【问题标题】:Wait for RabbitMQ Threads to finish in Windows Service OnStop()在 Windows Service OnStop() 中等待 RabbitMQ 线程完成
【发布时间】:2016-02-10 12:47:05
【问题描述】:

我正在开发一个用 C#(.NET 4.5,VS2012)编写的 Windows 服务,它使用 RabbitMQ(通过订阅接收消息)。有一个派生自DefaultBasicConsumer 的类,在这个类中有两个实际的消费者(所以有两个通道)。因为有两个通道,所以两个线程处理传入消息(来自两个不同的队列/路由键)并且都调用相同的HandleBasicDeliver(...) 函数。

现在,当调用 Windows 服务 OnStop() 时(当有人停止服务时),我想让这两个线程完成处理他们的消息(如果他们当前正在处理消息),将 ack 发送到服务器,然后停止服务(中止线程等)。

我想过多种解决方案,但似乎没有一个是真正好的。这是我尝试过的:

  • 使用一个互斥锁;每个线程在进入 HandleBasicDeliver 时尝试获取它,然后释放它。当OnStop() 被调用时,主线程会尝试获取相同的互斥体,从而有效地阻止 RabbitMQ 线程实际处理更多消息。缺点是,一次只能有一个消费者线程处理一条消息。
  • 使用两个互斥锁:每个 RabbitMQ 线程都使用不同的互斥锁,因此它们不会在 HandleBasicDeliver() 中相互阻塞 - 我可以区分哪个 线程实际上是根据路由键处理当前消息。比如:

    HandleBasicDeliver(...)
    {
        if(routingKey == firstConsumerRoutingKey)
        {
            // Try to grab the mutex of the first consumer
        }
        else
        {
            // Try to grab the mutex of the second consumer
        }
    }
    

    OnStop()被调用时,主线程会尝试同时抓取两个互斥体;一旦两个互斥锁都在主线程“手中”,它就可以继续停止服务。问题:如果要将另一个消费者添加到此类中,我需要更改很多代码。

  • 使用计数器,或CountdownEvent。计数器从 0 开始,每次输入 HandleBasicDeliver() 时,计数器都会使用 Interlocked 类安全地递增。处理完消息后,计数器递减。当OnStop()被调用时,主线程检查计数器是否为0。如果满足这个条件,它将继续。但是,在它检查 counter 是否为 0 之后,某些 RabbitMQ 线程可能会开始处理消息。
  • OnStop()被调用时,关闭到RabbitMQ的连接(确保没有新消息到达),然后等待几秒钟(如果有任何消息正在处理,则完成处理),然后关闭应用。问题是,在关闭应用程序之前我应该​​等待的确切秒数是未知的,所以这不是一个优雅或精确的解决方案。

我意识到设计不符合单一职责原则,这可能导致缺乏解决方案。但是,在不必重新设计项目的情况下,是否有一个很好的解决方案来解决这个问题?

【问题讨论】:

    标签: c# multithreading windows-services rabbitmq


    【解决方案1】:

    我们在我们的应用程序中这样做,主要思想是使用 CancellationTokenSource

    在您的 Windows 服务上添加:

    private static readonly CancellationTokenSource CancellationTokenSource = new CancellationTokenSource();
    

    然后在您的兔子消费者中执行以下操作: 1.从使用Dequeue改为DequeueNoWait 2.让你的兔子消费者检查取消令牌

    这是我们的代码:

            public async Task StartConsuming(IMessageBusConsumer consumer, MessageBusConsumerName fullConsumerName, CancellationToken cancellationToken)
            {
                var queueName = GetQueueName(consumer.MessageBusConsumerEnum);
    
                using (var model = _rabbitConnection.CreateModel())
                {
                    // Configure the Quality of service for the model. Below is how what each setting means.
                    // BasicQos(0="Don't send me a new message until I’ve finished",  _fetchSize = "Send me N messages at a time", false ="Apply to this Model only")
                    model.BasicQos(0, consumer.FetchCount.Value, false);
                    var queueingConsumer = new QueueingBasicConsumer(model);
    
                    model.BasicConsume(queueName, false, fullConsumerName, queueingConsumer);
    
    
                    var queueEmpty = new BasicDeliverEventArgs(); //This is what gets returned if nothing in the queue is found.
    
                    while (!cancellationToken.IsCancellationRequested)
                    {
                        var deliverEventArgs = queueingConsumer.Queue.DequeueNoWait(queueEmpty);
                        if (deliverEventArgs == queueEmpty)
                        {
                            // This 100ms wait allows the processor to go do other work.
                            // No sense in going back to an empty queue immediately. 
                            // CancellationToken intentionally not used!
                            // ReSharper disable once MethodSupportsCancellation
                            await Task.Delay(100);  
                            continue;
                        }
                        //DO YOUR WORK HERE!
                      }
    }
    

    【讨论】:

    • 感谢您的回答。但是,我有一些后续问题。由于我使用的是 DefaultBasicConsumer,因此每当消息到达时都会调用一个回调函数 (HandleBasicDeliver)。我可以在 CancellationToken 进入该函数后立即检查(在它开始处理消息之前)。但是,如果在 RabbitMQ 线程开始处理消息并传递了if(IsCancellationRequested) 之后立即调用Service.OnStop()(并且设置了CancellationTokenSource.Cancel)会发生什么?处理消息时服务将关闭。
    • 在您的 OnStop 方法中,您需要等待当前消息的处理。所以你会有这样的东西:Task.WaitAll(tasks, TimeSpan.FromMinutes(15));
    【解决方案2】:

    通常,我们如何确保 Windows 服务在处理完成之前不会停止是使用如下代码。希望有所帮助。

        protected override void OnStart(string[] args)
        {
            // start the worker thread
            _workerThread = new Thread(WorkMethod)
            {
                // !!!set to foreground to block windows service be stopped
                // until thread is exited when all pending tasks complete
                IsBackground = false
            };
            _workerThread.Start();
        }
    
        protected override void OnStop()
        {
            // notify the worker thread to stop accepting new migration requests
            // and exit when all tasks are completed
            // some code to notify worker thread to stop accepting new tasks internally
    
            // wait for worker thread to stop
            _workerThread.Join();
        }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-08-29
      • 2011-06-09
      • 2010-10-16
      相关资源
      最近更新 更多