【问题标题】:Celery worker hangs on ZEO database access (race condition?)芹菜工人挂在 ZEO 数据库访问(竞争条件?)
【发布时间】:2013-06-20 19:36:54
【问题描述】:

问题

当使用访问 ZEO 服务器的包时,Celery 工作人员正在等待任务执行。但是,如果我直接在tasks.py 中访问服务器,则完全没有问题。

背景

我有一个程序可以读写ZODB 文件。因为我希望多个用户能够同时访问和修改这个数据库,所以我将它由ZEO server 管理,这应该使其跨多个进程和线程安全。我在程序的一个模块中定义数据库:

from ZEO import ClientStorage
from ZODB.DB import DB

addr = 'localhost', 8090
storage = ClientStorage.ClientStorage(addr, wait=False)
db = DB(storage)

SSCCE

我显然在尝试更复杂的操作,但假设我只想要根对象或其子对象的键。我可以在这种情况下产生问题。

我在模块databases.py 中使用上述代码创建dummy_package,以及一个用于执行数据库访问的基本模块:

# main.py

def get_keys(dict_like):
    return dict_like.keys()

如果我不尝试使用 dummy_package 进行任何数据库访问,我可以导入数据库并毫无问题地访问 root:

# tasks.py
from dummy_package import databases

@task()
def simple_task():

    connection = databases.db.open()
    keys = connection.root().keys()
    connection.close(); databases.db.close()
    return keys  # Works perfectly

但是,尝试传递root 的连接或子代会使任务无限期挂起。

@task()
def simple_task():
    connection = databases.db.open()
    root = connection.root()
    ret = main.get_keys(root)  # Hangs indefinitely
    ...

如果有什么不同,Django 会访问这些 Celery 任务。

问题

那么,首先,这里发生了什么?以这种方式访问​​ ZEO 服务器是否会导致某种竞争情况?

可以让所有数据库访问由 Celery 负责,但这会使代码变得丑陋。此外,它会破坏我的程序作为独立程序运行的能力。在 Celery worker 调用的例程中不能与 ZEO 交互吗?

【问题讨论】:

    标签: python database django celery zodb


    【解决方案1】:

    不要将打开的连接或其根对象保存为全局对象。

    每个线程都需要一个连接;仅仅因为 ZEO 使多个线程可以访问,听起来您正在使用非线程本地的东西(例如,databases.py 中的模块级全局)。

    将 db 保存为全局,但在每个任务期间调用 db.open()。见http://zodb.readthedocs.org/en/latest/api.html#connection-pool

    【讨论】:

    • 感谢您的建议。我阅读了文档,现在只在模块中定义数据库。但是,无论是在tasks.py 的上下文中打开连接,还是直接在我的程序代码中打开连接,我仍然遇到了悬而未决的问题。我更新了我的问题 - 您是否有机会分享任何见解?
    • 很难说;也许将日志记录添加到您的任务方法中以确定事情挂在哪里?
    • 我已经做到了。具体来说,在第一次尝试访问数据库时(即在我的程序中),事情就挂了。我可以一直记录消息,直到访问语句,然后它挂起。我想它可能是multiprocessing issue,但仍在试图弄清楚。
    【解决方案2】:

    我不完全理解发生了什么,但我认为死锁与 Celery 默认使用 multiprocessing 进行并发的事实有关。切换到使用 Eventlet 处理需要访问 ZEO 服务器的任务解决了我的问题。

    我的过程

    1. 启动一个使用uses Eventlet 的worker,以及一个使用标准multiproccesing 的worker。

      celery 是默认队列的名称 (for historical reasons),因此让 Eventlet 工作人员处理此队列:

      $ celery worker --concurrency=500 --pool=eventlet --loglevel=debug \ 
                       -Q celery                --hostname eventlet_worker
      $ celery worker  --loglevel=debug \
                       -Q multiprocessing_queue --hostname multiprocessing_worker
      
    2. Route tasks 需要标准的multiprocessing 到相应的队列。默认情况下,所有其他人将被路由到celery 队列(Eventlet 管理)。 (如果使用 Django,则进入 settings.py):

      CELERY_ROUTES = {'project.tasks.ex_task': {'queue': 'multiprocessing_queue'}}
      

    【讨论】:

      猜你喜欢
      • 2012-04-08
      • 1970-01-01
      • 1970-01-01
      • 2013-08-12
      • 1970-01-01
      • 2017-02-11
      • 1970-01-01
      • 1970-01-01
      • 2011-09-23
      相关资源
      最近更新 更多