Ich habe einen Python-Worker-Client, der einen 10 Arbeiter spinnt, die jeweils in eine RabbitMQ-Warteschlange einhaken. Ein bisschen wie folgt aus:Pika + RabbitMQ: basic_qos auf prefetch = 1 scheint immer noch alle Nachrichten in der Warteschlange zu konsumieren
#!/usr/bin/python
worker_count=10
def mqworker(queue, configurer):
connection = pika.BlockingConnection(pika.ConnectionParameters(host='mqhost'))
channel = connection.channel()
channel.queue_declare(queue=qname, durable=True)
channel.basic_consume(callback,queue=qname,no_ack=False)
channel.basic_qos(prefetch_count=1)
channel.start_consuming()
def callback(ch, method, properties, body):
doSomeWork();
ch.basic_ack(delivery_tag = method.delivery_tag)
if __name__ == '__main__':
for i in range(worker_count):
worker = multiprocessing.Process(target=mqworker)
worker.start()
Das Problem, das ich habe ist, dass basic_qos auf dem Kanal, der erste Arbeiter zu beginnen nimmt alle Nachrichten aus der Warteschlange, während die anderen sitzen untätig trotz Einstellung. Ich kann dies in der Rabbitmq-Schnittstelle sehen, selbst wenn ich worker_count
auf 1 setze und 50 Nachrichten in der Warteschlange ablege, gehen alle 50 in den 'unbestätigten' Bucket, während ich erwarte, dass 1 unbestätigt wird und die anderen 49 zu sei bereit.
Warum funktioniert das nicht?
danke! Das hat das Problem gelöst. und btw das ist sehr schwer zu debuggen .. – Sajuuk
@Hiagara yeah rannte gerade in das heute selbst hinein. Erstaunlich, dass dies fast 5 Jahre später immer noch nicht klar oder in der API dokumentiert ist. – Jordan