2016-07-17 13 views
0

Ich habe eine Route wie unten erwähnt. Die Route fragt in regelmäßigen Abständen ein Verzeichnis ab und liest eine CSV-Datei großer Größe. Es teilt dann die Dateien in einem Block von 1000 Zeilen und sendet es an die seda queue(firstQueue). Ich habe 15 gleichzeitige Verbraucher in dieser Warteschlange.Concurrent Consumer von Seda in Apache Camel

route.split().tokenize("\n", 1000).streaming().to("seda:firstQueue?concurrentConsumers=15").process(myProcessor).to("seda:secondQueue?concurrentConsumers=15").process(anotherMyProcessor); 

1) Was 15 gleichzeitige Verbraucher tut bedeutet - tut es bedeutet, 15 Threads Daten aus dem seda lesen und es auf eine Instanz von myProcessor passieren? Oder 15 separate Instanzen von myProcessor erstellt werden jeweils auf die gleiche Kopie der Daten handeln? Beachten Sie, dass myProcessor ein Singleton ist, was passieren wird, wenn ich es zum Prototyp ändere.

2) Ist es möglich, dass zwei oder mehr Threads die gleichen Daten auswählen und an die myProcessor übergeben? Oder ist garantiert, dass keine zwei Threads die gleichen Daten haben?

Schätzen Sie eine schnelle Antwort. Vielen Dank!

+0

Ich benutze eine ziemlich alte Version von Camel - 2.10.1. Ist jemandem bekannt, ob es in dieser Version Probleme mit seda und concurrentConsumers gab und in der neueren Version behoben wurde? –

Antwort

1

Mein Kamel ist ein bisschen rostig, aber ich bin mir ziemlich sicher, dass

  1. Es gibt 15 laufenden Threads. Jeder liest eine Nachricht aus der Warteschlange und ruft myProcessor auf. Es gibt nur eine Instanz meines Prozessors, also müssen Sie sicherstellen, dass es threadsicher ist. Ich habe es nie ausprobiert, aber ich glaube nicht, dass die Änderung des Umfangs auf einen Prototyp einen Unterschied machen wird.

  2. Zwei Threads sollten nicht die gleiche Nachricht aus der Warteschlange aufnehmen. Im normalen Betrieb sollte jede Nachricht nur einmal verarbeitet werden. Es gibt jedoch Fehlerbedingungen, die dazu führen, dass die gleiche Nachricht zweimal verarbeitet wird. Die offensichtlichste ist, dass Sie die App während der Verarbeitung der Datei neu starten.

+0

Danke Matt für die Antwort. Also, da SEDA grundsätzlich eine "BlockingQueue" ist, ist sichergestellt, dass keine zwei Threads die gleiche Kopie der Nachricht haben? Sobald ein Thread eine Nachricht aus der Warteschlange empfängt, wird diese Nachricht sofort aus der Warteschlange entfernt. –

+1

Ja, eine blockierende Warteschlange ist genau das, was sie ist. Sobald ein Thread eine Nachricht empfängt, wird sie sofort aus der Warteschlange entfernt, sodass kein anderer Thread die gleiche Nachricht abruft. –

+0

Eine weitere Frage, obwohl ich 15 ConcurrentConsumer definiert habe, sehe ich nur 1 Thread-Verarbeitung (von myProcessor) die Nachrichten. Irgendeine Idee, warum das passiert? –