Ich habe einen Anwendungsfall, wo ich Nachrichten konsumieren, speichern sie und dann Erfolg oder fehlschlagen. Der Mongo-Insert gibt ein Observable zurück, sodass ich eine Chainmap verwenden kann. Das Problem ist, dass die Einfügung Observable das Ergebnis der Einfügung ausgibt, aber ich brauche die ursprüngliche Nachricht, die von der ersten Observablen ausgegeben wird, um darauf zu antworten. Um dies zu erreichen, führe ich die Einfügung innerhalb des Abonnements des ersten Observablen und antworte innerhalb des zweiten Abonnements.Chaining Observable & Emitting/Passing Original Emit zu abonnieren Call
Ich hatte gehofft, dies auf eine reaktivere Weise mit einer Art Operator wie Flatmap zu erreichen. Ich suchte die Liste der Operatoren ab, und nichts kommt auf, wonach ich suche.
eb.consumer("persister.save.event").toObservable()
.subscribe(msg -> {
mongo.insertObservable("event", (JsonObject) msg.body())
.subscribe(
res -> msg.reply(new JsonObject().put("success", true)),
error -> msg.fail(500, "failed to save event"));
});
Ist der obige Code die Art und Weise, wie es getan werden sollte oder gibt es einen besseren Ansatz? Die beiden Abonnenten fühlen sich nicht richtig.
Ah, perfekt! Vielen Dank. – zylum