【发布时间】:2021-05-14 16:22:34
【问题描述】:
我有一个消费者服务使用 EasyNetQ 订阅者从 RabbitMQ 队列中检索消息。每条消息需要数十秒的时间来处理,我需要并行运行它们以确保我能够跟上生产者的速度。但是,每条消息都有一个属性,称为 groupingId。重要的是不要同时执行具有相同 groupingId 的任务,因为这会导致资源冲突。
可能有数百个 groupingIds,并且在通常的实践中,任何时候都不会有太多消息具有相同的 Id。然而,数据可能是突发性的,导致同时出现数百个相同 Id 的集群。
我认为 TPL Dataflow 可能是一个不错的选择,但我对它不是很熟悉,也不知道如何用它来实现我所需要的。任何指导将不胜感激。
【问题讨论】:
-
一个想法可能是使用带键的异步锁,基本上是
SemaphoreSlims 的字典。你可以在这里找到一些实现:Asynchronous locking based on a key -
这里是 Polly/TPL 数据流方法:Send parallel requests but only one per host。
标签: c# task-parallel-library easynetq