2016-04-30 14 views
5

Meine Anwendung verfügt über eine Akka-Websocket-Schnittstelle. Der Web-Socket besteht aus einem Actor-Subscriber und einem Actor-Publisher. Der Abonnent behandelt Befehle, indem er sie an den entsprechenden Akteur sendet. Der Publisher hört den Event-Stream ab und veröffentlicht Update-Informationen zurück zum Stream (und schließlich zum Client). Das funktioniert gut.So senden Sie eine Nachricht in einem reaktiven Stream von einem Abonnenten an einen Publisher in einer Web-Socket-Verbindung

Meine Frage: Wie ist es für den Abonnenten möglich, ein Ereignis zurück an den Stream zu senden? Zum Beispiel, um die Ausführung eines empfangenen Befehls zu bestätigen.

public class WebSocketApp extends HttpApp { 

    private static final Gson gson = new Gson(); 

    @Override 
    public Route createRoute() { 
    return get(
     path("metrics").route(handleWebSocketMessages(metrics())) 
     ); 
    } 

    private Flow<Message, Message, ?> metrics() { 
    Sink<Message, ActorRef> metricsSink = Sink.actorSubscriber(WebSocketCommandSubscriber.props()); 
    Source<Message, ActorRef> metricsSource = 
     Source.actorPublisher(WebSocketDataPublisherActor.props()) 
     .map((measurementData) -> TextMessage.create(gson.toJson(measurementData))); 
    return Flow.fromSinkAndSource(metricsSink, metricsSource); 
    } 
} 

Eine schöne Lösung könnte sein, dass der zeichn Schauspieler (die WebSocketCommandSubscriber Schauspieler im Code oben) eine Nachricht zurück an den Strom wie sender().tell(...) ... No

Antwort

4

schicken konnte, es nicht möglich ist, zumindest nicht direkt. Streams sind immer unidirektional - alle Nachrichten fließen in eine Richtung, während die Nachfrage in die entgegengesetzte Richtung fließt. Sie müssen Ihre Bestätigungsnachrichten von der Senke an die Quelle weiterleiten, damit die letztere sie an den Client zurücksendet, z. B. indem der Quellaktor im Sink-Actor registriert wird. Es könnte wie folgt aussehen:

Flow.fromSinkAndSourceMat(metricsSink, metricsSource, (sinkActor, sourceActor) -> { 
    sinkActor.tell(new RegisterSource(sourceActor), null); 
}) 

Dann, nach dem Waschbecken Schauspieler RegisterSource Nachricht empfängt, kann er die Bestätigungsmeldungen an den angegebenen ActorRef, die sie in den Ausgabestrom weiterleiten werden dann senden.