【问题标题】:Using Multiprocessing concurrency mechanisms in Celery tasks在 Celery 任务中使用多处理并发机制
【发布时间】:2016-03-24 06:42:17
【问题描述】:

我正在尝试与只能接受单个 TCP 连接(内存限制)的设备进行交互,因此仅为每个工作线程启动一个连接不是一个选项,因为它在正常的客户端-服务器情况下,例如数据库连接。

我尝试使用线程间可全局访问的 Multiprocessing Manager dict,格式如下:

clients{(address, port): (connection_obj, multiprocessing.Manager.RLock)}

还有这样一个任务:

from celery import shared_task
from .celery import manager, clients

@shared_task
def send_command(controller, commandname, args):
    """Send a command to the controller."""
    # Create client connection if one does not exist.
    conn = None
    addr, port = controller
    if controller not in clients:
        conn = Client(addr, port)
        conn.connect()
        lock = manager.RLock()
        clients[controller] = (conn, lock,)
        print("New controller connection to %s:%s" % (addr, port,))
    else:
        conn, lock = clients[controller]

    try:
        f = getattr(conn, commandname) # See if connection.commandname() exists.
    except Exception:
        raise Exception("command: %s not known." % (commandname))

    with lock:
        res = f(*args)
        return res

但是任务会因序列化错误而失败,例如:

_pickle.PicklingError: Can't pickle <class '_thread.lock'>: attribute lookup lock on _thread failed

即使我没有使用不可序列化的值调用任务,并且任务也没有尝试返回不可序列化的值,Celery 似乎痴迷于尝试序列化这个全局对象?

我错过了什么?您将如何使 Celery 任务中使用的客户端设备连接成为线程安全且线程之间可访问的?示例代码?

【问题讨论】:

  • 我不确定这是否适用于您的情况,但我只记得阅读过有关multiprocessing.reduction 的内容,它应该允许在进程之间共享套接字连接。 See this blog post for an example.
  • 客户端没有使用原始套接字,它是一个具有协议的 Twisted 连接对象。使用原始套接字或从 fd 重新构造 Twisted 连接对象并非易事。
  • 我最终想出了如何将 Twisted 协议包装在现有套接字周围,但是在我的情况下它不起作用,因为 Celery 消费者作为工作主进程的单独的子进程进程无法访问所需的文件描述符(存储在 Redis 中),并且设置缠结的 unix 管道来共享 FD 实在是太骇人听闻了。我的情况的问题是设备受内存限制,根本不能有多个连接......所以我决定只拥有一个工人池,每个工人都有一个消费者和一个设备。不好!

标签: python multiprocessing celery


【解决方案1】:

如何使用 Redis 实现分布式锁管理器? Redis python 客户端具有内置的锁定功能。另外,请参阅 redis.io 上的 this doc。即使您使用的是 RabbitMQ 或其他代理,Redis 也非常轻量级。

例如,作为装饰者:

from functools import wraps

def device_lock(block=True):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            return_value = None
            have_lock = False
            lock = redisconn.lock('locks.device', timeout=2, sleep=0.01)
            try:
                have_lock = lock.acquire(blocking=block)
                if have_lock:
                    return_value = func(*args, **kwargs)
            finally:
                if have_lock:
                    lock.release()
            return return_value
        return wrapper
    return decorator

@shared_task
@device_lock
def send_command(controller, commandname, args):
    """Send a command to the controller."""
    ...

您还可以使用 Celery 任务食谱中的 this approach

from celery import task
from celery.utils.log import get_task_logger
from django.core.cache import cache
from hashlib import md5
from djangofeeds.models import Feed

logger = get_task_logger(__name__)

LOCK_EXPIRE = 60 * 5 # Lock expires in 5 minutes

@task(bind=True)
def import_feed(self, feed_url):
    # The cache key consists of the task name and the MD5 digest
    # of the feed URL.
    feed_url_hexdigest = md5(feed_url).hexdigest()
    lock_id = '{0}-lock-{1}'.format(self.name, feed_url_hexdigest)

    # cache.add fails if the key already exists
    acquire_lock = lambda: cache.add(lock_id, 'true', LOCK_EXPIRE)
    # memcache delete is very slow, but we have to use it to take
    # advantage of using add() for atomic locking
    release_lock = lambda: cache.delete(lock_id)

    logger.debug('Importing feed: %s', feed_url)
    if acquire_lock():
        try:
            feed = Feed.objects.import_feed(feed_url)
        finally:
            release_lock()
        return feed.url

    logger.debug(
        'Feed %s is already being imported by another worker', feed_url)

【讨论】:

  • 我知道这些解决方案 - 但是我没有使用它的原因是因为它没有做我想要的,即只是在进程之间共享实际的连接对象并使用已经打开的连接.我试图避免每次运行任务时断开和重新连接。如果我使用单个线程运行工作程序并重用它,我可以将连接对象保持为全局对象。我正在考虑为这些客户使用一个单进程工人池。否则,如果我确实选择每次发送消息时都连接,那么我将使用 Redis 进行锁定。在其他解决方案中......
【解决方案2】:
 ...
self._send_bytes(ForkingPickler.dumps(obj))
 File "/usr/lib64/python3.4/multiprocessing/reduction.py", line 50, in dumps
cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <class '_thread.lock'>: attribute lookup lock on _thread failed

浏览互联网后,我意识到我可能错过了追溯中的重要内容。在仔细查看回溯后,我意识到不是 Celery 试图腌制连接对象,而是 Multiprocessing.reduction。减少用于在一侧序列化并在另一侧重构。

我有一些替代方法来解决这个问题 - 但是它们都没有真正做到我最初想要的,即只是借用客户端库连接对象并使用它,而这对于 Multiprocessing 和 prefork 是不可能的。

【讨论】:

  • 啊,我想我的回答有点草率,因为你想传递同一个连接对象。 Mutliprocessing 和 prefork 通常不能很好地处理进程之间的连接和 i/o。您通常希望在分叉后建立连接。是否考虑过从 prefork 切换到 eventlet 或 gevent 进行并发,然后实现连接池?
【解决方案3】:

您是否尝试过使用 gevent 或 eventlet celery worker 来代替进程和线程?在这种情况下,您将能够使用 global var 或 threading.local() 来共享连接对象。

【讨论】:

  • 我正在使用 eventlet 获得锁定。我可以更努力地找出原因,但几乎没有动力,因为我正在尝试做的阻塞 IO 性质不适合 eventlet/gevent 的事件循环性质。
猜你喜欢
  • 2018-06-03
  • 2018-06-25
  • 1970-01-01
  • 2013-08-23
  • 2020-11-18
  • 2015-07-18
  • 2012-06-06
  • 2014-07-17
  • 1970-01-01
相关资源
最近更新 更多