【问题标题】:How does a channel consumer get everything from multiple channel producers when any producer can close the entire channel at any time?当任何生产者可以随时关闭整个频道时,频道消费者如何从多个频道生产者那里获得一切?
【发布时间】:2022-01-29 13:54:36
【问题描述】:

我是System.Threading.Channels 的新手。我有以下消费者代码:

await foreach (var thing in this.Reader.ReadAllAsync(cancellationToken)
    .ConfigureAwait(false))
{
    await this.HandleThingAsync(thing, cancellationToken).ConfigureAwait(false);
}

当像这样消费由单个生产者生产的东西时,这似乎工作正常:

var things = await this.GetThingsAsync(cancellationToken).ConfigureAwait(false);
await foreach (var thing in things.WithCancellation(cancellationToken)
    .ConfigureAwait(false))
{
    await this.Writer.WriteAsync(thing, cancellationToken).ConfigureAwait(false);
}

this.Writer.Complete();

但是当我尝试添加相同通用形式的第二个生产者时,一旦两个生产者之一完成(并调用this.Writer.Complete()),另一个生产者仍需要添加的任何内容都将被拒绝,因为通道已经关闭。这是一个问题,因为我希望读者阅读所有内容,而不仅仅是所有内容,直到任何一个制作人都没有更多内容可制作。

如何处理这种情况?是否有一些内置或其他“标准”方式?例如,可能是一个“condenser”通道,它公开了多个 Channel.Writer 对象(每个“真实”生产者一个)和一个 Channel.Reader(一个“真实”消费者)?

【问题讨论】:

    标签: c# system.threading.channels


    【解决方案1】:

    我认为没有一种方法可以称为“标准”。 Channel<T> 是一种可以以多种不同方式使用的工具,很像 TaskSemaphoreSlim。在您的情况下,您可以使用这样的计数器来传播所有生产者的完成情况:

    int producersCount = X;
    //...
    await foreach (var thing in things)
        await channel.Writer.WriteAsync(thing);
    if (Interlocked.Decrement(ref producersCount) == 0) channel.Writer.Complete();
    

    或者,如果每个生产者都是Task,您可以将延续附加到所有这些组合的任务,如下所示:

    var producers = new List<Task>();
    //...
    _ = Task.WhenAll(producers).ContinueWith(_ => channel.Writer.Complete(),
        default, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
    

    上面的丢弃 (_) 已用于传达 ContinueWith 延续已以即发即弃的方式启动。如果您不喜欢像我一样将未观察到的任务抛在脑后,您可以像这样在async void 方法中处理生产者的完成:

    var producers = new List<Task>();
    //...
    HandleProducersCompletion();
    //...
    async void HandleProducersCompletion()
    {
        try { await Task.WhenAll(producers); }
        finally { channel.Writer.Complete(); }
    }
    

    这样,channel.Writer.Complete(); 调用引发的异常将无法处理,并且会使进程崩溃。考虑到替代方案是一个无明显原因陷入僵局的过程,这可以说是一件好事。

    【讨论】:

      【解决方案2】:

      根据我在原始问题中提到的“通道冷凝器”想法,我最终完成了这门课程。它可能会也可能不会很糟糕和/或漏洞百出,但至少到目前为止,它似乎以一种对我来说似乎相当自然且不引人注目的方式来完成这项工作:

      using Nito.AsyncEx;
      using System.Collections.Concurrent;
      using System.Collections.Generic;
      using System.Threading;
      using System.Threading.Channels;
      using System.Threading.Tasks;
      
      namespace Rwv37.System.Threading.Channels
      {
          public class ChannelCondenser<T>
          {
              private bool IsGoing { get; set; }
              private AsyncLock IsGoingLock { get; init; }
              private ConcurrentBag<Channel<T>> IncomingChannel { get; init; }
              private Channel<T> OutgoingChannel { get; init; }
      
              public ChannelCondenser()
              {
                  this.IsGoingLock = new AsyncLock();
                  this.IncomingChannel = new();
                  this.OutgoingChannel = Channel.CreateUnbounded<T>();
              }
      
              public async Task GoAsync(CancellationToken cancellationToken = default)
              {
                  using (await this.IsGoingLock.LockAsync(cancellationToken).ConfigureAwait(false))
                  {
                      if (this.IsGoing)
                      {
                          throw new System.InvalidOperationException("Cannot go - already going!");
                      }
      
                      this.IsGoing = true;
                  }
      
                  List<Task> tasks = new();
                  foreach (var incomingChannel in this.IncomingChannel)
                  {
                      tasks.Add(this.HandleIncomingChannelAsync(incomingChannel, cancellationToken));
                  }
      
                  await Task.WhenAll(tasks).ConfigureAwait(false);
      
                  this.OutgoingChannel.Writer.Complete();
              }
      
              public ChannelWriter<T> AddIncomingChannel()
              {
                  using (this.IsGoingLock.Lock())
                  {
                      if (this.IsGoing)
                      {
                          throw new System.InvalidOperationException("New incoming channels cannot be added while going!");
                      }
                  }
      
                  Channel<T> incomingChannel = Channel.CreateUnbounded<T>();
                  this.IncomingChannel.Add(incomingChannel);
      
                  return incomingChannel.Writer;
              }
      
              public ChannelReader<T> GetOutgoingChannel()
              {
                  return this.OutgoingChannel.Reader;
              }
      
              private async Task HandleIncomingChannelAsync(Channel<T> incomingChannel, CancellationToken cancellationToken)
              {
                  await foreach (var item in incomingChannel.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false))
                  {
                      await this.OutgoingChannel.Writer.WriteAsync(item, cancellationToken).ConfigureAwait(false);
                  }
              }
          }
      }
      

      消费者和生产者中的使用与我原来的问题中显示的完全没有变化。

      在它们之外我唯一需要改变的是使用它们的类是如何构造的。消费者结构从...改变...

      private Channel<Thing> WantedThingsChannel { get; init; }
      
      (...)
      
      this.WantedThingsChannel = Channel.CreateUnbounded<Thing>();
      this.WantedThingsHandler = new(this.WantedThingsChannel.Reader);
      

      ...到...

      private ChannelCondenser<Thing> WantedThingsCondenser { get; init; }
      
      (...)
      
      this.WantedThingsCondenser = new();
      this.WantedThingsHandler = new(this.WantedThingsCondenser.GetOutgoingChannel());
      

      同样地,生产者的构造也从……改变了……

      this.WantedThingsRetriever = new(this.WantedThingsChannel.Writer);
      

      ...到...

       this.WantedThingsRetriever = new(this.WantedThingsCondenser.AddIncomingChannel());
      

      哦,不,等等,我撒了谎。除了它们之外的另一项更改:我的程序的主要Task.WhenAll 已更改,因此它还等待ChannelCondenser。所以,从...

      List<Task> tasks = new()
      {
          this.WantedThingsHandler.GoAsync(cancellationToken),
          this.WantedThingsRetriever.GoAsync(cancellationToken),
      };
      

      ...到...

      List<Task> tasks = new()
      {
          this.WantedThingsCondenser.GoAsync(cancellationToken),
          this.WantedThingsHandler.GoAsync(cancellationToken),
          this.WantedThingsRetriever.GoAsync(cancellationToken),
      };
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2012-03-25
        • 1970-01-01
        • 1970-01-01
        • 2022-09-25
        • 1970-01-01
        • 2021-12-30
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多