2016-05-18 13 views
0

Ich stelle ein Producer/Consumer-Setup mit Kombu auf Redis zusammen, aber ich stoße auf ein Problem. Wenn ich einen Verbraucher starte und dann den Producer mit der Reichweite (10000) starte, kann ich bestätigen, dass der Produzent alle 10k Artikel in die Warteschlange gestellt hat, aber nicht alle 10k Artikel vom Konsumenten erhalten werden. Gibt es eine Einschränkung, die mir nicht bekannt ist, weder mit Kombu noch mit Redis? Es scheint korrekt mit Bereich (9000) zu funktionieren, und alle Schlüssel/Acks sind ordnungsgemäß geleert.Kombu beim Redisken von Nachrichten

class ProduceConsume(object): 
    def __init__(self, exchange_name): 
     exchange = Exchange(exchange_name, type='fanout', durable=False) 
     self.queue_name = 'test_queue' 
     self.queue = Queue(self.queue_name, exchange) 

    def producer(self, inp): 
     with BrokerConnection("redis://localhost:6379/15") as conn: 
      with conn.SimpleQueue(self.queue) as queue: 
       for payload in inp: 
        queue.put(str(payload).zfill(5)) 
        print(str(payload).zfill(5)) 

    def consumer(self): 
     with BrokerConnection("redis://localhost:6379/15") as conn: 
      with conn.SimpleQueue(self.queue) as queue: 
       while True: 
        message = queue.get() 
        message.ack() 
        print(message.payload) 

Antwort

0

Keine vollständige Antwort, aber die Verwendung von RabbitMQ anstelle von Redis hatte nicht das Problem, dass Nachrichten verloren gingen. Möglicherweise aufgrund von Problemen mit nicht direktem Austausch? Fühlen Sie sich frei, eine informiertere Antwort zu senden, aber hoffentlich kann dies jemandem helfen.

Update: Dies ist ein Bug in Kombu; siehe https://github.com/celery/kombu/issues/593