Ich möchte Nachrichten aus einer RabbitMq-Warteschlange parallel verarbeiten. Die Warteschlange ist so konfiguriert, dass sie autoAck = false ist. Ich verwende die camel-rabbitMQ-Unterstützung für camel endpoints, die Unterstützung für einen threadPoolSize-Parameter hat, aber dies hat nicht den gewünschten Effekt. Nachrichten werden weiterhin seriell aus der Warteschlange verarbeitet, auch wenn threadpoolsize = 20 ist.Rabbit Mq Java-Client parallelen Verbrauch
Vom Debugging über den Code kann ich sehen, dass der Parameter threadpoolsize verwendet wird, um einen ExecutorService zu erstellen, der verwendet wird, um wie in here beschrieben an die Kaninchen-confactfactor übergeben zu werden. Das alles sieht gut aus bis Sie in den Hasen ConsumerWorkService
gelangen. Hier werden Nachrichten in Blöcken von maximal 16 Nachrichten verarbeitet. Jede Nachricht in einem Block wird seriell verarbeitet und dann, wenn mehr Arbeit zu erledigen ist, wird der Executor-Dienst mit dem nächsten Block aufgerufen. Ein Codeschnipsel davon ist unten. Von dieser Verwendung des Executor-Dienstes kann ich nicht sehen, wie die Nachrichten parallel verarbeitet werden können. Der Executorservice hat immer nur eine Aufgabe zu erledigen.
Was fehlt mir?
private final class WorkPoolRunnable implements Runnable {
public void run() {
int size = MAX_RUNNABLE_BLOCK_SIZE;
List<Runnable> block = new ArrayList<Runnable>(size);
try {
Channel key = ConsumerWorkService.this.workPool.nextWorkBlock(block, size);
if (key == null) return; // nothing ready to run
try {
for (Runnable runnable : block) {
runnable.run();
}
} finally {
if (ConsumerWorkService.this.workPool.finishWorkBlock(key)) {
ConsumerWorkService.this.executor.execute(new WorkPoolRunnable());
}
}
} catch (RuntimeException e) {
Thread.currentThread().interrupt();
}
}
Können Sie ConsumerWorkService so konfigurieren, dass eine andere Blockgröße verwendet wird? –
Hi Claus, ich habe einige Änderungen an der Camel-rabbitmq-Komponente über Github als Fergus Nelson vorgenommen. Ich habe Änderungen am RabbitMqConsumer vorgenommen, um für jeden benötigten Konkurenten einen Kanal einzurichten. Ich werde eine Jira + Pull-Anfrage erstellen, wenn ich alles getestet habe. –
@ mR_fr0g, wie ich verstehe, haben Sie das Problem behoben, indem Sie mehrere Kanäle in der Camel-RabbitMQ-Komponente erstellen. Könnten Sie den Link zu Ihrem Jira-Ticket angeben, eine Anfrage stellen und angeben, in welcher Camel-Version der Fix vorhanden ist? – wheleph