Wie können WebSocket-Streams mit RxJS modelliert werden?Modellierung von WebSocket-Streams mit RxJS
Die offensichtlichen Dinge, die ich sehe, sind Ströme von Sockets, die Ströme von Nachrichten ausstrahlen.
Wenn ich einen Stream von Sockets erzeuge, wie kann ich Streams ihrer Nachrichten erstellen und trotzdem bewahren, wer diese Nachrichten gesendet hat?
Der Socket-Stream war mein erster Schritt:
const socket$ = Observable.create(({complete, next}) => {
const server = new WebSocketServer({server: someHttpServer})
server.on('connection', next)
return() => {
server.close()
complete()
}
})
Aber der Nachrichtenstrom ein bisschen schwieriger ist, weil ich die Steckdosen muß ich aus den Nachrichten bekam.
Das war meine Faust naiver Versuch auf der Modellierung:
const message$ = socket$.flatMap(socket => Observable.create(({complete, next}) => {
socket.on('message', next)
socket.on('close', complete)
return() => socket.close()
})).share()
A beobachtbar, die alle Socket-Nachrichten von allen Steckdosen Ströme. Aber wenn ich es abonniere, habe ich keine Sockets mehr, was dieses unidirektional macht.
Ich möchte
socket$ -> message$ -> server-processing -> socket$
Aber es gibt mehrere Anwendungsfälle für die Antworten, Broadcast-, Multicast- und Unicast.
Vielen Dank, aber dies scheint für Kunden zu sein. Ich habe mehr Details zu meiner Frage hinzugefügt :) –
Diese Bibliothek ist jetzt veraltet. Es wurde direkt in RxJS gefaltet: https://github.com/ReactiveX/rxjs/tree/master/src/internal/observable/dom – Sawtaytoes