【发布时间】:2018-08-29 18:56:44
【问题描述】:
我想在消费者服务中处理 RabbitMQ 队列。当我尝试按照教程进行操作时,我可以看到它逐条处理消息。但是如果处理某些消息需要更长的时间(例如更长的数据库响应)怎么办?然后它不会处理其他任何东西。
我想让它异步。因此它可以在等待期间处理另一条消息。我尝试了这段代码,它可以工作,但在我看来它并不正确(不是等待的任务,然后是 ContinueWith):
private async Task ExecuteAsync(CancellationToken cancelationToken)
{
Random random = new Random();
var factory = new ConnectionFactory() { HostName = "localhost", DispatchConsumersAsync = true };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
channel.BasicQos(prefetchSize: 0, prefetchCount: 30, global: false);
Console.WriteLine(" [*] Waiting for messages.");
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.Received += async (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
// Is it possible to write following part somehow,
// 1) so that following task can be awaited ?
// 2) so I doesn't have to use .ContinueWith ?
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Task.Run(async () =>
{
await Task.Delay(random.Next(100, 5000), cancelationToken);
Console.WriteLine(" [x] Received {0}", message);
}).ContinueWith((prevTask) =>
{
if (!prevTask.IsFaulted)
{
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
}
});
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
};
channel.BasicConsume(queue: "task_queue",
autoAck: false,
consumer: consumer);
while (!cancelationToken.IsCancellationRequested)
{
await Task.Delay(100, cancelationToken);
}
}
}
如果我等待该 Task.Run,那么在此之前它不会处理任何其他消息
consumer.Received += async (model, ea) =>
{
...
};
结束了。
【问题讨论】:
-
查看基本异步消费者,而不是事件消费者。
-
我看了那个“基本的异步消费者”,但我不知道如何使用它。如何让它调用我的方法。
标签: c# rabbitmq .net-core microservices