2015-10-01 12 views
9

Normalerweise, wenn man Java 8's parallelStream() verwendet, ist das Ergebnis die Ausführung über den standardmäßigen gemeinsamen Fork-Join-Pool (d. H. ForkJoinPool.commonPool()).Java parallelStream() mit benutzerdefinierten Pool mit Anrufer Arbeit stiehlt?

Dies ist jedoch eindeutig unerwünscht, wenn eine Arbeit vorliegt, die weit von der CPU entfernt ist, z. kann viel Zeit auf IO warten. In solchen Fällen möchte man einen separaten Pool verwenden, der nach anderen Kriterien bemessen ist (z. B. wie viel Zeit die Aufgaben wahrscheinlich tatsächlich mit der CPU verbringen).

Es gibt keine offensichtlich Mittel, um parallelStream() einen anderen Pool zu nutzen, aber es gibt einen Weg, wie detailliert here.

Leider führt dieser Ansatz dazu, die Terminal-Operation auf dem parallelen Stream von einem Fork-Join-Pool-Thread aufzurufen. Der Nachteil davon ist, dass, wenn der Target-Fork-Join-Pool vollständig mit vorhandener Arbeit beschäftigt ist, die gesamte Ausführung darauf wartet, während überhaupt nichts getan wird. Somit kann der Pool zu einem Engpass werden, der schlimmer ist als eine Einzel-Thread-Ausführung. Im Gegensatz dazu wird, wenn man parallelStream() in der "normalen" Art verwendet, ForkJoinPool.common.externalHelpComplete() oder ForkJoinPool.common.tryExternalUnpush() verwendet, um den aufrufenden Thread von außerhalb des Pools bei der Verarbeitung zu unterstützen.

Kennt jemand eine Möglichkeit, zu beide bekommt parallelStream() Pool einen Nicht-Standard-fork-join und von außerhalb der Gabel-join Pool Hilfe bei der Verarbeitung dieser Arbeit eine Berufung zu verwenden Thread (aber nicht der Rest der Gabel-Join-Pool-Arbeit)?

+3

Ich verstehe nicht Ihre _Der Nachteil davon ist, dass wenn der Ziel-Gabel-Join-Pool vollständig mit vorhandenen Arbeit_ beschäftigt ist. Würden Sie nicht nur für diesen parallelen Stream-Aufruf einen neuen Pool erstellen? –

+1

Es ist noch schlimmer. Wenn Sie bei Ihren Tasks 'get' aufrufen, die nicht im gemeinsamen Pool sind, wird' ForkJoinPool.common.tryExternalUnpush() 'aufgerufen, aber die Task wird nicht in der Warteschlange des gemeinsamen Pools gefunden. – Holger

+0

Um die Frage zu beantworten, nein, ich würde nicht nur für diesen Aufruf einen neuen Thread-Pool erstellen. Eher würde ich diesen anderen Thread-Pool über viele ähnliche Aufrufe teilen, von denen einige sich überlappen könnten, von denen einige viel längere/größere Aufgaben haben könnten als andere usw. –

Antwort

2

Sie können awaitQuiescence auf den Pool zu helfen. Sie können jedoch nicht auswählen, welche Aufgabe (n) Ihnen helfen wird, es wird nur das nächste ausstehende aus dem Pool übernehmen. Wenn also weitere ausstehende Aufgaben vorhanden sind, können Sie diese ausführen, bevor Sie zu Ihren eigenen gelangen.

ForkJoinPool forkJoinPool = new ForkJoinPool(1); 
// make all threads busy: 
forkJoinPool.submit(() -> LockSupport.parkNanos(Long.MAX_VALUE)); 
// submit our task (may contain your stream operation) 
ForkJoinTask<Thread> task = forkJoinPool.submit(() -> Thread.currentThread()); 
// help out 
while(!task.isDone()) // use zero timeout to execute one task only 
    forkJoinPool.awaitQuiescence(0, TimeUnit.NANOSECONDS); 
System.out.println(Thread.currentThread()==task.get()); 

wird true drucken.

während

ForkJoinPool forkJoinPool = new ForkJoinPool(1); 
// make all threads busy: 
forkJoinPool.submit(() -> LockSupport.parkNanos(Long.MAX_VALUE)); 
// overload: 
forkJoinPool.submit(() -> LockSupport.parkNanos(Long.MAX_VALUE)); 
// submit our task (may contain your stream operation) 
ForkJoinTask<Thread> task = forkJoinPool.submit(() -> Thread.currentThread()); 
// help out 
while(!task.isDone()) 
    forkJoinPool.awaitQuiescence(0, TimeUnit.NANOSECONDS); 
System.out.println(Thread.currentThread()==task.get()); 

hängen wird für immer, da es die zweite Sperr Aufgabe auszuführen versucht.

Nichtsdestoweniger wird es dem initiierenden Thread helfen, die ausstehenden Aufgaben des Pools zu verarbeiten, was die Chance erhöht, dass seine eigene Aufgabe ausgeführt wird, solange es keine unendlichen Aufgaben gibt (das obige Beispiel ist extrem und wird nur zur Demonstration gewählt).


Aber beachten Sie, dass die gesamte Beziehung zwischen dem Fork/Join-Framework und die Stream API ist eine Implementierung Detail sowieso.

+0

Ich war zu dem Schluss gekommen, dass ich das tun könnte, aber wie du feststellst, könnte das sehr wohl bedeuten, dass ich am Ende einem anderen Thread mit seinen Aufgaben helfe, anstatt mir selbst zu helfen. Das ist eine Art Nichtstarter. Auch ich bekomme, dass Gabel-Join ist ein Implementierungsdetail, aber es muss eine bessere Kontrolle über ParallelStream(), z. etwas so einfaches wie parallelStream (forkJoinPool). –

+1

Nun, mit einer Methode wie 'parallelStream (ForkJoinPool)' wäre es kein Implementierungsdetail mehr ... – Holger

+0

Nein, aber was parallelStream() mit keinen Argumenten tut, wäre immer noch ein Implementierungsdetail. Dies würde Ihnen nur die Möglichkeit geben, ein wenig Kontrolle zu übernehmen, wenn Sie es brauchen. –