前期/免责声明:我为 NServiceBus 的制造商 Particular Software 工作。我还写了Learning NServiceBus。
历史
在为 Particular 工作之前,我曾经发现自己与您的处境一模一样。我遇到了一种分析类型的情况,其中 12 台 Web 服务器通过 MSMQ 发送相同类型的命令以指示一篇文章已被查看。需要在数据库中跟踪这些计数,以便可以根据视图数量生成“最受欢迎”列表。但是每个页面视图的插入效果都不好,所以我引入了服务总线。
插入器可以从使用表值参数一次插入多达 50-100 条中获益,但 NServiceBus 在事务中一次只为您提供一条消息。
为什么不使用 Saga?
在 NServiceBus 中,任何对多条消息进行操作的东西通常都需要使用 Saga。 (Saga 基本上是一堆相关的消息处理程序,它们在处理每条消息之间保持一些存储状态。)
但是 Saga 必须将其数据存储在某个地方,这通常意味着数据库。那么我们来比较一下:
- 现在使用 NServiceBus,50 条消息意味着 50 次数据库插入。
- 假设批量接收,50 条消息将意味着 1 个数据库批量插入。
- 对于 Sagas,50 条消息输入意味着 50 次 Saga 数据读取 + 50 次 Saga 数据更新,然后是单个数据库批量插入。
因此,Saga 会使“持久性负载”变得更糟。
当然,您可以选择对 Saga 使用内存中的持久性。这将为您提供批处理而无需额外的持久性开销,但如果 Saga 端点崩溃,您可能会丢失部分批处理。因此,如果您不愿意丢失数据,那不是一个选择。
批量接收会是什么样子?
所以即使在几年前,我也曾设想过这样的事情:
// Not a real NServiceBus thing! Only exists in my imagination!
public interface IHandleMessageBatches<TMessage>
{
void Handle(TMessage[] messages);
int MaxBatchSize { get; }
}
我们的想法是,如果消息传输可以提前查看并看到许多可用消息,它可以开始接收到 MaxBatchSize 并且您会立即获得所有消息。当然,如果队列中只有 1 条消息,您将得到一个包含 1 条消息的数组。
问题
几年前,我坐下来使用 NServiceBus 代码库,想尝试实现它。好吧,我失败了。当时,尽管 MSMQ 是唯一的传输(在 NServiceBus V3 中),但 API 的架构使得传输代码可以查看队列并一次取出一条消息,从而引发消息处理逻辑的内存事件开始吧。如果没有大规模的重大更改,就不可能改变这一点。
较新版本中的代码更加模块化,这在很大程度上是因为现在支持多种消息传输。但是,仍然假设一次处理一条消息。
进入 V6 的当前实现在 IPushMessages 接口中。在Initialize 方法中,核心将Func<PushContext, Task> pipe 推送到IPushMessages 的传输实现中。
或者用英语,“Hey Transport,当您有可用的消息时,执行此操作将其交给核心,我们将从那里取走它。”
简而言之,这是因为 NServiceBus 旨在一次可靠地处理一条消息。从更详细的角度来看,批处理接收变得困难的原因有很多:
- 当事务在进行时,接收一个批次需要处理该事务中的所有消息。如果交易规模过大,这很容易失控。
- 消息类型可能在队列中混合使用。消息类型毕竟只是一个标头。没有办法说“给我一批 T 型消息”。如果您收到一个批次并且它包含 其他 消息类型怎么办?
- 多个处理程序可以在同一消息类型上运行。例如,如果消息
SuperMessage 继承BaseMessage,则两种类型的处理程序都可以在同一消息上运行。在考虑一批消息时,多个处理程序和多态消息处理程序的这种可能性变得非常复杂。
- 关于多态消息的更多信息,如果批处理是
Handle(BaseMessage[] batch) 但传入的消息是都继承自 BaseMessage 的不同超类型怎么办?
- 我敢肯定,还有很多其他的事情,我什至没有想到。
总而言之,将 NServiceBus 更改为接受批处理需要针对批处理优化整个管道。单个消息(当前规范)将是数组大小为 1 的专用批处理。
因此,从本质上讲,对于它所提供的有限的商业价值来说,这种改变风险太大了。
建议
我发现每条消息插入一次并不像我想象的那么昂贵。糟糕的是,多个 Web 服务器上的多个线程尝试一次写入数据库并卡在该 RPC 操作中直到完成。
当这些操作被序列化到一个队列中,并且有限的固定数量的线程处理这些消息并以数据库可以处理的速率进行数据库插入时,大多数情况下事情往往运行得非常顺利。
另外,请仔细考虑您在数据库中所做的工作。对现有行的更新比插入便宜得多。就我而言,我真的只关心计数,不需要记录每个单独的页面视图。因此,根据内容 ID 和 5 分钟时间窗口更新记录并更新该记录的读取计数会更便宜,而不是每次读取都插入一条记录并强迫自己进行大量聚合查询。
如果这绝对行不通,您需要考虑在可靠性方面可以做出哪些权衡。您可以使用具有内存持久性的 Saga,但是您可能(并且很可能最终会)丢失整个批次。这很可能是可以接受的,具体取决于您的用例。
您还可以使用消息处理程序写入 Redis,这比数据库便宜,然后使用 Saga 更像调度程序,将数据批量迁移到数据库。您可能可以使用 Kafka 或其他一些技术来做类似的事情。在这些情况下,您需要自行决定需要什么样的可靠性保证,并设置可以实现这一目标的工具。