2010-05-03 5 views
73

Ich bin auf der Suche nach einer ExecutorService Implementierung, die mit einem Timeout bereitgestellt werden kann. Aufgaben, die an den ExecutorService gesendet werden, werden unterbrochen, wenn sie länger dauern als das Zeitlimit für die Ausführung. Die Umsetzung eines solchen Biests ist keine so schwierige Aufgabe, aber ich frage mich, ob jemand eine bestehende Implementierung kennt.ExecutorService, die Aufgaben nach einem Timeout unterbricht

Hier ist, was ich basierend auf einigen der folgenden Diskussion kam. Irgendwelche Kommentare?

import java.util.List; 
import java.util.concurrent.*; 

public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor { 
    private final long timeout; 
    private final TimeUnit timeoutUnit; 

    private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor(); 
    private final ConcurrentMap<Runnable, ScheduledFuture> runningTasks = new ConcurrentHashMap<Runnable, ScheduledFuture>(); 

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, long timeout, TimeUnit timeoutUnit) { 
     super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); 
     this.timeout = timeout; 
     this.timeoutUnit = timeoutUnit; 
    } 

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit) { 
     super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); 
     this.timeout = timeout; 
     this.timeoutUnit = timeoutUnit; 
    } 

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) { 
     super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); 
     this.timeout = timeout; 
     this.timeoutUnit = timeoutUnit; 
    } 

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) { 
     super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); 
     this.timeout = timeout; 
     this.timeoutUnit = timeoutUnit; 
    } 

    @Override 
    public void shutdown() { 
     timeoutExecutor.shutdown(); 
     super.shutdown(); 
    } 

    @Override 
    public List<Runnable> shutdownNow() { 
     timeoutExecutor.shutdownNow(); 
     return super.shutdownNow(); 
    } 

    @Override 
    protected void beforeExecute(Thread t, Runnable r) { 
     if(timeout > 0) { 
      final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t), timeout, timeoutUnit); 
      runningTasks.put(r, scheduled); 
     } 
    } 

    @Override 
    protected void afterExecute(Runnable r, Throwable t) { 
     ScheduledFuture timeoutTask = runningTasks.remove(r); 
     if(timeoutTask != null) { 
      timeoutTask.cancel(false); 
     } 
    } 

    class TimeoutTask implements Runnable { 
     private final Thread thread; 

     public TimeoutTask(Thread thread) { 
      this.thread = thread; 
     } 

     @Override 
     public void run() { 
      thread.interrupt(); 
     } 
    } 
} 
+0

Ist das der Timeout-Zeit der Unterwerfung ‚Startzeit‘? Oder die Zeit, zu der die Aufgabe ausgeführt wird? –

+0

Gute Frage. Wenn es mit der Ausführung beginnt. Vermutlich mit dem 'protected void beforeExecute (Thread t, Runnable r)' Haken. –

+0

@ scompt.com verwenden Sie immer noch diese Lösung oder wurde sie ersetzt –

Antwort

69

Sie einen ScheduledExecutorService dafür verwenden können. Zuerst würden Sie es nur einmal einreichen, um sofort zu beginnen und die Zukunft, die erstellt wird, zu behalten. Danach können Sie eine neue Aufgabe einreichen, die die beibehaltene Zukunft nach einiger Zeit aufhebt.

ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); 
final Future handler = executor.submit(new Callable(){ ... }); 
executor.schedule(new Runnable(){ 
    public void run(){ 
     handler.cancel(); 
    }  
}, 10000, TimeUnit.MILLISECONDS); 

Dies wird Ihre Handler ausführen (Hauptfunktionalität unterbrochen werden) für 10 Sekunden, dann abbrechen wird (das heißt Interrupt), dass bestimmte Aufgabe.

+9

Interessante Idee, aber was ist, wenn die Aufgabe vor dem Timeout endet (was wird normalerweise)? Ich würde lieber nicht Tonnen von Aufräumarbeiten warten, nur um herauszufinden, dass ihre zugewiesene Aufgabe bereits abgeschlossen ist. Es müsste einen weiteren Thread geben, der die Futures überwacht, wenn sie fertig sind, um ihre Aufräumarbeiten zu entfernen. –

+2

Der Executor plant diesen Abbruch nur einmal. Wenn die Aufgabe abgeschlossen ist, ist der Abbruch ein No-Op und die Arbeit wird unverändert fortgesetzt. Es muss nur eine zusätzliche Thread-Anweisung zum Abbrechen der Tasks und ein Thread zum Ausführen der Tasks geben. Sie könnten zwei Executoren haben, einen, um Ihre Hauptaufgaben einzureichen, und einen, um sie abzubrechen. –

+2

Das stimmt, aber was ist, wenn die Zeitüberschreitung 5 Stunden beträgt und in dieser Zeit 10k Aufgaben ausgeführt werden. Ich möchte vermeiden, dass all diese No-Ops herumliegen, die Speicher belegen und Kontextwechsel verursachen. –

3

Wickeln Sie die Aufgabe in FutureTask und Sie können eine Zeitüberschreitung für die FutureTask angeben. Schauen Sie sich das Beispiel in meiner Antwort auf diese Frage,

java native Process timeout

+0

Ich weiß, es gibt ein paar Möglichkeiten, dies mit den 'java.util.concurrent'-Klassen zu tun, aber ich suche nach einer 'ExecutorService'-Implementierung. –

+0

Wenn Sie sagen, dass Ihr ExecutorService die Tatsache verbergen soll, dass Timeouts aus dem Client-Code hinzugefügt werden, können Sie einen eigenen ExecutorService implementieren, der jeden ausführbaren Handle mit einer FutureTask umschließt, bevor er ausgeführt wird. – erikprice

5

Leider ist die Lösung fehlerhaft. Es gibt eine Art Bug mit ScheduledThreadPoolExecutor, der auch in this question gemeldet wird: Durch das Abbrechen einer übermittelten Aufgabe werden die mit der Aufgabe verknüpften Speicherressourcen nicht vollständig freigegeben; Die Ressourcen werden nur freigegeben, wenn der Task abläuft.

Wenn Sie daher eine TimeoutThreadPoolExecutor mit einer ziemlich langen Ablaufzeit (eine typische Verwendung) erstellen und Aufgaben schnell genug übergeben, füllen Sie den Speicher schließlich aus - obwohl die Aufgaben tatsächlich erfolgreich ausgeführt wurden.

Sie können das Problem mit den folgenden (sehr grob) Testprogramm finden Sie unter:

public static void main(String[] args) throws InterruptedException { 
    ExecutorService service = new TimeoutThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS, 
      new LinkedBlockingQueue<Runnable>(), 10, TimeUnit.MINUTES); 
    //ExecutorService service = Executors.newFixedThreadPool(1); 
    try { 
     final AtomicInteger counter = new AtomicInteger(); 
     for (long i = 0; i < 10000000; i++) { 
      service.submit(new Runnable() { 
       @Override 
       public void run() { 
        counter.incrementAndGet(); 
       } 
      }); 
      if (i % 10000 == 0) { 
       System.out.println(i + "/" + counter.get()); 
       while (i > counter.get()) { 
        Thread.sleep(10); 
       } 
      } 
     } 
    } finally { 
     service.shutdown(); 
    } 
} 

Das Programm erschöpft den verfügbaren Speicher, obwohl es für die gelaicht Runnable s wartet abzuschließen.

Ich dachte schon eine Weile darüber nach, aber leider konnte ich keine gute Lösung finden.

EDIT: Ich fand heraus, dass dieses Problem als JDK bug 6602600 gemeldet wurde, und scheint vor kurzem behoben worden zu sein.

1

Wie wäre es mit der ExecutorService.shutDownNow() Methode wie in beschrieben? Es scheint die einfachste Lösung zu sein.

+5

Da es alle geplanten Aufgaben und nicht eine bestimmte Aufgabe wie angefordert beendet wird durch die Frage – MikeL

1

Es scheint Problem ist nicht in JDK Bug 6602600 (es wurde 2010-05-22 gelöst), aber in inkorrekter Aufruf von Schlaf (10) im Kreis. Zusatz Anmerkung, dass der Haupt-Thread direkt CHANCE zu anderen Threads geben muss, um ihre Aufgaben zu realisieren, indem SLEEP (0) in JEDER Zweig des äußeren Kreises aufgerufen wird. Es ist besser, ich denke, Thread.yield() anstelle von Thread zu verwenden.Schlaf (0)

Das Ergebnis korrigiert Teil der bisherigen Problemcode ist so, wie folgt aus:

....................... 
........................ 
Thread.yield();   

if (i % 1000== 0) { 
System.out.println(i + "/" + counter.get()+ "/"+service.toString()); 
} 

//     
//    while (i > counter.get()) { 
//     Thread.sleep(10); 
//    } 

Es funktioniert richtig mit der Menge der äußeren Zähler bis zu 150 000 000 getestet Kreisen.

0

Was über diese alternative Idee:

  • zwei haben zwei Vollstrecker:
    • ein für:
      • die Aufgabe einreichen, ohne
      • über die Zeitüberschreitung der Aufgabe die Pflege Hinzufügen die Zukunft resultierte und die Zeit, wenn es zu einer internen Struktur enden sollte
    • ein interner Job, der die interne Struktur überprüft, wenn einige Tasks Timeout sind und wenn sie abgebrochen werden müssen.

Kleine Probe hier:

public class AlternativeExecutorService 
{ 

private final CopyOnWriteArrayList<ListenableFutureTask> futureQueue  = new CopyOnWriteArrayList(); 
private final ScheduledThreadPoolExecutor    scheduledExecutor = new ScheduledThreadPoolExecutor(1); // used for internal cleaning job 
private final ListeningExecutorService     threadExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5)); // used for 
private ScheduledFuture scheduledFuture; 
private static final long INTERNAL_JOB_CLEANUP_FREQUENCY = 1000L; 

public AlternativeExecutorService() 
{ 
    scheduledFuture = scheduledExecutor.scheduleAtFixedRate(new TimeoutManagerJob(), 0, INTERNAL_JOB_CLEANUP_FREQUENCY, TimeUnit.MILLISECONDS); 
} 

public void pushTask(OwnTask task) 
{ 
    ListenableFuture<Void> future = threadExecutor.submit(task); // -> create your Callable 
    futureQueue.add(new ListenableFutureTask(future, task, getCurrentMillisecondsTime())); // -> store the time when the task should end 
} 

public void shutdownInternalScheduledExecutor() 
{ 
    scheduledFuture.cancel(true); 
    scheduledExecutor.shutdownNow(); 
} 

long getCurrentMillisecondsTime() 
{ 
    return Calendar.getInstance().get(Calendar.MILLISECOND); 
} 

class ListenableFutureTask 
{ 
    private final ListenableFuture<Void> future; 
    private final OwnTask    task; 
    private final long     milliSecEndTime; 

    private ListenableFutureTask(ListenableFuture<Void> future, OwnTask task, long milliSecStartTime) 
    { 
     this.future = future; 
     this.task = task; 
     this.milliSecEndTime = milliSecStartTime + task.getTimeUnit().convert(task.getTimeoutDuration(), TimeUnit.MILLISECONDS); 
    } 

    ListenableFuture<Void> getFuture() 
    { 
     return future; 
    } 

    OwnTask getTask() 
    { 
     return task; 
    } 

    long getMilliSecEndTime() 
    { 
     return milliSecEndTime; 
    } 
} 

class TimeoutManagerJob implements Runnable 
{ 
    CopyOnWriteArrayList<ListenableFutureTask> getCopyOnWriteArrayList() 
    { 
     return futureQueue; 
    } 

    @Override 
    public void run() 
    { 
     long currentMileSecValue = getCurrentMillisecondsTime(); 
     for (ListenableFutureTask futureTask : futureQueue) 
     { 
      consumeFuture(futureTask, currentMileSecValue); 
     } 
    } 

    private void consumeFuture(ListenableFutureTask futureTask, long currentMileSecValue) 
    { 
     ListenableFuture<Void> future = futureTask.getFuture(); 
     boolean isTimeout = futureTask.getMilliSecEndTime() >= currentMileSecValue; 
     if (isTimeout) 
     { 
      if (!future.isDone()) 
      { 
       future.cancel(true); 
      } 
      futureQueue.remove(futureTask); 
     } 
    } 
} 

class OwnTask implements Callable<Void> 
{ 
    private long  timeoutDuration; 
    private TimeUnit timeUnit; 

    OwnTask(long timeoutDuration, TimeUnit timeUnit) 
    { 
     this.timeoutDuration = timeoutDuration; 
     this.timeUnit = timeUnit; 
    } 

    @Override 
    public Void call() throws Exception 
    { 
     // do logic 
     return null; 
    } 

    public long getTimeoutDuration() 
    { 
     return timeoutDuration; 
    } 

    public TimeUnit getTimeUnit() 
    { 
     return timeUnit; 
    } 
} 
} 
1

Nach Tonne Zeit zu überblicken,
Schließlich verwende ich invokeAll Methode von ExecutorService dieses Problem zu lösen.
Dies wird die Aufgabe während der laufenden Task strikt unterbrechen.
Hier ist Beispiel

ExecutorService executorService = Executors.newCachedThreadPool(); 

try { 
    List<Callable<Object>> callables = new ArrayList<>(); 
    // Add your long time task (callable) 
    callables.add(new VaryLongTimeTask()); 
    // Assign tasks for specific execution timeout (e.g. 2 sec) 
    List<Future<Object>> futures = executorService.invokeAll(callables, 2000, TimeUnit.MILLISECONDS); 
    for (Future<Object> future : futures) { 
     // Getting result 
    } 
} catch (InterruptedException e) { 
    e.printStackTrace(); 
} 

executorService.shutdown(); 

Die Pro ist auch ListenableFuture zur gleichen ExecutorService einreichen können.
Ändern Sie einfach die erste Zeile des Codes.

ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); 

ListeningExecutorService die Listening-Funktion von ExecutorService bei Google Guave Projekt (com.google.guava))

+1

Danke für das Aufzeigen von 'invokeAll'. Das funktioniert sehr gut. Nur ein Wort der Vorsicht für alle, die darüber nachdenken, dies zu verwenden: obwohl "invokeAll" eine Liste von "Future" -Objekten zurückgibt, scheint es tatsächlich eine blockierende Operation zu sein. – mxro

1

Mit John W Antwort, die ich eine Implementierung geschaffen, die das Timeout richtig beginnen, wenn der Task startet seine Ausführung. Ich schreibe sogar einen Komponententest dafür :)

Jedoch entspricht es meinen Bedürfnissen nicht, da einige IO-Operationen nicht unterbrechen, wenn Future.cancel() aufgerufen wird (dh wenn Thread.interrupted() aufgerufen wird).

Wie dem auch sei, wenn jemand interessiert ist, habe ich einen Kern: https://gist.github.com/amanteaux/64c54a913c1ae34ad7b86db109cbc0bf