【问题标题】:Merge multiple IAsyncEnumerable streams合并多个 IAsyncEnumerable 流
【发布时间】:2022-01-11 07:06:33
【问题描述】:

随着Mediatr 10 的发布,现在有一种范式允许开发人员创建由IAsyncEnumerable 提供支持的流。我正在利用这种范例来创建多个不同的文件系统观察程序来监视多个文件夹。为了监控文件夹,我使用了两种不同的方法:轮询和FileSystemWatcher。作为我管道的一部分,所有不同的文件夹监视器都聚合到一个 IEnumerable<IAsyncEnumerable<FileRecord> 中。在每种类型的观察者中,都有一个内部循环运行,直到通过 CancellationToken 请求取消。

这是投票观察者:

public class PolledFileStreamHandler : 
    IStreamRequestHandler<PolledFileStream, FileRecord>
{
    private readonly ISeenFileStore _seenFileStore;
    private readonly IPublisher _publisher;
    private readonly ILogger<PolledFileStreamHandler> _logger;

    public PolledFileStreamHandler(
        ISeenFileStore seenFileStore, 
        IPublisher publisher, 
        ILogger<PolledFileStreamHandler> logger)
    {
        _seenFileStore = seenFileStore;
        _publisher = publisher;
        _logger = logger;
    }

    public async IAsyncEnumerable<FileRecord> Handle(
        PolledFileStream request, 
        [EnumeratorCancellation] CancellationToken cancellationToken)
    {
        var queue = new ConcurrentQueue<FileRecord>();
        while (!cancellationToken.IsCancellationRequested)
        {
            var files = Directory.EnumerateFiles(request.Folder)
                .Where(f => !_seenFileStore.Contains(f));

            await Parallel.ForEachAsync(files, CancellationToken.None, async (f,t) =>
            {
                var info = new FileRecord(f);
                
                _seenFileStore.Add(f);
                await _publisher.Publish(new FileSeenNotification { FileInfo = info }, t);
                queue.Enqueue(info);
            });
            
            // TODO: Try mixing the above parallel task with the serving task... Might be chaos...

            while (!queue.IsEmpty)
            {
                if (queue.TryDequeue(out var result))
                    yield return result;
            }

            _logger.LogInformation("PolledFileStreamHandler watching {Directory} at: {Time}", request.Folder, DateTimeOffset.Now);
            
            await Task.Delay(request.Interval, cancellationToken)
                .ContinueWith(_ => {}, CancellationToken.None);
        }
    }
}

还有 FileSystemWatcher

public class FileSystemStreamHandler : 
    IStreamRequestHandler<FileSystemStream, FileRecord>
{
    private readonly ISeenFileStore _seenFileStore;
    private readonly ILogger<FileSystemStreamHandler> _logger;
    private readonly IPublisher _publisher;
    private readonly ConcurrentQueue<FileRecord> _queue;

    private Action<object, FileSystemEventArgs>? _tearDown;

    public FileSystemStreamHandler(
        ISeenFileStore seenFileStore, 
        ILogger<FileSystemStreamHandler> logger, 
        IPublisher publisher)
    {
        _seenFileStore = seenFileStore;
        _logger = logger;
        _publisher = publisher;
        _queue = new ConcurrentQueue<FileRecord>();
    }

    public async IAsyncEnumerable<FileRecord> Handle(
        FileSystemStream request, 
        [EnumeratorCancellation] CancellationToken cancellationToken)
    {
        var watcher = SetupWatcher(request.Folder, cancellationToken);
        
        while (!cancellationToken.IsCancellationRequested)
        {
            if (_queue.TryDequeue(out var record))
                yield return record;

            await Task.Delay(100, cancellationToken)
                .ContinueWith(_ => {}, CancellationToken.None);
        }
        
        TearDownWatcher(watcher);
    }
    
    private FileSystemWatcher SetupWatcher(string folder, CancellationToken cancellation)
    {
        var watcher = new FileSystemWatcher(folder);
        watcher.NotifyFilter = NotifyFilters.Attributes
                               | NotifyFilters.CreationTime
                               | NotifyFilters.DirectoryName
                               | NotifyFilters.FileName
                               | NotifyFilters.LastAccess
                               | NotifyFilters.LastWrite
                               | NotifyFilters.Security
                               | NotifyFilters.Size;
        watcher.EnableRaisingEvents = true;
        _tearDown = (_, args) => OnWatcherOnChanged(args, cancellation);
        watcher.Created += _tearDown.Invoke;

        return watcher;
    }
    
    private async void OnWatcherOnChanged(FileSystemEventArgs args, CancellationToken cancellationToken)
    {
        var path = args.FullPath;

        if (_seenFileStore.Contains(path)) return;
            
        _seenFileStore.Add(path);

        try
        {
            if ((File.GetAttributes(path) & FileAttributes.Directory) != 0) return;
        }
        catch (FileNotFoundException)
        {
            _logger.LogWarning("File {File} was not found. During a routine check. Will not be broadcast", path);
            return;
        }
            
        var record = new FileRecord(path);
        _queue.Enqueue(record);
        await _publisher.Publish(new FileSeenNotification { FileInfo = record }, cancellationToken);
    }

    private void TearDownWatcher(FileSystemWatcher watcher)
    {
        if (_tearDown != null)
            watcher.Created -= _tearDown.Invoke;
    }
}

最后,这是将所有内容联系在一起并尝试监视流的类(在StartAsync 方法中)。您会注意到来自 System.Interactive.AsyncMerge 运算符的存在,这目前无法按预期运行。

public class StreamedFolderWatcher : IDisposable
{
    private readonly ConcurrentBag<Func<IAsyncEnumerable<FileRecord>>> _streams;
    private CancellationTokenSource? _cancellationTokenSource;
    private readonly IMediator _mediator;
    private readonly ILogger<StreamedFolderWatcher> _logger;

    public StreamedFolderWatcher(
        IMediator mediator,
        IEnumerable<IFileStream> fileStreams, 
        ILogger<StreamedFolderWatcher> logger)
    {
        _mediator = mediator;
        _logger = logger;
        _streams = new ConcurrentBag<Func<IAsyncEnumerable<FileRecord>>>();
        _cancellationTokenSource = new CancellationTokenSource();

        fileStreams.ToList()
            .ForEach(f => AddStream(f, _cancellationTokenSource.Token));
    }

    private void AddStream<T>(
        T request, 
        CancellationToken cancellationToken) 
        where T : IStreamRequest<FileRecord>
    {
        _streams.Add(() => _mediator.CreateStream(request, cancellationToken));
    }

    public async Task StartAsync(CancellationToken cancellationToken)
    {
        _cancellationTokenSource = CancellationTokenSource
            .CreateLinkedTokenSource(cancellationToken);

        var streams = _streams.Select(s => s()).ToList();
        while (!cancellationToken.IsCancellationRequested)
        {
            await foreach (var file in streams.Merge().WithCancellation(cancellationToken))
            {
                _logger.LogInformation("Incoming file {File}", file);
            }
            
            await Task.Delay(1000, cancellationToken)
                .ContinueWith(_ => {}, CancellationToken.None);
        }
    }

    public Task StopAsync()
    {
        _cancellationTokenSource?.Cancel();

        return Task.CompletedTask;
    }

    public void Dispose()
    {
        _cancellationTokenSource?.Dispose();
        GC.SuppressFinalize(this);
    }
}

我对 Merge 行为的期望是,如果我有 3 个 IAsyncEnumerables,则每个项目应在产生后立即发出。相反,除非我将 yield break 放在循环中的某个位置,否则提取的第一个 IStreamRequestHandler 将简单地无限执行直到取消令牌强制停止。

如何将多个输入 IAsyncEnumerables 合并到一个长期存在的输出流中,每次产生结果时都会发出?

最小可重复样本

static async IAsyncEnumerable<(Guid Id, int Value)> CreateSequence(
    [EnumeratorCancellation] CancellationToken cancellationToken)
{
    var random = new Random();
    var id = Guid.NewGuid();
    while (!cancellationToken.IsCancellationRequested)
    {
        await Task.Delay(TimeSpan.FromMilliseconds(random.Next(100, 1000)));
        yield return (id, random.Next(0, 10));
    }
}

var token = new CancellationTokenSource();
var sequences = Enumerable.Range(0, 10)
    .Select(_ => CreateSequence(token.Token));
var merged = sequences.Merge();

await foreach (var (id, value) in merged)
{
    Console.WriteLine($"[{DateTime.Now.ToShortTimeString()}] Value {value} Emitted from {id}");
}

【问题讨论】:

  • 这能回答你的问题吗? Asynchronously write to an IAsyncEnumerable
  • @RaymondChen 我不确定。这似乎是为将 IEnumerable&lt;Task&gt; 转换为统一的 IAsyncEnumerable 而量身定制的,但我会看看是否可以重新利用它。
  • 哎呀..我最近看到了一个副本..
  • 这看起来像是生产者消费者模式的经典用例。也许您可以将所有异步枚举枚举到一个通道中,然后返回 ReadAllAsync。参照。 devblogs.microsoft.com/dotnet/…
  • 我想出了一个相当强力的解决方案,它利用了几个后台任务。我将在下面发布。这绝对不是最好的解决方案,所以我会留下这个问题以期待得到纠正。

标签: c# asynchronous async-await iasyncenumerable ix.net


【解决方案1】:

似乎 Rx 团队搞砸了 Merge 运算符,并创建了具有不同行为的重载。这个重载支持并发:

public static IAsyncEnumerable<TSource> Merge<TSource>(
    params IAsyncEnumerable<TSource>[] sources);

这个重载不支持并发:

public static IAsyncEnumerable<TSource> Merge<TSource>(
    this IEnumerable<IAsyncEnumerable<TSource>> sources);

来自source code里面的cmets:

// REVIEW:
// This implementation does not exploit concurrency. We should not introduce such
// behavior in order to avoid breaking changes, but we could introduce a parallel
// ConcurrentMerge implementation. It is unfortunate though that the Merge
// overload accepting an array has always been concurrent, so we can't change that
// either (in order to have consistency where Merge is non-concurrent, and
// ConcurrentMerge is).

所以你要做的就是在Merge()之前转换你的可枚举.ToArray()

【讨论】:

  • 这不是什么!这是一个很棒的收获。我可以确认这似乎完全纠正了这个问题。另外值得注意的是,我不能使用扩展方法来完成此操作,而是必须依赖AsyncEnumerableEx.Merge 而不是myAsyncEnumerableCollection.Merge()
【解决方案2】:

我设法提出了一个可行的,但可能效率低下且可能存在错误的解决方案。通过将每个 IAsyncEnumerable 放入自己的后台任务中,我可以将每个 IAsyncEnumerable 发送到线程安全队列中,在每个可用时提供它们。

public static async IAsyncEnumerable<TSource> MergeAsyncEnumerable<TSource>(
    this IEnumerable<IAsyncEnumerable<TSource>> sources,
    TimeSpan? debounceTime = default,
    [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    var queue = new ConcurrentQueue<TSource>();
    var tasks = SetupCollections(sources, queue, cancellationToken);
    
    while (!Task.WhenAll(tasks).IsCompleted)
    {
        while (!queue.IsEmpty)
            if (queue.TryDequeue(out var record))
                yield return record;
            
        // Small debounce to prevent an infinite loop from just spinning. 
        await WaitIfDebounce(debounceTime, cancellationToken);
    }

    await Task.CompletedTask;
}

private static Task WaitIfDebounce(
    TimeSpan? debounceTime,
    CancellationToken cancellationToken)
{
    return debounceTime.HasValue
        ? Task.Delay(debounceTime.Value, cancellationToken)
            .ContinueWith(_ => { }, CancellationToken.None)
        : Task.CompletedTask;
}

private static IList<Task> SetupCollections<TSource>(
    IEnumerable<IAsyncEnumerable<TSource>> sources,
    ConcurrentQueue<TSource> queue,
    CancellationToken cancellationToken)
{
    return sources
        .Select(s => Task.Run(async () =>
        {
            await foreach (var file in s.WithCancellation(cancellationToken)) 
                queue.Enqueue(file);
        }, cancellationToken))
        .ToList();
}

【讨论】:

  • await Task.Delay(100, cancellationToken) 行的目的是什么?
  • @TheodorZoulias 总的来说,这是不必要的,但我为我的特定用例添加了它作为一种去抖动。
  • MergeAsyncEnumerable 似乎是一种通用方法。恕我直言,它不应包含特定于应用程序的逻辑。
  • @TheodorZoulias 它还有助于降低 CPU 使用率,因为我不只是有一个无限循环坐在那里旋转。在我的 i9-9900k 上,应用程序的 CPU 使用率从恒定的 6-8% 下降到 0-1%。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-04-02
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2023-03-18
  • 1970-01-01
相关资源
最近更新 更多