【问题标题】:NServiceBus Handle Messages as a BatchNServiceBus 批量处理消息
【发布时间】:2016-03-21 16:32:20
【问题描述】:

我发现后端消息处理中出现了常见模式:

ServiceA 生成大量消息。

ServiceB 一次处理一条消息。

ServiceC 向数据库或 Web 服务调用发出调用,从而通过批量调用获得显着的性能和可靠性提升。

在某些情况下,预先批处理来自 ServiceA 的消息或在 ServiceB 中批量处理消息是不可行的,因此首选是单独处理所有消息,直到 ServiceC 的最终调用。这需要在 ServiceC 调用之前进行批处理。

看似理想的情况是有一个 NServiceBus 处理程序签名,可以选择批量传递消息,例如:

public void Handle(FooMessage[] messageBatch)
{
}

在处理程序执行之前,messageBatch 中的任何消息都不会被提交。

这似乎在 NServiceBus 中不受本机支持。我可以一次处理队列中的消息并写入内存,直到批量刷新。但在这种情况下,消息是在刷新之前提交的,如果进程崩溃,我们不会为批处理中的所有消息保留传递保证。

所以问题是:出于某种我没有想到的原因,这是一个糟糕的模式吗?我知道知道何时刷新批处理存在一个固有的问题,但似乎至少有一些传输实现缓冲了批处理中的消息,并且一次只发送一个。在这个级别进行批处理或为定期刷新设置一个简单的超时似乎会起作用。

我是否缺少解决方法或首选模式?

【问题讨论】:

  • 您的解决方案能否与 Saga 的解决方案一起使用?看起来您的服务 C 会从批处理中获得一些东西,而对于系统的其余部分来说,这并不重要(从抽象的角度来看)。像 Saga 之类的东西,甚至是一个简单的存储来保存一些 id/其他信息来执行调用,然后在时间过去或批量达到正确大小时批量执行它们。这可以在 Saga 中轻松完成(当它启动时,例如 30 分钟超时,执行批处理,或在收到 50 个请求后,执行批处理并关闭 saga)。

标签: c# service message-queue nservicebus soa


【解决方案1】:

前期/免责声明:我为 NServiceBus 的制造商 Particular Software 工作。我还写了Learning NServiceBus

历史

在为 Particular 工作之前,我曾经发现自己与您的处境一模一样。我遇到了一种分析类型的情况,其中 12 台 Web 服务器通过 MSMQ 发送相同类型的命令以指示一篇文章已被查看。需要在数据库中跟踪这些计数,以便可以根据视图数量生成“最受欢迎”列表。但是每个页面视图的插入效果都不好,所以我引入了服务总线。

插入器可以从使用表值参数一次插入多达 50-100 条中获益,但 NServiceBus 在事务中一次只为您提供一条消息。

为什么不使用 Saga?

在 NServiceBus 中,任何对多条消息进行操作的东西通常都需要使用 Saga。 (Saga 基本上是一堆相关的消息处理程序,它们在处理每条消息之间保持一些存储状态。)

但是 Saga 必须将其数据存储在某个地方,这通常意味着数据库。那么我们来比较一下:

  • 现在使用 NServiceBus,50 条消息意味着 50 次数据库插入。
  • 假设批量接收,50 条消息将意味着 1 个数据库批量插入。
  • 对于 Sagas,50 条消息输入意味着 50 次 Saga 数据读取 + 50 次 Saga 数据更新,然后是单个数据库批量插入。

因此,Saga 会使“持久性负载”变得更糟。

当然,您可以选择对 Saga 使用内存中的持久性。这将为您提供批处理而无需额外的持久性开销,但如果 Saga 端点崩溃,您可能会丢失部分批处理。因此,如果您不愿意丢失数据,那不是一个选择。

批量接收会是什么样子?

所以即使在几年前,我也曾设想过这样的事情:

// Not a real NServiceBus thing! Only exists in my imagination!
public interface IHandleMessageBatches<TMessage>
{
    void Handle(TMessage[] messages);
    int MaxBatchSize { get; }
}

我们的想法是,如果消息传输可以提前查看并看到许多可用消息,它可以开始接收到 MaxBatchSize 并且您会立即获得所有消息。当然,如果队列中只有 1 条消息,您将得到一个包含 1 条消息的数组。

问题

几年前,我坐下来使用 NServiceBus 代码库,想尝试实现它。好吧,我失败了。当时,尽管 MSMQ 是唯一的传输(在 NServiceBus V3 中),但 API 的架构使得传输代码可以查看队列并一次取出一条消息,从而引发消息处理逻辑的内存事件开始吧。如果没有大规模的重大更改,就不可能改变这一点。

较新版本中的代码更加模块化,这在很大程度上是因为现在支持多种消息传输。但是,仍然假设一次处理一条消息。

进入 V6 的当前实现在 IPushMessages 接口中。在Initialize 方法中,核心将Func&lt;PushContext, Task&gt; pipe 推送到IPushMessages 的传输实现中。

或者用英语,“Hey Transport,当您有可用的消息时,执行此操作将其交给核心,我们将从那里取走它。”

简而言之,这是因为 NServiceBus 旨在一次可靠地处理一条消息。从更详细的角度来看,批处理接收变得困难的原因有很多:

  • 当事务在进行时,接收一个批次需要处理该事务中的所有消息。如果交易规模过大,这很容易失控。
  • 消息类型可能在队列中混合使用。消息类型毕竟只是一个标头。没有办法说“给我一批 T 型消息”。如果您收到一个批次并且它包含 其他 消息类型怎么办?
  • 多个处理程序可以在同一消息类型上运行。例如,如果消息SuperMessage 继承BaseMessage,则两种类型的处理程序都可以在同一消息上运行。在考虑一批消息时,多个处理程序和多态消息处理程序的这种可能性变得非常复杂。
  • 关于多态消息的更多信息,如果批处理是 Handle(BaseMessage[] batch) 但传入的消息是都继承自 BaseMessage 的不同超类型怎么办?
  • 我敢肯定,还有很多其他的事情,我什至没有想到。

总而言之,将 NServiceBus 更改为接受批处理需要针对批处理优化整个管道。单个消息(当前规范)将是数组大小为 1 的专用批处理。

因此,从本质上讲,对于它所提供的有限的商业价值来说,这种改变风险太大了。

建议

我发现每条消息插入一次并不像我想象的那么昂贵。糟糕的是,多个 Web 服务器上的多个线程尝试一次写入数据库并卡在该 RPC 操作中直到完成。

当这些操作被序列化到一个队列中,并且有限的固定数量的线程处理这些消息并以数据库可以处理的速率进行数据库插入时,大多数情况下事情往往运行得非常顺利。

另外,请仔细考虑您在数据库中所做的工作。对现有行的更新比插入便宜得多。就我而言,我真的只关心计数,不需要记录每个单独的页面视图。因此,根据内容 ID 和 5 分钟时间窗口更新记录并更新该记录的读取计数会更便宜,而不是每次读取都插入一条记录并强迫自己进行大量聚合查询。

如果这绝对行不通,您需要考虑在可靠性方面可以做出哪些权衡。您可以使用具有内存持久性的 Saga,但是您可能(并且很可能最终会)丢失整个批次。这很可能是可以接受的,具体取决于您的用例。

您还可以使用消息处理程序写入 Redis,这比数据库便宜,然后使用 Saga 更像调度程序,将数据批量迁移到数据库。您可能可以使用 Kafka 或其他一些技术来做类似的事情。在这些情况下,您需要自行决定需要什么样的可靠性保证,并设置可以实现这一目标的工具。

【讨论】:

  • 谢谢大卫,很棒的回答。起初,我们有点狭隘地考虑,如果/当它被支持时,是否能够从传输中批量提取消息。我们没有考虑类型和多态性问题。我认为我们现在正朝着您最后建议的方向前进,让 Saga 充当针对另一种媒介的调度程序。
猜你喜欢
  • 2016-03-30
  • 2011-09-15
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-07-02
  • 2021-10-09
  • 2019-05-26
  • 1970-01-01
相关资源
最近更新 更多