【问题标题】:Thread Safety in Concurrent Queue C#并发队列 C# 中的线程安全
【发布时间】:2017-03-30 12:10:26
【问题描述】:

我有一个MessagesManager 线程,不同的线程可以向它发送消息,然后这个MessagesManager 线程负责在SendMessageToTcpIP() 中发布这些消息(MessagesManager 线程的起点)。

class MessagesManager : IMessageNotifier
{
    //private
    private readonly AutoResetEvent _waitTillMessageQueueEmptyARE = new AutoResetEvent(false);
    private ConcurrentQueue<string> MessagesQueue = new ConcurrentQueue<string>(); 

    public void PublishMessage(string Message)
    {
        MessagesQueue.Enqueue(Message);
        _waitTillMessageQueueEmptyARE.Set();
    }

    public void SendMessageToTcpIP()
    {
        //keep waiting till a new message comes
        while (MessagesQueue.Count() == 0)
        {
            _waitTillMessageQueueEmptyARE.WaitOne();
        }

        //Copy the Concurrent Queue into a local queue - keep dequeuing the item once it is inserts into the local Queue
        Queue<string> localMessagesQueue = new Queue<string>();

        while (!MessagesQueue.IsEmpty)
        {
            string message;
            bool isRemoved = MessagesQueue.TryDequeue(out message);
            if (isRemoved)
                localMessagesQueue.Enqueue(message);
        }

        //Use the Local Queue for further processing
        while (localMessagesQueue.Count() != 0)
        {
            TcpIpMessageSenderClient.ConnectAndSendMessage(localMessagesQueue.Dequeue().PadRight(80, ' '));
            Thread.Sleep(2000);
        }
    }
}

不同的线程 (3-4) 通过调用 PublishMessage(string Message) 发送它们的消息(使用相同的对象到 MessageManager)。消息到达后,我将该消息推送到并发队列,并通过设置_waitTillMessageQueueEmptyARE.Set(); 通知SendMessageToTcpIP()SendMessageToTcpIP()里面,我是从本地队列里面的并发队列中复制消息,然后一一发布。

问题:以这种方式进行入队和出队是否线程安全?会不会有什么奇怪的效果?

【问题讨论】:

  • 为什么要检查计数或使用 AutoResetEvent?为什么是本地队列? ConcurrentQueue 是线程安全的,不需要任何代码。如果您想遍历现有消息,可以使用GetConsumingEnumerable()。该代码只能引入线程安全问题
  • 因为您的消息(string 类型)是不可变的,这应该是安全的。但是,如果您的消息是可变的,那么即使队列本身是线程安全的,您的使用也不一定是线程安全的。例如,如果某个其他线程在另一个线程使用它时改变了一条消息。
  • 我正在使用AutoResetEvent,因为MessagesManager 线程正在调用SendMessageToTcpIP(),它应该保持等待状态,直到有新消息到来。
  • 你如何重新启动SendMessageToTcpIP?似乎它只会运行一次(没有全局 while 循环)。
  • @skm 它会,即使你不使用它。使消息出队是一种阻塞操作。一个简单的public void SendMessageToTcpIP(){ foreach(var message in MessageQueue.GetConsumingEnumerable(){TcpIpMessageSenderClient.ConnectAndSendMessage(message.PadRight(80, ' ');}} 就足够了

标签: c# multithreading


【解决方案1】:

虽然这可能是线程安全的,但 .NET 中有内置类可帮助实现“多个发布者一个消费者”模式,例如 BlockingCollection。你可以像这样重写你的类:

class MessagesManager : IDisposable {
    // note that your ConcurrentQueue is still in play, passed to constructor
    private readonly BlockingCollection<string> MessagesQueue = new BlockingCollection<string>(new ConcurrentQueue<string>());

    public MessagesManager() {
        // start consumer thread here
        new Thread(SendLoop) {
            IsBackground = true
        }.Start();
    }

    public void PublishMessage(string Message) {
        // no need to notify here, will be done for you
        MessagesQueue.Add(Message);
    }

    private void SendLoop() {
        // this blocks until new items are available
        foreach (var item in MessagesQueue.GetConsumingEnumerable()) {
            // ensure that you handle exceptions here, or whole thing will break on exception
            TcpIpMessageSenderClient.ConnectAndSendMessage(item.PadRight(80, ' '));
            Thread.Sleep(2000); // only if you are sure this is required 
        }
    }

    public void Dispose() {            
        // this will "complete" GetConsumingEnumerable, so your thread will complete
        MessagesQueue.CompleteAdding();
        MessagesQueue.Dispose();
    }
}

【讨论】:

  • SendLoop() 正在执行Thread.Sleep(2000); 时,发布者能否将消息推送到MessageQueue
  • 是的,发布商可以随时推送到您的队列中。
【解决方案2】:

.NET 已经提供了ActionBlock< T>,它允许将消息发布到缓冲区并异步处理它们。默认情况下,一次只处理一条消息。

你的代码可以改写为:

//In an initialization function
ActionBlock<string> _hmiAgent=new ActionBlock<string>(async msg=>{
        TcpIpMessageSenderClient.ConnectAndSendMessage(msg.PadRight(80, ' '));
        await Task.Delay(2000);
);

//In some other thread ...
foreach ( ....)
{
    _hmiAgent.Post(someMessage);
}

// When the application closes

_hmiAgent.Complete();
await _hmiAgent.Completion;

ActionBlock 提供了许多好处 - 您可以指定它可以在缓冲区中接受的项目数量的限制,并指定可以并行处理多条消息。您还可以在处理管道中组合多个块。在桌面应用程序中,可以将消息发布到管道以响应事件,由单独的块处理,并将结果发布到更新 UI 的最终块。

例如,填充可以由中介TransformBlock< TIn,TOut> 执行。这种转换是微不足道的,使用块的成本比方法大,但这只是一个说明:

//In an initialization function
TransformBlock<string> _hmiAgent=new TransformBlock<string,string>(
    msg=>msg.PadRight(80, ' '));

ActionBlock<string> _tcpBlock=new ActionBlock<string>(async msg=>{
        TcpIpMessageSenderClient.ConnectAndSendMessage());
        await Task.Delay(2000);
);

var linkOptions=new DataflowLinkOptions{PropagateCompletion = true};
_hmiAgent.LinkTo(_tcpBlock);

发布代码根本没有改变

    _hmiAgent.Post(someMessage);

当应用程序终止时,我们需要等待_tcpBlock完成:

    _hmiAgent.Complete();
    await _tcpBlock.Completion;

Visual Studio 2015+ 本身使用 TPL Dataflow 处理此类场景

Bar Arnon 在TPL Dataflow Is The Best Library You're Not Using 中提供了一个更好的示例,它展示了如何在块中同时使用同步和异步方法。

【讨论】:

  • 发送到Post(someMessage)的消息是如何被ActionBlock接收的?
  • PostActionBlock 和所有其他块的方法
  • @skm TPL Dataflow为链接块提供了两步消息发送协议,以确保传递。
【解决方案3】:

代码线程安全的,因为ConcurrentQueueAutoResetEvent 都是线程安全的。无论如何,您的字符串都被读取并且永远不会被写入,因此此代码是线程安全的。

但是,您必须确保在某种循环中调用 SendMessageToTcpIP
否则,您将面临危险的竞争条件 - 一些消息可能会丢失:

while (!MessagesQueue.IsEmpty)
        {
            string message;
            bool isRemoved = MessagesQueue.TryDequeue(out message);
            if (isRemoved)
                localMessagesQueue.Enqueue(message);
        }

        //<<--- what happens if another thread enqueues a message here?

        while (localMessagesQueue.Count() != 0)
        {
            TcpIpMessageSenderClient.ConnectAndSendMessage(localMessagesQueue.Dequeue().PadRight(80, ' '));
            Thread.Sleep(2000);
        }

除此之外,AutoResetEvent 是非常重的物体。它使用内核对象来同步线程。每个调用都是一个系统调用,可能代价高昂。考虑使用用户模式同步对象(.net 不提供某种条件变量吗?)

【讨论】:

  • GetConsumingEnumerable() 已经提供了该循环。不需要检查是否为空或检查计数
  • @David:SendMessageToTcpIP() 在来自Main() 的while 循环下。如果有新消息出现,它将在SendMessageToTcpIP() 的下一次迭代(或下一次调用)期间进行处理。
【解决方案4】:

这是我将如何实现此功能的重构代码 sn-p:

class MessagesManager {
    private readonly AutoResetEvent messagesAvailableSignal = new AutoResetEvent(false);
    private readonly ConcurrentQueue<string> messageQueue = new ConcurrentQueue<string>();

    public void PublishMessage(string Message) {
        messageQueue.Enqueue(Message);
        messagesAvailableSignal.Set();
    }

    public void SendMessageToTcpIP() {
        while (true) {
            messagesAvailableSignal.WaitOne();
            while (!messageQueue.IsEmpty) {
                string message;
                if (messageQueue.TryDequeue(out message)) {
                    TcpIpMessageSenderClient.ConnectAndSendMessage(message.PadRight(80, ' '));
                }
            }
        }
    }
}

这里需要注意的地方:

  1. 这会完全排空队列:如果至少有一条消息,它将处理所有消息
  2. 2000ms 线程休眠被移除

【讨论】:

  • 您根本不需要 AutoResetEvent。您可以将这两种方法分别转换为一行
  • 我认为信号是必要的:您希望消费者线程等待消息到达,而不是无休止地循环浪费 CPU 周期。
  • 它循环是因为你使用了TryDequeue。如果您不这样做,它将阻止等待响应。事实上,GetConsumingEnumerable() 已经处理了整个循环行为
  • TryDequeue 不会阻塞 => 如果队列中没有元素,它将返回 false
  • @PanagiotisKanavos,阻塞收集与并发收集不同。在这种情况下,信令似乎是避免过度旋转的合乎逻辑的方式。切换到阻止收集可能是一种选择,也可能不是一种选择,或者您是否坚持它在所有情况中都是正确的?例如。如果PublishMessage 被过多的编写者调用,而阻塞(这就是BlockingCollection 会做的,对吗?)是不受欢迎的?
猜你喜欢
  • 1970-01-01
  • 2014-03-13
  • 2023-03-04
  • 2013-02-23
  • 2010-11-15
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-02-16
相关资源
最近更新 更多