2015-01-15 12 views
5

Ich möchte eine Client-Anwendung implementieren, die zunächst eine Anfrage an den Server senden dann für ihre Antwort warten (ähnlich http)scalaz-Stream, wie `ask-dann-wait-reply` zu implementieren TCP-Client

Mein Klient Prozess kann sein

val topic = async.topic[ByteVector] 
val client = topic.subscribe 

Hier wird die api ist

trait Client { 
    val incoming = tcp.connect(...)(client) 
    val reqBus = topic.pubsh() 
    def ask(req: ByteVector): Task[Throwable \/ ByteVector] = { 
     (tcp.writes(req).flatMap(_ => tcp.reads(1024))).to(reqBus) 
     ??? 
    } 
} 

Dann, wie das bleiben Teil ask zu implementieren?

Antwort

6

Normalerweise ist die Implementierung mit der Veröffentlichung der Nachricht über Senke und dann warten auf irgendeine Art von Antwort auf einige Quelle, wie Ihr Thema.

Eigentlich haben wir eine Menge von Idiomen dies in unserem Code:

def reqRply[I,O,O2](src:Process[Task,I],sink:Sink[Task,I],reply:Process[Task,O])(pf: PartialFunction[O,O2]):Process[Task,O2] = { 
merge.mergeN(Process(reply, (src to sink).drain)).collectFirst(pf) 
} 

Im Wesentlichen diese erste Haken Strom antworten jede resultierende O erwarten unsere Anfrage gesendet bestätigt. Dann veröffentlichen wir die Nachricht I und konsultieren pf für alle eingehenden O, um schließlich in O2 übersetzt werden und dann zu beenden.

+0

ist 'TSource [O2]' das gleiche wie 'Process1 [O, O2]'? – ahjohannessen

+0

Eigentlich nicht, es ist Typ Alias ​​aus unserer Code-Basis für 'Prozess [Task, O2]', Entschuldigung für Verwirrung. –

+2

Ah, danke @pavel :) Es wäre schön einige weitere Beispiele zu sehen, wie dieses hier in der freien Wildbahn verwendet wird :) Das Hinzufügen von Beispielen für gebräuchliche Muster zum scalaz-stream Wiki würde uns helfen, die in der Bibliothek neu sind :) – ahjohannessen