【问题标题】:exceptions in my service stack service not moving messages to dead letter queue我的服务堆栈服务中的异常没有将消息移动到死信队列
【发布时间】:2018-08-11 10:06:55
【问题描述】:

我有一个具有标准服务堆栈 RabbitMQ 抽象的服务堆栈服务。 消息队列是为我的 MyRequest 类型自动创建的,我有一个服务方法,我已经设置它来处理来自 MyRequest.In Queue 的请求

我期待如果我在此方法中抛出异常,消息将被放置在死信队列中。但是它们只是从 In 队列中删除,并且不会进入死信队列

public class MyOtherService : AsmServiceBase
{
    public void Any(MyRequest request)
    {
        try
        {
            throw new InvalidOperationException("this is an invalid operation");
        }
        catch (Exception ex)
        {
            Console.Write("exceptions");
            throw;
        }
    }
}

[Route("/api/myrequest", "POST")]
public class MyRequest : HeliosRequestBase<MyResponse>
{
    public string Content { get; set; }
}

public class MyResponse : HeliosResponseBase
{

}

这是 AppHost 中将 MyRequest 消息路由到我的服务方法的代码:

RabbitMqServer mqServer = RabbitMqServerFactory
                         .GetRabbitMqServer(m_ServiceDiscovery).Result;
mqServer.RegisterHandler<MyRequest>(ExecuteMessage);
mqServer.Start();

谁能解释一下我做错了什么?

更新 之前排队的图片

及之后(注意服务方法被调用,每个方法都抛出异常)

如果我将 RegisterHandler 行更改为这样

 mqServer.RegisterHandler<MyRequest>(
            x =>
            {
                try
                {
                    object resp =  ExecuteMessage(x);
                    //if I throw an exception here it will go to the DLQ
                    return resp;
                }
                catch (Exception ex)
                {
                    throw;
                }
            }
            );

我可以看到异常包含在从 ExecuteMessage 返回的对象中。我想也许我需要重新抛出它?还是我在这里做错了什么?

更新 2 - 消息的发布方式是问题 在 myz 示例的帮助下,我已经能够确定问题是由消息发布到队列的方式引起的。

我们这样调用发布方法:

 public void PublishOneWay<TRequest>(TRequest request, string queueName, int timeoutInMilliseconds)
    {
        if (m_Disposed)
            throw new ObjectDisposedException("The service client has been disposed and cannot be used.");
        Message message = MakeMessage(request);


        //Updated to just send one-way messages - why bother enquing if we are never going to capture the callback/block on this thread?
        //MessagesToProcess.Add(new Tuple<string, Message, MessageDirection>(queueName, message, MessageDirection.OneWay));
        MqClient.SendOneWay(queueName, message); //will work if I send request instead of message

    }

问题似乎出在这段代码中,我们在其中构造了一个要发送的新消息对象。

 private Message MakeMessage<TRequest>(TRequest request)
    {
        Guid messageId = Guid.NewGuid();
        Message newMessage = new Message<TRequest>(request) { ReplyTo = ResponseQueueName, Id = messageId };

        newMessage.Meta = new Dictionary<string, string>();

        newMessage.Meta.Add(ServiceContextConstants.TrackingIdentifier, AsmServiceContext.TrackingIdentifierData?.Value);
        newMessage.Meta.Add(ServiceContextConstants.SessionContext, AsmServiceContext.SessionContextData?.Value);

        return newMessage;
    }

如果我发送原始请求对象,那么异常会按照您的预期发送到 DLQ。如果我不设置消息对象的 ReplyTo 属性,它也可以工作。

我想知道,我需要在 Message 对象上设置任何属性来创建这种行为吗?我可能无法设置 ReplyTo 属性,但不确定我们的代码会因此而改变多少。

【问题讨论】:

  • 我看不到您的队列是如何配置的,但您应该检查您的消息是否被自动确认。查看rabbitmq.com/dlx.html 并检查是否不适用任何条件
  • 源代码的目标应该是提供Minimal, Complete, Verifiable Example,以便其他人有机会从提供的源中重现任何问题。因为尚不清楚AsmServiceBaseHeliosRequestBase&lt;T&gt;HeliosResponseBaseRabbitMqServerFactory 等包含什么。您能否还包括此示例创建的所有队列的屏幕截图以及您用于发布MyRequest 消息的代码。
  • 过失神话。代码有点复杂,很容易缩小和重现,但我会尝试

标签: c# rabbitmq servicestack


【解决方案1】:

编辑

如果您使用的是显式ReplyTo 地址,则任何错误都将发送到该ReplyTo 地址而不是DLQ。

如果您的响应 DTO 具有 ResponseStatus 属性,则异常将填充到响应 DTO 的 ResponseStatus 中,否则您可以使用通用 ErrorResponse DTO 读取异常信息,例如:

var requestMsg = new Message<ThrowVoid>(request)
{
    ReplyTo = $"mq:{request.GetType().Name}.replyto"
};
mqProducer.Publish(requestMsg);

var msg = mqClient.Get<ErrorResponse>(requestMsg.ReplyTo, null);
mqClient.Ack(msg);

msg.GetBody().ResponseStatus.ErrorCode //= InvalidOperationException

我无法仅使用普通的 ServiceStack 类来重现此问题,正如 this commit 所见,它在所有 MQ 服务器中都按预期工作。

我在下面提取了使用ServiceStack's RabbitMQ Server的代码:

public class ThrowVoid
{
    public string Content { get; set; }
}

public class TestMqService : Service
{
    public void Any(ThrowVoid request)
    {
        throw new InvalidOperationException("this is an invalid operation");
    }
}

public class AppHost : AppSelfHostBase
{
     public AppHost(Func<IMessageService> createMqServerFn)
        : base(nameof(TestMqService), typeof(TestMqService).Assembly) {}

    public override void Configure(Container container)
    {
        var mqServer = new RabbitMqServer { RetryCount = 1 };
        container.Register<IMessageService>(c => mqServer);
        mqServer.RegisterHandler<ThrowVoid>(ExecuteMessage);

        AfterInitCallbacks.Add(appHost => mqServer.Start());
    }
}

消息的发送地点:

using (var mqFactory = appHost.TryResolve<IMessageFactory>())
{
    var request = new ThrowVoid { Content = "Test" };

    using (var mqProducer = mqFactory.CreateMessageProducer())
    using (var mqClient = mqFactory.CreateMessageQueueClient())
    {
        mqProducer.Publish(request);

        var msg = mqClient.Get<ThrowVoid>(QueueNames<ThrowVoid>.Dlq, null);
        mqClient.Ack(msg);

        Assert.That(msg.Error.ErrorCode, Is.EqualTo("InvalidOperationException"));
    }
}

不可能说出你的问题是什么,很多实现都隐藏在你自己的自定义类后面,但我建议从一个像上面这样的小型独立示例开始,它确实有效,然后慢慢添加添加你的自定义代码找出导致它的原因。

虽然我会recommend against using inheritance to hide properties in your DTOs,但 DTO 是用于定义服务契约的声明性模型,将它们的属性隐藏在继承之后会使任何阅读它的人更难准确了解每个服务接受和返回的内容。继承对于定义可重用的功能很有用,但是 DTO 是声明性的,不应该有实现,所以你基本上是在添加不必要的耦合来隐藏显式的服务契约,这使得通过查看服务契约来推断每个服务的作用变得更加困难.

【讨论】:

  • 嗨,mythz,谢谢你,这很有帮助,你能看看我的最新更新吗?
  • @jazza1000 如果您使用明确的ReplyTo,则任何错误都将发送到回复地址而不是 DLQ,请参阅我的更新答案以获取更多信息。
猜你喜欢
  • 2020-07-26
  • 1970-01-01
  • 2020-01-11
  • 2018-01-19
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-02-13
  • 2011-06-02
相关资源
最近更新 更多