【问题标题】:MassTransit and event versus command publishingMassTransit 和事件与命令发布
【发布时间】:2012-07-27 15:23:02
【问题描述】:

我是 MassTransit 的新手,我想念一些我理解的东西。

假设我有一个服务器场,所有节点都可以做同样的工作。应用程序框架是 CQRS 的风格。这意味着我要发布两种基本类型的消息:

  • 命令:必须由其中一个服务器处理,其中任何一个(第一个具有空闲作业槽)
  • 事件:必须由所有服务器处理

我已经构建了一个非常简单的 MassTransit 原型(一个每隔 X 秒发送一次问候的控制台应用程序)。

在 API 中,我可以看到有一个“发布”方法。如何指定它是什么类型的消息(一个与所有服务器)?

如果我查看“处理程序”配置,我可以指定队列 uri。如果我为所有主机指定相同的队列,所有主机都会收到消息,但我不能将执行限制在一台服务器上。

如果我从主机专用队列监听,只有一个服务器会处理消息,但我不知道如何广播其他类型的消息。

请帮助我了解我缺少什么。

PS:如果有关系,我的消息系统是rabbitmq。

为了测试,我用这些类创建了一个通用类库:

public static class ActualProgram
{
    private static readonly CancellationTokenSource g_Shutdown = new CancellationTokenSource();

    private static readonly Random g_Random = new Random();

    public static void ActualMain(int delay, int instanceName)
    {
        Thread.Sleep(delay);
        SetupBus(instanceName);

        Task.Factory.StartNew(PublishRandomMessage, g_Shutdown.Token);

        Console.WriteLine("Press enter at any time to exit");
        Console.ReadLine();
        g_Shutdown.Cancel();

        Bus.Shutdown();
    }

    private static void PublishRandomMessage()
    {
        Bus.Instance.Publish(new Message
        {
            Id = g_Random.Next(),
            Body = "Some message",
            Sender = Assembly.GetEntryAssembly().GetName().Name
        });

        if (!g_Shutdown.IsCancellationRequested)
        {
            Thread.Sleep(g_Random.Next(500, 10000));
            Task.Factory.StartNew(PublishRandomMessage, g_Shutdown.Token);
        }
    }

    private static void SetupBus(int instanceName)
    {
        Bus.Initialize(sbc =>
        {
            sbc.UseRabbitMqRouting();
            sbc.ReceiveFrom("rabbitmq://localhost/simple" + instanceName);
            sbc.Subscribe(subs =>
            {
                subs.Handler<Message>(MessageHandled);
            });
        });
    }

    private static void MessageHandled(Message msg)
    {
        ConsoleColor color = ConsoleColor.Red;
        switch (msg.Sender)
        {
            case "test_app1":
                color = ConsoleColor.Green;
                break;

            case "test_app2":
                color = ConsoleColor.Blue;
                break;

            case "test_app3":
                color = ConsoleColor.Yellow;
                break;
        }
        Console.ForegroundColor = color;
        Console.WriteLine(msg.ToString());
        Console.ResetColor();
    }

    private static void MessageConsumed(Message msg)
    {
        Console.WriteLine(msg.ToString());
    }
}

public class Message
{
    public long Id { get; set; }

    public string Sender { get; set; }

    public string Body { get; set; }

    public override string ToString()
    {
        return string.Format("[{0}] {1} : {2}" + Environment.NewLine, Id, Sender, Body);
    }
}

我还有 3 个只运行 ActualMain 方法的控制台应用程序:

internal class Program
{
    private static void Main(string[] args)
    {
        ActualProgram.ActualMain(0, 1);
    }
}

【问题讨论】:

    标签: masstransit


    【解决方案1】:

    您想要的是竞争消费者(搜索 SO,您会找到更多信息) 使用 RabbitMQ 让生活变得简单,您只需为您启动的每个消费者指定相同的队列名称,消息将仅由其中一个处理。 而不是每次都生成一个唯一的队列。

    private static void SetupBus(int instanceName)
    {
        Bus.Initialize(sbc =>
        {
            sbc.UseRabbitMqRouting();
            sbc.ReceiveFrom("rabbitmq://localhost/Commands);
            sbc.Subscribe(subs =>
            {
                subs.Handler<Message>(MessageHandled);
            });
        });
    }
    

    AFAIK,您需要为命令处理程序而不是事件处理程序设置一个单独的进程。所有命令处理程序将ReceiveFrom 同一个队列,所有事件处理程序将ReceiveFrom 自己的唯一队列。

    另一个难题是如何将消息发送到总线。您仍然可以对命令使用发布,但是如果您错误地配置了消费者,您可能会执行多次执行,因为消息将发送给所有消费者,如果您想保证消息最终在单个队列中,您可以使用 Send 而不是Publish.

         Bus.Instance
             .GetEndpoint(new Uri("rabbitmq://localhost/Commands"))
            .Send(new Message
            {
                Id = g_Random.Next(),
                Body = "Some message",
                Sender = Assembly.GetEntryAssembly().GetName().Name
            });
    

    【讨论】:

    • 谢谢。这对我理解差异有很大帮助。唯一让我害怕的是“你需要为命令处理程序而不是事件处理程序设置一个单独的进程”。这将对全球架构产生影响,但如果没有其他选择,我会接受。
    • 对于竞争消费者,您还需要为每个消费者提供单独的总线实例。命令通常在单独的总线实例上的原因通常是它们是同步处理的,而消费者是在多个线程上处理的。 MT 不允许您基于每个消息类型指定并发。我认为可以在一个进程中托管多条总线,但我还没有尝试过。我使用 topshelf(来自同一个人)来托管我的每个“服务”。它允许您在部署时非常轻松地选择进程中(appdomains)或单独的进程/单独的机器
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-04-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多