【问题标题】:Grouping tasks within parallel execution to prevent messages with same groupingID being executed concurrently在并行执行中对任务进行分组,以防止具有相同 groupingID 的消息同时执行
【发布时间】:2021-05-14 16:22:34
【问题描述】:

我有一个消费者服务使用 EasyNetQ 订阅者从 RabbitMQ 队列中检索消息。每条消息需要数十秒的时间来处理,我需要并行运行它们以确保我能够跟上生产者的速度。但是,每条消息都有一个属性,称为 groupingId。重要的是不要同时执行具有相同 groupingId 的任务,因为这会导致资源冲突。

可能有数百个 groupingIds,并且在通常的实践中,任何时候都不会有太多消息具有相同的 Id。然而,数据可能是突发性的,导致同时出现数百个相同 Id 的集群。

我认为 TPL Dataflow 可能是一个不错的选择,但我对它不是很熟悉,也不知道如何用它来实现我所需要的。任何指导将不胜感激。

【问题讨论】:

标签: c# task-parallel-library easynetq


【解决方案1】:

创建分组 ID 字典并锁定它们。

首先,在某处创建字典,可能作为成员变量。

ConcurrentDictionary<int,object> _locks = new ConcurrentDictionary<int, object>();

当你需要处理消息时,使用这个逻辑。

if (!_locks.ContainsKey(message.GroupingID))
{
    _locks.TryAdd(message.GroupingID, new object());
}
lock (_locks[message.GroupingID])
{
    ProcessMessage(message);
}

【讨论】:

  • 这确实会阻止处理相同的 groupingId 消息。然而,仅仅丢弃消息并不是一个真正的选择。我的第一种方法是 BlockingCollection 对象的字典。但它看起来非常笨拙。
  • 不知道为什么“丢弃”出现在您的评论中——我的代码中没有任何东西丢弃任何东西。
  • 我很抱歉。你是对的,我没有正确阅读你的代码。我认为该解决方案实际上可以为我工作,而且它比其他任何东西都简单得多。我唯一担心的是锁的数量和我最终可能会遇到的阻塞线程的数量,但弄清楚这一点的唯一方法是测试它。
  • 是的,这是一种蛮力解决方案,会导致大量的锁定和阻塞。另一种方法是建立 10 个线程和 10 个阻塞集合,并根据组 ID 中的最后一位数字为每个集合分配消息——诸如此类。这可能会表现得更好,但取决于您的消息传递系统的实现细节。
猜你喜欢
  • 2016-08-29
  • 2014-08-22
  • 2018-03-05
  • 2021-06-26
  • 1970-01-01
  • 1970-01-01
  • 2017-08-28
  • 1970-01-01
  • 2014-09-14
相关资源
最近更新 更多