【问题标题】:Adding a listener event to a ConcurrentQueue or ConcurrentBag?将侦听器事件添加到 ConcurrentQueue 或 ConcurrentBag?
【发布时间】:2022-01-17 09:20:53
【问题描述】:

我有多个任务以 1:1 的方式从队列中获取消息。我想将来自每个线程的这些消息添加到 ConcurrentBag 中,并在它们异步进入时对其进行处理。此处的目的是尽快将消息从队列中取出,以免队列填满。我只需要一个监听器的帮助,等待消息添加到 ConcurrentBag 然后我需要从 Bag 中删除消息并处理它们

private static ConcurrentQueue<string> messageList = new ConcurrentQueue<string>();
private static readonly SemaphoreSlim semaphore = new SemaphoreSlim(50);
void Main (string[] args)
{
   List<Task> taskList = new TaskList();
    foreach(var job in JobList)
   {
      taskList.Add(Task.Run(() => ListenToQueue(job.QueueName));
   }
  Task.WaitAll(taskList.ToArray());
}


private async Task<string> ListenToQueue(string queueName)
{
   var cancellationtoken = new CancellationTokenSource(TimeSpan.FromMinutes(60)).Token
  
   //possibly 5000 messages can be on a single queue 
    while(!cancellationtoken.IsCancellationRequested)
     {
      var message = getMessageFromQueue(queueName);
      messageList.Enqueue(message); //Add the message from each thread to a thread safe List
     }
}

我需要一个侦听器事件,每次将某些内容添加到列表中时,都会触发此事件。我还需要以线程安全的方式从列表中删除消息。

 private void Listener()
   {
      var msg =  string.Empty;
       while (messageList.Count > 0)
     {
         messageList.TryDequeue(out msg)
         await semaphore.WaitAsync();
            Task.Run(() => 
                  { 
                    try
                    {
                      if(!String.IsNullorEmpty(msg))
                      {
                     _ = ProcessMessage(msg); // I do not want to await anything but just fire and let it go
                       }
                    }
                    finally
                    {
                       sim.Release();
                    }
                  });
      }    
    }

【问题讨论】:

  • 您是否考虑过使用Channel 代替所有这些代码? await ChannelReader.ReadAsync一个监听器。更好的是,await foreach(var msg in channel.Reader.ReadAllAsync() 将异步处理发布到 Channel 的所有项目

标签: c# concurrency task-parallel-library semaphore concurrentdictionary


【解决方案1】:

这些天来,我建议使用像 System.Threading.Channels 这样的异步兼容解决方案:

private static Channel<string> messageList = Channel.CreateUnbounded<string>();

private async Task<string> ListenToQueue(string queueName)
{
  var cancellationtoken = new CancellationTokenSource(TimeSpan.FromMinutes(60)).Token;

  try
  {
    var message = await getMessageFromQueue(queueName, cancellationtoken);
    await messageList.Writer.WriteAsync(message, cancellationtoken);
  }
  catch (OperationCanceledException)
  {
    // ignored
  }
}

private async Task Listener()
{
  await foreach (var msg in messageList.Reader.ReadAllAsync())
  {
    if (!string.IsNullOrEmpty(msg))
      _ = Task.Run(() => ProcessMessage(msg));
  }
}

但是,如果您想(或需要)留在阻塞的世界中,也有解决方案。 ConcurrentBag&lt;T&gt;ConcurrentQueue&lt;T&gt; 很少直接使用。相反,更常见的是使用BlockingCollection&lt;T&gt;,它包装了一个并发集合并提供了更高级别的API,包括GetConsumingEnumerable

private static BlockingCollection<string> messageList = new();

private async Task<string> ListenToQueue(string queueName)
{
  var cancellationtoken = new CancellationTokenSource(TimeSpan.FromMinutes(60)).Token;

  try
  {
    var message = await getMessageFromQueue(queueName, cancellationtoken);
    messageList.Add(message, cancellationtoken);
  }
  catch (OperationCanceledException)
  {
    // ignored
  }
}

private void Listener()
{
  foreach (var msg in messageList.GetConsumingEnumerable())
  {
    if (!string.IsNullOrEmpty(msg))
      _ = Task.Run(() => ProcessMessage(msg));
  }
}

【讨论】:

  • 哦,谢谢。那么在阻塞集合示例中,Listener 方法是如何触发的呢?
  • 您只需调用它。如果您希望它在单独的线程中,则将其包装在 Task.Run 中。
猜你喜欢
  • 2021-04-17
  • 1970-01-01
  • 2011-05-29
  • 2019-03-21
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2023-03-05
  • 2014-06-22
相关资源
最近更新 更多