【问题标题】:Mass Transit - only single Consumer "handler" is processing messages公共交通 - 只有单个消费者“处理程序”正在处理消息
【发布时间】:2019-07-30 23:10:39
【问题描述】:

我正在将我们的项目从内部接收器/调度器实施转换为公共交通。

事情最初进展顺利,但我遇到了一个我正在努力确定原因的问题。

注意:当我们从旧实现迁移到新实现时,我们的一些内部组件的命名可能看起来很奇怪。

消费者

我们在类中实现我们的消费者,按目的分组。

我有这个测试分组:

public class TestCommandHandler : 
    IConsumer<PingCommand>,
    IConsumer<TestCommand>
{
    private readonly ILoggingProvider _logger;

    public TestCommandHandler(
        ILoggingProvider loggingProvider
    {
        _logger = loggingProvider;
    }

    public Task Consume(ConsumeContext<TestCommand> cmd)
    {
        _logger.Info(cmd.Message.Message);
        if (cmd.SentTime.HasValue)
        {
            _logger.Info($"Sent to {cmd.SessionId()} at {cmd.SentTime} - duration to now {DateTime.UtcNow.Subtract(cmd.SentTime.Value)}");
        }
        return Task.CompletedTask;
    }

    public async Task Consume(ConsumeContext<PingCommand> cmd)
    {
        _logger.Info($"Received Ping Command for {cmd.Message.OperationId}");
        _logger.Info($"Checking compression...");

        // do some checks and throw if an issue
    }
}

我也有这个“投票”结果分组:

public class PollCommandHandler :
    IConsumer<PollResult>
{
    private readonly IAggregateRepository<PollResult> _aggregateRepository;

    public PollCommandHandler(IAggregateRepository<PollResult> aggregateRepository)
    {
        _aggregateRepository = aggregateRepository;
    }

    public async Task Consume(ConsumeContext<Model.Commands.PollResult> cmd)
    {
        var poll = _aggregateRepository.Find(cmd.Message.Id);

        poll.AddPollResult(cmd.Message.RetailerId.ToString(), cmd.Message.PollDate, cmd.Message.PollResponseStatus, cmd.Message.BlobUri);

        await _aggregateRepository.SaveAsync(poll, cmd.Message.Id.ToString());
    }

总线设置

在我们的主机服务启动期间,我们使用Autofac 来注册我们所有的组件(consumerAssembly 是具有上述消费者/处理程序的Assembly):

builder.AddMassTransit(mt =>
{
    mt.AddConsumers(consumerAssembly);
    mt.AddBus(context => Bus.Factory.CreateUsingAzureServiceBus(x =>
    {
        x.RequiresSession = true;
        x.MaxConcurrentCalls = 500;
        x.MessageWaitTimeout = TimeSpan.FromMinutes(5);
        x.UseRenewLock(TimeSpan.FromMinutes(4));

        var host = x.Host(serviceUri, h =>
        {
            h.SharedAccessSignature(s =>
            {
                s.KeyName = _config.Infrastructure.AzureServiceBusSharedSecretIssuer;
                s.SharedAccessKey = _config.Infrastructure.AzureServiceBusSharedSecretValue;
                s.TokenTimeToLive = TimeSpan.FromDays(1);
                s.TokenScope = TokenScope.Namespace;
            });
        });

        x.ReceiveEndpoint(host, $"mt.{QueueName.Command.ToString()}", ep =>
        {
            ep.RequiresSession = true;
            ep.MaxConcurrentCalls = 500;
            ep.RemoveSubscriptions = true;

            ep.ConfigureConsumers(context);        
        });
    }));
});

我看到的问题是TestCommandHandler 处理的所有命令都已接收和处理。这很好。

但是,当我发送由PollCommandHandler 处理的PollResult 命令时,消息 被消耗 - 相反,它们被移动到skipped 队列,我'不知道为什么。这很糟糕。

据我了解,mt.AddConsumers(consumerAssembly); 应该会自动扫描并注册我们定义的消费者,ep.ConfigureConsumers(context); 会自动将它们连接起来,以便ReceiveEndpoint 知道将消息发送到哪里。

我在 bus post 配置上运行了GetProbeResult(),据我所知,所有消费者都配置正确。

寻找指向我在设置时做错的事情的指针,或者寻找有关如何诊断问题的指导。

【问题讨论】:

    标签: c# autofac masstransit


    【解决方案1】:

    嗯,这很尴尬。

    看起来我们有一个PollCommandHandler 和一个FtpPollCommandHandler,以及相应的PollResultFtpPollResult 命令。

    我已将PollCommandHandler 迁移为消费者,但正在使用FtpPollResult 命令进行测试。

    因此,Mass Transit 将消息移动到跳过的队列,因为确实没有与之关联的消费者。

    【讨论】:

    • 如果我每次都有假网点是这个原因,我会有很多假网点!
    猜你喜欢
    • 2020-12-07
    • 2016-05-04
    • 2018-07-13
    • 2013-06-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-10-11
    • 1970-01-01
    相关资源
    最近更新 更多