2016-08-01 21 views
1

Ich habe Python vorher aber nur für Flask-Anwendungen verwendet, aber ich habe noch nie zuvor Sellerie verwendet. Nachdem ich die Dokumente gelesen und alles eingerichtet habe (und es funktioniert, wie ich es mit mehreren Arbeitern getestet habe), versuche ich eine SQL-Abfrage auszuführen und für jede Zeile, die von der Abfrage zurückgegeben wird, sie an Sellery weiterzuleiten Arbeitnehmer.Run Sellery Aufgabe für jede Zeile von MySQL-Abfrage zurückgegeben?

Unten ist ein Beispiel des sehr grundlegenden Codes.

from celery import Celery 
import MySQLdb 

app = Celery('tasks', broker='redis://localhost:6379/0') 
@app.task 
def print_domain(): 
    db = MySQLdb.connect(host="localhost", user="DB_USER", passwd="DB_PASS", db="DB_NAME") 
    cur = db.cursor() 
    cur.execute("SELECT * FROM myTable") 

    for row in cur.fetchall(): 
     print_query_result(row[0]) 

    db.close() 

def print_query_result(result): 
    print result 

Im Grunde wählt es alles in der Tabelle 'myTable' aus und gibt es für jede zurückgegebene Zeile aus. Wenn ich den Code nur mit Python aufrufe, funktioniert es einwandfrei und druckt alle Daten aus der MySQL-Tabelle. Wenn ich es mit der Funktion .delay() aufruft, um es an einen Worker zur Verarbeitung zu senden, sendet es es nur an den einen Worker und gibt nur die oberste Zeile in der Datenbank aus.

Ich habe versucht, über Teilaufgaben zu lesen, aber ich bin mir nicht sicher, ob ich damit in die richtige Richtung gehe.

Kurz gesagt, ich will, dass das passiert, aber ich habe keine wo ich anfangen soll. Hat jemand irgendwelche Ideen?

  • SQL-Abfrage alle Zeilen in der Tabelle wählen
  • jede Zeile senden/zu einem Arbeiter führt einig Code
  • Return Code Ergebnis zurück in eine Datenbank
  • Pick-up nächsten Punkt in der Warteschlange zu verarbeiten (falls any)

Vielen Dank im Voraus.

EDIT 1:

Ich habe meinen Code aktualisiert SQLAlchemy zu verwenden, anstatt, aber die Ergebnisse sind wieder immer noch wie meine alte Abfrage, die in Ordnung ist.

from celery import Celery 
from models import DBDomains 

app = Celery('tasks', broker='redis://localhost:6379/0') 
@app.task 
def print_domain(): 
    query = DBDomains.query.all() 
    for i in query: 
     print i.domain 
     print_query_result.s() 

@app.task 
def print_query_result(): 
    print "Received job" 

print_domain.delay() 

Der Arbeiter, wenn die .py Datei zurückkehrt läuft:

[2016-08-02 02:08:40,881: INFO/MainProcess] Received task: tasks.print_domain[65d7667a-fc70-41f7-8caa-b991f360a9de] 
[2016-08-02 02:08:41,036: WARNING/Worker-3] result1 
[2016-08-02 02:08:41,037: WARNING/Worker-3] result2 
[2016-08-02 02:08:41,039: INFO/MainProcess] Task tasks.print_domain[65d7667a-fc70-41f7-8caa-b991f360a9de] succeeded in 0.154022816569s: None 

Wie Sie sehen können, der Arbeiter bekommt ‚result1‘ und ‚result2‘ aus der Tabelle ich abfragt, aber es dann doesn Es scheint, als würde ich den Befehl in der Teilaufgabe ausführen, die nur "Auftrag empfangen" drucken soll.

UPDATE: Es sieht so aus, als ob die Teilaufgabe eine .delay() am Ende hätte, wie in den Celery-Dokumenten, so sieht mein Code so aus und verteilt erfolgreich die Jobs auf die Arbeiter.

from celery import Celery 
from models import DBDomains 

app = Celery('tasks', broker='redis://localhost:6379/0') 
@app.task 
def print_domain(): 
    query = DBDomains.query.all() 
    for i in query: 
     subtask = print_query_result.s(i.domain) 
     subtask.delay() 


@app.task 
def print_query_result(domain): 
    print domain 

print_domain.delay() 
+0

Sie möchten also eine Aufgabe, die eine Abfrage für einen DB und für jede zurückgegebene Zeile eine andere Aufgabe in die Warteschlange stellen? Oder ist es akzeptabel für die oberste Ebene, die die Abfrage ausführt und die neuen Aufgaben als eine reguläre Funktion absetzt? –

+0

Ja, im Grunde wird es eine Aufgabe sein, die die Abfrage durchführt, und dann wird für jedes Ergebnis ein anderes Task/Queue-Element erzeugt, das ein Worker verarbeiten kann.Der Grund dafür ist, dass ich tausende (wenn nicht zehntausende) Datenzeilen habe, die alle 30 Sekunden oder so abgefragt werden. Meine Gedanken waren also umso mehr Arbeiter, die arbeiten, je schneller die Daten verarbeitet werden. Es lässt auch Raum für die Erweiterung, da immer mehr Arbeitskräfte zur Verfügung stehen, je mehr Daten ich habe. Ich habe mich auch mit dem Thema Threading beschäftigt, aber Sellerie schien einfacher zu sein, und Sie können auf entfernte Mitarbeiter aufsteigen. – mphowarth

Antwort

1

Jedes Mal, wenn Sie innerhalb einer Aufgabe, eine Aufgabe aufrufen, haben Sie subtasks zu verwenden. Glücklicherweise ist die Syntax einfach.

from celery import Celery 

app = Celery('tasks', broker='redis://127.0.0.1:6379/0') 


@app.task 
def print_domain(): 
    for x in range(20): 
     print_query_result.s(x) 


@app.task 
def print_query_result(result): 
    print(result) 

(Ersatz für x in Bereich (20) mit den Abfrageergebnissen.) Und wenn Sie den Sellerie Ausgabe anschauen, werden Sie die Aufgaben erstellt und verteilt über die Arbeiter sehen.

+0

Leider scheint das nicht zu funktionieren, ich habe mein OP mit dem Code aktualisiert, den ich jetzt habe. Es sieht nicht einmal so aus, als ob die Subtask korrekt aufgerufen wird, obwohl die Ergebnisse der Abfrage definitiv zurückkehren. Danke für die Unterstützung bis jetzt. – mphowarth

+0

Ich habe mein OP wieder aktualisiert, da ich das Problem gefunden zu haben scheint. Danke für Ihre Hilfe und wies mich in die richtige Richtung! – mphowarth

+0

Odd könnte ein Unterschied in den Versionen sein. Die Probe, die ich gepostet habe, hat richtig funktioniert. –