【问题标题】:Mass Transit - Is it possible to consume a message from Azure Service bus?Mass Transit - 是否可以使用来自 Azure 服务总线的消息?
【发布时间】:2018-03-12 15:19:16
【问题描述】:

请不要问为什么,我正在做我被告知的事情。 我正在从 Azure 服务总线发送消息。我希望使用 Mass Transit 来使用此消息。我可以使用 Azure 服务总线使用来自 Mass Transit 的消息,但反之则不行。 我只能使用 Mass Transit 轻松发送和接收消息,但不能从 Azure Service Bus 到 Mass Transit。 有谁知道我怎样才能做到这一点?

这是我的实现:

Azure 服务总线发送消息:

  queueClient = new QueueClient("Endpoint=sb:", AzureQueueName);

    var murrr = new MainMessage()
                        {
                            ConsignmentReference = DateTime.Now.Ticks.ToString(),
                            CustomerId = "MOOOOOOOOOOOOOOOOOO",
                            OtherProperty = 123,
                            WantMeToFail = false,

                        };

                        var json = JsonConvert.SerializeObject(murrr);
                        var message2 = new Message(Encoding.UTF8.GetBytes(json));


                        // Send the message to the queue
                        await queueClient.SendAsync(message2);

在 Mass Transit 中消费消息:

  var bus = CreateAzureBus(ConfigurationManager.AppSettings["AzureQueueName"], ConfigurationManager.AppSettings["AzureNamespace"], ConfigurationManager.AppSettings["AzureKey"], ConfigurationManager.AppSettings["AzureKeyName"], configurator =>
            {
                configurator.Subscribe(s =>
                {
                    s.Consumer<Consumer>().Permanent();
                    s.Consumer<RetryConsumer>().Permanent();
                    s.Consumer<SBConsumer>().Permanent();  

                });

                configurator.SetCreateMissingQueues(true);
            });


    private static IServiceBus CreateAzureBus(string azureQueueName, string azureNamespace, string azureKey,
            string azureKeyName, Action<ServiceBusConfigurator> config = null)
        {
            var queueUrl = $"azure-sb://{azureNamespace}/{azureQueueName}";
            return ServiceBusFactory.New(x =>
            {
                x.UseAzureServiceBus(r => r.ConfigureNamespace(azureNamespace, h =>
                {
                    h.SetKey(azureKey);
                    h.SetKeyName(azureKeyName);
                }));

                x.ReceiveFrom(queueUrl);
                x.UseJsonSerializer();

                x.UseAzureServiceBusRouting();
                x.SetCreateMissingQueues(true);

                config?.Invoke(x);
            });
        }

这是我希望使用消息的消费者,我已经尝试过我正在使用的对象和 Azure 服务总线对象消息

 public class SBConsumer : Consumes<Message>.All
    {
        public void Consume(Message message)
        {
            AzureProcessor.DoSomethingWithOurMessage(message);
        }
    }

我也试过这个:

 public class Consumer : Consumes<MainMessage>.All
    {
        public void Consume(MainMessage message)
        {
            Processor.DoSomethingWithOurMessage(message, 0);
        }

    }

【问题讨论】:

    标签: c# azure servicebus masstransit


    【解决方案1】:

    MassTransit 具有特定的消息格式,这是消息反序列化程序所要求的。

    http://masstransit-project.com/MassTransit/advanced/interoperability.html

    如果您想使用任意消息,则需要编写自己的反序列化器。

    【讨论】:

    • 这不回答任何问题
    • 对不起,我对您的问题的理解是,MassTransit 无法使用该消息。原因是您的 Object 消息不是 MT 理解的格式,您需要将其格式化为 JSON 或 XML,并使用类似的消息信封,并将其作为流写入消息正文而不是使用 Object 消息。也可以看一下MT如何向队列发送消息的代码:github.com/MassTransit/MassTransit/blob/develop/src/…
    猜你喜欢
    • 1970-01-01
    • 2021-07-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-01-05
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多