2012-06-07 4 views
25

Meine Frage: Wie eine Reihe von Thread-Objekte auf einem ThreadPoolExecutor ausführen und warten auf sie alle zu beenden, bevor Sie weitermachen?Wie warten, bis ein ThreadPoolExecutor fertig ist

Ich bin neu in ThreadPoolExecutor. Dieser Code ist also ein Test, um zu lernen, wie es funktioniert. Im Moment fülle ich nicht einmal die BlockingQueue mit den Objekten, weil ich nicht verstehe, wie man die Warteschlange ohne Aufruf execute() mit einem anderen RunnableObject startet. Jedenfalls rufe ich gerade awaitTermination() an, aber ich denke, dass mir noch etwas fehlt. Irgendwelche Tipps wären toll! Vielen Dank.

public void testThreadPoolExecutor() throws InterruptedException { 
    int limit = 20; 
    BlockingQueue q = new ArrayBlockingQueue(limit); 
    ThreadPoolExecutor ex = new ThreadPoolExecutor(limit, limit, 20, TimeUnit.SECONDS, q); 
    for (int i = 0; i < limit; i++) { 
    ex.execute(new RunnableObject(i + 1)); 
    } 
    ex.awaitTermination(2, TimeUnit.SECONDS); 
    System.out.println("finished"); 
} 

Die RunnableObject Klasse:

package playground; 

public class RunnableObject implements Runnable { 

    private final int id; 

    public RunnableObject(int id) { 
    this.id = id; 
    } 

    @Override 
    public void run() { 
    System.out.println("ID: " + id + " started"); 
    try { 
     Thread.sleep(2354); 
    } catch (InterruptedException ignore) { 
    } 
    System.out.println("ID: " + id + " ended"); 
    } 
} 
+1

Ich glaube nicht, es ist eine gute Idee, um Ihre Frage sofort, nachdem Sie es schreiben und vorschlagen zu beantworten, dass Ihr eigene Antwort ist nicht ganz passend. Im besten Fall garantiert und aktualisiert sie Ihre ursprüngliche Frage, warum diese Antwort nicht ausreicht. – Kiril

+1

@Link, du hast Recht. Ich werde es reparieren. – kentcdodds

+0

Ich habe gerade meine Antwort bearbeitet, um eine Möglichkeit zu zeigen, auf alle Jobs zu warten, bevor ich den Executor herunterfahre. – Gray

Antwort

43

Sie hinzugefügt werden sollte Schleife auf awaitTermination

ExecutorService threads; 
// ... 
// Tell threads to finish off. 
threads.shutdown(); 
// Wait for everything to finish. 
while (!threads.awaitTermination(10, TimeUnit.SECONDS)) { 
    log.info("Awaiting completion of threads."); 
} 
+0

Das hat es total geschafft. Ich habe nicht bemerkt, dass 'awaistTermination' irgendetwas zurückgibt. Niemand sonst hat erwähnt, dass ich es durchlaufen müsste, um zu blockieren. Vielen Dank! – kentcdodds

+9

Warum in einer Schleife? Warum nicht einfach die Anzahl der Sekunden erhöhen, um darauf zu warten. Normalerweise mache ich eine 'awaitTermination (Long.MAX_VALUE, TimeUnit.SECONDS)' oder so. – Gray

+1

@Gray - Ich habe es anfangs nur einmal in einem Stück Code von mir aufgerufen und es passierten entweder wirklich seltsame Dinge beim Beenden manchmal oder alles wurde gesperrt. Irgendwann habe ich es aufgespürt, also schleife ich jetzt und zeichne eine "wartende" Nachricht auf, damit ich weiß, dass etwas Seltsames vor sich geht. – OldCurmudgeon

4

Ihr Problem scheint zu sein, dass Sie nicht shutdown fordern, nachdem Sie alle Jobs zu Ihrem Pool eingereicht haben. Ohne shutdown() wird Ihr awaitTermination immer false zurückgeben.

ThreadPoolExecutor ex = 
    new ThreadPoolExecutor(limit, limit, 20, TimeUnit.SECONDS, q); 
for (int i = 0; i < limit; i++) { 
    ex.execute(new RunnableObject(i + 1)); 
} 
// you are missing this line!! 
ex.shutdown(); 
ex.awaitTermination(2, TimeUnit.SECONDS); 

Sie können auch so etwas wie das für alle Ihre Jobs warten folgende tun zu beenden:

List<Future<Object>> futures = new ArrayList<Future<Object>>(); 
for (int i = 0; i < limit; i++) { 
    futures.add(ex.submit(new RunnableObject(i + 1), (Object)null)); 
} 
for (Future<Object> future : futures) { 
    // this joins with the submitted job 
    future.get(); 
} 
... 
// still need to shutdown at the end 
ex.shutdown(); 

Auch, weil Sie für 2354 Millisekunden schlafen, aber nur für die Beendigung aller der Warte Jobs für 2SECONDS, awaitTermination werden immer false zurückgeben.

Schließlich klingt es so, als ob Sie sich Sorgen um eine neue ThreadPoolExecutor erstellt haben und stattdessen möchten Sie die erste wiederverwenden. Sei nicht. Der GC-Overhead wird im Vergleich zu jedem Code, den Sie schreiben, um festzustellen, ob die Jobs beendet sind, extrem minimal sein.


von dem javadocs zitieren, ThreadPoolExecutor.shutdown():

ein ordnungsgemäßes Abschalten Leitet, in dem zuvor eingereichten Aufgaben ausgeführt werden, aber keine neue Aufgaben akzeptiert werden. Die Aktivierung hat keine zusätzlichen Auswirkungen, wenn sie bereits heruntergefahren ist.

Im ThreadPoolExecutor.awaitTermination(...) Verfahren wird für den Zustand des Testamentsvollstreckers warten TERMINATED zu gehen. Aber zuerst muss der Zustand zu SHUTDOWN gehen, wenn shutdown() aufgerufen wird oder STOP wenn shutdownNow() aufgerufen wird.

3

Es hat nichts mit dem Executor selbst zu tun. Verwenden Sie einfach die Schnittstelle java.util.concurrent.ExecutorService.invokeAll(Collection<? extends Callable<T>>). Es wird blockiert, bis alle Callable s beendet sind.

Executors sollen langlebig sein; über die Lebensdauer einer Gruppe von Aufgaben hinaus. shutdown ist, wenn die Anwendung beendet und aufgeräumt ist.

+0

Können Sie diese Antwort näher erläutern? Ich mag die Antwort, aber es fällt mir schwer, sie umzusetzen. Danke – kentcdodds

-1

die Sie interessieren,

ThreadPoolExecutor ex = 
    new ThreadPoolExecutor(limit, limit, 20, TimeUnit.SECONDS, q); 
for (int i = 0; i < limit; i++) { 
    ex.execute(new RunnableObject(i + 1)); 
} 

Lines

ex.shutdown(); 
ex.awaitTermination(timeout, unit) 
+0

Sie sollten auf den zurückgegebenen Boolean von warten auf Beendigung überprüfen und weiter versuchen, bis Sie wahr werden. –

1

Ein weiterer Ansatz ist CompletionService zu bedienen, sehr nützlich, wenn Sie müssen versuchen Sie ein Task-Ergebnis:

//run 3 task at time 
final int numParallelThreads = 3; 

//I used newFixedThreadPool for convenience but if you need you can use ThreadPoolExecutor 
ExecutorService executor = Executors.newFixedThreadPool(numParallelThreads); 
CompletionService<String> completionService = new ExecutorCompletionService<String>(executor); 

int numTaskToStart = 15; 

for(int i=0; i<numTaskToStart ; i++){ 
    //task class that implements Callable<String> (or something you need) 
    MyTask mt = new MyTask(); 

    completionService.submit(mt); 
} 

executor.shutdown(); //it cannot be queued more task 

try { 
    for (int t = 0; t < numTaskToStart ; t++) { 
     Future<String> f = completionService.take(); 
     String result = f.get(); 
     // ... something to do ... 
    } 
} catch (InterruptedException e) { 
    //termination of all started tasks (it returns all not started tasks in queue) 
    executor.shutdownNow(); 
} catch (ExecutionException e) { 
    // ... something to catch ... 
} 
2

Hier ist eine Variante auf der akzeptierte Antwort, die Wiederholungen behandelt, wenn/falls InterruptedException geworfen wird:

executor.shutdown(); 

boolean isWait = true; 

while (isWait) 
{ 
    try 
    {    
     isWait = !executor.awaitTermination(10, TimeUnit.SECONDS); 
     if (isWait) 
     { 
      log.info("Awaiting completion of bulk callback threads."); 
     } 
    } catch (InterruptedException e) { 
     log.debug("Interruped while awaiting completion of callback threads - trying again..."); 
    } 
}