2012-09-14 7 views
11

Ich habe einen Python-Worker-Client, der einen 10 Arbeiter spinnt, die jeweils in eine RabbitMQ-Warteschlange einhaken. Ein bisschen wie folgt aus:Pika + RabbitMQ: basic_qos auf prefetch = 1 scheint immer noch alle Nachrichten in der Warteschlange zu konsumieren

#!/usr/bin/python 
worker_count=10 

def mqworker(queue, configurer): 
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='mqhost')) 
    channel = connection.channel() 
    channel.queue_declare(queue=qname, durable=True) 
    channel.basic_consume(callback,queue=qname,no_ack=False) 
    channel.basic_qos(prefetch_count=1) 
    channel.start_consuming() 


def callback(ch, method, properties, body): 
    doSomeWork(); 
    ch.basic_ack(delivery_tag = method.delivery_tag) 

if __name__ == '__main__': 
    for i in range(worker_count): 
     worker = multiprocessing.Process(target=mqworker) 
     worker.start() 

Das Problem, das ich habe ist, dass basic_qos auf dem Kanal, der erste Arbeiter zu beginnen nimmt alle Nachrichten aus der Warteschlange, während die anderen sitzen untätig trotz Einstellung. Ich kann dies in der Rabbitmq-Schnittstelle sehen, selbst wenn ich worker_count auf 1 setze und 50 Nachrichten in der Warteschlange ablege, gehen alle 50 in den 'unbestätigten' Bucket, während ich erwarte, dass 1 unbestätigt wird und die anderen 49 zu sei bereit.

Warum funktioniert das nicht?

Antwort

14

Ich habe das durch Verschieben gelöst, wo basic_qos aufgerufen wird.

Platzieren Sie es direkt nach channel = connection.channel() scheint das Verhalten zu ändern, was ich erwarten würde.

+0

danke! Das hat das Problem gelöst. und btw das ist sehr schwer zu debuggen .. – Sajuuk

+0

@Hiagara yeah rannte gerade in das heute selbst hinein. Erstaunlich, dass dies fast 5 Jahre später immer noch nicht klar oder in der API dokumentiert ist. – Jordan