【发布时间】:2022-01-17 09:20:53
【问题描述】:
我有多个任务以 1:1 的方式从队列中获取消息。我想将来自每个线程的这些消息添加到 ConcurrentBag 中,并在它们异步进入时对其进行处理。此处的目的是尽快将消息从队列中取出,以免队列填满。我只需要一个监听器的帮助,等待消息添加到 ConcurrentBag 然后我需要从 Bag 中删除消息并处理它们
private static ConcurrentQueue<string> messageList = new ConcurrentQueue<string>();
private static readonly SemaphoreSlim semaphore = new SemaphoreSlim(50);
void Main (string[] args)
{
List<Task> taskList = new TaskList();
foreach(var job in JobList)
{
taskList.Add(Task.Run(() => ListenToQueue(job.QueueName));
}
Task.WaitAll(taskList.ToArray());
}
private async Task<string> ListenToQueue(string queueName)
{
var cancellationtoken = new CancellationTokenSource(TimeSpan.FromMinutes(60)).Token
//possibly 5000 messages can be on a single queue
while(!cancellationtoken.IsCancellationRequested)
{
var message = getMessageFromQueue(queueName);
messageList.Enqueue(message); //Add the message from each thread to a thread safe List
}
}
我需要一个侦听器事件,每次将某些内容添加到列表中时,都会触发此事件。我还需要以线程安全的方式从列表中删除消息。
private void Listener()
{
var msg = string.Empty;
while (messageList.Count > 0)
{
messageList.TryDequeue(out msg)
await semaphore.WaitAsync();
Task.Run(() =>
{
try
{
if(!String.IsNullorEmpty(msg))
{
_ = ProcessMessage(msg); // I do not want to await anything but just fire and let it go
}
}
finally
{
sim.Release();
}
});
}
}
【问题讨论】:
-
您是否考虑过使用
Channel代替所有这些代码?await ChannelReader.ReadAsync是一个监听器。更好的是,await foreach(var msg in channel.Reader.ReadAllAsync()将异步处理发布到 Channel 的所有项目
标签: c# concurrency task-parallel-library semaphore concurrentdictionary