【问题标题】:Serial processing of a certain message type in RebusRebus中某种消息类型的串行处理
【发布时间】:2013-05-26 02:48:35
【问题描述】:

我们有一个与第三方网络服务对话的 Rebus 消息处理程序。由于我们无法直接控制的原因,此 WCF 服务经常抛出异常,因为它在自己的数据库中遇到了数据库死锁。然后 Rebus 将尝试处理此消息五次,这在大多数情况下意味着这五次中的一次将是幸运的并且不会陷入僵局。但是经常发生消息在死锁之后确实会死锁并最终进入我们的错误队列。

除了解决死锁的根源(这将是一个长期目标)之外,我还可以想到两个选择:

  1. 继续尝试仅使用此特定消息类型,直到成功。最好我可以设置一个超时,所以“如果五个死锁然后在 5 分钟内再试一次”,而不是通过连续尝试来进一步阻塞进程。我已经做了一个 Thread.Sleep(random) 来传播消息,但它仍然会在尝试五次后放弃。

  2. 将此特定消息类型发送到一个不同的队列,该队列只有一个处理消息的工作人员,因此这会以串行方式而不是并行方式发生。我们当前的配置使用 8 个工作线程,但这只会使死锁情况变得更糟,因为现在 Web 服务被并发调用并且消息相互干扰。

选项 #2 有我的偏好,但我不确定这是否可行。我们在接收方的配置目前如下所示:

var adapter = new Rebus.Ninject.NinjectContainerAdapter(this.Kernel);

var bus = Rebus.Configuration.Configure.With(adapter)
    .Logging(x => x.Log4Net())
   .Transport(t => t.UseMsmqAndGetInputQueueNameFromAppConfig())
   .MessageOwnership(d => d.FromRebusConfigurationSection())
   .CreateBus().Start();

以及接收方的.config

<rebus inputQueue="app.msg.input" errorQueue="app.msg.error" workers="8">
  <endpoints>
  </endpoints>
</rebus>

从配置中可以看出,只能将一个输入队列设置为“监听”。我也无法通过流畅的映射 API 找到一种方法。这似乎也只需要一个输入和错误队列:

.Transport(t =>t.UseMsmq("input", "error"))

基本上,我正在寻找的内容大致如下:

<rebus workers="8">
  <input name="app.msg.input" error="app.msg.error" />
  <input name="another.input.queue" error="app.msg.error" />
</rebus>

关于如何处理我的要求的任何提示?

【问题讨论】:

    标签: c# .net message-bus rebus


    【解决方案1】:

    我建议您使用 saga 和 Rebus 的超时服务来实施适合您需求的重试策略。这样,在您启用 Rebus 的 Web 服务外观中,您可以执行以下操作:

    public void Handle(TryMakeWebServiceCall message)
    {
        try
        {
            var result = client.MakeWebServiceCall(whatever);
    
            bus.Reply(new ResponseWithTheResult{ ... });
        }
        catch(Exception e)
        {
            Data.FailedAttempts++;
    
            if (Data.FailedAttempts < 10)
            {
                bus.Defer(TimeSpan.FromSeconds(1), message);
                return;
            }
    
            // oh no! we failed 10 times... this is probably where we'd
            // go and do something like this:
            emailService.NotifyAdministrator("Something went wrong!");
        }
    }
    

    其中Data 是传奇数据,它神奇地提供给您并在调用之间保持不变。

    有关如何创建 saga 的灵感,请查看coordinating stuff that happens over time 上的 wiki 页面,您可以在其中看到有关服务如何将某些状态(即您的案例中的失败尝试次数)存储在本地的示例在处理消息之间可用。

    到了让bus.Defer 工作的时候,你有两个选择:1) 使用外部超时服务(我通常在每台服务器上安装一个),或者 2) 只使用“你自己”作为超时服务。

    在配置时,你去

    Configure.With(...)
        .(...)
        .Timeouts(t => // configure it here)
    

    您可以在其中StoreInMemoryStoreInSqlServerStoreInMongoDbStoreInRavenDbUseExternalTimeoutManager

    如果您选择 (1),您需要查看 Rebus 代码并自己构建 Rebus.Timeout - 它基本上只是一个可配置的、支持 Topshelf 的控制台应用程序,其中包含一个 Rebus 端点。

    如果您需要更多帮助来完成这项工作,请告诉我 - bus.Defer 是您的系统变得非常棒的地方,并且能够克服导致所有其他系统崩溃的所有小故障 :)

    【讨论】:

    • 好吧,酷!明天我一定会看看sagas。我没有立即看到如何配置 Saga 状态的存储位置(对我们来说可能是 SQL Server),但我确信我可以在某个地方找到它。谢谢!如果这对我有用,我会告诉你。编辑:已经在哪里找到:)
    • 我可以在这里看到一些增长潜力,因为此网络服务用于下订单,我们向客户承诺,如果他们在特定时间之前下订单,他们会在下一个交付日。使用这种方法,我们可以开始发送紧急电子邮件/文本,以防“截止日期”临近并且仍有消息需要处理。
    • 我已经尝试过完成这项工作,而且我已经走了很长一段路,但我遇到了一个问题。当bus.Defer 发生时,它会序列化我定义的ISagaData 类,但在反序列化时,Newtonsoft.Json 声称它无法加载定义该类型的程序集。消息内容(在 SQL Server 中):@ 987654335@。这个程序集是一个用Assembly.LoadFile 加载的插件。
    • 当我使用始终加载的“核心”程序集而不是动态加载的程序集时,它确实有效。问题是,我确定在 Rebus IBus 设置时插件程序集已经加载。所以我不知道为什么它没有被发现。我宁愿在插件程序集中定义 saga 类型,以便只有那些插件依赖于 Rebus。
    • 在 bus.Defer 调用期间,您创建一个新的消息实例,而不是从方法参数传递原始消息变量是否有特殊原因?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-01-12
    • 1970-01-01
    • 1970-01-01
    • 2012-07-27
    • 1970-01-01
    相关资源
    最近更新 更多