【问题标题】:Receive Event Grid from a Service Bus Handler on Azure Services从 Azure 服务上的服务总线处理程序接收事件网格
【发布时间】:2019-12-05 21:49:41
【问题描述】:

我正在事件网格上发送事件,例如。我可以看到他们到达了 azure 仪表板

            NomeEmailChange yay = new NomeEmailChange
            {
                Nome = "cesco",
                Email = "cesco"
            };

            var primaryTopicKey = _config["EventGridConfig:AcessKey"];
            var primaryTopic = _config["EventGridConfig:Endpoint"];

            var primaryTopicHostname = new Uri(primaryTopic).Host;

            var topicCredentials = new TopicCredentials(primaryTopicKey);
            var client = new EventGridClient(topicCredentials);

            var id = Guid.NewGuid().ToString();
            var hey = new List<EventGridEvent>
            {
                new EventGridEvent()
                {
                    Id = id,
                    EventType = "cesco-cesco",
                    Data = (yay),
                    EventTime = DateTime.Now,
                    Subject = "MS_Clientes",
                    DataVersion = "1.0",
                }
            };
            ;
            client.PublishEventsAsync(primaryTopicHostname, hey);

然后我创建了一个事件网格订阅。我可以确认到达事件网格订阅的事件网格消息。

在另一个项目中,我订阅了如下所示的服务总线。它适用于消费直接发送到总线的消息。

        public static IServiceCollection AddBus(this IServiceCollection services, IConfiguration configuration,
            IHostingEnvironment env)
        {
            services.AddMassTransit(x => { x.AddConsumer<NomeEmailChangeConsumer>(); });
            services.AddSingleton(provider => Bus.Factory.CreateUsingAzureServiceBus(cfg =>
            {
                var keyName = "RootManageSharedAccessKey";
                var busName = configuration["ServiceBus:Name"];
                var secret = configuration["ServiceBus:Secret"];
                var host = cfg.Host(
                    "Endpoint=sb://" + busName + ".servicebus.windows.net/;" +
                    "SharedAccessKeyName=" + keyName + ";" +
                    "SharedAccessKey=" + secret,
                    z =>
                    {
                        TokenProvider
                            .CreateSharedAccessSignatureTokenProvider(keyName, secret);
                    });
                cfg.UseExtensionsLogging(provider.GetService<ILoggerFactory>());
                cfg.ReceiveEndpoint(host, configuration["ServiceBus:Topic"],
                    e => { e.Consumer<NomeEmailChangeConsumer>(provider); });
            }));
            services.AddSingleton<IPublishEndpoint>(provider => provider.GetRequiredService<IBusControl>());
            services.AddSingleton<ISendEndpointProvider>(provider => provider.GetRequiredService<IBusControl>());
            services.AddSingleton<IBus>(provider => provider.GetRequiredService<IBusControl>());
            services.AddScoped(provider => provider.GetRequiredService<IBus>().CreateRequestClient<NomeEmailChange>());
            services.AddSingleton<IHostedService, BusService>();
            return services;
        }

现在应该一切正常,但是在另一个项目上,当消息到达时,我收到以下错误

fail: MassTransit.Messages[0]
      R-FAULT sb://sbacompanharreldev.servicebus.windows.net/bff-queue 9ade19ec-238c-4c08-8e03-28bac695ea7b No deserializer was registered for the message content type: application/json; charset=utf-8. Supported content types include application/vnd.masstransit+json, application/vnd.masstransit+bson, application/vnd.masstransit+xml
System.Runtime.Serialization.SerializationException: No deserializer was registered for the message content type: application/json; charset=utf-8. Supported content types include application/vnd.masstransit+json, application/vnd.masstransit+bson, application/vnd.masstransit+xml
   at MassTransit.Serialization.SupportedMessageDeserializers.Deserialize(ReceiveContext receiveContext)
   at MassTransit.Pipeline.Filters.DeserializeFilter.Send(ReceiveContext context, IPipe`1 next)
   at GreenPipes.Filters.RescueFilter`2.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)

【问题讨论】:

  • stackoverflow.com/a/59195982/11228967 - 反馈会很好,知道它是否适合您,因为投入了大量时间来探索这个问题。谢谢:)
  • @kgalic 你发给我的反序列化器在这一行没有编译 eturn new JsonConsumeContext(JsonSerializer.CreateDefault(), receiveContext, messageEnv);
  • 它说它需要 4 个参数
  • 您是否尝试从我提供的存储库中下载示例并尝试运行它?
  • 另外,您使用的是 .net core 还是 .net framework?

标签: .net azure azureservicebus azure-eventgrid


【解决方案1】:

按照建议,您必须编写自定义反序列化程序。在我的实现中,我改变了一些关于如何处理 MassTransit 包的事情。

首先,您必须为不同的内容类型注册反序列化器:

  1. 例如,如果您的消息来自 MassTransit 发布者,它将是默认消息 ContentType(在 C# 中):JsonMessageSerializer.JsonContentType
  2. 如果您的消息来自 ServiceBus 队列/主题,通常是"application/json"

以下代码显示了如何注册反序列化器,以及如何设置接收端点。与您的方法相比,一个区别是我使用了连接字符串,因此不需要 SAS 令牌部分。

class Program
    {
        static string ContentTypeJson = "application/json";
        static async Task Main(string[] args)
        {
            var bus = Bus.Factory.CreateUsingAzureServiceBus(cfg =>
            {
                var queueName = "Your SB Queue Name";
                var connectionString = "Connection String with RooTManage policy";
                var host = cfg.Host(connectionString, h =>
                {
                    h.OperationTimeout = TimeSpan.FromSeconds(60);
                });

                cfg.ReceiveEndpoint(queueName,
                    e =>
                    {
                        e.AddMessageDeserializer(contentType: new ContentType(ContentTypeJson), () =>
                        {
                            return new EventGridMessgeDeserializer(ContentTypeJson);
                        });
                        e.Consumer(() => new EventGridMessageConsumer());

                        // Uncomment if required deserializer for local messages - mass transit as publisher or direct messages from SB
                        //e.AddMessageDeserializer(contentType: JsonMessageSerializer.JsonContentType, () =>
                        //{
                        //    return new CustomMessageDeserializer(JsonMessageSerializer.JsonContentType.ToString());
                        //});
                        //e.Consumer(() => new MessageConsumer());
                    });

            });
            bus.Start();

            Console.WriteLine("Press any key to exit");
            // for testing purposes of local messages - mass transit as publisher
            // await bus.Publish<CustomMessage>(new {  Hello = "Hello, World." });
            await Task.Run(() => Console.ReadKey());

            await bus.StopAsync();

        }

我使用我的message simulator 将消息通过管道传送到 EventGrid,EventGrid 将消息转发到 SB 队列,您可以在下面看到运行此代码的结果:

事件网格消息的反序列化器以及完整代码,您可以在 Github 上找到:https://github.com/kgalic/MassTransitSample 以及我对您其他问题的回答。

【讨论】:

    【解决方案2】:

    MassTransit 正在使用用于反序列化内容的消息信封。 您将需要创建并注册一个自定义序列化程序以允许 MassTransit 摄取和处理您的消息。有关详细信息,请参阅 Interoperability 文档。创建自定义序列化程序后,可以使用configuration API 进行注册。

    【讨论】:

    • 任何关于如何做到这一点的例子?
    • 我在文档中找不到任何示例。也许这值得一个单独的问题?
    • 看起来你已经raised it了。
    • 我也在那里开始了赏金活动!
    • @CESCO 我提供了一个如何为 EventGrid 消息编写自定义反序列化程序的示例,以及完整的工作解决方案作为示例。请检查您的其他问题,以及此问题的答案,该问题解释了如何将解串器与 MassTransit 设置相结合。
    猜你喜欢
    • 1970-01-01
    • 2020-11-11
    • 2020-06-28
    • 1970-01-01
    • 2019-03-06
    • 1970-01-01
    • 2020-07-20
    • 2021-08-17
    • 1970-01-01
    相关资源
    最近更新 更多