2016-06-29 14 views
6

Ich bin ein Neuling in Sellerie und ich versuche diese Aufgabenwarteschlange in mein Projekt zu integrieren, aber ich verstehe immer noch nicht, wie Sellerie die gescheiterten Aufgaben behandelt und ich würde gerne alle in einer Warteschlange für ambiente Briefe behalten.Sellerie: Wie kann ich eine fehlgeschlagene Aufgabe in eine Warteschlange für unzustellbare Nachrichten leiten

Laut dem Dokument here scheint es, dass die Erhöhung von Reject in einem Task mit acks_late den gleichen Effekt wie das Ackern der Nachricht erzeugt und dann haben wir ein paar Worte über Warteschlangen für unzustellbare Nachrichten.

So habe ich eine benutzerdefinierte Standardwarteschlange zu meiner Sellerie Config

celery_app.conf.update(CELERY_ACCEPT_CONTENT=['application/json'], 
         CELERY_TASK_SERIALIZER='json', 
         CELERY_QUEUES=[CELERY_QUEUE, 
             CELERY_DLX_QUEUE], 
         CELERY_DEFAULT_QUEUE=CELERY_QUEUE_NAME, 
         CELERY_DEFAULT_EXCHANGE=CELERY_EXCHANGE 
         ) 

und meine Kombu Objekte wie

CELERY_DLX_EXCHANGE = Exchange(CELERY_DLX_EXCHANGE_NAME, type='direct') 
CELERY_DLX_QUEUE = Queue(CELERY_DLX_QUEUE_NAME, exchange=DLX_EXCHANGE, 
          routing_key='celery-dlq') 

DEAD_LETTER_CELERY_OPTIONS = {'x-dead-letter-exchange': CELERY_DLX_EXCHANGE_NAME, 
          'x-dead-letter-routing-key': 'celery-dlq'} 

CELERY_EXCHANGE = Exchange(CELERY_EXCHANGE_NAME, 
           arguments=DEAD_LETTER_CELERY_OPTIONS, 
           type='direct') 

CELERY_QUEUE = Queue(CELERY_QUEUE_NAME, 
         exchange=CELERY_EXCHANGE, 
         routing_key='celery-q') 

suchen und die Aufgabe, die ich bin ausgeführt werden:

class HookTask(Task): 
    acks_late = True 

def run(self, ctx, data): 
    logger.info('{0} starting {1.name}[{1.request.id}]'.format(self.__class__.__name__.upper(), self)) 
    self.hook_process(ctx, data) 


def on_failure(self, exc, task_id, args, kwargs, einfo): 
    logger.error('task_id %s failed, message: %s', task_id, exc.message) 

def hook_process(self, t_ctx, body): 
    # Build context 
    ctx = TaskContext(self.request, t_ctx) 
    logger.info('Task_id: %s, handling request %s', ctx.task_id, ctx.req_id) 
    raise Reject('no_reason', requeue=False) 

Ich machte einen kleinen Test mit ihm, aber keine Ergebnisse beim Auslösen einer Ausnahme von Ausfällen.

Jetzt frage ich mich, ob es eine gute Idee ist, die fehlgeschlagene Task-Route in die Warteschlange für nicht zustellbare Nachrichten zu zwingen, indem Sie Task.on_failure überschreiben. Ich denke, das würde funktionieren, aber ich denke auch, dass diese Lösung nicht so sauber ist, denn nach dem, was ich rot Sellerie sollte das ganz alleine tun.

Danke für Ihre Hilfe.

+1

Traurig, dass niemand geantwortet hat. Hast du eine Lösung gefunden? @onizukaek –

Antwort

1

Ich denke, Sie sollten in CELERY_EXCHANGE nicht hinzufügen. Sie sollten es zu CELERY_QUEUE mit queue_arguments=DEAD_LETTER_CELERY_OPTIONS hinzufügen.

Das folgende Beispiel ist das, was ich tat, und es funktioniert gut:

from celery import Celery 
from kombu import Exchange, Queue 
from celery.exceptions import Reject 

app = Celery(
    'tasks', 
    broker='amqp://[email protected]:5672//', 
    backend='redis://localhost:6379/0') 

dead_letter_queue_option = { 
    'x-dead-letter-exchange': 'dlx', 
    'x-dead-letter-routing-key': 'dead_letter' 
} 

default_exchange = Exchange('default', type='direct') 
dlx_exchange = Exchange('dlx', type='direct') 

default_queue = Queue(
    'default', 
    default_exchange, 
    routing_key='default', 
    queue_arguments=dead_letter_queue_option) 
dead_letter_queue = Queue(
    'dead_letter', dlx_exchange, routing_key='dead_letter') 

app.conf.task_queues = (default_queue, dead_letter_queue) 

app.conf.task_default_queue = 'default' 
app.conf.task_default_exchange = 'default' 
app.conf.task_default_routing_key = 'default' 


@app.task 
def add(x, y): 
    return x + y 


@app.task(acks_late=True) 
def div(x, y): 
    try: 
     z = x/y 
     return z 
    except ZeroDivisionError as exc: 
     raise Reject(exc, requeue=False) 

Nach der Erstellung der Warteschlange, Sie, dass auf der ‚Eigenschaften‘ Spalte sehen soll, zeigt es DLX (unzustellbare-Austausch) und DLK (dead-letter-routing-key) Etiketten.

enter image description here

Hinweis: Sie sollten die vorherigen Warteschlangen löschen, wenn Sie sie bereits in RabbitMQ erstellt haben. Dies liegt daran, dass Sellerie die vorhandene Warteschlange nicht löscht und eine neue erstellt.

+0

Ihr Beispiel funktioniert nicht für mich. Sie müssen 'dead_letter_queue' aus den' task_queues' entfernen, sonst wird sich der Sellerie-Arbeiter mit dieser Warteschlange verbinden und die Nachrichten konsumieren (ohne zu verarbeiten). Zum Erstellen dieser Warteschlangen ist jedoch ein alternativer Weg erforderlich. –

+0

@ K.P. Ich habe das gleiche Problem auch getroffen. Du hast absolut recht. Danke für deine Lösung. Ich habe es getestet und es funktioniert super! :-) –

1

Ich habe einen ähnlichen Fall und ich hatte die gleichen Probleme. Ich wollte auch eine Lösung, die auf Konfiguration und nicht auf fest codierten Werten basiert. Die vorgeschlagene Lösung von Hengfeng Li war sehr hilfreich und half mir, den Mechanismus und die Konzepte zu verstehen. Aber es gab ein Problem mit der Deklaration von Warteschlangen. Speziell, wenn Sie die DLQ in die task_default_queues injiziert haben, nahm der Sellerie die Warteschlange und es war immer leer. Daher wurde eine manuelle Deklaration von DL (X/Q) benötigt.

Ich habe Sellerys Bootsteps verwendet, da sie eine gute Kontrolle auf der Bühne, dass der Code ausgeführt wurde, bieten. Mein erstes Experiment war es, sie genau nach der App-Erstellung zu erstellen, aber dies erzeugte eine blockierte Verbindung nach dem Abzweigen von Prozessen und erzeugte eine hässliche Ausnahme. Mit einem Bootschritt, der exakt nach dem Schritt ausgeführt wird, können Sie sicherstellen, dass es am Anfang jedes Worker ausgeführt wird, nachdem es gespalten wurde und der Verbindungspool bereit ist.

Schließlich habe ich einen Dekorator erstellt, der nicht erfasste Ausnahmen in Aufgabenabweisungen konvertiert, indem er mit Reject von Sellerie reraised.Besondere Sorgfalt wird auf Fälle angewendet, in denen eine Aufgabe bereits entschieden wurde, z. B. Versuche.

Hier ist ein voll funktionsfähiges Beispiel. Versuchen Sie, die Aufgabe div.delay(1, 0) auszuführen und zu sehen, wie es funktioniert.

from celery import Celery 
from celery.exceptions import Reject, TaskPredicate 
from functools import wraps 
from kombu import Exchange, Queue 

from celery import bootsteps 


class Config(object): 

    APP_NAME = 'test' 

    task_default_queue = '%s_celery' % APP_NAME 
    task_default_exchange = "%s_celery" % APP_NAME 
    task_default_exchange_type = 'direct' 
    task_default_routing_key = task_default_queue 
    task_create_missing_queues = False 
    task_acks_late = True 

    # Configuration for DLQ support 
    dead_letter_exchange = '%s_dlx' % APP_NAME 
    dead_letter_exchange_type = 'direct' 
    dead_letter_queue = '%s_dlq' % APP_NAME 
    dead_letter_routing_key = dead_letter_queue 


class DeclareDLXnDLQ(bootsteps.StartStopStep): 
    """ 
    Celery Bootstep to declare the DL exchange and queues before the worker starts 
     processing tasks 
    """ 
    requires = {'celery.worker.components:Pool'} 

    def start(self, worker): 
     app = worker.app 

     # Declare DLX and DLQ 
     dlx = Exchange(
      app.conf.dead_letter_exchange, 
      type=app.conf.dead_letter_exchange_type) 

     dead_letter_queue = Queue(
      app.conf.dead_letter_queue, 
      dlx, 
      routing_key=app.conf.dead_letter_routing_key) 

     with worker.app.pool.acquire() as conn: 
      dead_letter_queue.bind(conn).declare() 


app = Celery('tasks', broker='pyamqp://[email protected]//') 
app.config_from_object(Config) 


# Declare default queues 
# We bypass the default mechanism tha creates queues in order to declare special queue arguments for DLX support 
default_exchange = Exchange(
    app.conf.task_default_exchange, 
    type=app.conf.task_default_exchange_type) 
default_queue = Queue(
     app.conf.task_default_queue, 
     default_exchange, 
     routing_key=app.conf.task_default_routing_key, 
     queue_arguments={ 
      'x-dead-letter-exchange': app.conf.dead_letter_exchange, 
      'x-dead-letter-routing-key': app.conf.dead_letter_routing_key 
     }) 

# Inject the default queue in celery application 
app.conf.task_queues = (default_queue,) 

# Inject extra bootstep that declares DLX and DLQ 
app.steps['worker'].add(DeclareDLXnDLQ) 


def onfailure_reject(requeue=False): 
    """ 
    When a task has failed it will raise a Reject exception so 
    that the message will be requeued or marked for insertation in Dead Letter Exchange 
    """ 

    def _decorator(f): 
     @wraps(f) 
     def _wrapper(*args, **kwargs): 

      try: 
       return f(*args, **kwargs) 
      except TaskPredicate: 
       raise # Do not handle TaskPredicate like Retry or Reject 
      except Exception as e: 
       print("Rejecting") 
       raise Reject(str(e), requeue=requeue) 
     return _wrapper 

    return _decorator 


@app.task() 
@onfailure_reject() 
def div(x, y): 
    return x/y 

Edit: ich den Code aktualisiert, um die neue Konfigurationsschema von Sellerie zu verwenden (Kleinbuchstaben) als ich einige Kompatibilitätsprobleme in Sellerie 4.1.0 gefunden.