2016-05-03 5 views
1

Wir haben ein Projekt, das Spring-AMQP verwendet, um Nachrichten von unserem RabbitMQ-Broker zu konsumieren. Wir möchten die Nebenläufigkeit auf der konsumierenden Seite erhöhen, so dass mehrere Worker-Threads Nachrichten parallel verarbeiten können. Ich begann mit dem Lesen der Dokumentation für den nativen RabbitMQ-Client, und das führte mich zu einem Design, bei dem ein einzelner Consumer und ein Prefetch-Count> 1 zur Steuerung der Parallelität verwendet wurden. Wenn Sie den RabbitMQ-Client direkt verwenden, scheint das ziemlich natürlich zu sein. Die handleDelivery Methode der DefaultConsumer kann eine neue Runnable spawnen, die die Arbeit erledigt und die Nachricht am Ende der Arbeit bestätigt. Die Vorabrufzählung steuert effektiv die maximale Anzahl von Runnable s, die der Verbraucher erzeugt.Spring AMQP Single Consumer-Parallelität mit Prefetch

Dieses Design scheint jedoch nicht in der Spring-AMQP-Welt übersetzbar. In der SimpleMessageListenerContainer werden für jeden AMQP-Verbraucher alle Nachrichten in einen einzigen BlockingQueueConsumer geliefert, und ein einzelner Thread liefert Nachrichten aus der blockierenden Warteschlange BlockingQueueConsumer an die MessageListener. Obwohl die SimpleMessageListenerContainer eine TaskExecutor unterstützt, wird die TaskExecutor nur verwendet, um eine Aufgabe pro Verbraucher auszuführen. Um also mehrere Nachrichten parallel zu verarbeiten, müssen mehrere Benutzer verwendet werden.

Dies führt mich zu ein paar Fragen über Parallelität mit Spring-AMQP. Erstens, ist mein anfängliches Design mit Einzelverbraucher und hohem Prefetch ein gültiger Weg, Parallelität mit AMQP zu erreichen? Wenn ja, warum verzichtet Spring-AMQP auf dieses Design zugunsten eines Threads-pro-Consumer-Designs? Und ist es möglich, Spring-AMQP so anzupassen, dass Single-Consumer-Parallelität möglich ist?

Antwort

1

Spring AMQP wurde gegen eine frühere Version der Hase-Client-Bibliothek entwickelt, die nur einen Thread pro Verbindung hatte.

Die Methode handleDelivery des DefaultConsumer kann ein neues Runnable generieren, das die Arbeit ausführt und die Nachricht am Ende der Arbeit bestätigt.

, die nicht wirklich Sie viel mehr kaufen, als einfach die concurrentConsumers Vergrößern - der einzige Unterschied ist für jeden Thread ein Verbraucher gibt es, aber es gibt nicht eine ganze Menge Aufwand gibt.

können Sie tun, was Sie wollen, aber eine ChannelAwareMessageListener verwenden und die acknowledgeMode auf MANUAL eingestellt - dann Hörer für acking die Nachricht (en) verantwortlich ist.

In 2.0 (nächstes Jahr) werden wir einen alternativen Listener-Container haben, der den Listener direkt im Thread der Client-Bibliothek aufruft. Aber es ist eine Menge Arbeit. This (closed) pull request has an initial PoC, aber es ist noch kein voll funktionsfähiger Container.