【问题标题】:Redis - Better way of cleaning the processing queue(reliable) while using BRPOPLPUSHRedis - 使用 BRPOPLPUSH 时清理处理队列的更好方法(可靠)
【发布时间】:2015-03-15 05:11:58
【问题描述】:

我们目前的设计

环境 Redis 2.8.17

我们已经实现了我们的可靠队列,使用类似于 redis 文档中描述的模式,RPOPLPUSH

但是,考虑到它的阻塞性质,我们使用 BRPOPLPUSH,并使用 LPUSH 来确保 FIFO 顺序。

生产者:多个线程(来自多个服务器)使用 LPUSH 来推送项目。

消费者:多个线程(来自多个服务器)使用 BRPOPLPUSH 来处理项目。

BRPOPLPUSH q processing-q

如文档所述,redis 从队列 'q' 中弹出项目,同时将它们添加到 'processing-q' 中。

问题

由于我们应用程序的多线程(异步)特性,我们无法控制消费者何时完成他们的处理。

因此,如果我们使用 LREM(根据文档)从 processing-q 中删除已处理的元素>,这只会删除 processing-q 的顶部元素。由于无法保证,它是否已删除由相应 consumer 处理的实际元素。

因此,如果我们不做任何事情,processing-q 会继续增长(消耗内存),恕我直言,这是非常糟糕的。

有什么建议或想法吗?

【问题讨论】:

标签: redis queue reliability


【解决方案1】:

我会采取的方法是使用每个消费者的 processing-q(例如 processing-q:consumer-id)。这将解决您当前的问题,但您仍然需要以某种方式处理崩溃的消费者。为此,我建议您还保留每个消费者最后一次弹出任务并定期检查超时。如果消费者已达到超时,则将其任务移回主队列并删除其队列。

【讨论】:

    【解决方案2】:

    在一个类似的项目中,我将主机名和工作人员的进程 ID 用于备份队列。每个worker都有自己的备份队列,如果worker死了,item也不会丢失。

    查看READMEimplementation 了解更多详情。

    【讨论】:

    • @soveran,那么当工作人员在处理过程中死亡并以不同的进程 ID 重新启动时会发生什么?新的工作流程如何查找其前任的备份队列?
    【解决方案3】:

    除了建议的解决方案之外,您还可以ltrim 处理队列,使其达到对您的服务有意义的数量。这将确保处理队列永远不会不成比例地增长。

    但是,如果达到修剪限制,您将开始丢失物品。对于您的用例,这可能会也可能不会接受。

    http://redis.io/commands/ltrim

    【讨论】:

    • 你是对的,在我的用例中,丢失一些物品是不可接受的。
    【解决方案4】:

    您只需在对 LREM 的调用中包含要删除的作业。

    LREM 采用以下形式:

    LREM queue count "object"
    

    它将从 queue 中删除 count 个等于 "object" 的项目。因此,要删除您的消费者线程正在处理的特定工作,您需要执行类似的操作。

    LREM processing-q 1 "job_identifier"
    

    有关更多信息,请参阅此处的文档:http://redis.io/commands/lrem

    然后,为了处理崩溃的消费者和放弃的作业,您可以使用 SETEX 创建具有过期时间的锁,并定期检查没有锁的作业。

    所以整个过程是这样的:

    制片人

    1. RPUSH q "job_identifier"

    消费者

    1. SETEX lock:processing-q:job_identifier 60(先设置锁以避免竞争条件)
    2. BRPOPLPUSH q processing-queue
    3. 处理作业
    4. LREM processing-queue "job_identifier"

    过期作业监视器

    1. 工作 = LRANGE processing-queue 0 -1
    2. foreach 作业中的作业:lock = GET lock:processing-q:job_identifier
    3. 如果锁定为空,则此作业超时,因此从处理中删除-q LREM processing-queue "job_identifier"
    4. 并使用RPUSH q "job_identifier" 重试

    @NotAUser 已经发布了一个开源的 java 实现,这里:https://github.com/graknlabs/redisq/tree/master/src/main/java/ai/grakn/redisq

    【讨论】:

    猜你喜欢
    • 2015-06-25
    • 2021-09-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-10-09
    • 2021-12-25
    • 2017-12-28
    • 2019-11-20
    相关资源
    最近更新 更多