2015-05-13 6 views
95

Sind Java 8-Streams RxJava-Observablen ähnlich?Unterschied zwischen Java 8-Streams und RxJava-Observablen

Java 8 Stream Definition:

Klassen in dem neuen Paket java.util.stream bietet einen Stream-API zu Unterstützung funktionellen Stil Operationen auf Ströme von Elementen.

+8

FYI gibt es Vorschläge, um mehr RxJava wie Klassen in JDK 9. http vorstellen: // jsr166 -concurrency.10961.n7.nabble.com/jdk9-Candidate-classes-Flow-and-SubmissionPublisher-td11967.html –

+0

@JohnVint Was ist der Status dieses Vorschlags? Wird es tatsächlich fliegen? –

+2

@IgorGanapolsky Oh ja, es sieht definitiv so aus, als würde es in jdk9 landen. http://cr.openjdk.java.net/~martin/webrevs/openjdk9/jsr166-jdk9-integration/. Es gibt sogar einen Port für RxJava zum Fließen https://github.com/akarnokd/RxJavaUtilConcurrentFlow. –

Antwort

93

TL; DR: Alle Sequenz/Stream-Processing-Bibliotheken bieten eine sehr ähnliche API für den Pipelinebau. Die Unterschiede bestehen in der API für die Handhabung von Multi-Threading und die Zusammensetzung von Pipelines.

RxJava ist ganz anders als Stream. Von allen JDK-Dingen ist das am nächsten zu rx.Observable vielleicht java.util.stream.Collector Stream + CompleableFuture-Combo (was zu einem Preis des Umgangs mit einer zusätzlichen Monad-Schicht führt, d.h. die Umwandlung zwischen Stream<CompletableFuture<T>> und CompletableFuture<Stream<T>> bewältigen muss).

Es gibt erhebliche Unterschiede zwischen beobachtbaren und Stream:

  • Streams sind Pull-basierte, Observable sind Push-basiert. Das hört sich vielleicht zu abstrakt an, hat aber erhebliche Konsequenzen, die sehr konkret sind.
  • -Stream kann nur einmal verwendet werden kann beobachtbar oft
  • Stream#parallel() Splits Sequenz in Partitionen abonniert werden, Observable#subscribeOn() und Observable#observeOn() nicht; Es ist schwierig, Stream#parallel() Verhalten mit Observable zu emulieren, es hatte einmal .parallel() Methode, aber diese Methode verursacht so viel Verwirrung, dass .parallel() Unterstützung wurde auf separaten Repository auf Github, RxJavaParallel verschoben. Weitere Details sind in another answer.
  • Stream#parallel() lässt nicht zu, einen Threadpool zu verwenden, anders als die meisten RxJava-Methoden, die optionalen Scheduler akzeptieren. Da alle Stream-Instanzen in einer JVM verwenden die gleiche fork-join Pool, das Hinzufügen .parallel() kann versehentlich das Verhalten in einem anderen Modul des Programms beeinflussen
  • Streams fehlen zeitbezogene Operationen wie Observable#interval(), Observable#window() und viele andere; Dies liegt vor allem daran, dass die Streams Pull-basiert sind
  • Streams bieten eingeschränkte Reihe von Operationen im Vergleich zu RxJava. Z.B. Die Ströme haben keine Abschneideoperationen (takeWhile(), takeUntil()); Abhilfe mit Stream#anyMatch() begrenzt: es ist Terminalbetrieb, so dass Sie nicht mehr als einmal pro Stream 8
  • Ab JDK verwenden können, gibt es keinen Strom # Zip-Operation, die manchmal ganz nützlich ist
  • Streams schwer um selbst zu konstruieren, Observable kann durch viele Möglichkeiten konstruiert werdenEDIT: Wie in Kommentaren erwähnt, gibt es Möglichkeiten, um Stream zu konstruieren. Da es jedoch keinen nicht-terminalen Kurzschluss gibt, können Sie nicht e. G. leicht generieren Stream von Zeilen in Datei (JDK bietet Dateien # Zeilen und BufferedReader # Linien out of the Box und andere ähnliche Szenarien können verwaltet werden, indem Stream aus Iterator).
  • Observable bietet Ressourcen-Management-Einrichtung (Observable#using()); Sie können IO-Stream oder Mutex damit umhüllen und sicherstellen, dass der Benutzer nicht vergessen wird, die Ressource freizugeben - er wird automatisch bei Beendigung des Abonnements entsorgt; Stream hat onClose(Runnable) Methode, aber Sie müssen es manuell oder über Try-mit-Ressourcen aufrufen. Z.B. Sie müssen daran denken, dass Files # lines() in try-with-resources blockieren müssen.
  • Observables sind vollständig synchronisiert (ich habe nicht wirklich überprüft, ob das gleiche für Streams gilt). Dies erspart Ihnen das Denken, ob grundlegende Operationen threadsicher sind (die Antwort ist immer "Ja", es sei denn, es gibt einen Fehler), aber der Nebenläufigkeits-Overhead wird da sein, egal ob Ihr Code es benötigt oder nicht.

Zusammenfassung: RxJava unterscheidet sich erheblich von Streams. Echte RxJava-Alternativen sind andere Implementierungen von ReactiveStreams, z. G. relevanter Teil von Akka.

Aktualisieren. Es gibt Trick zu verwenden Nicht-Standard-fork-join Pool für Stream#parallel finden Custom thread pool in Java 8 parallel stream

aktualisieren. All dies basiert auf den Erfahrungen mit RxJava 1.x. Jetzt, dass RxJava 2.x is here, diese Antwort möglicherweise nicht mehr aktuell ist.

+2

Warum sind Streams schwer zu konstruieren? Laut diesem Artikel scheint es einfach zu sein: http://www.oracle.com/technetwork/articles/java/ma14-java-se-8-streams-2177646.html –

+1

Es gibt eine ganze Reihe von Klassen, die 'stream' haben "Methode: Sammlungen, Eingabe-Streams, Verzeichnis-Dateien, etc. Aber was ist, wenn Sie einen Stream aus einer benutzerdefinierten Schleife erstellen möchten - sagen wir, über Datenbank-Cursor iterieren? Der beste Weg, den ich bisher gefunden habe, ist einen Iterator zu erstellen, ihn mit Spliterator zu umhüllen und schließlich StreamSupport # fromSpliterator aufzurufen. Zu viel Kleber für einen einfachen Fall IMHO. Es gibt auch Stream.iterate, aber es erzeugt unendlichen Stream. Die einzige Möglichkeit, sream in diesem Fall zu deaktivieren, ist Stream # anyMatch, aber es handelt sich um eine Terminaloperation. Daher können Stream Producer und Consumer nicht getrennt werden. –

+1

RxJava hat Observable.fromCallable, Observable.create und so weiter. Oder du kannst unbedenklich Observable erzeugen, dann sag '. TakeWhile (condition)', und du bist in Ordnung, diese Sequenz an die Kunden zu senden. –

44

Java 8 Stream und RxJava sieht sehr ähnlich aus. Sie haben ähnliche Operatoren (Filter, Map, FlatMap ...), sind aber nicht für die gleiche Verwendung gebaut.

Mit RxJava können Sie asynchonus-Aufgaben ausführen.

Mit Java 8 streamen Sie Elemente Ihrer Sammlung.

Sie können so ziemlich dasselbe in RxJava (traverse Elemente einer Sammlung) tun, aber, wie RxJava auf gleichzeitige Aufgabe konzentriert ist, ..., verwenden Sie Synchronisation, Latch, ... Also die gleiche Aufgabe mit RxJava möglicherweise langsamer als mit Java 8-Stream.

RxJava kann mit CompletableFuture verglichen werden, aber das kann in der Lage sein, mehr als nur einen Wert zu berechnen.

+12

Es ist erwähnenswert, dass die Aussage über Stream Traversal nur für einen nicht parallelen Stream gilt. 'parallelStream' unterstützt ähnliche Synchronisation von einfachen Traversalen/Karten/Filterung etc .. –

+1

Ich denke nicht "Also die gleiche Aufgabe mit RxJava kann langsamer sein als mit Java 8 Stream." gilt universell, es hängt stark von der jeweiligen Aufgabe ab. – daschl

+1

Ich bin froh, dass Sie gesagt haben ** gleiche Aufgabe mit RxJava möglicherweise langsamer als mit Java 8 Stream **. Dies ist ein sehr wichtiger Unterschied, den viele RxJava-Benutzer nicht kennen. –

35

Es gibt einige technische und konzeptionelle Unterschiede, zum Beispiel Java 8-Streams sind single-use, Pull-basierte, synchrone Folgen von Werten, während RxJava Observables sind wiederbeobachtbare, adaptive Push-Pull-basierte, möglicherweise asynchrone Folgen von Werten. RxJava ist auf Java 6+ ausgerichtet und funktioniert auch auf Android.

+4

Typischer Code, der RxJava mit einbezieht, benutzt stark lambdas, die nur von Java 8 an verfügbar sind. So können Sie Rx mit Java 6 verwenden, aber der Code wird laut sein. –

+1

Eine ähnliche Unterscheidung ist, dass Rx Observables unbegrenzt am Leben bleiben können, bis sie abgemeldet werden. Java 8-Streams werden standardmäßig mit Operationen beendet. –

+2

@KirillGamazkov können Sie [retroLambda] (https://github.com/orfjackal/retrolambda) verwenden, um Ihren Code hübscher zu machen, wenn Sie auf Java 6 zielen. –

27

Java 8 Streams sind Pull-basiert. Sie durchlaufen einen Java 8-Stream, der jedes Element verbraucht. Und es könnte ein endloser Strom sein.

RXJava Observable ist standardmäßig Push-basiert. Sie abonnieren ein Observable und Sie werden benachrichtigt, wenn das nächste Element eintrifft (onNext) oder wenn der Stream abgeschlossen ist (onCompleted) oder wenn ein Fehler aufgetreten ist (onError). Denn mit Observable Sie erhalten onNext, onCompleted, onError Ereignisse, können Sie wie die Kombination verschiedener Observable s auf einen neuen einige leistungsstarke Funktionen tun (zip, merge, concat). Andere Dinge, die Sie tun könnten, ist Caching, Throttling, ... Und es verwendet mehr oder weniger die gleiche API in verschiedenen Sprachen (RxJava, RX in C#, RxJS, ...)

Standardmäßig RxJava ist single-threaded. Wenn Sie keine Scheduler verwenden, geschieht alles auf demselben Thread.

+0

in Stream Sie haben forEach, das ist ziemlich viel das gleiche als onNext – paul

+0

Eigentlich sind Streams in der Regel terminal. "Operationen, die eine Stream-Pipeline schließen, werden Terminal-Operationen genannt. Sie erzeugen ein Ergebnis aus einer Pipeline wie einer Liste, einer Ganzzahl oder sogar void (beliebiger Nicht-Stream-Typ)." ~ http: //www.oracle.com/technetwork/articles/java/ma14-java-se-8-streams-2177646.html –

3

RxJava ist auch eng mit der reactive streams initiative verbunden und betrachtet es als eine einfache Implementierung der reaktiven Ströme API (z. B. im Vergleich zu der Akka streams implementation). Der Hauptunterschied besteht darin, dass die reaktiven Ströme so ausgelegt sind, dass sie den Gegendruck beherrschen, aber wenn Sie sich die Seite mit den reaktiven Datenströmen ansehen, werden Sie auf die Idee kommen. Sie beschreiben ihre Ziele ziemlich gut und die Ströme sind auch eng verwandt mit the reactive manifesto.

Die Java 8-Streams sind so ziemlich die Implementierung einer unbegrenzten Sammlung, ziemlich ähnlich der Scala Stream oder der Clojure lazy seq.

1

Java 8-Streams ermöglichen die effiziente Verarbeitung sehr großer Sammlungen bei gleichzeitiger Nutzung von Multicore-Architekturen. Im Gegensatz dazu ist RxJava standardmäßig single-threaded (ohne Scheduler). Daher wird RxJava Multi-Core-Rechner nicht nutzen, wenn Sie diese Logik nicht selbst programmieren.

+3

Stream ist standardmäßig auch single-threaded, es sei denn, Sie rufen .parallel() auf. Außerdem gibt Rx mehr Kontrolle über Nebenläufigkeit. –

9

Die vorhandenen Antworten sind umfassend und korrekt, aber ein klares Beispiel für Anfänger fehlt. Gestatten Sie mir, einige Begriffe wie "push/pull-based" und "re-observable" zu hinterfragen. Hinweis: Ich hasse den Begriff Observable (es ist ein Strom um Himmels Willen), so wird einfach auf J8 vs RX-Streams beziehen.

Betrachten wir eine Liste, wenn ganze Zahlen,

digits = [1,2,3,4,5] 

Ein J8-Stream ist ein Dienstprogramm, um die Sammlung zu ändern. Zum Beispiel können auch Ziffern extrahiert werden,

evens = digits.stream().filter(x -> x%2) 

Diese im Grunde das ist Python map, filter, reduce, ein sehr schönes (und längst überfälligen) zusätzlich zu Java. Was aber, wenn die Ziffern nicht vorher gespeichert werden? Kann man die geraden Ziffern noch filtern?

Stellen ein separater Thread Prozess ganze Zahlen zufällig Ausgeben (--- bezeichnet die Zeit)

digits = 12345---6------7--8--9-10--------11--12 

In RX, even auf jede neue digit reagieren kann und die Filter in Echtzeit

even = -2-4-5---6---------8----10------------12 
gilt

Es ist nicht erforderlich, Eingabe- und Ausgabelisten zu speichern. Wenn Sie eine Ausgabeliste wollen, kein Problem, das auch streamable ist. In der Tat, everything is a stream.

evens_stored = even.collect() 

Aus diesem Grund Begriffe wie "stateless" und "funktional" sind mehr mit RX