【问题标题】:Sharing an Oracle database connection between simultaneous Celery tasks在同时执行的 Celery 任务之间共享 Oracle 数据库连接
【发布时间】:2023-03-04 19:09:02
【问题描述】:

我正在使用 Python2.7、Celery 和 cx_Oracle 来访问 Oracle 数据库。

我创建了很多任务。每个任务通过 cx_Oracle 运行一个查询。其中许多任务将同时运行。所有任务都应该共享同一个数据库连接。

如果我只启动一项任务,查询就会正确运行。但是,如果我启动多个查询,我会开始收到以下错误消息:

[2016-04-04 17:12:43,846: ERROR/MainProcess] Task tasks.run_query[574a6e7f-f58e-4b74-bc84-af4555af97d6] raised unexpected: DatabaseError('<cx_Oracle._Error object at 0x7f9976635580>',)
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 240, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 438, in __protected_call__
    return self.run(*args, **kwargs)
  File "/home/ric/workspace/dbw_celery/tasks.py", line 39, in run_query
    column_names = get_column_names(oracle_conn, table_info["table_name"])
  File "/home/ric/workspace/dbw_celery/utilities.py", line 87, in get_column_names
    cursor.execute(query_str)
DatabaseError: <cx_Oracle._Error object at 0x7f9976635580>

现在让我们看看我的代码。

这是我的tasks.py 文件,我在其中创建 Oracle 数据库连接、Celery 实例并定义用户所说的数据库连接的任务:

# tasks.py
import celeryconfig
from celery import Celery
from utilities import connect_to_db, get_new_rows, write_output_rows

# Define a Celery instance
dbwapp = Celery('tasks')
dbwapp.config_from_object(celeryconfig)
dbwapp.conf["CELERYBEAT_SCHEDULE"] = {}

# Define an Oracle connection as a global variable to be used by all tasks
oracle_conn = connect_to_db(db_user, db_pass, db_host, db_port, db_name)

# Define the task function that each Celery worker will run
@dbwapp.task()
def run_query(table_info, output_description):
    """Run a query on a given table. Writes found rows to output file."""
    global oracle_conn

    column_names = get_column_names(oracle_conn, table_info["table_name"])

    new_rows, last_check_timestamp = get_new_rows(oracle_conn, table_info)

    write_result_to_output_file(output_file, new_rows)


def load_celerybeat_schedule():
    """Loads the CELERYBEAT_SCHEDULE dictionary with the tasks to run."""

    new_task_dict = {
        "task": "tasks.run_query",
        "schedule": timedelta(seconds=table_config["check_interval"]),
        "args": (table_config, output_description)
    }
    new_task_name = "task-" + table_config["table_name"]
    dbwapp.conf["CELERYBEAT_SCHEDULE"][new_task_name] = new_task_dict

这是我在utilities.py 文件中连接数据库的方式:

# utilities.py
def connect_to_db(db_user, db_password, db_host, db_port, db_name):
    """Connect to DB."""
    connection_str = "%s/%s@%s:%s/%s" % (db_user, db_password, db_host, db_port, db_name)

    try:
        db_connection = cx_Oracle.connect(connection_str)
    except cx_Oracle.DatabaseError:
        logger.error("Couldn't connect to DB %s" % db_name)
        return None
    logging.info("Succesfully connected to the DB: %s" % db_name)

    return db_connection

这是在另一个文件中定义的get_new_rows_function,查询实际在该文件中运行:

#utilities.py
def get_new_rows(db_connection, table_info):
    """Return new rows inserted in a given table since last check."""
    cursor = db_connection.cursor()
    query_str = "SELECT * FROM {0}".format(table_info["table_name"])
    cursor.execute(query_str)
    new_rows = cursor.fetchall()
    cursor.close()
    return new_rows

我这样运行我的代码:celery -A tasks worker -B

我已尝试简化我的代码以使其更易于理解。

恐怕我得到的错误是由同时运行的不同任务并共享相同的数据库连接引起的。他们的同时执行被“混淆”或类似的东西。

在不同的 Celery 任务之间共享数据库连接的正确方法是什么?

有人知道我做错了什么吗?

【问题讨论】:

    标签: python oracle celery cx-oracle celerybeat


    【解决方案1】:

    如果您希望多个线程共享同一个连接,您需要启用线程模式。像这样的:

    conn = cx_Oracle.connect(connection_str, threaded = True)
    

    如果你不这样做,你可能会遇到一些有趣的问题!

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2015-10-06
      • 1970-01-01
      • 1970-01-01
      • 2012-02-03
      • 2020-02-21
      • 2015-07-04
      • 1970-01-01
      • 2016-06-21
      相关资源
      最近更新 更多