2016-06-01 8 views
1

Ich habe zwei Server, nennen sie A und B. B läuft RabbitMQ, während A über Kombu zu RabbitMQ verbindet. Wenn ich RabbitMQ auf B neu starte, bricht die Kombu-Verbindung ab und die Nachrichten werden nicht mehr zugestellt. Ich muss dann den Prozess auf A zurücksetzen, um die Verbindung wiederherzustellen. Gibt es einen besseren Ansatz, d. H. Gibt es eine Möglichkeit für Kombu, sich automatisch neu zu verbinden, selbst wenn der RabbitMQ-Prozess neu gestartet wird?Kombu nicht wieder an RabbitMQ

Meine grundlegende Code-Implementierung ist unten, danke im Voraus! :)

def start_consumer(routing_key, incoming_exchange_name, outgoing_exchange_name): 
    global rabbitmq_producer 

    incoming_exchange = kombu.Exchange(name=incoming_exchange_name, type='direct') 
    incoming_queue = kombu.Queue(name=routing_key+'_'+incoming_exchange_name, exchange=incoming_exchange, routing_key=routing_key)#, auto_delete=True) 

    outgoing_exchange = kombu.Exchange(name=outgoing_exchange_name, type='direct') 
    rabbitmq_producer = kombu.Producer(settings.rabbitmq_connection0, exchange=outgoing_exchange, serializer='json', compression=None, auto_declare=True) 

    settings.rabbitmq_connection0.connect() 
    if settings.rabbitmq_connection0.connected: 
     callbacks=[] 
     queues=[] 

     callbacks.append(callback) 
     # if push_queue: 
     # callbacks.append(push_message_callback) 
     queues.append(incoming_queue) 

     print 'opening a new *incoming* rabbitmq connection to the %s exchange for the %s queue' % (incoming_exchange.name, incoming_queue.name) 
     incoming_exchange(settings.rabbitmq_connection0).declare() 
     incoming_queue(settings.rabbitmq_connection0).declare() 

     print 'opening a new *outgoing* rabbitmq connection to the %s exchange' % outgoing_exchange.name 
     outgoing_exchange(settings.rabbitmq_connection0).declare() 

     with settings.rabbitmq_connection0.Consumer(queues=queues, callbacks=callbacks) as consumer: 
      while True: 
       settings.rabbitmq_connection0.drain_events() 
+0

Es ist nicht für Kombu, aber ich schrieb vor kurzem ein robustes Verbraucherbeispiel für meine eigene Bibliothek. Wenn überhaupt, könntest du es wahrscheinlich für Kombu/Pika verwenden. https://github.com/eandersson/amqpstorm/blob/stable/examples/scalable_consumer.py – eandersson

Antwort

0

Auf der Verbraucherseite, kombu.mixins.ConsumerMixin Wiederverbindung behandelt, wenn die Verbindung geht weg (und auch tut Herzschlag, etc. und lässt Sie weniger Code schreiben). Es scheint nicht ein ProducerMixin zu sein, aber leider könnten Sie möglicherweise in den Code eindringen und ihn anpassen ...?