【问题标题】:Publish events with MassTransit使用 MassTransit 发布活动
【发布时间】:2019-07-19 12:38:25
【问题描述】:

我正在尝试在一个 微服务 中发布消息并在另一个中获取它,但无法使用 MassTransit 5.5.3 和 RabbitMQ强>。

据我所知,我们不必创建 ReceiveEndpoint 即可发布事件,所以我只是在两个服务中创建相同的消息接口并发布消息,但作为我可以在 RabbitMQ 中看到它要么无处可去(如果未映射到队列)或进入“_skipped”队列。

出版商:

namespace Publisher
{
    class Program
    {
        static async Task Main(string[] args)
        {
            var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                IRabbitMqHost host = cfg.Host("host", "vhost", h =>
                {
                    h.Username("xxx");
                    h.Password("yyy");
                });
            });

            bus.Start();

            await bus.Publish<Message>(new { Text = "Hello World" });

            Console.ReadKey();

            bus.Stop();
        }
    }

    public interface Message
    {
        string Text { get; set; }
    }
}

消费者:

namespace Consumer
{
    class Program
    {
        static async Task Main(string[] args)
        {
            var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                IRabbitMqHost host = cfg.Host("host", "vhost", h =>
                {
                    h.Username("xxx");
                    h.Password("yyy");
                });

                cfg.ReceiveEndpoint(host, e =>
                {
                    e.Consumer<MbConsumer>();
                });
            });

            bus.Start();

            bool finish = false;

            while(!finish)
            {
                await Task.Delay(1000);
            }

            bus.Stop();
        }
    }

    public interface Message
    {
        string Text { get; set; }
    }

    public class MbConsumer : IConsumer<Message>
    {
        public async Task Consume(ConsumeContext<Message> context)
        {
            await Console.Out.WriteLineAsync(context.Message.Text);
        }
    }
}

我希望消费者在消息发布后得到消息,但它没有得到消息。我认为这是因为完整的消息类型不同(“Publisher.Message”与“Consumer.Message”),所以消息合同不同。我应该如何修复此代码以在消费者中获取事件?看起来我缺少一些关于 RabbitMQMassTransit 的基本知识。

【问题讨论】:

    标签: masstransit


    【解决方案1】:

    你的猜测是正确的。 MassTransit 使用完全限定的类名称作为消息合同名称。 MassTransit 还使用基于类型的路由,因此 FQCN 用于创建交换和绑定。

    因此,如果您将消息类移动到单独的命名空间,例如:

    namespace Messages
    {
        public interface Message
        {
            string Text { get; set; }
        }
    }
    

    然后您可以在发布消息时引用此类型

    await bus.Publish<Messages.Message>(new { Text = "Hello World" });
    

    并定义您的消费者

    public class MbConsumer : IConsumer<Messages.Message>
    {
        public async Task Consume(ConsumeContext<Message> context)
        {
            await Console.Out.WriteLineAsync(context.Message.Text);
        }
    }
    

    它会起作用的。

    您可能还想查看 RMQ 管理 UI 以了解 MassTransit 拓扑。使用您的代码,您将看到两个交换,一个 Publisher.Message 和另一个 Consumer.Message,其中您的消费者队列绑定到 Consumer.Message 交换,但是您将消息发布到 Publisher.Message 交换,它们就消失了。

    我还建议为您的接收端点指定一个有意义的端点名称:

    cfg.ReceiveEndpoint(host, "MyConsumer", e =>
    {
        e.Consumer<MbConsumer>();
    });
    

    【讨论】:

    • 我没有指定名称,因为希望所有消费微服务都能接收消息并进行适当处理,如果我们指出队列名称,则事件只会发送到适合竞争的服务之一消费者用例。
    • 对于竞争消费者,您需要使用相同的队列名称运行相同服务的多个实例。只要您使用不同的队列名称运行不同的服务,它们都会收到一条消息。您不应该使用随机队列名称,因为如果您的服务出现故障,消息将在服务队列中累积,如果服务将获得另一个队列名称,它将丢失所有待处理的消息并且队列变得陈旧。
    • 如果我发布了一条消息而消费者不在那里(可能已关闭)会发生什么。有没有一种设置可以让我将它保存在发布者队列中,直到消费者出现并从他们自己的队列中使用它?
    • 如果消费者之前在那里,消息将堆积在队列中。如果消费者不在那里,消息就会消失。 MassTransit 在消费者应用程序启动时为消费者创建交换、队列和绑定。之后,消息将被传递。这一切都在文档中进行了描述。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-09-19
    • 1970-01-01
    相关资源
    最近更新 更多