2016-05-12 11 views
1

Ich habe einige Aussetzer bei den Verbrauchern, die ich mit der pika-Bibliothek für rabbitmq eingerichtet habe. Zusammen mit Pika verwende ich die Twisted-Implementierung, um Async-Konsumenten einzurichten. Ich bin nicht sicher, warum das passiert, aber ich möchte eine Wiederverbindung implementieren, wenn der Verbraucher ausfällt und nicht sicher ist, wie man das macht. Hier ist meine aktuelle ImplementierungSo starten Sie den Verbraucher neu rabbitmq pika python

class Consumer(object): 
def __init__(self, queue, exchange, routingKey, medium, signalRcallbackFunc): 
    self._queue_name = queue 
    self.exchange = exchange 
    self.routingKey = routingKey 
    self.medium = medium 
    print "client on" 
    self.channel = None 
    self.medium.client.on(signalRcallbackFunc, self.callback) 

def on_connected(self, connection): 
    d = connection.channel() 
    d.addCallback(self.got_channel) 
    d.addCallback(self.queue_declared) 
    d.addCallback(self.queue_bound) 
    d.addCallback(self.handle_deliveries) 
    d.addErrback(log.err) 

def got_channel(self, channel): 
    self.channel = channel 
    self.channel.basic_qos(prefetch_count=500) 
    return self.channel.queue_declare(queue=self._queue_name, durable=True) 

def queue_declared(self, queue): 
    self.channel.queue_bind(queue=self._queue_name, 
          exchange=self.exchange, 
          routing_key=self.routingKey) 

def queue_bound(self, ignored): 
    return self.channel.basic_consume(queue=self._queue_name) 

def handle_deliveries(self, queue_and_consumer_tag): 
    queue, consumer_tag = queue_and_consumer_tag 
    self.looping_call = task.LoopingCall(self.consume_from_queue, queue) 

    return self.looping_call.start(0) 

def consume_from_queue(self, queue): 
    d = queue.get() 
    return d.addCallback(lambda result: self.handle_payload(*result)) 

def handle_payload(self, channel, method, properties, body): 
    print(body) 
    print(properties.headers) 
    channel.basic_ack(method.delivery_tag) 
    print "#####################################" + method.delivery_tag + "###################################" 

def callback(self, data): 
    #self.channel.basic_ack(data, multiple=True) 
    pass 

Antwort

1

Sie könnten einen ‚on-close‘ Handler mit der Verbindung innerhalb des on_connected Rückrufs registrieren. Dies wird aufgerufen, wenn die Verbindung unterbrochen wird. Hier können Sie eine neue Verbindung wiederherstellen.

Das folgende Beispiel relativ nützlich ist, und es ist eine Strategie, die ich für eine gute Wirkung verwendet haben ... http://pika.readthedocs.io/en/latest/examples/asynchronous_consumer_example.html

Für die Twisted pika Bibliothek der add_on_close_callback Methode werden Sie wahrscheinlich weit kommen ganz (obwohl ich nicht getestet) . https://pika.readthedocs.io/en/0.10.0/modules/adapters/twisted.html

+0

wie würde ich mich über das Tun dies, mit dem verdrehten Protokoll – Johnathon64

+0

Es scheint, die verdrehte Umsetzung der pika Bibliothek hat ähnliche Methoden zur Verfügung registrieren Hörer für die Verbindung geschlossenen Veranstaltung. Sie könnten versuchen, die 'add_on_close_callback' Methode zu verwenden. Siehe [link] https://pika.readthedocs.io/en/0.10.0/modules/adapters/twisted.html – nkuro

+0

Ich habe das gelesen, bin mir aber nicht sicher, wie ich das in meiner Lösung implementieren soll – Johnathon64

0

Gibt es einen Grund, warum Sie die Verbindung nicht einfach schließen und wieder öffnen können?

@contextmanager 
def with_pika_connection(): 
    credentials = pika.PlainCredentials(worker_config.username, worker_config.password) 
    connection = pika.BlockingConnection(pika.ConnectionParameters(
     host=worker_config.host, 
     credentials=credentials, 
     port=worker_config.port, 
    )) 

    try: 
     yield connection 
    finally: 
     connection.close() 


@contextmanager 
def with_pika_channel(queuename): 
    with with_pika_connection() as connection: 
     channel = connection.channel() 


while True: 
    while not stopping: 
     try: 
       with with_pika_channel(queuename) as (connection, channel): 
        consumer_tag = channel.basic_consume(
         callback, 
         queue=queuename, 
        ) 
        channel.start_consuming() 
     except Exception as e: 
       reportException(e) 
       # Continue