2013-07-31 8 views
21

Ich habe mit CompletionStage/CompletableFuture in Java 8 gespielt, um eine asynchrone Verarbeitung durchzuführen, was ziemlich gut funktioniert. Manchmal möchte ich jedoch, dass eine Stufe eine asynchrone Verarbeitung eines Iterators/Stroms von Elementen durchführt, und es scheint keine Möglichkeit dafür zu geben.Ist es möglich, Java 8 Streams API für die asynchrone Verarbeitung zu verwenden?

Insbesondere hat Stream.forEach() die Semantik, dass nach dem Aufruf alle Elemente verarbeitet wurden. Ich würde das gleiche will, aber mit einem CompletionStag statt, z.B .:

CompletionStage<Void> done = stream.forEach(...); 
done.thenRun(...); 

Wenn der Strom durch ein asynchrones Streaming Ergebnis gesichert wird dies wäre viel besser, als warten, dass es mich in dem obigen Code auf Vollständigkeit.

Ist es irgendwie möglich, dies mit der aktuellen Java 8 API zu konstruieren? Problemumgehungen?

+0

Hallo! Ich habe meine Antwort aktualisiert. Wenn es dich zufrieden stellen würde, stimme bitte ab Ich versuche, mehr Ansehen zu erlangen;) –

+2

@Rickard Mag es zu spät sein :) aber ich habe etwas gefunden was du willst wenn ich dich richtig verstehe. http://www.reactive-streams.org/. https://github.com/reactor/reactor –

Antwort

20

Soweit ich weiß, unterstützt die Stream-API keine asynchrone Ereignisverarbeitung. Klingt, als ob Sie etwas wie Reactive Extensions für .NET möchten, und es gibt einen Java-Port namens RxJava, der von Netflix erstellt wurde.

RxJava unterstützt viele der gleichen High-Level-Operationen wie Java 8-Streams (z. B. Karte und Filter) und ist asynchron.

aktualisieren: Es gibt jetzt eine reactive streams Initiative in den Arbeiten, und es sieht aus wie JDK 9 wird zumindest ein Teil davon, obwohl die Flow Klasse Support.

+19

Tatsächlich ist RxJava, was Sie wollen. Beim Design Center für Streams handelt es sich hauptsächlich um Daten, auf die ohne Latenz zugegriffen werden kann (entweder von Datenstrukturen oder von Funktionen). Das Design Center für Rx ist über unendliche Ströme von Ereignissen, die asynchron ankommen können. –

7

Als @KarolKrol angedeutet, können Sie es mit einem Stream von CompletableFuture tun.

Es gibt eine Bibliothek, die auf JDK8-Streams aufbaut, um das Arbeiten mit Streams von CompletableFuture namens cyclops-react zu erleichtern.

Um Ihre Streams zu komponieren, können Sie cyclics-reacts flüssiges Versprechen ike API verwenden oder Sie können simple-reacts Stages verwenden.

+0

Ich vergesse diese Tage zu wählen :) –

+0

@ KarolKról statt Lobbying für upvotes, einfach die bestmöglichen Antworten auf weitere Fragen schreiben. –

2

cyclops-react (Ich bin der Autor dieser Bibliothek), bietet eine StreamUtils Klasse für die Verarbeitung von Streams. Eine der Funktionen, die es bietet, ist futureOperations, die Zugriff auf die Standard-Stream-Terminal-Operationen (und dann einige) mit einer Wendung bietet - der Stream wird asynchron ausgeführt und das Ergebnis wird innerhalb einer CompleableFuture zurückgegeben. .eg

Stream<Integer> stream = Stream.of(1,2,3,4,5,6) 
             .map(i->i+2); 
CompletableFuture<List<Integer>> asyncResult = StreamUtils.futureOperations(stream, 
              Executors.newFixedThreadPool(1)) 
             .collect(Collectors.toList()); 

Es gibt auch eine convience Klasse ReactiveSeq, die Stream-wickelt und bietet die gleiche Funktionalität, mit einem schönen fließend API

CompletableFuture<List<Integer>> asyncResult = ReactiveSeq.of(1,2,3,4,5,6) 
             .map(i->i+2) 
             .futureOperations(
              Executors.newFixedThreadPool(1)) 
             .collect(Collectors.toList()); 

Wie Adam bemerkt hat cyclops-react FutureStreams ausgelegt sind, Daten asynchron zu verarbeiten (indem Futures und Streams zusammengemischt werden) - es ist besonders geeignet für Multithread-Operationen, bei denen E/A blockiert wird (z. B. Dateien lesen, db-Aufrufe machen, Ruheaufrufe machen usw.).

1

Es ist möglich, einen Stream zu erzeugen, jedes Element auf CompletionStage abzubilden und die Ergebnisse unter Verwendung von CompletionStage.thenCombine() zu sammeln, aber der resultierende Code wird nicht lesbarer sein, als wenn man einfach für so verwendet.

Dieses Beispiel kann leicht in funktional forEach umgewandelt werden, ohne die Lesbarkeit zu verlieren. Aber die Verwendung von Streams reduce oder collect erfordert zusätzlichen, nicht so feinen Code.

Update: CompletableFuture.allOf und CompletableFuture.join bieten eine andere, lesbarere Möglichkeit der Transformation der Sammlung von zukünftigen Ergebnissen zu zukünftigen Sammlung von Ergebnissen.