2012-08-28 7 views
12

Vielleicht bin ich dumm, die Frage zu stellen, aber ich muss meinen Kopf um die grundlegenden Konzepte wickeln, bevor ich weiterarbeite.Sellerie - Kann eine Nachricht in RabbitMQ von zwei oder mehr Arbeitern gleichzeitig konsumiert werden?

Ich verarbeite ein paar tausend RSS-Feeds, mit mehreren Sellery Worker-Knoten und einem RabbitMQ-Knoten als Broker. Die URL jedes Feeds wird als Nachricht in die Warteschlange geschrieben. Ein Worker liest nur die URL aus der Warteschlange und beginnt mit der Verarbeitung. Ich muss sicherstellen, dass ein einzelner RSS-Feed nicht von zwei Arbeitern gleichzeitig verarbeitet wird.

Der Artikel Ensuring a task is only executed one at a time schlägt eine Memcahced-basierte Lösung zum Sperren des Feeds vor, wenn es verarbeitet wird.

Aber was ich versuche zu verstehen, ist, warum muss ich Memcached (oder etwas anderes) verwenden, um sicherzustellen, dass eine Nachricht in einer RabbitMQ-Warteschlange nicht von mehreren Arbeitern zur gleichen Zeit konsumiert werden. Gibt es einige Konfigurationsänderungen in RabbitMQ (oder Sellerie), die ich tun kann, um dieses Ziel zu erreichen?

+1

Es besteht ein Unterschied zwischen dem Sperren der Nachrichten und Sperren der Feeds. Was musst du tun? –

+0

@PlatinumAzure - etwas erklären?Ich muss die Nachricht sperren (wenn das bedeutet sicherzustellen, dass sie nicht von mehreren Arbeitern konsumiert wird). – rubayeet

Antwort

4

Wie von anderen erwähnt, mischen Sie Äpfel und Orangen.

Als Sellerie Aufgabe und eine MQ-Nachricht.

Sie können sicherstellen, dass eine Nachricht von nur einem Mitarbeiter gleichzeitig verarbeitet wird.

z.

@task(...) 
def my_task(

my_task.apply(1) 

die .apply veröffentlicht eine Nachricht an den Nachrichten-Broker Sie verwenden (Kaninchen, redis ...). Dann wird die Nachricht an eine Warteschlange weitergeleitet und von einem Mitarbeiter zu der Zeit verbraucht. Sie brauchen das nicht zu sperren, Sie haben es kostenlos :)

Das Beispiel auf dem Sellerie-Kochbuch zeigt, wie zwei solche Meldungen (my_task.apply (1)) gleichzeitig ausgeführt werden können, das ist etwas Sie müssen innerhalb der Aufgabe selbst sicherstellen.

Sie benötigen etwas, auf das Sie von allen Arbeitern natürlich zugreifen können (memcached, redis ...), da sie auf verschiedenen Rechnern laufen könnten.

2

Erwähntes Beispiel, das normalerweise für andere Ziele verwendet wird: Es verhindert, dass Sie mit verschiedenen Nachrichten mit der gleichen Bedeutung (nicht die gleiche Nachricht) arbeiten. ZB habe ich zwei Prozesse: erstens legt man einige URLs in die Warteschlange, und zweitens nimmt man die URL aus der Warteschlange und holt sie. Was ist, wenn der erste Prozess eine URL zweimal (oder noch öfter) in die Warteschlange stellt?

P.S. Ich verwende für diesen Zweck Redis Speicher und setnx Betrieb (die Schlüssel nur einmal einstellen).

5

Eine einzelne MQ-Nachricht wird sicherlich nicht von mehreren Benutzern in einem normalen Arbeitssetup gesehen werden. Sie müssen etwas für die Fälle tun, in denen Mitarbeiter scheitern/abstürzen, sich über Auto-Acks und Nachrichtenabweisungen informieren, aber der Grundfall ist Sound.

Ich sehe keine synchronisierte Warteschlange (lesen: MQ) in dem Artikel, den Sie verknüpft haben, so dass (wie ich es sagen kann) sie den Sperrmechanismus verwenden (lesen: Memcache) zu synchronisieren eine Alternative. Und ich kann mir ein paar Probleme vorstellen, die bei einem richtigen MQ-Setup nicht vorhanden wären.