【发布时间】:2016-03-24 06:42:17
【问题描述】:
我正在尝试与只能接受单个 TCP 连接(内存限制)的设备进行交互,因此仅为每个工作线程启动一个连接不是一个选项,因为它在正常的客户端-服务器情况下,例如数据库连接。
我尝试使用线程间可全局访问的 Multiprocessing Manager dict,格式如下:
clients{(address, port): (connection_obj, multiprocessing.Manager.RLock)}
还有这样一个任务:
from celery import shared_task
from .celery import manager, clients
@shared_task
def send_command(controller, commandname, args):
"""Send a command to the controller."""
# Create client connection if one does not exist.
conn = None
addr, port = controller
if controller not in clients:
conn = Client(addr, port)
conn.connect()
lock = manager.RLock()
clients[controller] = (conn, lock,)
print("New controller connection to %s:%s" % (addr, port,))
else:
conn, lock = clients[controller]
try:
f = getattr(conn, commandname) # See if connection.commandname() exists.
except Exception:
raise Exception("command: %s not known." % (commandname))
with lock:
res = f(*args)
return res
但是任务会因序列化错误而失败,例如:
_pickle.PicklingError: Can't pickle <class '_thread.lock'>: attribute lookup lock on _thread failed
即使我没有使用不可序列化的值调用任务,并且任务也没有尝试返回不可序列化的值,Celery 似乎痴迷于尝试序列化这个全局对象?
我错过了什么?您将如何使 Celery 任务中使用的客户端设备连接成为线程安全且线程之间可访问的?示例代码?
【问题讨论】:
-
我不确定这是否适用于您的情况,但我只记得阅读过有关
multiprocessing.reduction的内容,它应该允许在进程之间共享套接字连接。 See this blog post for an example. -
客户端没有使用原始套接字,它是一个具有协议的 Twisted 连接对象。使用原始套接字或从 fd 重新构造 Twisted 连接对象并非易事。
-
我最终想出了如何将 Twisted 协议包装在现有套接字周围,但是在我的情况下它不起作用,因为 Celery 消费者作为工作主进程的单独的子进程进程无法访问所需的文件描述符(存储在 Redis 中),并且设置缠结的 unix 管道来共享 FD 实在是太骇人听闻了。我的情况的问题是设备受内存限制,根本不能有多个连接......所以我决定只拥有一个工人池,每个工人都有一个消费者和一个设备。不好!
标签: python multiprocessing celery