【问题标题】:How to generate Queues per user inside celery?如何在 celery 中为每个用户生成队列?
【发布时间】:2019-09-06 03:28:48
【问题描述】:

所以我正在尝试将网络请求中的阻塞内容作为后台任务并利用队列。我也是消息传递和发布/订阅的新手。用户在那里推送数据并对其进行处理,然后通知用户。我为此做了一个 celery 设置,发现它不能满足我的用例,因为每个用户都有自己的任务专用队列。

我尝试指定缺失队列的创建和工作人员生成期间(发送以逗号分隔的队列名称),并将它们列在队列设置中,如先前在互联网上的答案中所述,“使用 celery 创建动态队列”。它创建队列,但当我指定与设置和命令行中指定名称不同的队列名称时,它不会。解决方案是使用不满足用例的队列名称生成更多工作人员,因为将有数百万个数据处理请求。

我发现 python-rq 具有 Queue 对象初始化及其名称,我认为它会创建新队列。如果是这样,转向 RQ 是否正确?

redis_conn = Redis()
q = Queue('some_queue', connection=redis_conn)

我想要的是每个用户在后台为他们自己的任务排队。我在 celery 中没有看到任何在线创建动态队列的解决方案(没有在命令行中指定队列名称或设置)。 python-rq 似乎有这个解决方案。我的权衡是从 RabbitMQ 和 celery 迁移到 redis。

有没有办法在 celery 中真正实现每个用户的队列?如果是,请列出步骤。还是这种设计模式不正确? pubsub 会满足用例吗?

【问题讨论】:

    标签: django redis rabbitmq celery


    【解决方案1】:

    Celery worker 仅消耗由task_queues 设置定义的队列或在命令行上使用-Q 选项给出的队列。但是,这可以从命令行或代码动态更改。只需确保在设置中启用task_create_missing_queues(这是默认设置),以便在您开始消费时自动创建新队列。

    因此,在为给定用户发送任务之前,您必须指示工作人员开始从用户队列中消费。这可以通过使用add_consumer 控制命令从命令行或使用app.control.add_consumer() 方法从代码中实现。这些操作是幂等的,因此如果工作人员已经从队列中消费,则不会发生任何事情。如果队列尚不存在,则会自动创建。

    【讨论】:

    • 这解决了我的问题@Tomáš Linhart。我现在可以在获取工作控制后添加队列。现在会是这样吗……工人将使用并发来处理队列中的作业?那么我需要增加工人数量或并发设置吗?我将收到来自用户的数百万条数据处理请求。
    • 是的,并发设置在这里正常应用。您将不得不尝试设置以获得最佳性能。另外,请查看autoscaling 选项。
    • 尝试那个东西会很有趣。好吧,感谢 @Tomáš Linhart 提供的帮助。
    猜你喜欢
    • 1970-01-01
    • 2021-06-06
    • 1970-01-01
    • 2023-01-09
    • 2016-05-20
    • 2022-06-10
    • 1970-01-01
    • 1970-01-01
    • 2012-05-24
    相关资源
    最近更新 更多