2013-02-11 2 views
11

Ich habe eine Java-Anwendung, die Worker-Threads hat, um Jobs zu verarbeiten. Ein Arbeiter erzeugt ein Ergebnisobjekt, so etwas sagen wie:Java Thread-sichere Übergabe von Sammlungsobjekten von einem Thread zum anderen

class WorkerResult{ 
    private final Set<ResultItems> items; 
    public Worker(Set<ResultItems> pItems){ 
     items = pItems; 
    } 
} 

Wenn der Arbeiter beendet, es tut diese Operation:

... 
final Set<ResultItems> items = new SomeNonThreadSafeSetImplSet<ResultItems>(); 
for(Item producedItem : ...){ 
     items.add(item); 
} 
passToGatherThread(items); 

Der items Satz ist eine Art "unit-of-Arbeit" hier . Die Methode passToGatherThread übergibt die items-Gruppe an einen Sammel-Thread, von dem zur Laufzeit nur einer existiert.

Synchronisation ist hier nicht erforderlich, da Race-Bedingungen nicht auftreten können, weil nur ein Thread (Gather-Thread) den items-Satz liest. AFAICS, der Gather-Thread sieht möglicherweise nicht alle Elemente, weil das Set nicht Thread-sicher ist, oder?

Angenommen, ich kann passToGatherThread nicht synchronisieren, sagen, weil es eine 3rd-Party-Bibliothek ist. Was ich grundsätzlich befürchte ist, dass der Sammel-Thread nicht alle Elemente wegen Caching, VM-Optimierungen, etc. sieht. Hier kommt die Frage: Wie man die Elemente thread-sicher weitergibt, so dass der Gather-Thread "sieht" die richtige Menge von Gegenständen?

+0

Ich bin mir nicht sicher, aber vielleicht [PipedStreams] (http://docs.oracle.com/javase/6/docs/api/java/io/PipedInputStream.html) kann Ihnen helfen? – oliholz

+1

Was ist die Definition von "passToGatherhthread"? Ich denke, dies ist entscheidend für das Verständnis, ob die unten angegebenen Antworten richtig sind. Wie genau werden Items an den Sammel-Thread übergeben? –

+0

Haben Sie wirklich Probleme mit der Sichtbarkeit der Objekte oder vermuten Sie nur ein solches Verhalten? Ihre Sorgen erscheinen unter diesen Umständen ziemlich weit hergeholt. – Dariusz

Antwort

1

Ich habe darüber nachgedacht, (und diskutiert) diese Frage viel und ich habe mit einer anderen Antwort kommen, die, wie ich hoffe, die beste Lösung sein.

Das Überschreiten einer synchronisierten Sammlung ist in Bezug auf die Effizienz nicht gut, da jede nachfolgende Operation in dieser Sammlung synchronisiert wird. Wenn es viele Operationen gibt, kann sich dies als Handicap erweisen.

Auf den Punkt gebracht: lassen Sie uns einige Annahmen getroffen werden (was ich nicht zustimmen):

  • die erwähnte passToGatherThread Verfahren in der Tat unsicher ist jedoch unwahrscheinlich es scheint
  • Compiler können die Ereignisse im Code neu anordnen so dass die passToGatherThread vor der Sammlung gefüllt
  • genannt wird

die einfachste, sauberste und möglicherweise der effizienteste Weg, um sicherzustellen, dass die zu gatherer Methode übergebene Sammlung ist fertig und abgeschlossen ist t o setzen die Sammlung Push in einem synchronisierten Block, wie folgt aus:

synchronized(items) { 
    passToGatherThread(items); 
} 

diese Weise können wir eine Speicher Synchronisation gewährleisten und eine gültige passieren-vor-Sequenz vor der Sammlung übergeben wird, damit sicher, dass alle Objekte korrekt übergeben werden.

+1

Ich denke, du hast Recht, und am Ende bin ich genau auf dieses Problem gestoßen. Es sieht ein bisschen hässlich aus, nicht wahr? Synchronisieren auf einem neu erstellten Set, das für die meisten Leute einfach albern aussieht ... ;-) Danke für deine Antwort. – xSNRG

1

Es scheint hier kein Synchronisationsproblem zu geben. Sie erstellen für jedes passToGatherThread ein neues Set-Objekt, und zwar nach dem Ändern des Sets. Keine Objekte werden verloren gehen.

Auf Set (und die meisten Java-Sammlungen) kann gleichzeitig mit vielen Threads zugegriffen werden, sofern keine Änderungen an der Auflistung vorgenommen werden. Das ist, was Collections.unmodifiableCollection ist.

Da die erwähnte Methode passToGatherThread als Kommunikation mit einem anderen Thread dient, muss sie eine Art von Synchronisation verwenden - und jede Synchronisation gewährleistet die Konsistenz des Speichers zwischen den Threads.

Auch - bitte beachten Sie, dass alle Schreibvorgänge auf die Objekte in der übergebenen Sammlung gemacht werden, bevor es an den anderen Thread übergeben wird. Selbst wenn der Speicher in den lokalen Cache des Threads kopiert wird, hat er denselben unveränderten Wert wie der andere Thread.

+1

AFAIK die JVM darf Daten nach Möglichkeit in den Prozessorkern-Registern zwischenspeichern. Beim Schreiben von Daten ohne "Leeren" (unter Verwendung von synchronisierten/flüchtigen/etc.) Können andere Threads "veraltete" Werte oder sogar gar nicht sehen, da die Semantik "vorgefallen" nur in dem Thread gültig ist, der die Daten erzeugt. – xSNRG

+0

@xSNRG hat einen Absatz hinzugefügt. Vorausgesetzt, die Pass-Methode ist gültig, sollten Sie sich keine Sorgen machen. – Dariusz

+0

Darüber hinaus - wenn das nicht funktionierte, würde eine verdammt viele Anwendungen nicht funktionieren. – Dariusz

1

Sie können einfach eine der Thread-sicheren Implementierungen von Set verwenden, die Java für Ihre WorkerResult bereitstellt. Siehe zum Beispiel:

Eine weitere Option ist Collections.synchronizedSet() zu verwenden.

+0

Warum wird das benötigt? Was ist falsch an der Verwendung einer nicht threadsicheren Set-Implementierung? –

+0

Ich denke, das OP hat Angst vor dem Caching. Threads können nicht-flüchtige Daten zwischenspeichern, und wenn Schreibvorgänge darauf nicht synchronisiert sind, wird niemals ein Update angezeigt. – joergl

+1

Ja joergl ist absolut richtig. Siehe meinen Kommentar unter der Antwort von Dariusz Wawer. – xSNRG

0

Der Arbeiter aufrufbar implementiert und gibt WorkerResult:

class Worker implements Callable<WorkerResult> { 
    private WorkerInput in; 

    public Worker(WorkerInput in) { 
     this.in = in; 
    } 

    public WorkerResult call() { 
     // do work here 
    } 
} 

Dann verwenden wir ein ExecutorService den Thread-Pool zu verwalten, und sammeln Sie die Ergebnisse über zukünftige Verwendung.

public class PooledWorkerController { 

    private static final int MAX_THREAD_POOL = 3; 
    private final ExecutorService pool = 
     Executors.newFixedThreadPool(MAX_THREAD_POOL); 

    public Set<ResultItems> process(List<WorkerInput> inputs) 
      throws InterruptedException, ExecutionException{   
     List<Future<WorkerResult>> submitted = new ArrayList<>(); 
     for (WorkerInput in : inputs) { 
      Future<WorkerResult> future = pool.submit(new Worker(in)); 
      submitted.add(future); 
     } 
     Set<ResultItems> results = new HashSet<>(); 
     for (Future<WorkerResult> future : submitted) { 
      results.addAll(future.get().getItems()); 
     } 
     return results; 
    } 
} 
+0

So erstellen Sie einen Thread jedes Mal, wenn ein Job gestartet wird. -1 dafür. – Dariusz

+0

in Ihrem Beitrag haben Sie eine schlechte Praxis des Erstellens von Thread jedes Mal eingeführt, wenn ein Paket von Jobs aufgerufen wird. Die Erstellung von Threads ist sehr kostenintensiv und wenn möglich, sollten Threads einmal erstellt und wiederverwendet werden. Ihr Code wäre IMO viel, viel besser, wenn Sie einen einzigen statisch initialisierten Thread-Pool hätten, der in 'process' verwendet würde. Das Erstellen eines neuen Thread-Pools für jeden 'Prozess'-Aufruf ändert nicht viel. – Dariusz