Wir haben ein Projekt, das Spring-AMQP verwendet, um Nachrichten von unserem RabbitMQ-Broker zu konsumieren. Wir möchten die Nebenläufigkeit auf der konsumierenden Seite erhöhen, so dass mehrere Worker-Threads Nachrichten parallel verarbeiten können. Ich begann mit dem Lesen der Dokumentation für den nativen RabbitMQ-Client, und das führte mich zu einem Design, bei dem ein einzelner Consumer und ein Prefetch-Count> 1 zur Steuerung der Parallelität verwendet wurden. Wenn Sie den RabbitMQ-Client direkt verwenden, scheint das ziemlich natürlich zu sein. Die handleDelivery
Methode der DefaultConsumer
kann eine neue Runnable
spawnen, die die Arbeit erledigt und die Nachricht am Ende der Arbeit bestätigt. Die Vorabrufzählung steuert effektiv die maximale Anzahl von Runnable
s, die der Verbraucher erzeugt.Spring AMQP Single Consumer-Parallelität mit Prefetch
Dieses Design scheint jedoch nicht in der Spring-AMQP-Welt übersetzbar. In der SimpleMessageListenerContainer
werden für jeden AMQP-Verbraucher alle Nachrichten in einen einzigen BlockingQueueConsumer
geliefert, und ein einzelner Thread liefert Nachrichten aus der blockierenden Warteschlange BlockingQueueConsumer
an die MessageListener
. Obwohl die SimpleMessageListenerContainer
eine TaskExecutor
unterstützt, wird die TaskExecutor
nur verwendet, um eine Aufgabe pro Verbraucher auszuführen. Um also mehrere Nachrichten parallel zu verarbeiten, müssen mehrere Benutzer verwendet werden.
Dies führt mich zu ein paar Fragen über Parallelität mit Spring-AMQP. Erstens, ist mein anfängliches Design mit Einzelverbraucher und hohem Prefetch ein gültiger Weg, Parallelität mit AMQP zu erreichen? Wenn ja, warum verzichtet Spring-AMQP auf dieses Design zugunsten eines Threads-pro-Consumer-Designs? Und ist es möglich, Spring-AMQP so anzupassen, dass Single-Consumer-Parallelität möglich ist?