2014-03-06 11 views
20

Ich habe eine REST-API in Django geschrieben, mit und Endpunkt, der eine Sellerie-Aufgabe in die Warteschlange stellt, wenn ich sie posten möchte. Die Antwort enthält die Aufgaben-ID, mit der ich testen möchte, ob die Aufgabe erstellt wurde und das Ergebnis erhält. Also, würde Ich mag so etwas wie zu tun:In-Memory-Broker für Sellerie-Unit-Tests

def test_async_job(): 
    response = self.client.post("/api/jobs/", some_test_data, format="json") 
    task_id = response.data['task_id'] 
    result = my_task.AsyncResult(task_id).get() 
    self.assertEquals(result, ...) 

ich natürlich will keine Sellerie Arbeiter laufen muß die Unit-Tests laufen, ich erwarte, dass es irgendwie lustig zu machen. Ich kann CELERY_ALWAYS_EAGER nicht verwenden, weil das den Broker insgesamt zu umgehen scheint, was mich daran hindert, AsyncResult zu verwenden, um die Aufgabe durch seine ID zu erhalten (wie angegeben here).

Ich gehe durch Sellerie und kombu docs, ich habe festgestellt, dass es einen In-Memory-Transport für Komponententests gibt, die tun würden, was ich suche. Ich habe versucht, das Überschreiben der BROKER_URL Einstellung auf die Tests zu verwenden:

@override_settings(BROKER_URL='memory://') 
def test_async_job(): 

Aber das Verhalten ist das gleiche wie mit dem ampq Broker: Es blockiert den Test auf das Ergebnis warten. Irgendeine Idee wie soll ich diesen Broker konfigurieren, damit er in den Tests funktioniert?

+3

Sie benötigen noch einen Mitarbeiter mit dem In-Memory-Broker. Leider glaube ich nicht, was du machen willst ist möglich. Sie müssen entweder einen Worker für die Verwendung mit Ihren Tests starten oder Tasks synchron mit CELERY_ALWAYS_EAGER ausführen (in diesem Fall erhalten Sie kein AsyncResult, wie Sie festgestellt haben). – jrothenbuhler

+0

Warum müssen Sie auf die Aufgabe über ihre ID zugreifen? Dies sieht nach einem guten Kandidaten für Komponententests aus. Warum nicht die Funktion testen, die die Aufgabe direkt erzeugt, anstatt es über HTTP zu tun? Auf diese Weise erhalten Sie das 'EagerResult' mit der gleichen API wie' AsyncResult'. – patrys

+0

mögliches Duplikat von [Komponententest mit Django-Sellerie?] (Http://stackoverflow.com/questions/4055860/unit-testing-with-django-sellery) –

Antwort

11

Sie können die broker_backend in den Einstellungen festlegen:

if 'test' in sys.argv[1:]: 
    BROKER_BACKEND = 'memory' 
    CELERY_ALWAYS_EAGER = True 
    CELERY_EAGER_PROPAGATES_EXCEPTIONS = True 

oder Sie die Einstellungen mit einem Dekorateur

import unittest 
from django.test.utils import override_settings 


class MyTestCase(unittest.TestCase): 

    @override_settings(CELERY_EAGER_PROPAGATES_EXCEPTIONS=True, 
         CELERY_ALWAYS_EAGER=True, 
         BROKER_BACKEND='memory') 
    def test_mytask(self): 
     ... 
4

Sie können mit den Kombu in-Memory-Broker außer Kraft setzen können direkt in Ihrem Test Führen Sie Komponententests durch. Um dies zu tun, müssen Sie jedoch einen Sellerie-Arbeiter mit dem gleichen Sellerie-App-Objekt wie den Django-Server hochfahren.

Um die In-Memory-Broker, setzen BROKER_URL zu memory://localhost/

verwenden Dann können Sie Folgendes tun einen kleinen Sellerie Arbeiter spin up:

app = <Django Celery App> 

# Set the worker up to run in-place instead of using a pool 
app.conf.CELERYD_CONCURRENCY = 1 
app.conf.CELERYD_POOL = 'solo' 

# Code to start the worker 
def run_worker(): 
    app.worker_main() 

# Create a thread and run the worker in it 
import threading 
t = threading.Thread(target=run_worker) 
t.setDaemon(True) 
t.start() 

Sie müssen Sie die verwenden, um sicherzustellen, gleiche App wie die Django Sellerie App-Instanz.

Beachten Sie, dass beim Starten des Arbeiters viele Dinge gedruckt und Protokollierungseinstellungen geändert werden.

4

Hier ist ein Beispiel eines Django TransactionTestCase, das mit Sellerie 4.x funktioniert.

import threading 

from django.test import TransactionTestCase 
from django.db import connections 

from myproj.celery import app # your Celery app 


class CeleryTestCase(TransactionTestCase): 
    """Test case with Celery support.""" 

    @classmethod 
    def setUpClass(cls): 
     super().setUpClass() 
     app.control.purge() 
     cls._worker = app.Worker(app=app, pool='solo', concurrency=1) 
     connections.close_all() 
     cls._thread = threading.Thread(target=cls._worker.start) 
     cls._thread.daemon = True 
     cls._thread.start() 

    @classmethod 
    def tearDownClass(cls): 
     cls._worker.stop() 
     super().tearDownClass() 

Beachten Sie, dies ändern Namen der Warteschlange zu einem Test Warteschlangen nicht, also, wenn Sie auch die App laufen Sie das auch tun wollen werden.

+0

Sehr gute Antwort! Es hindert uns daran, die Option "task_always_eager" zu verwenden, die Arbeiter fälscht. Für den Zugriff auf die aktuelle Sellerie App ist es in der Tat einfacher, nur von Sellerie Import current_app zu tun. – Raffi

+0

Gibt es einen Grund, ein 'TransactionTestCase' anstelle des normalen Testfalls gewählt zu haben? – Jonathan

+0

@danielle Dies funktioniert leider nicht für mich: Ich bekomme conn_errors = self.channel.connection.client.connection_errors AttributError: 'NoneType' Objekt hat kein Attribut 'Client' – Jonathan