Ich habe einen Input-Stream von einem RS-232-Port und eine Warteschlange von Befehlen an den seriellen Port mit Rx-Streams. Ich habe meinen Code vereinfacht wie folgt:Anfrage/Antwort mit Streams
void Init()
{
SerialPort srl;
... // open serial port
IObservable<string> obInput =
Observable.FromEventPattern<
SerialDataReceivedEventHandler,
SerialDataReceivedEventArgs>
(
handler => srl.DataReceived += handler,
handler => srl.DataReceived -= handler
).SelectMany(_ =>
{
List<string> ret;
... //extract messages
return ret;
}).Publish().Refcount();
obCommandOk =
obInput
.Where(msg => msg == "OK" || msg == "KO");
var sAction = new Subject<string>();
var sCommandOk = new Subject<Tuple<string,bool>>();
sAction
.Do(srl.WriteLine)
.Zip(obCommandOk, (cmd, result) =>
{
if (result == "OK")
sCommandOk.OnNext(Tuple.Create(cmd, true))
else
sCommandOk.OnNext(Tuple.Create(cmd, false))
});
}
async bool Command(string cmd)
{
sAction.OnNext(cmd);
return
await sCommandOk
.Where(t => t.Item1 == cmd)
.Select(t => t.Item2)
.FirstAsync();
}
kommt vor, dass nach OnNext das Ergebnis bereits nach sCommandOk gedrückt wurde und so habe ich es zu verlieren.
Können Sie mir einen besseren Ansatz vorschlagen, um den Verlust von Antworten zu vermeiden?
Können wir bieten wollen http://stackoverflow.com/hilfe/mcve bitte? Sicher, ich bekomme es ist Pseudo-Code, aber es ist etwa 5 Änderungen weg von echten C#, die kompiliert. –
@LeeCampbell es ist ziemlich schwierig zu extrapolieren, was Sie von meinem Code anfordern, aber ich habe etwas geschrieben, das hier kompiliert https://gist.github.com/zpul/b7a59e559e523a8a3b1077b1ec8013ef (zu groß für SO) – zpul
Sie sollten wirklich nicht verwenden ein externer veränderbarer Puffer von 'SelectMany', das ist wirklich eine wirklich schlechte Idee. Sie können absolut nichts über den Inhalt garantieren. – TheInnerLight