Ich arbeite mit Python2.7, Sellerie und cx_Oracle für den Zugriff auf die Oracle-Datenbank.Gemeinsame Nutzung einer Oracle-Datenbankverbindung zwischen simultanen Sellerie-Aufgaben
Ich erstelle eine Menge Aufgaben. Jeder Task führt eine Abfrage über cx_Oracle aus. Viele dieser Aufgaben werden gleichzeitig ausgeführt. Alle Aufgaben sollten dieselbe Datenbankverbindung haben.
Wenn ich nur eine Aufgabe starte, wird die Abfrage korrekt ausgeführt. Wenn ich jedoch mehrere Abfragen starte, bekomme ich folgende Fehlermeldung:
[2016-04-04 17:12:43,846: ERROR/MainProcess] Task tasks.run_query[574a6e7f-f58e-4b74-bc84-af4555af97d6] raised unexpected: DatabaseError('<cx_Oracle._Error object at 0x7f9976635580>',)
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 240, in trace_task
R = retval = fun(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 438, in __protected_call__
return self.run(*args, **kwargs)
File "/home/ric/workspace/dbw_celery/tasks.py", line 39, in run_query
column_names = get_column_names(oracle_conn, table_info["table_name"])
File "/home/ric/workspace/dbw_celery/utilities.py", line 87, in get_column_names
cursor.execute(query_str)
DatabaseError: <cx_Oracle._Error object at 0x7f9976635580>
Lassen Sie uns nun meinen Code ansehen.
Dies ist meine tasks.py
Datei, wo ich die Oracle-Datenbankverbindung, Sellerie-Instanz erstellen und meine Aufgaben definieren, welche Benutzer die werden die Datenbankverbindung:
# tasks.py
import celeryconfig
from celery import Celery
from utilities import connect_to_db, get_new_rows, write_output_rows
# Define a Celery instance
dbwapp = Celery('tasks')
dbwapp.config_from_object(celeryconfig)
dbwapp.conf["CELERYBEAT_SCHEDULE"] = {}
# Define an Oracle connection as a global variable to be used by all tasks
oracle_conn = connect_to_db(db_user, db_pass, db_host, db_port, db_name)
# Define the task function that each Celery worker will run
@dbwapp.task()
def run_query(table_info, output_description):
"""Run a query on a given table. Writes found rows to output file."""
global oracle_conn
column_names = get_column_names(oracle_conn, table_info["table_name"])
new_rows, last_check_timestamp = get_new_rows(oracle_conn, table_info)
write_result_to_output_file(output_file, new_rows)
def load_celerybeat_schedule():
"""Loads the CELERYBEAT_SCHEDULE dictionary with the tasks to run."""
new_task_dict = {
"task": "tasks.run_query",
"schedule": timedelta(seconds=table_config["check_interval"]),
"args": (table_config, output_description)
}
new_task_name = "task-" + table_config["table_name"]
dbwapp.conf["CELERYBEAT_SCHEDULE"][new_task_name] = new_task_dict
Dies ist, wie ich in die Datenbank in der utilities.py
Datei verbinden :
# utilities.py
def connect_to_db(db_user, db_password, db_host, db_port, db_name):
"""Connect to DB."""
connection_str = "%s/%[email protected]%s:%s/%s" % (db_user, db_password, db_host, db_port, db_name)
try:
db_connection = cx_Oracle.connect(connection_str)
except cx_Oracle.DatabaseError:
logger.error("Couldn't connect to DB %s" % db_name)
return None
logging.info("Succesfully connected to the DB: %s" % db_name)
return db_connection
Dies ist die get_new_rows_function
in einer anderen Datei definiert, wo die Abfrage tatsächlich ausgeführt wird:
#utilities.py
def get_new_rows(db_connection, table_info):
"""Return new rows inserted in a given table since last check."""
cursor = db_connection.cursor()
query_str = "SELECT * FROM {0}".format(table_info["table_name"])
cursor.execute(query_str)
new_rows = cursor.fetchall()
cursor.close()
return new_rows
Ich betreibe mein Code wie folgt aus: celery -A tasks worker -B
Ich habe versucht, meinen Code zu vereinfachen, um es leichter zu verstehen.
Ich fürchte, dass der Fehler, den ich bekomme, durch verschiedene Aufgaben verursacht wird, die gleichzeitig ausgeführt werden und die gleiche Datenbankverbindung teilen. Ihre gleichzeitige Ausführung wird "durcheinander" oder so ähnlich.
Wie kann eine Datenbankverbindung zwischen verschiedenen Sellerie-Aufgaben geteilt werden?
Weiß jemand, was ich falsch mache?