【问题标题】:Persist headers when redelivering a RabbitMq message using MassTransit使用 MassTransit 重新传递 RabbitMq 消息时保留标头
【发布时间】:2019-05-21 12:16:01
【问题描述】:

目的:我需要在重新发送邮件时跟踪标题。

配置

  • RabbitMQ 3.7.9
  • 二郎21.2
  • 大众运输 5.1.5
  • 用于 Quartz 数据库的 MySql 8.0

我的尝试没有成功:

第一次尝试:

await context.Redeliver(TimeSpan.FromSeconds(5), (consumeCtx, sendCtx) => {
   if (consumeCtx.Headers.TryGetHeader("SenderApp", out object sender))
   {
      sendCtx.Headers.Set("SenderApp", sender);
   }
}).ConfigureAwait(false);

第二次尝试:

protected Task ScheduleSend(Uri rabbitUri, double delay)
{
  return GetBus().ScheduleSend<IProcessOrganisationUpdate>(
    rabbitUri,
    TimeSpan.FromSeconds(delay),
    _Data,
    new HeaderPipe(_SenderApp, 0));
}

public class HeaderPipe : IPipe<SendContext>
{
  private readonly byte   _Priority;
  private readonly string _SenderApp;

  public HeaderPipe (byte priority)
  {
    _Priority  = priority;
    _SenderApp = Assembly.GetEntryAssembly()?.GetName()?.Name ?? "Default";
  }

  public HeaderPipe (string senderApp, byte priority)
  {
    _Priority  = priority;
    _SenderApp = senderApp;
  }

  public void Probe (ProbeContext context)
  { }

  public Task Send (SendContext context)
  {
    context.Headers.Set("SenderApp", _SenderApp);
    context.SetPriority(_Priority);
    return Task.CompletedTask;
  }
}

预期:FinQuest.Robot.DBProcess

结果:空

我登录我的 SenderApp 使用方法。第一次是这样的

Initial trigger checking returns true for FinQuest.Robots.OrganisationLinkedinFeed (id: 001ae487-ad3d-4619-8d34-367881ec91ba, sender: FinQuest.Robot.DBProcess, modif: LinkedIn)

重新投递后的样子

Initial trigger checking returns true for FinQuest.Robots.OrganisationLinkedinFeed (id: 001ae487-ad3d-4619-8d34-367881ec91ba, sender: , modif: LinkedIn)

我做错了什么?我不想使用重试功能,因为它的重试次数上限(我不想被限制)。

提前致谢。

【问题讨论】:

    标签: c# rabbitmq quartz.net masstransit


    【解决方案1】:

    您可能想要使用重新传递过滤器使用的一种方法:

    https://github.com/MassTransit/MassTransit/blob/develop/src/MassTransit/SendContextExtensions.cs#L90

    public static void TransferConsumeContextHeaders(this SendContext sendContext, ConsumeContext consumeContext)
    

    在你的代码中,你会使用它:

    await context.Redeliver(TimeSpan.FromSeconds(5), (consumeCtx, sendCtx) => {
        sendCtx.TransferConsumeContextHeaders(consumeCtx);
    });
    

    【讨论】:

      猜你喜欢
      • 2015-10-02
      • 1970-01-01
      • 1970-01-01
      • 2016-05-09
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多