2016-07-14 14 views
2

Ich habe zwei Anwendungen (nennen wir sie A und B) in einer Pipeline.Umgang mit Strömung während der Skalierung rabbitmq

App A ist eine Spring Boot/Spring Integration App, die Nachrichten X aus der Warteschlange 1 liest, einige Arbeit, und sendet eine große Anzahl von Nachrichten Y in Warteschlange 2 basierend - für jedes X aus Warteschlange 1, etwa 300 Y wird in 2 veröffentlicht. Ein einzelner Thread behandelt die Arbeit in jedem X und veröffentlicht die einzelnen Nachrichten. Das Optimieren von App A hat mich an einen Ort gebracht, wo eine einzelne Instanz etwa 50 X pro Sekunde bestätigen kann; So veröffentlicht etwa 15.000 Y pro Sekunde zu Warteschlange 2.

App B ist auch eine Spring Boot/Spring Integration App, Ys aus der Warteschlange 2 liest, und aggregiert sie, die Pipeline zu beenden. Eine einzelne Instanz von B behandelt etwa 7-8k Y pro Sekunde.

Also, zusammenfassend, mit 1 A, 2 Bs und einem ziemlich großen RabbitMQ-Server (ein AWS r3.4xlarge), traf ich etwa 50 X und 15000 Y pro Sekunde.

Ich habe versucht, diesen Prozess zu vergrößern; zumindest möchte ich 100 X/30000 Y pro Sekunde treffen. Da die Logik in diesen Apps horizontal skaliert werden kann, habe ich versucht, die Bereitstellung zu verdoppeln. d.h. 2 As und 4 Bs.

Das Hochskalieren des As hat jedoch nicht den erwarteten Effekt; Bestätigungen von Xs bleiben ungefähr gleich bei 50/s und Ys bleiben auch bei ungefähr 15k pro Sekunde stabil, wobei Warteschlange 2 mehr oder weniger leer bleibt.

Eine genauere Überprüfung zeigt, dass A's Veröffentlichungskanäle im flow Modus sind, vermutlich weil 15k eine Sekunde viel in die Warteschlange 2 passen. Die Begrenzung scheint jedoch nicht nur eine Warteschlange zu sein, die 15k Y pro Sekunde empfängt; Wenn ich die Bindungen so ändere, dass die Ys stattdessen in einer Warteschlange ohne irgendwelche Konsumenten veröffentlicht werden, dann treffe ich die erwarteten 100 X/30k Y eine Sekunde.

Warum kann ich nicht scheinen, 30k Y eine Sekunde in Warteschlange 2 zu bekommen?

Weitere Details:

  • Warteschlange 2 wird als dauerhaft erklärt mit DLX
  • Y-Nachrichten werden veröffentlicht als nicht-persistent

Der Verlag Erklärung wie folgt aussieht:

@Bean 
public IntegrationFlow outboundFlow(ConnectionFactory connectionFactory, MessageConverter jsonNodeMessageConverter, 
            AmqpHeaderMapper headerMapper) { 
    RabbitTemplate outboundTemplate = new RabbitTemplate(connectionFactory); 
    outboundTemplate.setMessageConverter(jsonNodeMessageConverter); 

    return IntegrationFlows 
      .from(BeanNames.OUTPUT_CHANNEL) 
      .split() 
      .handle(Amqp.outboundAdapter(outboundTemplate) 
        .exchangeName(OUTBOUND_EXCHANGE_NAME) 
        .headerMapper(headerMapper) 
        .routingKeyExpression("headers." + ApplicationHeaders.DESTINATION_ROUTING_KEY)) 
      .get(); 
} 

@Bean 
public AmqpHeaderMapper headerMapper() { 
    return new DefaultAmqpHeaderMapper() { 
     @Override 
     protected void populateStandardHeaders(Map<String, Object> headers, MessageProperties amqpMessageProperties) { 
      super.populateStandardHeaders(headers, amqpMessageProperties); 
      amqpMessageProperties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT); 
     } 
    }; 
} 

Aktualisierung

Ich kann hier mehr Einstellungen vornehmen.

Zur Applikation A:

  • Verbraucher/Themen: 16
  • Prefetch für X in Anwendung A: 50
  • kein Verlag Bestätigungen
  • non-persistent

Zur Applikation B:

  • Verbraucher/Themen: 16
  • Prefetch für Y in Anwendung B: 250
  • tx Größe: 250

In RabbitMQ (was ich auf ein r3.8xlarge aktualisiert haben):

Ich habe sogar die Flüsse an diesem Punkt aufgeteilt; zwei Anwendungen A lesen eine Warteschlange 1 ab, veröffentlichen sie jedoch in zwei Warteschlangen 2s - 2a und 2b. Die acks/s in der Warteschlange 1 haben sich überhaupt nicht geändert, und die Summe von acks/s auf 2a und 2b ist die gleiche wie zu der Zeit, als es nur eine Warteschlange 2 gab. Tatsächlich sind die Verbindungen von Anwendung A zu dem Hasen und Die Kanäle von Anwendung A zu den Warteschlangen 2a und 2b gehen routinemäßig immer noch in flow.

Alles, was sich geändert hat, ist, dass ich noch weniger Hardware-Ressourcen nutze als zuvor (weil ich den Hasen-Server aufgerüstet habe) - der Hase spuckt nie über 30% CPU und der Speicherverbrauch bleibt lächerlich niedrig.

Antwort

0

Ich nehme an, Sie verwenden keine Transaktionskanäle auf der Herstellerseite.

Versuchen Sie, die prefetchCount und txSize auf den Y Consumer Listener Container zu erhöhen; der erste puffert Nachrichten im Container; die zweite wird den Ack-Verkehr reduzieren.

EDIT

Für RabbitMQ Besonderheiten finden Finding bottlenecks with RabbitMQ.

+0

Ich hatte bereits prefetchCount auf etwa 250 gesetzt, aber ich werde das aufbocken und versuchen, die TX-Größe einen Haufen zu erhöhen. – jwilner

+0

Dieser Link wurde gerade als Antwort auf eine andere Frage an die Google-Gruppe von rabbitmq-users gesendet. https://www.rabbitmq.com/blog/2014/04/14/finding-bottlenecks-with-rabbitmq-3-3/ –

+0

Hier ist ein Link zum vollständigen Thread: https://groups.google.com/forum/#! topic/rabbitmq-users/DjRtCaqNNuI –