2012-04-17 5 views
37

Wenn ich eine Funktion wie folgt definiert:Wie dynamisch hinzufügen/entfernen periodische Aufgaben Sellerie (celerybeat)

def add(x,y): 
    return x+y 

Gibt es eine Möglichkeit, um dynamisch diese Funktion als Sellerie PeriodicTask hinzufügen und es kick off at Laufzeit? Ich möchte in der Lage sein, etwas zu tun wie (Pseudo-Code):

some_unique_task_id = celery.beat.schedule_task(add, run_every=crontab(minute="*/30")) 
celery.beat.start(some_unique_task_id) 

Ich würde auch diese Aufgabe dynamisch mit so etwas wie (Pseudo-Code) stoppen oder entfernen mag:

celery.beat.remove_task(some_unique_task_id) 

oder

celery.beat.stop(some_unique_task_id) 

FYI Ich verwende keine Djcellery, mit der Sie regelmäßige Aufgaben über den Django-Administrator verwalten können.

Antwort

18

Nein, tut mir leid, das ist mit dem normalen Selleriebeet nicht möglich.

Aber es ist leicht erweiterbar zu tun, was Sie wollen, z. Der Django-Sellerie Scheduler ist nur eine Unterklasse Lesen und Schreiben des Zeitplans in die Datenbank (mit einigen Optimierungen oben).

Sie können den django-sellery Scheduler auch für Nicht-Django-Projekte verwenden.

Etwas wie folgt aus:

  • django Install + django-Sellerie:

    pip $ installieren -U django django-Sellerie

  • die folgenden Einstellungen Ihres celeryconfig hinzufügen:

    DATABASES = { 
        'default': { 
         'NAME': 'celerybeat.db', 
         'ENGINE': 'django.db.backends.sqlite3', 
        }, 
    } 
    INSTALLED_APPS = ('djcelery',) 
    
  • Erstellen Sie die Datenbanktabellen :

    $ PYTHONPATH=. django-admin.py syncdb --settings=celeryconfig 
    
  • starten celerybeat mit der Datenbank-Scheduler:

    $ PYTHONPATH=. django-admin.py celerybeat --settings=celeryconfig \ 
        -S djcelery.schedulers.DatabaseScheduler 
    

Auch gibt es die djcelerymon Befehl, der für nicht-Django verwendet werden können Projekte celerycam und Django Admin Webserver starten in der gleiche Prozess, können Sie verwenden, um auch Ihre regelmäßigen Aufgaben in einem netten Web-Interface zu bearbeiten:

(Hinweis aus irgendeinem Grund djcelerymon kann nicht mit Strg + C gestoppt werden, Sie Strg müssen + Z + töten% 1)

+1

Können Sie bitte Code zum Hinzufügen von Aufgaben und Entfernen angeben? Tut mir leid, ich verstehe nicht. –

+6

Irgendwelche Änderungen in diesem von 2012 bis 2016? – Tanay

32

Diese Frage auf google groups beantwortet wurde.

ich nicht der Autor AM, geht alle Kredit Jean Mark

Hier ist eine richtige Lösung.Bestätigtes Arbeiten, In meinem Szenario habe ich Periodic Task unterklassifiziert und ein Modell daraus erstellt, da ich weitere Felder zum Modell hinzufügen kann, wie ich brauche und auch so könnte ich die "terminate" -Methode hinzufügen. Sie müssen die Eigenschaft der periodischen Aufgabe auf False setzen und sie speichern, bevor Sie sie löschen. Die ganze Unterklasse ist kein Muss, die Methode schedule_every ist die, die wirklich die Arbeit macht. Wenn Sie bereit sind, Ihre Aufgabe zu beenden (wenn Sie nicht abgeleitet haben), können Sie einfach PeriodicTask.objects.filter (name = ...) verwenden, um nach Ihrer Aufgabe zu suchen, deaktivieren und dann löschen.

Hoffe, das hilft!

from djcelery.models import PeriodicTask, IntervalSchedule 
from datetime import datetime 

class TaskScheduler(models.Model): 

    periodic_task = models.ForeignKey(PeriodicTask) 

    @staticmethod 
    def schedule_every(task_name, period, every, args=None, kwargs=None): 
    """ schedules a task by name every "every" "period". So an example call would be: 
     TaskScheduler('mycustomtask', 'seconds', 30, [1,2,3]) 
     that would schedule your custom task to run every 30 seconds with the arguments 1,2 and 3 passed to the actual task. 
    """ 
     permissible_periods = ['days', 'hours', 'minutes', 'seconds'] 
     if period not in permissible_periods: 
      raise Exception('Invalid period specified') 
     # create the periodic task and the interval 
     ptask_name = "%s_%s" % (task_name, datetime.datetime.now()) # create some name for the period task 
     interval_schedules = IntervalSchedule.objects.filter(period=period, every=every) 
     if interval_schedules: # just check if interval schedules exist like that already and reuse em 
      interval_schedule = interval_schedules[0] 
     else: # create a brand new interval schedule 
      interval_schedule = IntervalSchedule() 
      interval_schedule.every = every # should check to make sure this is a positive int 
      interval_schedule.period = period 
      interval_schedule.save() 
     ptask = PeriodicTask(name=ptask_name, task=task_name, interval=interval_schedule) 
     if args: 
      ptask.args = args 
     if kwargs: 
      ptask.kwargs = kwargs 
     ptask.save() 
     return TaskScheduler.objects.create(periodic_task=ptask) 

    def stop(self): 
     """pauses the task""" 
     ptask = self.periodic_task 
     ptask.enabled = False 
     ptask.save() 

    def start(self): 
     """starts the task""" 
     ptask = self.periodic_task 
     ptask.enabled = True 
     ptask.save() 

    def terminate(self): 
     self.stop() 
     ptask = self.periodic_task 
     self.delete() 
     ptask.delete() 
+1

Dies sollte die akzeptierte Antwort sein. – kai

+1

@kai 'IntervalSchedule',' PeriodicTask' usw. sind 'djcellery' Klassen und das OP sagt, dass er' djcellery' nicht benutzt. Auf jeden Fall nützlich. – Chris

2

Sie können diese flask-djcelery überprüfen, die Kolben und djcelery konfiguriert und bietet auch browseable Rest api

2

Es gibt eine Bibliothek namens django-Sellerie-Beat, die die Modelle ein Bedarf zur Verfügung stellt. Um neue periodische Aufgaben dynamisch zu laden, muss man einen eigenen Scheduler erstellen.

from django_celery_beat.schedulers import DatabaseScheduler 


class AutoUpdateScheduler(DatabaseScheduler): 

    def tick(self, *args, **kwargs): 
     if self.schedule_changed(): 
      print('resetting heap') 
      self.sync() 
      self._heap = None 
      new_schedule = self.all_as_schedule() 

      if new_schedule: 
       to_add = new_schedule.keys() - self.schedule.keys() 
       to_remove = self.schedule.keys() - new_schedule.keys() 
       for key in to_add: 
        self.schedule[key] = new_schedule[key] 
       for key in to_remove: 
        del self.schedule[key] 

     super(AutoUpdateScheduler, self).tick(*args, **kwargs) 

    @property 
    def schedule(self): 
     if not self._initial_read and not self._schedule: 
      self._initial_read = True 
      self._schedule = self.all_as_schedule() 

     return self._schedule 
+0

Danke. Hat nicht sofort funktioniert, aber mit 'to_add = [key für key in new_schedule.keys() wenn key nicht in self.schedule.keys()]' und ähnlich für 'to_remove' ist der Trick. Warum ist das keine Standardoption? Bis jetzt musste ich bei Sellerie Aufgaben andere Sellerie Aufgaben mit einem Countdown erledigen lassen. Das klingt nicht sehr gut für mich. – freethebees