【问题标题】:Is there a data structure in C# like a ConcurrentQueue which allows me to await an empty queue until an item is added? [duplicate]C# 中是否有像 ConcurrentQueue 这样的数据结构,它允许我等待一个空队列,直到添加一个项目? [复制]
【发布时间】:2019-05-15 06:26:33
【问题描述】:

我正在寻找一个像 ConcurrentQueue 这样的对象,如果队列为空,我可以等待 Dequeue 操作,因此我可以执行以下操作:

public static async Task ServiceLoop() {

    var awaitedQueue = new AwaitedQueue<int>();

    while (!cancelled) {
        var item = await awaitableQueue.Dequeue();
        Console.WriteLine(item);
    }

}

我已经编写了以下类,但是如果在调用 Dequeue 和新的等待者入队之间将一个项目添加到队列中,将会发生可怕的事情。

public class AwaitedQueue<T> : IDisposable {

    ConcurrentQueue<TaskCompletionSource<T>> awaiters = new ConcurrentQueue<TaskCompletionSource<T>>();

    ConcurrentQueue<T> items = new ConcurrentQueue<T>();

    public AwaitedQueue() { }

    public void Enqueue(T item) {
        if (!awaiters.TryDequeue(out TaskCompletionSource<T> awaiter)) {
            this.items.Enqueue(item);
        } else {
            awaiter.SetResult(item);
        }
    }

    public async Task<T> Dequeue() {
        if (items.TryDequeue(out T item)) {
            return item;
        } else {
            // If an item is enqueued between this call to create a new TaskCompletionSource.
            var awaiter = new TaskCompletionSource<T>();
            // And this call to actually enqueue, I believe it will cause me problems.
            awaiters.Enqueue(awaiter);
            return await awaiter.Task;
        }
    }

    public void Dispose() {
        while (awaiters.TryDequeue(out TaskCompletionSource<T> awaiter)) {
            awaiter.SetCanceled();
            awaiter.Task.Wait();
        }
    }
}

我确信这个概念已经存在一个健壮且经过充分测试的实现,但我不知道我需要在 Google 中输入哪种英语单词组合才能找到它。

【问题讨论】:

  • 它的上下文是什么?如果它适合上下文,您可以使用 RabbitMQ。

标签: c# asynchronous concurrency queue


【解决方案1】:

对此有一个现代解决方案:Channels。 “Channel”是一个异步生产者/消费者队列。

频道也有“完成”的概念,因此您可以完成频道而不是拥有cancelled 标志。

用法:

public static async Task ServiceLoop() {
  var awaitedQueue = Channel.CreateUnbounded<int>();
  var queueReader = awaitedQueue.Reader;

  while (await queueReader.WaitToReadAsync())
  {
    while (queueReader.TryRead(out var item))
    {
      Console.WriteLine(item);
    }
  }
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2016-02-06
    • 2021-04-24
    • 2016-04-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-09-12
    • 1970-01-01
    相关资源
    最近更新 更多