Ich habe Python vorher aber nur für Flask-Anwendungen verwendet, aber ich habe noch nie zuvor Sellerie verwendet. Nachdem ich die Dokumente gelesen und alles eingerichtet habe (und es funktioniert, wie ich es mit mehreren Arbeitern getestet habe), versuche ich eine SQL-Abfrage auszuführen und für jede Zeile, die von der Abfrage zurückgegeben wird, sie an Sellery weiterzuleiten Arbeitnehmer.Run Sellery Aufgabe für jede Zeile von MySQL-Abfrage zurückgegeben?
Unten ist ein Beispiel des sehr grundlegenden Codes.
from celery import Celery
import MySQLdb
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def print_domain():
db = MySQLdb.connect(host="localhost", user="DB_USER", passwd="DB_PASS", db="DB_NAME")
cur = db.cursor()
cur.execute("SELECT * FROM myTable")
for row in cur.fetchall():
print_query_result(row[0])
db.close()
def print_query_result(result):
print result
Im Grunde wählt es alles in der Tabelle 'myTable' aus und gibt es für jede zurückgegebene Zeile aus. Wenn ich den Code nur mit Python aufrufe, funktioniert es einwandfrei und druckt alle Daten aus der MySQL-Tabelle. Wenn ich es mit der Funktion .delay() aufruft, um es an einen Worker zur Verarbeitung zu senden, sendet es es nur an den einen Worker und gibt nur die oberste Zeile in der Datenbank aus.
Ich habe versucht, über Teilaufgaben zu lesen, aber ich bin mir nicht sicher, ob ich damit in die richtige Richtung gehe.
Kurz gesagt, ich will, dass das passiert, aber ich habe keine wo ich anfangen soll. Hat jemand irgendwelche Ideen?
- SQL-Abfrage alle Zeilen in der Tabelle wählen
- jede Zeile senden/zu einem Arbeiter führt einig Code
- Return Code Ergebnis zurück in eine Datenbank
- Pick-up nächsten Punkt in der Warteschlange zu verarbeiten (falls any)
Vielen Dank im Voraus.
EDIT 1:
Ich habe meinen Code aktualisiert SQLAlchemy zu verwenden, anstatt, aber die Ergebnisse sind wieder immer noch wie meine alte Abfrage, die in Ordnung ist.
from celery import Celery
from models import DBDomains
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def print_domain():
query = DBDomains.query.all()
for i in query:
print i.domain
print_query_result.s()
@app.task
def print_query_result():
print "Received job"
print_domain.delay()
Der Arbeiter, wenn die .py Datei zurückkehrt läuft:
[2016-08-02 02:08:40,881: INFO/MainProcess] Received task: tasks.print_domain[65d7667a-fc70-41f7-8caa-b991f360a9de]
[2016-08-02 02:08:41,036: WARNING/Worker-3] result1
[2016-08-02 02:08:41,037: WARNING/Worker-3] result2
[2016-08-02 02:08:41,039: INFO/MainProcess] Task tasks.print_domain[65d7667a-fc70-41f7-8caa-b991f360a9de] succeeded in 0.154022816569s: None
Wie Sie sehen können, der Arbeiter bekommt ‚result1‘ und ‚result2‘ aus der Tabelle ich abfragt, aber es dann doesn Es scheint, als würde ich den Befehl in der Teilaufgabe ausführen, die nur "Auftrag empfangen" drucken soll.
UPDATE: Es sieht so aus, als ob die Teilaufgabe eine .delay() am Ende hätte, wie in den Celery-Dokumenten, so sieht mein Code so aus und verteilt erfolgreich die Jobs auf die Arbeiter.
from celery import Celery
from models import DBDomains
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def print_domain():
query = DBDomains.query.all()
for i in query:
subtask = print_query_result.s(i.domain)
subtask.delay()
@app.task
def print_query_result(domain):
print domain
print_domain.delay()
Sie möchten also eine Aufgabe, die eine Abfrage für einen DB und für jede zurückgegebene Zeile eine andere Aufgabe in die Warteschlange stellen? Oder ist es akzeptabel für die oberste Ebene, die die Abfrage ausführt und die neuen Aufgaben als eine reguläre Funktion absetzt? –
Ja, im Grunde wird es eine Aufgabe sein, die die Abfrage durchführt, und dann wird für jedes Ergebnis ein anderes Task/Queue-Element erzeugt, das ein Worker verarbeiten kann.Der Grund dafür ist, dass ich tausende (wenn nicht zehntausende) Datenzeilen habe, die alle 30 Sekunden oder so abgefragt werden. Meine Gedanken waren also umso mehr Arbeiter, die arbeiten, je schneller die Daten verarbeitet werden. Es lässt auch Raum für die Erweiterung, da immer mehr Arbeitskräfte zur Verfügung stehen, je mehr Daten ich habe. Ich habe mich auch mit dem Thema Threading beschäftigt, aber Sellerie schien einfacher zu sein, und Sie können auf entfernte Mitarbeiter aufsteigen. – mphowarth