2016-06-30 21 views
1

Wir haben einen 2-Knoten-RabbitMQ-Cluster mit Ha-All-Politik. Wir verwenden Spring AMQP in unserer Anwendung, um mit RabbitMQ zu sprechen. Produzent Teil funktioniert gut, aber Verbraucher arbeitet für einige Zeit und Pausen. Produzent und Konsument laufen als unterschiedliche Anwendungen. Weitere Informationen zum Verbraucherteil.Spring Amqp Consumer pausiert nach einer Laufzeit von

  • wir verwenden SimpleMessageListenerContainer mit ChannelAwareMessageListener, verwenden Handbuch ack Modus und Standard prefetch(1)
  • In unserer Anwendung wir Warteschlange erstellen (on-demand), und fügen Sie den Hörer
  • Wenn wir mit 10 ConcurrentConsumers und 20 gestartet MaxConcurrentConsumers, geschieht der Verbrauch für etwa 15 Stunden und pausiert. Diese Situation tritt innerhalb von 1 Stunde, wenn wir erhöhen die MaxConcurrentConsumers bis 75.

Auf RabbitMQ UI sehen wir Kanäle mit 3/4 un ack ed Nachrichten auf dem Kanal Registerkarte, wenn diese Situation eintritt, bis sie haben dann nur 1 un ack ed Nachricht.

Unser Thread-Dump war ähnlich wie this. Aber mit einem Heartbeat von 60 konnte diese Situation nicht verbessert werden.

Der meiste Thread-Dump weist die folgende Meldung auf. Bei Bedarf werde ich den gesamten Thread-Dump anhängen. Lassen Sie es mich wissen, wenn ich irgendwelche Einstellungen verpasse, die den Verbraucher zum Pausieren veranlassen könnten?

"pool-6-thread-16" #86 prio=5 os_prio=0 tid=0x00007f4db09cb000 nid=0x3b33 waiting on condition [0x00007f4ebebec000] 
    java.lang.Thread.State: WAITING (parking) 
    at sun.misc.Unsafe.park(Native Method) 
    - parking to wait for <0x00000007b9930b68> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) 
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) 
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) 
    at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350) 
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$InternalConsumer.handleDelivery(BlockingQueueConsumer.java:660) 
    at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:144) 
    at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:99) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

Mehr Infos Wir dynamisch hinzufügen und entfernen Warteschlangen SimpleMessageListenerContainer und wir vermuten, dass dies ein Problem verursacht, weil jedes Mal, wenn wir hinzufügen oder eine Warteschlange von dem Hörer entfernen, alle BlockingQueueConsumer werden entfernt und wieder angelegt. Glauben Sie, dass dies zu diesem Problem führen kann?

+0

Sie müssen den vollständigen Thread Dump irgendwo, wahrscheinlich nicht hier, weil es zu groß ist; vielleicht etwas wie Pastebin oder Github. Wahrscheinlich steckt der Container-Thread irgendwo in Ihrem Code. –

+0

@GaryRussell: http://pastebin.com/UrBLfn2C ist die Einfügemarke, die vollständigen Thread-Dump enthält. – Kot

+0

Der Thread-Dump sieht gut aus - alle Container-Threads warten in 'nextMessage()', so scheint es, dass Ihre aktuelle Theorie korrekt ist - etwas im Netzwerk hat die Verbindung stillgelegt - einige Router tun das für inaktive Verbindungen. Wenn Sie den angeforderten Heartbeat einstellen, sollte die Verbindung erhalten bleiben - Sie müssen einen Netzwerkmonitor (tcpdump, wireshark usw.) verwenden, um es herauszufinden. –

Antwort

0

Ihr Problem ist irgendwo im Ziel-Listener downstream. diese

Schauen Sie, prefetch(1) Ursache:

this.queue = new LinkedBlockingQueue<Delivery>(prefetchCount); 

Und weiter, wenn wir nicht, dass die Warteschlange abfragen, was wir hier haben?

BlockingQueueConsumer.this.queue.put(new Delivery(consumerTag, envelope, properties, body)); 

Rechts, Parken auf Schloss.

0

AMQP-621 wird jetzt zu Master zusammengeführt; Wir werden 1.6.1.RELEASE in den nächsten Tagen veröffentlichen.