【问题标题】:RabbitMQ asynchronouslyRabbitMQ 异步
【发布时间】:2018-08-29 18:56:44
【问题描述】:

我想在消费者服务中处理 RabbitMQ 队列。当我尝试按照教程进行操作时,我可以看到它逐条处理消息。但是如果处理某些消息需要更长的时间(例如更长的数据库响应)怎么办?然后它不会处理其他任何东西。

我想让它异步。因此它可以在等待期间处理另一条消息。我尝试了这段代码,它可以工作,但在我看来它并不正确(不是等待的任务,然后是 ContinueWith):

private async Task ExecuteAsync(CancellationToken cancelationToken)
{
    Random random = new Random();
    var factory = new ConnectionFactory() { HostName = "localhost", DispatchConsumersAsync = true };
    using (var connection = factory.CreateConnection())
    using (var channel = connection.CreateModel())
    {
        channel.QueueDeclare(queue: "task_queue",
                                durable: true,
                                exclusive: false,
                                autoDelete: false,
                                arguments: null);

        channel.BasicQos(prefetchSize: 0, prefetchCount: 30, global: false);

        Console.WriteLine(" [*] Waiting for messages.");

        var consumer = new AsyncEventingBasicConsumer(channel);
        consumer.Received += async (model, ea) =>
        {
            var body = ea.Body;
            var message = Encoding.UTF8.GetString(body);

            // Is it possible to write following part somehow,
            // 1) so that following task can be awaited ?
            // 2) so I doesn't have to use .ContinueWith ?
            #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
            Task.Run(async () =>
            {
                await Task.Delay(random.Next(100, 5000), cancelationToken);

                Console.WriteLine(" [x] Received {0}", message);
            }).ContinueWith((prevTask) =>
            {
                if (!prevTask.IsFaulted)
                {
                    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                }

            });
            #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed


        };

        channel.BasicConsume(queue: "task_queue",
                                autoAck: false,
                                consumer: consumer);

        while (!cancelationToken.IsCancellationRequested)
        {
            await Task.Delay(100, cancelationToken);
        }
    }

}

如果我等待该 Task.Run,​​那么在此之前它不会处理任何其他消息

consumer.Received += async (model, ea) =>
{
...
};

结束了。

【问题讨论】:

标签: c# rabbitmq .net-core microservices


【解决方案1】:

经过一番搜索,我发现EasyNetQ (SubscribeAsync) 完全符合我的需要。这个库使 Rabbit MQ 的使用变得更加容易。

【讨论】:

    猜你喜欢
    • 2015-11-04
    • 2018-11-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-01-05
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多