2013-02-19 4 views
11

Ich habe eine Situation ähnlich der skizzierten here, mit der Ausnahme, dass anstelle Aufgaben mit mehreren Argumenten von Verkettungs möchte ich Kette Aufgaben, die ein Wörterbuch mit mehreren Einträgen zurück.Sellerie Aufgabe Kette und Zugriff ** kwargs

Dies ist - sehr locker und abstrakt --- was ich versuche zu tun:

tasks.py von ipython

@task() 
def task1(item1=None, item2=None): 
    item3 = #do some stuff with item1 and item2 to yield item3 
    return_object = dict(item1=item1, item2=item2, item3=item3) 
    return return_object 

def task2(item1=None, item2=None, item3=None): 
    item4 = #do something with item1, item2, item3 to yield item4 
    return_object = dict(item1=item1, item2=item2, item3=item3, item4=item4) 
    return return_object 

Arbeiten, ich bin in der Lage task1 einzeln und asynchron aufgerufen , ohne Probleme.

Ich kann task2 auch anrufen individuell mit dem Ergebnis von task1 als Argument Doppelstern zurückgekehrt:

>>res1 = task1.s(item1=something, item2=something_else).apply_async() 
>>res1.status 
'SUCCESS' 
>>res2 = task2.s(**res1.result).apply_async() 
>>res2.status 
'SUCCESS 

Doch was ich erreichen schließlich will, ist das gleiche Endergebnis wie oben, aber über eine Kette, und hier kann ich nicht herausfinden, wie task2 haben instanziiert nicht mit (Positions) Argumente von task1 zurückgekehrt, aber mit task1.result als ** kwargs:

chain_result = (task1.s(item1=something, item2=something_else) | task2.s()).apply_async() #THIS DOESN'T WORK! 

ich vermute, dass ich gehen kann zurück und schreibe meine Aufgaben so um, dass t hey Rückkehr Positionsargumente statt einem Wörterbuch, und diese Dinge klären können, aber es scheint mir, dass es einen Weg geben, um task1 Rückkehr Objekt in task2 mit der äquivalenten Funktionalität des ** Doppelstern zugreifen. Ich vermute auch, dass ich hier etwas ziemlich offensichtliches über Sellerie-Subtask-Implementierung oder * Args vs. ** Kwargs verpasse.

Hope das macht Sinn. Und danke im Voraus für irgendwelche Tipps.

Antwort

1

chain und die anderen Canvas-Grundelemente sind in der Familie von funktionale Dienstprogramme wie map und reduce.

z. wo map(target, items) ruft target(item) für jedes Element in der Liste, Python hat eine selten verwendete Version der Karte namens itertools.starmap, , die stattdessen ruft target(*item).

Während wir starchain und sogar kwstarchain die Toolbox hinzufügen könnten, diese wären sehr spezialisiert und wahrscheinlich nicht so oft verwendet.

Interessanterweise hat Python diese mit den Listen- und Generatorausdrücken überflüssig gemacht, so dass die Karte durch [target(item) for item in item] und starmap mit [target(*item) for item in item] ersetzt wird.

Also anstatt mehrere Alternativen für jedes Primitiv zu implementieren, denke ich, sollten wir Fokus auf die Suche nach einem flexibleren Weg, dies zu unterstützen, z. Sellerie angetriebene Generator Ausdrücke wie mit (wenn möglich, und wenn nicht etwas ähnlich mächtig)

+0

verstanden. Vielen Dank. Ich löste dies, indem ich die Eingaben/Rückgaben leicht in meine Aufgabe änderte. T2 sucht jetzt nur nach einem einzelnen dict-Objekt als Eingabe und ruft dann die erwarteten k/value-Paare aus dem dict ab, um die Aufgabe auszuführen. –

+0

@BenjaminWhite ich verstehe es immer noch nicht. Kannst du mir sagen, wie du das gemacht hast? – ashim888

1

Da dies nicht in Sellerie gebaut ist, schrieb ich eine Dekorateur Funktion etwas ähnliches selbst.

# Use this wrapper with functions in chains that return a tuple. The 
# next function in the chain will get called with that the contents of 
# tuple as (first) positional args, rather than just as just the first 
# arg. Note that both the sending and receiving function must have 
# this wrapper, which goes between the @task decorator and the 
# function definition. This wrapper should not otherwise interfere 
# when these conditions are not met. 

class UnwrapMe(object): 
    def __init__(self, contents): 
     self.contents = contents 

    def __call__(self): 
     return self.contents 

def wrap_for_chain(f): 
    """ Too much deep magic. """ 
    @functools.wraps(f) 
    def _wrapper(*args, **kwargs): 
     if type(args[0]) == UnwrapMe: 
      args = list(args[0]()) + list(args[1:]) 
     result = f(*args, **kwargs) 

     if type(result) == tuple and current_task.request.callbacks: 
      return UnwrapMe(result) 
     else: 
      return result 
    return _wrapper 

-Mine auspackt wie das starchain Konzept, aber man konnte es leicht ändern kwargs statt auszupacken.

5

Dies ist mein nehmen auf das Problem, eine abstrakte Aufgabe-Klasse:

from __future__ import absolute_import 
from celery import Task 
from myapp.tasks.celery import app 


class ChainedTask(Task): 
    abstract = True  

    def __call__(self, *args, **kwargs): 
     if len(args) == 1 and isinstance(args[0], dict): 
      kwargs.update(args[0]) 
      args =() 
     return super(ChainedTask, self).__call__(*args, **kwargs) 

@app.task(base=ChainedTask) 
def task1(x, y): 
    return {'x': x * 2, 'y': y * 2, 'z': x * y}  


@app.task(base=ChainedTask) 
def task2(x, y, z): 
    return {'x': x * 3, 'y': y * 3, 'z': z * 2} 

Sie können nun Ihre Kette als solche definieren und auszuführen:

from celery import chain 

pipe = chain(task1.s(x=1, y=2) | task2.s()) 
pipe.apply_async()