2016-06-14 5 views
2

In der aktuellen Akka-Dokumentation gibt es eine nice example of creating a client server architecture. Ich erstelle einen Akka-Actor, der Nachrichten auf dem Bitcoin-Protokoll senden und empfangen kann. Bisher konnte ich Nachrichten senden & erhalten Antworten auf die Nachricht, die ich gesendet habe, aber ich war nicht in der Lage, unerwünschte Nachrichten wie erforderlich auf dem Peer-to-Peer-Protokoll zu empfangen.Akka Tcp erstellen Peer-to-Peer-Architektur anstelle von Client-Server

Ich habe versucht, Tcp.Bind und Tcp.Connect zu verwenden, um die Lage sein, unerwünschte Nachrichten auf Port 18333 pfeifen hören auch zum Senden von Nachrichten in der Lage zu einem Peer im Netzwerk. Ich stehe jedoch auf dieses Problem, wo es sagen wird, dass der Port bereits gebunden ist (durch das Ereignis Tcp.Connect) oder es wird nicht in der Lage sein, Nachrichten von diesem Port zu senden (aufgrund des Ereignisses Tcp.Bind).

Wie kann ich Nachrichten senden und unerwünschte Nachrichten auf dem gleichen Port empfangen? Fehle ich hier etwas?

sealed trait Client extends Actor with BitcoinSLogger { 

    /** 
    * The address of the peer we are attempting to connect to 
    * on the p2p network 
    * @return 
    */ 
    def remote: InetSocketAddress 

    /** 
    * The actor that is listening to all communications between the 
    * client and its peer on the network 
    * @return 
    */ 
    def listener : ActorRef 


    def actorSystem : ActorSystem 
    /** 
    * The manager is an actor that handles the underlying low level I/O resources (selectors, channels) 
    * and instantiates workers for specific tasks, such as listening to incoming connections. 
    */ 
    def manager : ActorRef = IO(Tcp)(actorSystem) 


    /** 
    * This actor signifies the node we are connected to on the p2p network 
    * This is set when we received a [[Tcp.Connected]] message 
    */ 
    private var peer : Option[ActorRef] = None 

    def receive = { 
    case message : Tcp.Message => message match { 
     case event : Tcp.Event => 
     logger.debug("Event: " + event) 
     handleEvent(event) 
     case command : Tcp.Command => 
     logger.debug("Command: " + command) 
     handleCommand(command) 
    } 
    case unknownMessage => throw new IllegalArgumentException("Unknown message for client: " + unknownMessage) 
    } 

    /** 
    * This function is responsible for handling a [[Tcp.Event]] algebraic data type 
    * @param event 
    */ 
    private def handleEvent(event : Tcp.Event) = event match { 
    case Tcp.Bound(localAddress) => 
     logger.debug("Actor is now bound to the local address: " + localAddress) 
    case Tcp.CommandFailed(w: Tcp.Write) => 
     logger.debug("Client write command failed: " + Tcp.CommandFailed(w)) 
     logger.debug("O/S buffer was full") 
     // O/S buffer was full 
     //listener ! "write failed" 
    case Tcp.CommandFailed(command) => 
     logger.debug("Client Command failed:" + command) 
    case Tcp.Received(data) => 
     logger.debug("Received data from our peer on the network: " + BitcoinSUtil.encodeHex(data.toArray)) 
     //listener ! data 
    case Tcp.Connected(remote, local) => 
     logger.debug("Tcp connection to: " + remote) 
     logger.debug("Local: " + local) 
     peer = Some(sender) 
     peer.get ! Tcp.Register(listener) 
     listener ! Tcp.Connected(remote,local) 
    case Tcp.ConfirmedClosed => 
     logger.debug("Client received confirmed closed msg: " + Tcp.ConfirmedClosed) 
     peer = None 
     context stop self 
    } 
    /** 
    * This function is responsible for handling a [[Tcp.Command]] algebraic data type 
    * @param command 
    */ 
    private def handleCommand(command : Tcp.Command) = command match { 
    case Tcp.ConfirmedClose => 
     logger.debug("Client received connection closed msg: " + Tcp.ConfirmedClose) 
     listener ! Tcp.ConfirmedClose 
     peer.get ! Tcp.ConfirmedClose 
    } 

} 


case class ClientImpl(remote: InetSocketAddress, network : NetworkParameters, 
         listener: ActorRef, actorSystem : ActorSystem) extends Client { 

    manager ! Tcp.Bind(listener, new InetSocketAddress(network.port)) 
    //this eagerly connects the client with our peer on the network as soon 
    //as the case class is instantiated 
    manager ! Tcp.Connect(remote) 

} 

object Client { 


    def props(remote : InetSocketAddress, network : NetworkParameters, listener : ActorRef, actorSystem : ActorSystem) : Props = { 
    Props(classOf[ClientImpl], remote, network, listener, actorSystem) 
    } 

    def apply(remote : InetSocketAddress, network : NetworkParameters, listener : ActorRef, actorSystem : ActorSystem) : ActorRef = { 
    actorSystem.actorOf(props(remote, network, listener, actorSystem)) 
    } 

    def apply(network : NetworkParameters, listener : ActorRef, actorSystem : ActorSystem) : ActorRef = { 
    //val randomSeed = ((Math.random() * 10) % network.dnsSeeds.size).toInt 
    val remote = new InetSocketAddress(network.dnsSeeds(0), network.port) 
    Client(remote, network, listener, actorSystem) 
    } 

EDIT: Hinzufügen von Testfall, meine Schauspieler verwendet

"Client" must "connect to a node on the bitcoin network, " + 
    "send a version message to a peer on the network and receive a version message back, then close that connection" in { 
    val probe = TestProbe() 
    val client = Client(TestNet3, probe.ref, system) 

    val conn : Tcp.Connected = probe.expectMsgType[Tcp.Connected] 

    val versionMessage = VersionMessage(TestNet3, conn.localAddress.getAddress,conn.remoteAddress.getAddress) 
    val networkMessage = NetworkMessage(TestNet3, versionMessage) 
    client ! networkMessage 
    val receivedMsg = probe.expectMsgType[Tcp.Received](5.seconds) 

    //~~~~~~~~THIS IS WHERE THE TEST IS FAILING~~~~~~~~~~~~~~~~~~ 
    //the bitcoin protocol states that after exchanging version messages a verack message is sent if the version message is accepted 
    //this is appearing on wireshark, but not being found by my actor 
    val verackMessage = probe.expectMsgType[Tcp.Received](2.seconds) 

    } 

EDIT2:

Ausgang Wireshark zeigt, dass ich diese Nachrichten erhalte, und akka ist sie nicht

Registrierung enter image description here

Antwort

1

Die Hauptabstraktion von Akka ist Actors, also Peers in Tcp sind nur Actors, die Sie Nachrichten von UND Nachrichten senden können.

In diesem Fall können Sie die ActorRef Ihres Kollegen erhalten, indem Sie sender() aufrufen, sobald Sie eine Tcp.Connected Nachricht erhalten haben. In Ihrem Code speichern Sie diese Referenz bereits in peer. Es sollte so einfach wie sein, um beliebige Daten zurück zu diesem Peer zu senden.

Da die Verbindung an jeder beliebigen Stelle brechen könnte, erscheinen die docs dies mit Schauspieler Aufsicht zu handhaben:

(Das ist mir

class SimpleClient(connection: ActorRef, remote: InetSocketAddress) 
    extends Actor with ActorLogging { 

    import Tcp._ 

    // sign death pact: this actor terminates when connection breaks 
    context watch connection 
    ... 
} 

aktualisiert viel zu lange gedauert hat, zu realisieren.) Das Problem, das Sie haben, ist, dass Sie nicht explizit mit Message Framing umgehen: dh die Mechanismen der Pufferakkumulation und der Nachrichtenrekonstruktion. Akka TCP reicht Ihnen nur Rohpuffer.Diese Puffer unterbrechen NICHT notwendigerweise die Nachrichtengrenzen oder wissen überhaupt etwas über die Nachrichten der Protokolle höherer Ebene, wie BitCoin, die TCP benutzen.

Wenn Sie den Testfall ausführen, empfängt der Listener eine Tcp.Receive-Nachricht mit 1244 Byte Daten. Davon extrahiert der Komponententest eine NetworkHeader und eine VersionMessage, aber es ist durchaus möglich, dass mehr Nachrichten in diesem Puffer extrahiert und verarbeitet werden, abhängig von den Besonderheiten des Bitcoin-Protokolls, aber dass es nicht behandelt wird. Statt dessen wird der Puffer verworfen, und der Testfall wartet auf einen zweiten Puffer (der möglicherweise nicht ankommen kann) und konstruiert aus diesem Puffer eine weitere Nachricht mit der versteckten Erwartung, dass er zufällig perfekt byte-ausgerichtet ist.

Architektonisch würde ich empfehlen, einen neuen Akteur speziell für den Nachrichtenrahmen zu erstellen. Dieser Akteur würde die rohen Bits empfangen und vervollständigte Nachrichten rekonstruieren, um sie an den Zuhörer zu senden.

+0

Dies hat nicht funktioniert. In meinem Testfall bin ich 'Probe.expectMsgType [Tcp.Received] (2.seconds)' und entsprechend wireshark erhalte ich eine Nachricht von der gleichen IP-Adresse, aber akka registriert nicht –

+0

Ihr Handler für Tcp.Connected registriert a verschiedener Akteur, der auf eingehende Daten wartet, in Ihrem Beispiel an den val Listener gebunden. Gibt es diesen Akteur? Warum verwenden Sie den Selbstdarsteller nicht, damit der Client selbst auf diese Nachrichten reagiert? –

+0

Der 'Listener'-Aktor in diesem Beispiel ist eigentlich eine' TestProbe' in meinem Testfall. Ich werde den Testfall veröffentlichen, damit Sie sehen können –

0

TCP-Sockets haben eine Eigenschaftermöglichen kann hier, die ich Sie glauben entweder

.reuseAddress(true) 

auf Socket-Objekt

oder

here ich ein socket-options Array sehen, die diese Eigenschaft beinhaltet:

socket-options { 
    so-receive-buffer-size = undefined 
    so-send-buffer-size = undefined 
    so-reuse-address = undefined 
    so-traffic-class = undefined 
    tcp-keep-alive = undefined 
    tcp-oob-inline = undefined 
    tcp-no-delay = undefined 
} 

I Ich denke, das ist das, wonach Sie gesucht haben, aber ich habe die Frage vielleicht falsch verstanden.