【问题标题】:Rq Worker with multiple connections具有多个连接的 Rq Worker
【发布时间】:2020-10-21 12:32:22
【问题描述】:

我在同一个网络中有 3 台服务器。在这些服务器中的每一个上,一个 redis 服务和某种生产者都在运行。生产者将作业排入名为tasks 的本地rq 队列。 所以每个服务器都有自己的tasks 队列。

另外,还有一台服务器正在运行 rq worker。是否可以让该工作人员检查 3 台服务器中的每台服务器上的 tasks 队列?

我已尝试创建连接列表

import redis
from rq import Queue, Worker
from rq import push_connection
# urls = [url1, url2, url3]
connections = list(map(redis.from_url, urls))

然后我用它来创建队列列表。

queues = list(map(lambda c: Queue('tasks', connection=c), connections))

然后我推送所有连接

for connection in connections:
    push_connection(connection)

并将队列传递给Worker

Worker(queues=queues).work()

这导致工作人员仅在最后推送的任何连接上侦听 tasks

我一直在研究rq 上的代码,我想我可以编写一个自定义工作类来执行此操作,但在此之前我想问一下是否有其他方法。甚至可能完全是另一个队列框架?

【问题讨论】:

    标签: python redis distributed-computing


    【解决方案1】:

    好的,我解决了这个问题。我仍然不确定我是否有权在此处发布实际源代码,因此我将概述我的解决方案。

    我必须覆盖 register_birth(self)register_death(self)dequeue_job_and_maintain_ttl(self, timeout)。这些函数的原始实现可以在here找到。

    register_birth

    基本上,你必须遍历所有连接,push_connection(connection),完成注册过程,然后pop_connection()

    注意仅在mapping 变量中列出与该连接对应的队列。原始实现使用queue_names(self) 来获取队列名称列表。您必须执行与 queue_names(self) 相同的操作,但仅限于相关队列。

    register_death

    register_birth 基本相同。遍历所有连接,push_connection(connection),完成与原始实现相同的步骤,pop_connection()

    dequeue_job_and_maintain_ttl

    我们来看看original implementation of this function。在到达try 块之前,我们希望保持一切不变。在这里,我们要无休止地遍历所有连接。您可以使用itertools.cycle 来做到这一点。

    在循环push_connection(connection) 内,并将self.connection 设置为当前连接。如果self.connection = connection 缺失,则作业结果可能无法正确返回。

    现在我们将继续调用self.queue_class.dequeue_any,类似于原始实现。但是我们会将超时设置为1,这样如果当前连接没有任何工作可供工作人员使用,我们就可以继续检查另一个连接。

    确保调用self.queue_class.dequeue_any 时使用与当前连接对应的队列列表。在这种情况下,queues 仅包含相关队列。

    result = self.queue_class.dequeue_any(
        queues, 1, connection=connection, job_class=self.job_class)
    

    之后pop_connection(),并对result 进行与原始实现相同的检查。如果result 不是None,我们已经找到工作要做,需要break 退出循环。

    保留原始实现中的所有其他内容。不要忘记try 块末尾的break。它打破了while True 循环。

    另一件事

    队列包含对其连接的引用。您可以使用它来创建(connection, queues) 的列表,其中queues 包含具有连接connection 的所有队列。

    如果您将结果列表传递给itertools.cycle,您将获得覆盖dequeue_job_and_maintain_ttl 所需的无限迭代器。

    【讨论】:

      猜你喜欢
      • 2018-11-07
      • 1970-01-01
      • 2015-01-20
      • 2023-03-21
      • 1970-01-01
      • 2019-10-16
      • 2017-07-11
      • 2023-03-31
      • 2012-12-08
      相关资源
      最近更新 更多