2012-04-10 24 views
8

Es gibt etwas Seltsames an der Implementierung des BoundedExecutor in dem Buch Java Concurrency in Practice.Java Concurrency in der Praxis: Race-Bedingung in BoundedExecutor?

Es soll die Aufgabenübergabe an den Executor abbremsen, indem der übergebende Thread blockiert wird, wenn genügend Threads entweder in der Warteschlange stehen oder im Executor ausgeführt werden.

Dies ist die Implementierung (nach dem fehlenden rethrow in der catch-Klausel hinzufügen):

public class BoundedExecutor { 
    private final Executor exec; 
    private final Semaphore semaphore; 

    public BoundedExecutor(Executor exec, int bound) { 
     this.exec = exec; 
     this.semaphore = new Semaphore(bound); 
    } 

    public void submitTask(final Runnable command) throws InterruptedException, RejectedExecutionException { 
     semaphore.acquire(); 

     try { 
      exec.execute(new Runnable() { 
       @Override public void run() { 
        try { 
         command.run(); 
        } finally { 
         semaphore.release(); 
        } 
       } 
      }); 
     } catch (RejectedExecutionException e) { 
      semaphore.release(); 
      throw e; 
     } 
    } 

Als ich die BoundedExecutor mit einem Executors.newCachedThreadPool() und einer Grenze von 4 instanziiert, würde ich die Anzahl der Threads erwartet instanziiert durch der Cache-Thread-Pool sollte nie größer als 4 sein. In der Praxis ist dies jedoch der Fall. Ich habe dieses kleine Testprogramm bekommen, so viel wie 11 Threads zu erstellen:

public static void main(String[] args) throws Exception { 
    class CountingThreadFactory implements ThreadFactory { 
     int count; 

     @Override public Thread newThread(Runnable r) { 
      ++count; 
      return new Thread(r); 
     }   
    } 

    List<Integer> counts = new ArrayList<Integer>(); 

    for (int n = 0; n < 100; ++n) { 
     CountingThreadFactory countingThreadFactory = new CountingThreadFactory(); 
     ExecutorService exec = Executors.newCachedThreadPool(countingThreadFactory); 

     try { 
      BoundedExecutor be = new BoundedExecutor(exec, 4); 

      for (int i = 0; i < 20000; ++i) { 
       be.submitTask(new Runnable() { 
        @Override public void run() {} 
       }); 
      } 
     } finally { 
      exec.shutdown(); 
     } 

     counts.add(countingThreadFactory.count); 
    } 

    System.out.println(Collections.max(counts)); 
} 

Ich denke, es ist ein winzig kleiner Zeitrahmen zwischen der Freigabe des Semaphors und die Aufgabe endet, wo ein anderer Thread eine Genehmigung erwerben kann und Aufgabe einreichen, solange der Freigabe-Thread noch nicht beendet ist. Mit anderen Worten, es hat eine Wettlaufbedingung.

Kann jemand das bestätigen?

+1

Ich fügte ein 1ms Thread.sleep direkt nach dem semaphore.release() hinzu, um zu sehen, wie viel schlimmer es würde: Ich habe über 300 Threads erstellt. – toto2

Antwort

2

Ich sehe so viel wie 9 Threads auf einmal erstellt. Ich vermute, dass es eine Race-Bedingung gibt, die dazu führt, dass mehr Thread als erforderlich ist.

Dies könnte sein, weil es vor und nach dem Ausführen der Aufgabe Arbeit zu tun gibt. Das bedeutet, dass, obwohl nur 4 Threads in Ihrem Codeblock vorhanden sind, eine Reihe von Threads eine vorherige Aufgabe beendet oder bereit ist, eine neue Aufgabe zu starten.

d. H. Der Thread macht eine Freigabe(), während es noch läuft. Auch wenn es das letzte ist, was du tust, ist es nicht das letzte, was es macht, bevor du eine neue Aufgabe erwirbst.

+0

Ich stimme Ihrer Aussage zu, die im Grunde auch das ist, was Bossie an erster Stelle vermutet hat. Aber ich bin mir nicht sicher, ob ich das eine Race-Bedingung nennen würde, da dies einen Programmierfehler beinhaltet: Ich denke nicht, dass das Programm eine maximale Anzahl von Threads haben soll, so wie es geschrieben ist. – toto2

+0

@ toto2 Es ist kein regelmäßiger Programmierfehler, aber es gibt eine Wettlaufsituation zwischen dem Semaphor, der veröffentlicht wird, und dem Thread, der eine neue Aufgabe erwirbt. Wenn die Aufgabe länger dauert, wird dieses Verhalten möglicherweise nur selten angezeigt. –

+1

Ich verstehe die Aufgabenlänge (siehe meinen Kommentar zu Bossies Frage). Ich meine nur, dass es kein Bug ist, weil nichts angibt, dass es 4 Threads geben sollte. "Es ist kein Fehler, es ist ein Feature!" In diesem Fall denke ich, dass jemand diesen Anspruch wirklich geltend machen könnte. Entschuldigung ... Ich werde hier nur philosophisch. – toto2

5

Sie haben Ihre Analyse des Race Condition korrekt durchgeführt. Es gibt keine Synchronisationsgarantien zwischen dem ExecutorService & des Semaphore.

Allerdings weiß ich nicht, ob der BoundedExecutor für die Drosselung der Anzahl der Threads verwendet wird. Ich denke, es ist mehr für die Drosselung der Anzahl der Aufgaben an den Dienst übermittelt. Stellen Sie sich vor, Sie haben 5 Millionen Aufgaben, die Sie einreichen müssen, und wenn Sie mehr als 10.000 davon einreichen, haben Sie zu wenig Speicherplatz.

Nun, Sie werden immer nur 4 Threads haben, die zu einer bestimmten Zeit ausgeführt werden, warum sollten Sie versuchen, alle 5 Millionen Tasks anzuordnen? Sie können ein ähnliches Konstrukt verwenden, um die Anzahl der Aufgaben zu begrenzen, die zu einem bestimmten Zeitpunkt in der Warteschlange stehen. Was Sie daraus machen sollten ist, dass zu jedem Zeitpunkt nur 4 Aufgaben ausgeführt werden.

Offensichtlich ist die Auflösung zu diesem einen Executors.newFixedThreadPool(4) zu verwenden.

+1

Genau. Die Task, die an den zugrunde liegenden Executor übergeben wird, der 'command' ausführt, gibt den Semaphor vor dem Abschluss frei, aber dies geschieht im Executor-Thread, der den Task ausführt. Dadurch kann eine andere Aufgabe an den Executor übergeben und einem neuen Thread übergeben werden, wodurch theoretisch doppelt so viele Threads wie "gebunden" mit den richtigen Bedingungen zugelassen werden. –

+1

@John: Es drosselt die Anzahl der eingereichten Aufgaben, aber in einer unvorhersehbaren und unzuverlässigen Art und Weise. Wenn Threads in unglücklicher Weise geplant werden, könnte dies bedeuten, dass sich viele einreichende Threads einschleichen könnten. Auch im Falle eines 'newFixedThreadPool()' können diese Tasks immer noch in der unbegrenzten Warteschlange des Executors anhäufen , riskieren immer noch nicht genügend Speicher. @David: Ich habe so viel wie 11 Threads mit einer Grenze von 4 bekommen. Außerdem finde ich es witzig, dass das Nachschlagewerk über Java-Concurrency noch Race Conditions enthält. –

+0

@Bossie Sie haben Recht, Sie sind OOM immer noch mit einem newFixedThreadPool suspekt, aber das würde verhindern, dass die überschüssigen Threads erstellt werden. Sie verwenden eine Grenze, um zu verhindern, dass, wie bereits erwähnt, die Millionen von Aufgaben gleichzeitig eingereicht werden. –

10

BoundedExecutor war in der Tat als eine Veranschaulichung der Drohung Aufgabe Aufgabe, nicht als eine Möglichkeit, eine Grenze auf Thread-Pool-Größe zu platzieren gedacht. Es gibt direktere Wege, Letzteres zu erreichen, wie zumindest ein Kommentar herausstellte.

Aber auch die anderen Antworten nicht den Text in dem Buch erwähnt, das sagt eine unbegrenzte Warteschlange zu verwenden und zu

set the bound on the semaphore to be equal to the pool size plus the number of queued tasks you want to allow, since the semaphore is bounding the number of tasks both currently executing and awaiting execution. [JCiP, end of section 8.3.3]

Durch unbeschränkte Warteschlangen und Poolgröße zu nennen, wurden implizieren wir (offenbar nicht ganz klar) die Verwendung eines Thread-Pools begrenzter Größe.

Was mich bei BoundedExecutor immer gestört hat, ist, dass es die ExecutorService-Schnittstelle nicht implementiert. Ein moderner Weg, um eine ähnliche Funktionalität zu erreichen und dennoch die Standardschnittstellen zu implementieren, wäre die Verwendung der Guava-Methode listeningDecorator und der ForwardingListeningExecutorService-Klasse.

+0

Eine Antwort von einem der Experten, das ist großartig. Danke, dass du die Dinge ein wenig klarer gemacht hast, Tim. Diese 'ListenableFutures' sehen interessant aus. Also sollte ich ein paar Aufgaben einreichen, und wenn man fertig ist, reiche ich einen neuen im Callback ein, richtig? –

+0

+1 aus der Quelle –

+0

@Bossie - Sie könnten das tun, sicher, aber ich meinte, dass Sie weiterhin die Semaphore Ansatz von BoundedExecutorService verwenden können, indem Sie die Ausführung mit einem Erwerb und dem Rückruf mit einem Release dekorieren. –