2012-08-09 9 views
8

Ich versuche, eine Aufgabe aufzurufen und eine Warteschlange für diese Aufgabe erstellen, wenn es nicht vorhanden ist, dann sofort die aufgerufene Aufgabe in diese Warteschlange einfügen. Ich habe den folgenden Code:Sellerie dynamische Queue-Erstellung und Routing

@task 
def greet(name): 
    return "Hello %s!" % name 


def run(): 
    result = greet.delay(args=['marc'], queue='greet.1', 
     routing_key='greet.1') 
    print result.ready() 

dann habe ich einen benutzerdefinierten Router:

class MyRouter(object): 

    def route_for_task(self, task, args=None, kwargs=None): 
     if task == 'tasks.greet': 
      return {'queue': kwargs['queue'], 
        'exchange': 'greet', 
        'exchange_type': 'direct', 
        'routing_key': kwargs['routing_key']} 
     return None 

dies schafft ein Austausch genannt greet.1 und eine Warteschlange greet.1 genannt, aber die Warteschlange leer ist. Der Austausch sollte nur greet genannt werden, der weiß, wie man einen Leitwegschlüssel wie greet.1 in die Warteschlange mit der Bezeichnung greet.1 leitet.

Irgendwelche Ideen?

Antwort

13

Wenn Sie wie folgt vor:

task.apply_async(queue='foo', routing_key='foobar') 

Dann wird Sellerie Standardwerte aus der ‚foo‘ Warteschlange in CELERY_QUEUES nehmen, oder wenn es nicht dann automatisch existiert schaffen es mit (queue = foo, Austausch = foo, routing_key = foo)

Also, wenn 'foo' existiert nicht in CELERY_QUEUES werden Sie am Ende mit:

queues['foo'] = Queue('foo', exchange=Exchange('foo'), routing_key='foo') 

Der Hersteller wird dann erklären, dass Warteschlange, aber da Sie die routing_key außer Kraft setzen, die Nachricht tatsächlich routing_key = 'foobar'

mit senden Diese mag seltsam erscheinen, aber das Verhalten ist für Thema Börsen tatsächlich nützlich, wo Sie zu verschiedenen Themen veröffentlichen.

Es ist schwieriger zu tun, was Sie wollen, obwohl Sie die Warteschlange selbst erstellen und deklarieren können, aber das wird nicht gut mit automatischen Nachrichten veröffentlichen versuchen. Es wäre besser, wenn das Warteschlangenargument für apply_async stattdessen eine benutzerdefinierte kombu.Queue unterstützt, die sowohl deklariert als auch als Ziel verwendet wird. Vielleicht könnten Sie ein Problem für das bei http://github.com/celery/celery/issues

öffnen
+0

Ich hörte auf, sich Sorgen über die Erstellung der Warteschlange manuell, stattdessen nur einen neuen Arbeiter spawn, die automatisch erstellt Warteschlange und Austausch, die mehr Sinn für mein Problem. Wie immer, danke für die Antwort. :) – Marconi