【问题标题】:Asynchronous Queue Manager异步队列管理器
【发布时间】:2012-03-13 05:09:24
【问题描述】:

我在用 c# 编写异步多服务器网络应用程序时遇到了问题。我有许多工作由线程池处理,其中包括对网络套接字的写入。这最终允许了多个线程可以同时写入套接字并混淆我的传出消息的情况。我解决这个问题的想法是实现一个队列系统,只要数据被添加到队列中,套接字就会写入它。

我的问题是,我无法完全理解这种性质的架构。我想象有一个队列对象在数据被添加到队列时触发一个事件。然后该事件将写入队列中保存的数据,但这不起作用,因为如果两个线程同时到达并添加到队列中,即使队列被设为线程安全,事件仍然会为两者触发,并且我会遇到同样的问题。因此,如果另一个事件正在进行中,那么可能会以某种方式推迟一个事件,但是一旦第一个事件完成,我该如何继续该事件,而不会简单地阻塞某个互斥锁或其他东西上的线程。如果我不尝试严格遵守“无阻塞”架构,这不会那么难,但是这个特定的应用程序要求我允许线程池线程继续做他们的事情。

有什么想法吗?

【问题讨论】:

    标签: c# architecture asynchronous queue asyncsocket


    【解决方案1】:

    虽然类似于 Porges 的回答,但在实现上略有不同。

    首先,我通常不对要发送的字节进行排队,而是在发送线程中将对象和序列化,但我想这是一个品味问题。 但更大的区别在于 ConcurrentQueues 的使用(除了 BlockingCollection)。 所以我最终会得到类似于

    的代码
            BlockingCollection<Packet> sendQueue = new BlockingCollection<Packet>(new ConcurrentQueue<Packet>());
            while (true)
            {
                var packet = sendQueue.Take(); //this blocks if there are no items in the queue.
                SendPacket(packet); //Send your packet here.
            }
    

    这里的关键是您有一个循环此代码的线程,所有其他线程都可以以线程安全的方式添加到队列中(BlockingCollection 和 ConcurrentQueue 都是线程安全的)

    看看Processing a queue of items asynchronously in C# 我回答了类似的问题。

    【讨论】:

    • new BlockingCollection&lt;Packet&gt;(new ConcurrentQueue&lt;Packet&gt;()) 完全等同于 new BlockingCollection&lt;Packet&gt;() :)
    • 很高兴知道,但我的初始化程序通常很冗长。当有人不知道默认值是什么时更清楚:)(并且“var”有助于不必重复所有内容)
    • 超级有用,亚历克斯。救了我的培根。
    【解决方案2】:

    听起来您需要一个线程同步写入套接字,而一堆线程写入队列以供该线程处理。

    您可以使用阻塞集合 (BlockingCollection&lt;T&gt;) 来完成这项艰巨的工作:

    // somewhere there is a queue:
    
    BlockingCollection<byte[]> queue = new BlockingCollection<byte[]>();
    
    // in socket-writing thread, read from the queue and send the messages:
    
    foreach (byte[] message in queue.GetConsumingEnumerable())
    {
        // just an example... obviously you'd need error handling and stuff here
        socket.Send(message);
    }
    
    // in the other threads, just enqueue messages to be sent:
    
    queue.Add(someMessage);
    

    BlockingCollection 将处理所有同步。您还可以强制执行最大队列长度和其他有趣的事情。

    【讨论】:

      【解决方案3】:

      我不懂 C#,但我会做的是让事件触发套接字管理器开始从队列中拉出并一次写出一个。如果它已经在运行,则触发器不会执行任何操作,一旦队列中没有任何内容,它就会停止。

      这解决了两个线程同时写入队列的问题,因为第二个事件将是空操作。

      【讨论】:

      • 我深思熟虑后考虑了这一点。我质疑它的原因是因为一个不太可能的时间问题。想象一下,在事件开始时,它会将一些布尔值设置为 true。所以你有一些变量告诉其他线程队列数据的某个处理器正在运行。如果在处理线程有时间将值交换回 false 之前将数据添加到队列中,但处理器已经读取队列中没有剩余数据,会发生什么情况。然后我在队列中有数据在添加更多数据之前不会被处理。
      • 我同意这是可能的——也许在重置布尔值后对队列中的某些内容进行额外检查可以解决该问题。
      【解决方案4】:

      您可以有一个线程安全队列,您的所有工作线程都将其结果写入其中。然后让另一个线程轮询队列并在看到队列等待时发送结果。

      【讨论】:

        猜你喜欢
        • 2013-09-10
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2012-12-31
        • 2012-03-03
        相关资源
        最近更新 更多