【问题标题】:Factory for IAsyncEnumerable or IAsyncEnumeratorIAsyncEnumerable 或 IAsyncEnumerator 的工厂
【发布时间】:2020-08-15 20:22:57
【问题描述】:

我想知道是否有一种方法可以通过 Source 对象创建IAsyncEnumerable<T>IAsyncEnumerator<T>,就像TaskCompletionSource 允许人们执行任务一样。特别是,TaskCompletionSource 可以像任何其他参数一样被传递。

可能是这样的:

public class AsyncEnumerables {

    public Task HandlerTask { get; set; }

    public async Task<string> ParentMethod() {
        var source = new AsyncEnumerableSource<int>();
        IAsyncEnumerable asyncEnumerable = source.GetAsyncEnumerable();
        HandlerTask = Task.Run(() => handleAsyncResultsAsTheyHappen(asyncEnumerable));
        int n = await someOtherTask();
        source.YieldReturn(n);
        var r = await ChildMethod(source);
        source.Complete();  // this call would cause the HandlerTask to complete.
        return r;
    }

    private async Task<string> ChildMethod(AsyncEnumerableSource<int> source) {
        source.YieldReturn(5);
        await SomeOtherCall();
        source.YieldReturn(10);
        return "hello";
    }
}

使用上面的代码,handleAsyncResultsAsTheyHappen 任务将看到传递给 YieldReturn 的任何值。所以它会看到上面代码中的n,以及ChildMethod中的510

【问题讨论】:

标签: c# asynchronous c#-8.0 iasyncenumerable


【解决方案1】:

如果您可以构建代码以利用yield returnawait foreach,您的情况会好得多。例如,这段代码几乎做同样的事情:

public async Task Consume()
{
    var source = ParentMethod();
    HandlerTask = Task.Run(async () => { await foreach (var item in source) { Console.WriteLine(item); } });
}

public async IAsyncEnumerable<int> ParentMethod()
{
    await Task.Yield();
    yield return 13;
    await foreach (var item in ChildMethod())
        yield return item;
}

private async IAsyncEnumerable<int> ChildMethod()
{
    yield return 5;
    await Task.Yield();
    yield return 10;
}

然而,如果你真的需要一个“异步可枚举源”,你首先需要认清一件事。 TaskCompletionSource&lt;T&gt; 保存结果,即T(或异常)。它充当容器。可以在等待任务之前设置结果。这与“异步可枚举源”相同——您需要它能够在从中获取任何项目之前保存结果。 “异步可枚举源”需要保存多个结果——在这种情况下,是一个集合

所以您实际上要求的是“一个可以作为异步枚举使用的集合”。这里有几种可能性,但我推荐的是Channel

public async Task<string> ParentMethod()
{
  var source = Channel.CreateUnbounded<int>();
  var sourceWriter = source.Writer;
  IAsyncEnumerable<int> asyncEnumerable = source.Reader.ReadAllAsync();
  HandlerTask = Task.Run(async () => { await foreach (var item in asyncEnumerable) Console.WriteLine(item); });
  await Task.Yield();
  await sourceWriter.WriteAsync(13);
  var r = await ChildMethod(sourceWriter);
  sourceWriter.Complete();
  return r;
}

private async Task<string> ChildMethod(ChannelWriter<int> sourceWriter)
{
  await sourceWriter.WriteAsync(5);
  await Task.Yield();
  await sourceWriter.WriteAsync(10);
  return "hello";
}

【讨论】:

  • 非常感谢。我同意 IAsyncEnumerable 语法很棒。但我不喜欢它阻止我从我的方法中返回任何其他内容的事实。传入一个包装器对象并在那里设置输出是很痛苦的。我也不太喜欢 await foreach (var item in ChildMethod()) yield return item;部分。现在我有另一种选择来进行调查、权衡取舍等。
  • Channels 在这种用法上有一个限制,它们只能为一个消费者提供服务。 TaskCompletionSource.Task 可以被多个消费者等待,所以我认为类似的 AsyncEnumerableSource 类也应该能够将消息传播给多个消费者。
  • 需要明确的是,Channels 支持多个消费者,但他们只会将特定物品提供给单个消费者。
  • @StephenCleary 是的,你是对的。所以使用Channel 和多个消费者是可以的,只要每个消费者都对数据的部分子集感到满意。对于假设的AsyncEnumerableSourceclass 的消费者来说,情况可能并非如此。
  • 我目前对此的想法是,对于使用代码,IAsyncEnumerable 更好。但是对于生成异步结果的代码,它可能非常烦人,特别是如果有很多方法调用可能会产生可枚举的结果。在这种情况下,Channel 更好。好在两者之间转换并不难。
【解决方案2】:

AFAIK .NET 平台没有内置的AsyncEnumerableSource 类,但使用System.ReactiveSystem.Interactive.Async 库很容易实现。 System.Reactive 库包含 Subject 类,它是 IObservableIObserver 的组合。这是一个方便的类,因为您可以向IObserver 接口发送通知,并独立订阅IObservable 接口以接收这些通知。实际上不需要手动订阅,因为System.Interactive.Async 库包含方便的扩展方法ToAsyncEnumerable,它可以自动将IObservable 转换为IAsyncEnumerable

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Subjects;

public class AsyncEnumerableSource<T>
{
    private readonly Subject<T> _subject = new Subject<T>();

    public IAsyncEnumerable<T> GetAsyncEnumerable() => _subject.ToAsyncEnumerable();
    public void YieldReturn(T value) => _subject.OnNext(value);
    public void Complete() => _subject.OnCompleted();
    public void Fault(Exception ex) => _subject.OnError(ex);
}

此实现将仅向订阅者发送订阅后发生的通知。如果您想确保迟到的加入者会收到早期消息,您可以将Subject 替换为ReplaySubject。这个缓冲它接收到的通知,因此它带有内存使用注意事项:它在其构造函数中接受 int bufferSize 参数。

【讨论】:

    【解决方案3】:

    这是AsyncEnumerableSource 类的另一个实现,它不依赖于RX 库。这个依赖于 Channel&lt;T&gt;ImmutableArrayImmutableInterlocked 类,它们都在 .NET Core 平台上本机可用(也可用于 .NET Framework 作为包)。它与RX-based 实现具有相同的行为。

    AsyncEnumerableSource 类可以将通知传播给多个订阅者。每个订阅者都可以按照自己的节奏枚举这些通知。这是可能的,因为每个订阅都有自己专用的Channel&lt;T&gt; 作为底层存储。订阅的生命周期实际上与单个 await foreach 循环的生命周期相关联。出于任何原因(包括抛出的异常)提前从循环中中断,立即结束订阅。

    在技术术语中,每次调用从 GetAsyncEnumerable 方法返回的 IAsyncEnumerable 的方法 GetAsyncEnumerator 时,都会创建订阅。单独调用 GetAsyncEnumerable 方法不会创建订阅。当关联的IAsyncEnumerator 被释放时订阅结束。

    public class AsyncEnumerableSource<T>
    {
        private IImmutableList<Channel<T>> _channels
            = ImmutableArray<Channel<T>>.Empty;
    
        public async IAsyncEnumerable<T> GetAsyncEnumerable(
            [EnumeratorCancellation] CancellationToken token = default)
        {
            var channel = Channel.CreateUnbounded<T>();
            ImmutableInterlocked.Update(ref _channels, x => x.Add(channel));
            try
            {
                while (await channel.Reader.WaitToReadAsync(token).ConfigureAwait(false))
                {
                    while (channel.Reader.TryRead(out var item))
                    {
                        yield return item;
                        token.ThrowIfCancellationRequested();
                    }
                }
            }
            finally
            {
                ImmutableInterlocked.Update(ref _channels, x => x.Remove(channel));
            }
        }
    
        public void YieldReturn(T value)
        {
            foreach (var channel in Volatile.Read(ref _channels))
                channel.Writer.TryWrite(value);
        }
        public void Complete()
        {
            foreach (var channel in Volatile.Read(ref _channels))
                channel.Writer.TryComplete();
        }
        public void Fault(Exception ex)
        {
            foreach (var channel in Volatile.Read(ref _channels))
                channel.Writer.TryComplete(ex);
        }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-03-01
      • 2012-03-21
      • 1970-01-01
      • 1970-01-01
      • 2011-01-05
      • 1970-01-01
      • 2014-03-03
      • 1970-01-01
      相关资源
      最近更新 更多