2016-05-22 5 views
4

Wie können WebSocket-Streams mit RxJS modelliert werden?Modellierung von WebSocket-Streams mit RxJS

Die offensichtlichen Dinge, die ich sehe, sind Ströme von Sockets, die Ströme von Nachrichten ausstrahlen.

Wenn ich einen Stream von Sockets erzeuge, wie kann ich Streams ihrer Nachrichten erstellen und trotzdem bewahren, wer diese Nachrichten gesendet hat?

Der Socket-Stream war mein erster Schritt:

const socket$ = Observable.create(({complete, next}) => { 

    const server = new WebSocketServer({server: someHttpServer}) 

    server.on('connection', next) 

    return() => { 
    server.close() 
    complete() 
    } 

}) 

Aber der Nachrichtenstrom ein bisschen schwieriger ist, weil ich die Steckdosen muß ich aus den Nachrichten bekam.

Das war meine Faust naiver Versuch auf der Modellierung:

const message$ = socket$.flatMap(socket => Observable.create(({complete, next}) => { 

    socket.on('message', next) 
    socket.on('close', complete) 

    return() => socket.close() 

})).share() 

A beobachtbar, die alle Socket-Nachrichten von allen Steckdosen Ströme. Aber wenn ich es abonniere, habe ich keine Sockets mehr, was dieses unidirektional macht.

Ich möchte

socket$ -> message$ -> server-processing -> socket$ 

Aber es gibt mehrere Anwendungsfälle für die Antworten, Broadcast-, Multicast- und Unicast.

Antwort

2

Ich fand heraus, dass flatMap eine zweite Funktion übernimmt, dass die das Argument Wert von flatMap und die (abgeflachte) Rückgabewert von flatMap empfängt. Diese Funktion kann einen neuen Wert zurückgeben, der für alle späteren Operatoren verwendet wird.

const socketMessage$ = socket$.flatMap(

    socket => Observable.create(({complete, next}) => { 

    socket.on('message', next) 
    socket.on('close', complete) 

    return() => socket.disconnect() 

    }), 

    (socket, message) => ({socket, message}) 

).share() 
1

Wissen Sie nicht, ob das ist, was Sie suchen (was meinst du mit still preserve who sent these messages), aber es gibt eine Bibliothek zum Einwickeln von Websockets in Observable. Werfen Sie einen Blick in die Dokumentation hier: https://github.com/Reactive-Extensions/RxJS-DOM/blob/master/doc/operators/fromwebsocket.md

+0

Vielen Dank, aber dies scheint für Kunden zu sein. Ich habe mehr Details zu meiner Frage hinzugefügt :) –

+0

Diese Bibliothek ist jetzt veraltet. Es wurde direkt in RxJS gefaltet: https://github.com/ReactiveX/rxjs/tree/master/src/internal/observable/dom – Sawtaytoes