2016-06-28 10 views
3

mit dem folgenden Code:CombineLatest und kalt Observablen

 Observable<String> obs1 = Observable.just("1", "2", "3"); 
     Observable<String> obs2 = Observable.just("a", "b"); 
     Observable.combineLatest(obs1, 
           obs2, 
           (s1, s2) -> s1 + ":" + s2 
     ).subscribe(System.out::println); 

ich erwarten würde es so etwas wie die folgenden drucken:

1:a 
2:a 
3:a 
3:b 

Aber es druckt

3:a 
3:b 

Warum ist erste Observable nur die letzte Position emittieren? Vor combinateLatest gibt es keine aktiven Abonnements, daher sollte es kalt sein. Wie kann ich sicherstellen, dass alle Objekte aus beiden Observablen miteinander kombiniert sind?

Antwort

3

Dies geschieht, weil Sie synchrone Quellen haben. Wenn combineLatest seine Quellen abonniert, ruft es standardmäßig 128 Elemente von ihnen ab. Dies bedeutet, dass Ihre erste Quelle synchron zur Fertigstellung ausgeführt wird und das zweite Abonnement nicht einmal bis zu diesem Abschluss stattfindet. Da die erste Quelle alleine ist, werden alle außer dem allerletzten Element gelöscht. Sobald die zweite Quelle abonniert ist, findet sie die allerletzte von der ersten und kombiniert sich damit.

Was haben Sie versucht zu erreichen? Wenn Sie paarweise kombinieren möchten, verwenden Sie zip. Wenn Sie alle Kombinationen von ersten und zweiten möchten, verwenden Sie flatMap:

obs1.flatMap(v -> obs2.map(w -> v + ":" + w)).subscribe(...) 
+0

Naja, eigentlich mein Code funktioniert, weil meine Quelle ist nicht synchron, ich auf diese stolperte, wenn sie versuchen zu schreiben ein Test dafür.
Was ich versuche zu erreichen, ist folgendes:

    * Abfrage Mongodb, um eine Observable mit einem Strom von bestehenden IDs zu bekommen. * Abfrage AWS s3, um eine Observable mit einem Strom von Dateinamen zu erhalten * für jede s3-Datei, überprüfen Sie, ob der Dateiname in Mongo existiert. Wenn nicht, entferne die Datei von s3.
grub

+0

@akarnokd: mit deiner flatMap wird 'obs2' nur abonnieren, nachdem der erste Wert von' obs1' ausgegeben wurde. Bei 'combineLatest' ist das Abonnement sequenziell. Gibt es einen Weg, "obs2" direkt nach dem "obs1" -Abonnement zu abonnieren? – dwursteisen

0

Vielen Dank für Ihre Antwort, akarnokd. Der Prefetch von 128 Elementen ist ein sehr nützlicher Einblick. Ich habe mein Problem durch Umschalten der beiden Observablen gelöst, so

Observable<String> obs1 = Observable.just("1", "2", "3", "4"); 
Observable<String> obs2 = Observable.just("4", "b", "1"); 
Observable.combineLatest(obs1, 
         obs2.toList(), 
         (s1, list) -> list.stream().anyMatch((listItem) -> s1.contains(listItem)) ? null : s1) 
      .filter(StringUtils::isNotEmpty) 
      .subscribe(System.out::println); 

arbeitet nun wie folgt:

Observable<String> obs1 = Observable.just("1", "2", "3", "4"); 
Observable<String> obs2 = Observable.just("4", "b", "1"); 
Observable.combineLatest(obs2.toList(), 
         obs1, 
         (list, s1) -> list.stream().anyMatch((listItem) -> s1.contains(listItem)) ? null : s1) 
      .filter(StringUtils::isNotEmpty) 
      .subscribe(System.out::println);