【问题标题】:MassTransit Consumer Test Passing But Confusing Error ThrownMassTransit 消费者测试通过但引发令人困惑的错误
【发布时间】:2018-07-18 16:41:54
【问题描述】:

我正在尝试使用 MassTransit.Testing 框架和 InMemoryTestHarness 对 MassTransit 消费者进行单元测试。

到目前为止,我能够成功测试一条消息是发送给两个不同的消费者的。

其中一个消费者也成功消费了,但我收到如下错误消息:

R-FAULT loopback://localhost/vhost/input_queue 49820000-5689-0050-3b5c-08d5ecc4708c Acme.Company.Messages.Commands.ISomeCommand Acme.Company.SomeService.Consumers.SomeCommandConsumer(00:00:00.2328493) 失败: 未找到有效负载:MassTransit.RabbitMqTransport.ModelContext, StackTrace: at GreenPipes.PipeExtensions.GetPayload[TPayload](PipeContext context) at MassTransit.DeferExtensions.Defer[T](ConsumeContext1 context, TimeSpan delay, Action2 回调)

此时的代码正在尝试将消息延迟一分钟,所以我想知道这是否是缺少有效负载的原因???

代码如下:

[TestFixture]
public class SomeCommandConsumerTests
{
    private InMemoryTestHarness _harness;

    private Mock<ISomeRepository> _SomeRepositoryMock;
    private Mock<IAnotherRepository> _AnotherRepositoryMock;

    [OneTimeSetUp]
    public async Task OneTimeInit()
    {
        _harness = new InMemoryTestHarness("vhost");
        _harness.Consumer(() => new SomeCommandConsumer(_SomeRepositoryMock.Object, _AnotherRepositoryMock.Object));
        await _harness.Start();
    }

    [SetUp]
    public void Init()
    {
        _SomeRepositoryMock = new Mock<ISomeRepository>();
        _AnotherRepositoryMock = new Mock<IAnotherRepository>();
        _SomeRepositoryMock.Setup(x => x.UpdateSomeId(It.IsAny<SomeEnum>(), It.IsAny<int>()))
            .Returns(Task.Factory.StartNew(() => { }));
        _SomeRepositoryMock.Setup(x => x.UpdateProcMessage(It.IsAny<string>(), It.IsAny<int>()))
            .Returns(Task.Factory.StartNew(() => { }));
        _SomeRepositoryMock.Setup(
                x => x.UpdateSomeProcStartTime(It.IsAny<int>()))
            .Returns(Task.Factory.StartNew(() => { }));
        _SomeRepositoryMock.Setup(
                x => x.UpdateSomeProcEndTime(It.IsAny<int>()))
            .Returns(Task.Factory.StartNew(() => { }));
    }

    [Test]
    public async Task ProcessMessage_MethodCalledWithSomeCondition_MessageSent()
    {
        //Arrange
        _SomeRepositoryMock.Setup(x => x.GetAsync(It.IsAny<int>())).ReturnsAsync(new Entity
        {
            Property1 = true,
            SomeID = 12345
        });

        await _harness.InputQueueSendEndpoint.Send(new SomeCommand
        {
            MessageType = MessageTypeEnum.SomeMessgae,
            SomeID = 12345
        });

        //Assert
        _harness.Sent.Select<ISomeCommand>().Any().Should().BeTrue();
    }

    [Test]
    public async Task ProcessMessage_MethodCalledWithSomeCondition_CorrectNextStepReturned()
    {
        //Arrange
        _SomeRepositoryMock.Setup(x => x.GetAsync(It.IsAny<int>())).ReturnsAsync(new Control()
        {
            Property1 = true,
            SomeID = 12345
        });

        await _harness.InputQueueSendEndpoint.Send(new SomeCommand
        {
            MessageType = MessageTypeEnum.SomeMessgae,
            SomeID = 12345
        });

        //Assert
        _harness.Consumed.Select<ISomeCommand>().Any().Should().BeTrue();
        _harness.Consumed
            .Select<ISomeCommand>()
            .First()
            .Context
            .Message
            .SomeID
            .Should()
            .Be(12345);
        _harness.Consumed
            .Select<ISomeCommand>()
            .First()
            .Context
            .Message
            .MessageProcessingResult
            .Should()
            .Be(MessageProcessingResult.DeferProcessing);
    }

    [OneTimeTearDown] 
    public async Task Teardown()
    { 
        await _harness.Stop(); 
    }
}

在消费者中被击中的代码是:

await context.Defer(TimeSpan.FromMinutes(1));

基本上,我错过了什么,这甚至是一个问题吗?

【问题讨论】:

  • 真的吗?比利雷瓦伦丁?你是摩羯座的偶然吗?哈哈
  • 哈哈。这是我最喜欢的电影之一,我的姓是瓦伦丁。谢谢,我认为这可能与内存测试工具有关。

标签: c# unit-testing rabbitmq masstransit


【解决方案1】:

发生这种情况是因为您正在使用具有 RabbitMQ 支持的功能 (Defer) 的内存测试工具。 Defer 尝试使用来自消费者的 RabbitMQ 模型来延迟消息,但它不存在,因为内存中对此一无所知。

如果您想使用更通用的解决方案,请改用Redeliver。您需要将 QuartzIntegration 库与内存测试工具一起使用,但它使用该调度程序执行内存消息重新传递。

您还需要更新您的 RabbitMQ 总线配置以包含 cfg.UseDelayedExchangeMessageScheduler();,以便 RabbitMQ 用于消息调度。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-03-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-03-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多