【发布时间】:2019-04-03 22:27:44
【问题描述】:
我有一个由 3 个服务组成的解决方案,有 2 个队列将消息从一个服务传递到另一个服务,它使用 EasyNetQ C# 库连接到 RabbitMQ。
这是架构的轮廓:
- 从网络 TCP 接收数据(做一些处理)推送到 队列 A
- 从队列A读取(做一些处理)推送到队列B
- 从 队列 B 读取,将处理后的数据发送到第 3 方 API
服务 1 和 2 是旧服务,而服务 3 是最近推出的。
问题是 1 和 2 一次处理一个日志,因此发布/订阅模式是有意义的,即服务 1 接收一条消息并将这条消息推送到 队列 A 以进行进一步处理,但服务 3 应向第 3 方 API 发送尽可能多的消息,因为该 API 在单个 HTTP POST 请求中最多可以接受 1000 条消息。
我想过引入一个计时器,例如每 X 分钟,计时器将触发一个事件,从队列中读取多达 1000 条消息并通过 HTTP 将它们发送到第 3 方 API,但我似乎无法找到一种方法手动从队列中读取 N 条消息。
目前我将消息从服务 2 发布到队列 B,如下所示:
_bus.Publish(myObject);
然后在第三个服务中我有一个订阅
_bus.SubscribeAsync<XQueueDTO>("x_queue_processor", ProcessMessageAsync);
但此订阅将始终从队列中一次读取一条消息。
那么,处理这个问题的最佳方法是什么?
【问题讨论】:
-
您能否添加您的消费者样本?