2016-06-16 14 views
3

Für die Serverautomatisierung versuchen wir ein Tool zu entwickeln, das viele Aufgaben auf verschiedenen Servern bewältigen und ausführen kann. Wir senden die Aufgabe und den Hostnamen des Servers in eine Warteschlange. Die Warteschlange wird dann von einem Anforderer verbraucht, der die Information an die ansible API liefert. Um das zu erreichen, können wir mehr als eine Aufgabe gleichzeitig ausführen, wir verwenden Threading.Pika: Verbraucht die nächste Nachricht, selbst wenn die letzte Nachricht nicht bestätigt wurde

Jetzt sind wir mit der Bestätigung der Nachricht hängen geblieben ...

Was wir bisher getan haben:
Die requester.py verbraucht die Warteschlange und startet dann einen Thread, in dem die ansible Aufgabe ausgeführt wird. Das Ergebnis wird dann in eine andere Warteschlange gesendet. So erstellt jede neue Nachricht einen neuen Thread. Ist die Aufgabe erledigt, stirbt der Thread.
Aber jetzt kommt schwieriger Teil. Wir müssen die Nachrichten persistent machen, falls unser Server stirbt. So sollte jede Nachricht bestätigt werden nach das Ergebnis von Ansible wurde zurückgesendet.

Unser Problem ist jetzt, wenn wir versuchen, die Nachricht im Thread selbst zu bestätigen, gibt es nicht mehr "gleichzeitig" Arbeit getan, weil die consume von Pika auf die Bestätigung wartet. Wie können wir erreichen, dass die consume Nachrichten konsumiert und nicht auf die Bestätigung wartet? Oder wie können wir unser kleines Programm verbessern oder verbessern?

requester.py

#!/bin/python 

    from worker import * 
    import ansible.inventory 
    import ansible.runner 
    import threading 

    class Requester(Worker): 
     def __init__(self): 
      Worker.__init__(self) 
      self.connection(self.selfhost, self.from_db) 
      self.receive(self.from_db) 

     def send(self, result, ch, method): 
      self.channel.basic_publish(exchange='', 
            routing_key=self.to_db, 
            body=result, 
            properties=pika.BasicProperties(
                delivery_mode=2, 
            )) 

      print "[x] Sent \n" + result 
      ch.basic_ack(delivery_tag = method.delivery_tag) 

     def callAnsible(self, cmd, ch, method): 
      #call ansible api pre 2.0 

      result = json.dumps(result, sort_keys=True, indent=4, separators=(',', ': ')) 
      self.send(result, ch, method) 

     def callback(self, ch, method, properties, body): 
      print(" [x] Received by requester %r" % body) 
      t = threading.Thread(target=self.callAnsible, args=(body,ch,method,)) 
      t.start() 

worker.py

import pika 
    import ConfigParser 
    import json 
    import os 

    class Worker(object): 
     def __init__(self): 
      #read some config files 

     def callback(self, ch, method, properties, body): 
      raise Exception("Call method in subclass") 

     def receive(self, queue): 
      self.channel.basic_qos(prefetch_count=1) 
      self.channel.basic_consume(self.callback,queue=queue) 
      self.channel.start_consuming() 

     def connection(self,server,queue): 
      self.connection = pika.BlockingConnection(pika.ConnectionParameters(
       host=server, 
       credentials=self.credentials)) 
      self.channel = self.connection.channel() 
      self.channel.queue_declare(queue=queue, durable=True) 

Wir mit Python arbeiten sind 2.7 und pika 0.10.0.

Und ja, wir bemerkten in der Pika FAQ: http://pika.readthedocs.io/en/0.10.0/faq.html
, dass Pika nicht Thread sicher ist.

Antwort

3

Deaktivieren Sie die automatische Bestätigung und legen Sie die Anzahl der Vorabholungen auf einen Wert größer als 1 fest, je nachdem, wie viele Nachrichten Ihr Kunde annehmen soll.

Hier ist, wie Prefetch channel.basic_qos(prefetch_count=1), gefunden here eingestellt werden.

+0

Super! Vielen Dank! Wie kann ich diese Prefetch-Zählung übersteuern? Das macht all die Magie. – Rumpli

+0

@Rumpli Ich habe dies zur Antwort hinzugefügt. Jetzt werde ich hier meinen eigenen Schaden anrichten, aber da du neu hier bist, werde ich kurz erklären, wie du die Antworten aufhebst und akzeptierst: Wenn dir die Antwort hilft, gib ihr eine Stimme. Wenn es Ihr Problem löst, geben Sie ihm eine positive Bewertung und akzeptieren Sie es. Hier hast du nur ohne upvote akzeptiert, aber du hast es nicht versucht, wenn es funktioniert :) Vielleicht nur upvote für jetzt, und sobald du bestätigst akzeptiere auch. Bitte korrigieren Sie mich, wenn ich diese Abstimmung nicht korrekt erklärt habe. – cantSleepNow

+0

Danke für Ihre Erklärung. Ich habe das ausprobiert und mit dem Setzen von 'channel.basic_qos (prefetch_count = 1)' auf mehr als '1', tut es zu der Zeit mehr als eine Aufgabe. Und ich habe versucht, deine Antwort zu verbessern, aber solange ich nicht 15 Ruf habe, wird es es nicht anzeigen ... :( – Rumpli