In der aktuellen Akka-Dokumentation gibt es eine nice example of creating a client server architecture. Ich erstelle einen Akka-Actor, der Nachrichten auf dem Bitcoin-Protokoll senden und empfangen kann. Bisher konnte ich Nachrichten senden & erhalten Antworten auf die Nachricht, die ich gesendet habe, aber ich war nicht in der Lage, unerwünschte Nachrichten wie erforderlich auf dem Peer-to-Peer-Protokoll zu empfangen.Akka Tcp erstellen Peer-to-Peer-Architektur anstelle von Client-Server
Ich habe versucht, Tcp.Bind
und Tcp.Connect
zu verwenden, um die Lage sein, unerwünschte Nachrichten auf Port 18333
pfeifen hören auch zum Senden von Nachrichten in der Lage zu einem Peer im Netzwerk. Ich stehe jedoch auf dieses Problem, wo es sagen wird, dass der Port bereits gebunden ist (durch das Ereignis Tcp.Connect
) oder es wird nicht in der Lage sein, Nachrichten von diesem Port zu senden (aufgrund des Ereignisses Tcp.Bind
).
Wie kann ich Nachrichten senden und unerwünschte Nachrichten auf dem gleichen Port empfangen? Fehle ich hier etwas?
sealed trait Client extends Actor with BitcoinSLogger {
/**
* The address of the peer we are attempting to connect to
* on the p2p network
* @return
*/
def remote: InetSocketAddress
/**
* The actor that is listening to all communications between the
* client and its peer on the network
* @return
*/
def listener : ActorRef
def actorSystem : ActorSystem
/**
* The manager is an actor that handles the underlying low level I/O resources (selectors, channels)
* and instantiates workers for specific tasks, such as listening to incoming connections.
*/
def manager : ActorRef = IO(Tcp)(actorSystem)
/**
* This actor signifies the node we are connected to on the p2p network
* This is set when we received a [[Tcp.Connected]] message
*/
private var peer : Option[ActorRef] = None
def receive = {
case message : Tcp.Message => message match {
case event : Tcp.Event =>
logger.debug("Event: " + event)
handleEvent(event)
case command : Tcp.Command =>
logger.debug("Command: " + command)
handleCommand(command)
}
case unknownMessage => throw new IllegalArgumentException("Unknown message for client: " + unknownMessage)
}
/**
* This function is responsible for handling a [[Tcp.Event]] algebraic data type
* @param event
*/
private def handleEvent(event : Tcp.Event) = event match {
case Tcp.Bound(localAddress) =>
logger.debug("Actor is now bound to the local address: " + localAddress)
case Tcp.CommandFailed(w: Tcp.Write) =>
logger.debug("Client write command failed: " + Tcp.CommandFailed(w))
logger.debug("O/S buffer was full")
// O/S buffer was full
//listener ! "write failed"
case Tcp.CommandFailed(command) =>
logger.debug("Client Command failed:" + command)
case Tcp.Received(data) =>
logger.debug("Received data from our peer on the network: " + BitcoinSUtil.encodeHex(data.toArray))
//listener ! data
case Tcp.Connected(remote, local) =>
logger.debug("Tcp connection to: " + remote)
logger.debug("Local: " + local)
peer = Some(sender)
peer.get ! Tcp.Register(listener)
listener ! Tcp.Connected(remote,local)
case Tcp.ConfirmedClosed =>
logger.debug("Client received confirmed closed msg: " + Tcp.ConfirmedClosed)
peer = None
context stop self
}
/**
* This function is responsible for handling a [[Tcp.Command]] algebraic data type
* @param command
*/
private def handleCommand(command : Tcp.Command) = command match {
case Tcp.ConfirmedClose =>
logger.debug("Client received connection closed msg: " + Tcp.ConfirmedClose)
listener ! Tcp.ConfirmedClose
peer.get ! Tcp.ConfirmedClose
}
}
case class ClientImpl(remote: InetSocketAddress, network : NetworkParameters,
listener: ActorRef, actorSystem : ActorSystem) extends Client {
manager ! Tcp.Bind(listener, new InetSocketAddress(network.port))
//this eagerly connects the client with our peer on the network as soon
//as the case class is instantiated
manager ! Tcp.Connect(remote)
}
object Client {
def props(remote : InetSocketAddress, network : NetworkParameters, listener : ActorRef, actorSystem : ActorSystem) : Props = {
Props(classOf[ClientImpl], remote, network, listener, actorSystem)
}
def apply(remote : InetSocketAddress, network : NetworkParameters, listener : ActorRef, actorSystem : ActorSystem) : ActorRef = {
actorSystem.actorOf(props(remote, network, listener, actorSystem))
}
def apply(network : NetworkParameters, listener : ActorRef, actorSystem : ActorSystem) : ActorRef = {
//val randomSeed = ((Math.random() * 10) % network.dnsSeeds.size).toInt
val remote = new InetSocketAddress(network.dnsSeeds(0), network.port)
Client(remote, network, listener, actorSystem)
}
EDIT: Hinzufügen von Testfall, meine Schauspieler verwendet
"Client" must "connect to a node on the bitcoin network, " +
"send a version message to a peer on the network and receive a version message back, then close that connection" in {
val probe = TestProbe()
val client = Client(TestNet3, probe.ref, system)
val conn : Tcp.Connected = probe.expectMsgType[Tcp.Connected]
val versionMessage = VersionMessage(TestNet3, conn.localAddress.getAddress,conn.remoteAddress.getAddress)
val networkMessage = NetworkMessage(TestNet3, versionMessage)
client ! networkMessage
val receivedMsg = probe.expectMsgType[Tcp.Received](5.seconds)
//~~~~~~~~THIS IS WHERE THE TEST IS FAILING~~~~~~~~~~~~~~~~~~
//the bitcoin protocol states that after exchanging version messages a verack message is sent if the version message is accepted
//this is appearing on wireshark, but not being found by my actor
val verackMessage = probe.expectMsgType[Tcp.Received](2.seconds)
}
EDIT2:
Ausgang Wireshark zeigt, dass ich diese Nachrichten erhalte, und akka ist sie nicht
Dies hat nicht funktioniert. In meinem Testfall bin ich 'Probe.expectMsgType [Tcp.Received] (2.seconds)' und entsprechend wireshark erhalte ich eine Nachricht von der gleichen IP-Adresse, aber akka registriert nicht –
Ihr Handler für Tcp.Connected registriert a verschiedener Akteur, der auf eingehende Daten wartet, in Ihrem Beispiel an den val Listener gebunden. Gibt es diesen Akteur? Warum verwenden Sie den Selbstdarsteller nicht, damit der Client selbst auf diese Nachrichten reagiert? –
Der 'Listener'-Aktor in diesem Beispiel ist eigentlich eine' TestProbe' in meinem Testfall. Ich werde den Testfall veröffentlichen, damit Sie sehen können –