【发布时间】:2016-02-10 12:47:05
【问题描述】:
我正在开发一个用 C#(.NET 4.5,VS2012)编写的 Windows 服务,它使用 RabbitMQ(通过订阅接收消息)。有一个派生自DefaultBasicConsumer 的类,在这个类中有两个实际的消费者(所以有两个通道)。因为有两个通道,所以两个线程处理传入消息(来自两个不同的队列/路由键)并且都调用相同的HandleBasicDeliver(...) 函数。
现在,当调用 Windows 服务 OnStop() 时(当有人停止服务时),我想让这两个线程完成处理他们的消息(如果他们当前正在处理消息),将 ack 发送到服务器,然后停止服务(中止线程等)。
我想过多种解决方案,但似乎没有一个是真正好的。这是我尝试过的:
- 使用一个互斥锁;每个线程在进入 HandleBasicDeliver 时尝试获取它,然后释放它。当
OnStop()被调用时,主线程会尝试获取相同的互斥体,从而有效地阻止 RabbitMQ 线程实际处理更多消息。缺点是,一次只能有一个消费者线程处理一条消息。 -
使用两个互斥锁:每个 RabbitMQ 线程都使用不同的互斥锁,因此它们不会在 HandleBasicDeliver() 中相互阻塞 - 我可以区分哪个 线程实际上是根据路由键处理当前消息。比如:
HandleBasicDeliver(...) { if(routingKey == firstConsumerRoutingKey) { // Try to grab the mutex of the first consumer } else { // Try to grab the mutex of the second consumer } }当
OnStop()被调用时,主线程会尝试同时抓取两个互斥体;一旦两个互斥锁都在主线程“手中”,它就可以继续停止服务。问题:如果要将另一个消费者添加到此类中,我需要更改很多代码。 - 使用计数器,或
CountdownEvent。计数器从 0 开始,每次输入HandleBasicDeliver()时,计数器都会使用 Interlocked 类安全地递增。处理完消息后,计数器递减。当OnStop()被调用时,主线程检查计数器是否为0。如果满足这个条件,它将继续。但是,在它检查 counter 是否为 0 之后,某些 RabbitMQ 线程可能会开始处理消息。 - 当
OnStop()被调用时,关闭到RabbitMQ的连接(确保没有新消息到达),然后等待几秒钟(如果有任何消息正在处理,则完成处理),然后关闭应用。问题是,在关闭应用程序之前我应该等待的确切秒数是未知的,所以这不是一个优雅或精确的解决方案。
我意识到设计不符合单一职责原则,这可能导致缺乏解决方案。但是,在不必重新设计项目的情况下,是否有一个很好的解决方案来解决这个问题?
【问题讨论】:
标签: c# multithreading windows-services rabbitmq