2009-08-30 1 views

Antwort

2

Es gibt einen Fehler, der True FIFO verhindert.
Gelesen here.

Eine alternative Möglichkeit zum Aufbau einer Prioritätswarteschlangen-Multiprocessing-Einrichtung wäre sicherlich großartig!

+0

Ja, aber gibt es eine gute Möglichkeit, die Warteschlange zu tun? ist es möglich? (es sei denn, ich muss einen Prozess speziell schreiben, der noch schwieriger und fehleranfälliger ist) –

+0

Ich habe noch keinen Weg gefunden, werde euch aber auf dem Laufenden halten. –

5

Ach, ist es bei weitem nicht so einfach wie das Ändern Warteschlangendisziplin eines guten alten Queue.Queue: letztere in der Tat ist so konzipiert, subclassed nach einem Template-Methode Muster werden, und überschreiben Sie einfach die Haken Methoden _put und/oder _get leicht können erlauben, die Warteschlangendisziplin zu ändern (in 2.6 werden explizite LIFO- und Prioritätsimplementierungen angeboten, aber sie waren sogar in früheren Versionen von Python leicht zu machen).

Für Multiprocessing, im allgemeinen Fall (mehrere Leser, mehrere Autoren), sehe ich keine Lösung für die Implementierung von Prioritätswarteschlangen, außer auf die verteilte Natur der Warteschlange aufzugeben; einen speziellen Hilfsprozess bestimmen, der nur Warteschlangen bearbeitet, (im Wesentlichen) RPCs an ihn sendet, um eine Warteschlange mit einer bestimmten Disziplin zu erstellen, puts ausführt und dazu gelangt, Informationen darüber erhält, & c. So würde man die üblichen Probleme bekommen, um sicherzustellen, dass jeder Prozess über den Standort des aux proc (Host und Port, sagen wir), usw. informiert ist (einfacher, wenn der Prozess beim Start immer durch den Hauptproc erzeugt wird). Ein ziemlich großes Problem, besonders wenn man es mit guter Leistung machen will, schützt vor Abstürzen von aux proc (erfordert Replikation von Daten zu Slave-Prozessen, verteilte "Master-Wahl" unter Slaves, wenn Master abstürzt, & c), und so fort. Es von Grund auf neu zu machen, klingt nach einem Doktortitel. Man könnte von Johnson's arbeiten, oder huckepack auf einige sehr allgemeine Ansatz wie ActiveMQ.

Einige Spezialfälle (z. B. einzelner Leser, einzelner Schreiber) könnten einfacher sein und sich für ihren begrenzten Anwendungsbereich als schneller erweisen; aber dann sollte eine sehr spezifisch eingeschränkte Spezifikation für diesen begrenzten Bereich erstellt werden - und die Ergebnisse würden keine "Mehrzweckwarteschlange" für den allgemeinen Zweck darstellen, sondern nur eine, die nur für den gegebenen eingeschränkten Satz von Anforderungen gilt.

1

Obwohl dies keine Antwort ist, kann es Ihnen vielleicht helfen, eine Multiprocessing-Warteschlange zu entwickeln.

Hier ist eine einfache Prioritätswarteschlange Klasse I mit Array Python geschrieben:

class PriorityQueue(): 
    """A basic priority queue that dequeues items with the smallest priority number.""" 
    def __init__(self): 
     """Initializes the queue with no items in it.""" 
     self.array = [] 
     self.count = 0 

    def enqueue(self, item, priority): 
     """Adds an item to the queue.""" 
     self.array.append([item, priority]) 
     self.count += 1 

    def dequeue(self): 
     """Removes the highest priority item (smallest priority number) from the queue.""" 
     max = -1 
     dq = 0 
     if(self.count > 0): 
      self.count -= 1 

      for i in range(len(self.array)): 
       if self.array[i][1] != None and self.array[i][1] > max: 
        max = self.array[i][1] 

      if max == -1: 
       return self.array.pop(0) 
      else: 
       for i in range(len(self.array)): 
        if self.array[i][1] != None and self.array[i][1] <= max: 
         max = self.array[i][1] 
         dq = i 
       return self.array.pop(dq) 

    def requeue(self, item, newPrio): 
     """Changes specified item's priority.""" 
     for i in range(len(self.array)): 
      if self.array[i][0] == item: 
       self.array[i][1] = newPrio 
       break 

    def returnArray(self): 
     """Returns array representation of the queue.""" 
     return self.array 

    def __len__(self): 
     """Returnes the length of the queue.""" 
     return self.count 
0

Je nach Bedarf das Betriebssystem und das Dateisystem in eine Reihe von Möglichkeiten nutzen könnten. Wie groß wird die Schlange wachsen und wie schnell muss sie sein? Wenn die Warteschlange groß ist, Sie aber für jeden Zugriff auf die Warteschlange mehrere Dateien öffnen möchten, können Sie eine BTree-Implementierung zum Speichern der Warteschlangen- und Dateisperrung verwenden, um den exklusiven Zugriff zu erzwingen. Langsam aber robust.

Wenn die Warteschlange relativ klein bleiben und Sie müssen es schnell sein, Sie auf einigen Betriebssystemen gemeinsam genutzten Speicher zu verwenden, möglicherweise in der Lage ...

Wenn die Warteschlange klein sein wird (1000s Einträge) und Sie nicht brauchen es wirklich schnell zu sein Sie könnten etwas so einfach wie ein Verzeichnis mit Dateien verwenden, die die Daten mit Dateisperrung enthalten. Dies wäre meine Präferenz, wenn klein und langsam in Ordnung ist.

Wenn die Warteschlange groß sein kann und Sie möchten, dass sie im Durchschnitt schnell ist, sollten Sie wahrscheinlich einen dedizierten Serverprozess verwenden, wie Alex es vorschlägt. Dies ist jedoch ein Schmerz im Nacken.

Was sind Ihre Leistungs- und Größenanforderungen?

1

Ich hatte den gleichen Anwendungsfall. Aber mit einer endlichen Anzahl von Prioritäten.

Was ich am Ende mache, ist eine Warteschlange pro Priorität zu erstellen, und meine Process Worker versuchen, die Elemente aus diesen Warteschlangen zu bekommen, beginnend mit der wichtigsten Warteschlange zu der weniger wichtigen (von einer Warteschlange zur anderen) Ist fertig, wenn die Warteschlange leer ist)

+0

Dies scheint die vernünftigste mögliche Antwort auf der Seite zu sein. – speedplane

+1

Hmm ... auf den zweiten Gedanken, das ist immer noch nicht einfach, richtig zu machen. Wenn zum Beispiel die Warteschlange mit der höchsten Priorität leer ist und Sie mit der nächsten fortfahren, wie können Sie eine Race-Bedingung verhindern, bei der die Warteschlange mit der höchsten Priorität gefüllt wird, während Sie die nächsthöchste Warteschlange überprüfen? – speedplane

0

Inspiriert von @ user211505 Vorschlag, habe ich etwas schnell und schmutzig zusammen.

Beachten Sie, dass dies keine vollständige Lösung für das schwierige Problem von Prioritätswarteschlangen in Multiprocessing-Produktionsumgebungen darstellt. Allerdings, wenn Sie nur herumspielen oder etwas für ein kurzes Projekt benötigen, wird dies wahrscheinlich die Rechnung passen:

from time import sleep 
from datetime import datetime 
from Queue import Empty 
from multiprocessing import Queue as ProcessQueue 

class SimplePriorityQueue(object): 
    ''' 
    Simple priority queue that works with multiprocessing. Only a finite number 
    of priorities are allowed. Adding many priorities slow things down. 

    Also: no guarantee that this will pull the highest priority item 
    out of the queue if many items are being added and removed. Race conditions 
    exist where you may not get the highest priority queue item. However, if 
    you tend to keep your queues not empty, this will be relatively rare. 
    ''' 
    def __init__(self, num_priorities=1, default_sleep=.2): 
     self.queues = [] 
     self.default_sleep = default_sleep 
     for i in range(0, num_priorities): 
      self.queues.append(ProcessQueue()) 

    def __repr__(self): 
     return "<Queue with %d priorities, sizes: %s>"%(len(self.queues), 
        ", ".join(map(lambda (i, q): "%d:%d"%(i, q.qsize()), 
           enumerate(self.queues)))) 

    qsize = lambda(self): sum(map(lambda q: q.qsize(), self.queues)) 

    def get(self, block=True, timeout=None): 
     start = datetime.utcnow() 
     while True: 
      for q in self.queues: 
       try: 
        return q.get(block=False) 
       except Empty: 
        pass 
      if not block: 
       raise Empty 
      if timeout and (datetime.utcnow()-start).total_seconds > timeout: 
       raise Empty 

      if timeout: 
       time_left = (datetime.utcnow()-start).total_seconds - timeout 
       sleep(time_left/4) 
      else: 
       sleep(self.default_sleep) 

    get_nowait = lambda(self): self.get(block=False) 

    def put(self, priority, obj, block=False, timeout=None): 
     if priority < 0 or priority >= len(self.queues): 
      raise Exception("Priority %d out of range."%priority) 
     # Block and timeout don't mean much here because we never set maxsize 
     return self.queues[priority].put(obj, block=block, timeout=timeout)