2014-06-24 7 views
5

Wie kann ich das Ergebnis einer Aufgabe ziehen, wenn ich vorher nicht weiß, welche Aufgabe ausgeführt wurde? Hier ist das Setup: die folgende Quelle ('tasks.py') Gegeben:Abrufen des Ergebnisses von 'task_id' in Sellerie von unbekannter Task

from celery import Celery 

app = Celery('tasks', backend="db+mysql://u:[email protected]/db", broker = 'amqp://guest:[email protected]:5672//') 

@app.task 
def add(x,y): 
    return x + y 


@app.task 
def mul(x,y): 
    return x * y 

mit RabbitMQ 3.3.2 lokal ausgeführt:

marcs-mbp:sbin marcstreeter$ ./rabbitmq-server 

       RabbitMQ 3.3.2. Copyright (C) 2007-2014 GoPivotal, Inc. 
    ## ##  Licensed under the MPL. See http://www.rabbitmq.com/ 
    ## ## 
    ########## Logs: /usr/local/var/log/rabbitmq/[email protected] 
    ###### ##  /usr/local/var/log/rabbitmq/[email protected] 
    ########## 
       Starting broker... completed with 10 plugins. 

mit Sellerie 3.1.12 lokal ausgeführt:

-------------- [email protected] v3.1.12 (Cipater) 
---- **** ----- 
--- * *** * -- Darwin-13.2.0-x86_64-i386-64bit 
-- * - **** --- 
- ** ---------- [config] 
- ** ---------- .> app:   tasks:0x105dea3d0 
- ** ---------- .> transport: amqp://guest:**@localhost:5672// 
- ** ---------- .> results:  disabled 
- *** --- * --- .> concurrency: 8 (prefork) 
-- ******* ---- 
--- ***** ----- [queues] 
-------------- .> celery   exchange=celery(direct) key=celery 

ich kann dann die Methode importieren und das Ergebnis mit dem ‚task_id‘ abrufen:

from tasks import add, mul 
from celery.result import AsyncResult 

result = add.delay(2,2) 
task_id = result.task_id 
result.get() # 4 

result = AsyncResult(id=task_id) 
result.get() # 4 

result = add.AsyncResult(id=task_id) 
result.get() # 4 

# and the same for the 'mul' task. Just imagine I put it here 

Im nächsten Beispiel habe ich diese Schritte zwischen den Prozessen aufgeteilt. In einem Verfahren abrufen ich den ‚task_id‘ wie folgt:

from tasks import add 

result = add.delay(5,5) 
task_id = result.task_id 

Und in einem anderen Prozess, wenn ich die gleiche ‚task_id‘ verwende (kopiert und auf einem anderen REPL geklebt oder in einer anderen HTTP-Anfrage) wie folgt:

from celery.result import AsyncResult 

result = AsyncResult(id="copied_task_id", backend="db+mysql://u:[email protected]/db") 
result.get() # AttributeError: 'str' object has no attribute 'get_task_meta' 
result.state # AttributeError: 'str' object has no attribute 'get_task_meta' 
result.status # AttributeError: 'str' object has no attribute 'get_task_meta' 

Und in einem anderen Prozess, wenn ich tun:

from task import add # in this instance I know that an add task was performed 

result = add.AsyncResult(id="copied_task_id") 
result.status # "SUCCESSFUL" 
result.state # "SUCCESSFUL" 
result.get() # 10 

ich das Ergebnis in der Lage sein möchten, ohne vorher zu wissen, welche Aufgabe das Ergebnis erzeugt. In meiner realen Umgebung plane ich, diese task_id an den Client zurückzugeben und sie den Status ihres Jobs über eine HTTP-Anfrage abfragen zu lassen.

Antwort

11

Ok - so für eine lange Zeit nach einer Lösung gesucht, ich habe und jetzt, da ich dies schließlich formell geschrieben habe und sah die über documentation ich this gem gefunden:

Klasse celery.result .AsyncResult (ID, Back-End = Keine, Task-Name = Keine, App = Keine, Parent = Keine)

Taskstatus abfragen.

Parameter:

id - siehe id.

Backend - siehe backend.

AusnahmeTimeouterror

Fehler für Timeouts erhöht.

AsyncResult.App = Keine

Also anstatt den Back-End-Parameter ich das "App" Argument statt wie bisher in etwa so:

from celery.result import AsyncResult 
from task import app 

# Assuming add.delay(10,10) was called in another process 
# and that I'm using a 'task_id' I retrieved from that process 

result = AsyncResult(id='copied_task_id', app=app) 
result.state # 'SUCCESSFUL' 
result.get() # 20 

Dies ist wohl offensichtlich zu viele. Es war nicht für mich. Im Moment kann ich nur sagen, dass diese Lösung "einfach funktioniert", aber ich würde mich wohler fühlen, wenn ich wüsste, dass dies der sanktionierte Weg ist. Wenn Sie einen Abschnitt in der Dokumentation kennen, der das deutlicher macht, schreiben Sie ihn bitte in die Kommentare oder als Antwort und ich werde ihn als Antwort auswählen, wenn ich kann.

+0

Genau das, was ich gesucht habe; Ich teile Ihre Ansicht, dass dies aus der Dokumentation sehr unklar ist, so dass Ihr Beitrag mir sehr geholfen hat. – markjan

+0

Möglicherweise möchten Sie eine kleine Zeitüberschreitung für diesen Aufruf festlegen, da einige 'Get'-Aufrufe von Sellery im Falle einer ungültigen Aufgaben-ID oder einer Aufgabe, die dem Broker nicht mehr bekannt ist, möglicherweise sehr lange nicht mehr zurückkommen. Siehe http://stackoverflow.com/a/10074280/992887 – RichVel

+0

Vielen Dank für dieses Update. Es brachte mich dazu, meine Haare für 2 Wochen zu ziehen. – Pant