2014-03-12 5 views
30

Siehe einfaches Beispiel unten, dass die Anzahl des Auftretens jedes Wortes in einer Liste zählt:parallele Ströme, Sammler und Thread-Sicherheit

Stream<String> words = Stream.of("a", "b", "a", "c"); 
Map<String, Integer> wordsCount = words.collect(toMap(s -> s, s -> 1, 
                 (i, j) -> i + j)); 

Am Ende wordsCount ist {a=2, b=1, c=1}.

Aber mein Strom ist sehr groß, und ich will den Job parallelisieren, so dass ich schreiben:

Map<String, Integer> wordsCount = words.parallel() 
             .collect(toMap(s -> s, s -> 1, 
                 (i, j) -> i + j)); 

aber ich habe bemerkt, dass wordsCount ist ein einfaches HashMap so frage ich mich, wenn ich ausdrücklich für eine Notwendigkeit zu fragen, Concurrent Karte Thread-Sicherheit gewährleisten:

Map<String, Integer> wordsCount = words.parallel() 
             .collect(toConcurrentMap(s -> s, s -> 1, 
                   (i, j) -> i + j)); 

können nicht gleichzeitige Sammler sicher mit einem Parallelstrom verwendet werden oder soll ich nur die gleichzeitigen Versionen verwenden, wenn sie von einem parallelen Strom zu sammeln?

Antwort

32

Können nicht gleichzeitig ablaufende Kollektoren sicher mit einem parallelen Strom verwendet werden oder sollte ich nur die parallelen Versionen beim Sammeln aus einem parallelen Strom verwenden?

Es ist sicher, einen nicht-gleichzeitigen Kollektor in einem collect Betrieb eines parallelen Stroms zu verwenden.

Im specification der Collector Schnittstelle, in dem Abschnitt mit einem halben Dutzend Aufzählungspunkten ist dies:

Für nicht gleichzeitige Kollektoren kehrte jedes Ergebnis von den Ergebnissen Lieferanten, Akkumulator oder Kombinierer-Funktionen muss seriell eingegrenzt sein. Dadurch kann die Erfassung parallel erfolgen, ohne dass der Collector eine zusätzliche Synchronisierung implementieren muss. Die Reduzierungsimplementierung muss verwalten, dass die Eingabe ordnungsgemäß partitioniert ist, dass Partitionen isoliert verarbeitet werden und das Kombinieren erst dann erfolgt, wenn die Akkumulation abgeschlossen ist. Diese

bedeutet, dass die verschiedenen Implementierungen von der Collectors-Klasse können mit parallelen Strömen verwendet werden, obwohl einige dieser Implementierungen nicht gleichzeitig Sammler sein könnten. Dies gilt auch für Ihre eigenen nicht gleichzeitig ablaufenden Kollektoren, die Sie möglicherweise implementieren. Sie können sicher mit parallelen Streams verwendet werden, vorausgesetzt, Ihre Collectors stören nicht die Stream-Quelle, sind frei von Nebenwirkungen, unabhängig von der Reihenfolge usw.

Ich empfehle auch den Abschnitt Mutable Reduction von java.util.stream zu lesen Paketdokumentation. In der Mitte dieses Abschnitts befindet sich ein Beispiel, das als parallelisierbar bezeichnet wird, das jedoch Ergebnisse in eine ArrayList sammelt, die nicht threadsicher ist.

Die Art und Weise, wie dies funktioniert, ist, dass ein paralleler Stream, der in einem nicht-gleichzeitigen Kollektor endet, dafür sorgt, dass verschiedene Threads immer auf verschiedenen Instanzen der Zwischenergebnis-Collections arbeiten. Aus diesem Grund verfügt ein Collector über eine Supplier-Funktion, um so viele Zwischensammlungen zu erstellen, wie Threads vorhanden sind, sodass sich jeder Thread in einem eigenen Thread ansammeln kann. Wenn Zwischenergebnisse zusammengeführt werden sollen, werden sie sicher zwischen den Threads übergeben, und zu jedem gegebenen Zeitpunkt führt nur ein einzelner Thread ein Paar von Zwischenergebnissen zusammen.

8

Es ist sicher, nicht-gleichzeitige Sammlungen und nicht-atomare Zähler mit parallelen Streams zu verwenden.

Wenn Sie einen Blick auf die Dokumentation von Stream::collect nehmen Sie den folgenden Absatz finden:

Wie reduce(Object, BinaryOperator), können Sie Operationen ohne zusätzliche Synchronisation parallelisiert werden.

Und für die Methode Stream::reduce:

Während dies einen Umweg zu sein scheint eine laufende Summe in einer Schleife eine Aggregation im Vergleich zu durchführen, um einfach mutiert, parallelisieren Reduktionsoperationen eleganten, ohne zusätzliche zu benötigen Synchronisation und mit stark reduziertem Risiko von Datenrennen.

Dies könnte ein wenig überraschend sein. Beachten Sie jedoch, dass parallele Streams auf einem Fork-Join-Modell basieren. Das bedeutet, dass die gleichzeitige Ausführung funktioniert wie folgt:

  • Split-Sequenz in zwei Teile mit etwa der gleichen Größe
  • Prozess jedes Teil einzeln
  • sammeln die Ergebnisse der beiden Teile und kombinieren sie zu einem Ergebnis

Im zweiten Schritt werden die drei Schritte rekursiv auf die Untersequenzen angewendet.

Ein Beispiel sollte das klar machen. Die

IntStream.range(0, 4) 
    .parallel() 
    .collect(Trace::new, Trace::accumulate, Trace::combine); 

Der einzige Zweck der Klasse Trace ist der Konstruktor und Methodenaufrufe protokollieren. Wenn Sie diese Anweisung ausführen, druckt er die folgenden Zeilen:

thread: 9/operation: new 
thread: 10/operation: new 
thread: 10/operation: accumulate 
thread: 1/operation: new 
thread: 1/operation: accumulate 
thread: 1/operation: combine 
thread: 11/operation: new 
thread: 11/operation: accumulate 
thread: 9/operation: accumulate 
thread: 9/operation: combine 
thread: 9/operation: combine 

Sie können sehen, dass vier Trace Objekte erstellt wurden, akkumulieren hat einmal auf jedes Objekt aufgerufen wurde, und kombinieren hat dreimal verwendet, um die vier Objekte zu einem zu kombinieren. Jedes Objekt kann nur auf jeweils einen Thread zugreifen. Das macht den Code threadsicher, und das gleiche gilt für die Methode Collectors :: toMap.

16

Alle Kollektoren können, wenn sie den Regeln der Spezifikation entsprechen, sicher parallel oder sequenziell ausgeführt werden. Parallelität ist hier ein wesentlicher Bestandteil des Designs.

Die Unterscheidung zwischen parallelen und nicht-parallelen Kollektoren hat mit dem Ansatz der Parallelisierung zu tun.

Ein gewöhnlicher (nicht gleichzeitiger) Kollektor arbeitet, indem er Unterergebnisse zusammenführt. Daher wird die Quelle in eine Menge von Blöcken aufgeteilt, jeder Block wird in einem Ergebniscontainer (wie einer Liste oder einer Karte) gesammelt und dann werden die Unterergebnisse in einem größeren Ergebniscontainer zusammengeführt. Dies ist sicher und Ordnung-Erhaltung, aber für einige Arten von Containern - vor allem Karten - kann teuer sein, da das Zusammenführen von zwei Karten per Schlüssel oft teuer ist.

Ein Concurrent Collector erstellt stattdessen einen Ergebniscontainer, dessen Einfügevorgänge garantiert threadsicher sind, und fügt Elemente aus mehreren Threads in ihn ein. Mit einem hochgradig gleichzeitigen Ergebniscontainer wie ConcurrentHashMap kann dieser Ansatz möglicherweise bessere Ergebnisse erzielen als das Zusammenführen gewöhnlicher HashMaps.

So sind die Concurrent Collectors streng Optimierungen gegenüber ihren gewöhnlichen Gegenstücken. Und sie kommen nicht ohne Kosten; Da Elemente aus vielen Threads gesprengt werden, können gleichzeitige Kollektoren die Reihenfolge der Begegnungen im Allgemeinen nicht beibehalten. (Aber oft ist es Ihnen egal - wenn Sie ein Wortzählungs-Histogramm erstellen, ist es Ihnen egal, welche Instanz von "foo" Sie zuerst gezählt haben.)