【问题标题】:How to use Rx.Nex extension ForEachAsync with async action如何使用带有异步操作的 Rx.Nex 扩展 ForEachAsync
【发布时间】:2017-07-28 21:39:59
【问题描述】:

我有从 SQL 流式传输数据并将其写入不同存储的代码。代码大概是这样的:

using (var cmd = new SqlCommand("select * from MyTable", connection))
{
     using (var reader = await cmd.ExecuteReaderAsync())
     {
         var list = new List<MyData>();
         while (await reader.ReadAsync())
         {
             var row = GetRow(reader);
             list.Add(row);
             if (list.Count == BatchSize)
             {
                 await WriteDataAsync(list);
                 list.Clear();
             }
         }
         if (list.Count > 0)
         {
             await WriteDataAsync(list);
         }
     }
 }

我想为此使用响应式扩展。理想情况下,代码如下所示:

await StreamDataFromSql()
    .Buffer(BatchSize)
    .ForEachAsync(async batch => await WriteDataAsync(batch));

但是,扩展方法 ForEachAsync 似乎只接受同步操作。是否可以编写一个接受异步操作的扩展?

【问题讨论】:

    标签: c# asynchronous system.reactive rx.net


    【解决方案1】:

    是否可以编写一个接受异步操作的扩展?

    不直接。

    Rx 订阅必须是同步的,因为 Rx 是一个基于推送的系统。当数据项到达时,它会遍历您的查询,直到到达最终订阅 - 在本例中是执行 Action

    Rx 提供的await-able 方法是awaiting 序列本身 - 即,ForEachAsync 在序列方面是异步的(您正在异步等待序列完成),但ForEachAsync 中的订阅(对每个元素执行的操作)仍必须是同步的。

    为了在您的数据管道中进行同步到异步的转换,您需要有一个缓冲区。 Rx 订阅可以(同步)作为生产者添加到缓冲区,而异步消费者正在检索项目并处理它们。因此,您需要一个同时支持同步和异步操作的生产者/消费者队列。

    TPL Dataflow 中的各种块类型可以满足这一需求。这样的事情就足够了:

    var obs = StreamDataFromSql().Buffer(BatchSize);
    var buffer = new ActionBlock<IList<T>>(batch => WriteDataAsync(batch));
    using (var subscription = obs.Subscribe(buffer.AsObserver()))
      await buffer.Completion;
    

    注意没有背压;只要StreamDataFromSql 可以推送数据,它将被缓冲并存储在ActionBlock 的传入队列中。根据数据的大小和类型,这会很快占用大量内存。

    【讨论】:

    • 您能否再解释一下“Rx 订阅必须是同步的,因为 Rx 是基于推送的系统”?乍一看我会说那是不正确的,但也许我误解了你的意思。
    • @Enigmativity 我的意思是没有内置的自动背压系统。例如,您的答案中的订阅是同步的,而不是异步的。一旦在您的订阅中点击了第一个 await,就 Rx 而言,整个订阅方法已经完成,可以免费开始另一个。
    • 很公平。我想我应该进行.ObserveOn 调用以将执行从UI 线程移开。然后,根据 WriteDataAsync 是否使用任何 UI 元素,它可以直接运行 UI 并避免以这种方式出现异步问题。
    【解决方案2】:

    正确的做法是正确使用 Reactive Extensions 来完成这项工作 - 因此,从创建连接的那一刻开始,直到写入数据。

    方法如下:

    IObservable<IList<MyData>> query =
        Observable
            .Using(() => new SqlConnection(""), connection =>
                Observable
                    .Using(() => new SqlCommand("select * from MyTable", connection), cmd =>
                        Observable
                            .Using(() => cmd.ExecuteReader(), reader =>
                                Observable
                                    .While(() => reader.Read(), Observable.Return(GetRow(reader))))))
            .Buffer(BatchSize);
    
    IDisposable subscription =
        query
            .Subscribe(async list => await WriteDataAsync(list));
    

    我无法测试代码,但它应该可以工作。此代码假定 WriteDataAsync 也可以采用 IList&lt;MyData&gt;。如果它不只是放入.ToList()

    【讨论】:

    • Subscribe 不需要Func&lt;T,Task&gt;,所以asyncawait 并没有真正做任何事情。
    • @ErikHeemskerk - 它仍然是 Action&lt;T&gt; 并且适用于 async/await
    【解决方案3】:

    这是支持异步操作的ForEachAsync 方法的一个版本。它将源 observable 投影到包含异步操作的嵌套 IObservable&lt;IObservable&lt;Unit&gt;&gt;,然后使用 Merge 运算符将其展平回 IObservable&lt;Unit&gt;。生成的 observable 最终被转换为任务。

    默认情况下,动作是按顺序调用的,但可以通过配置可选的maximumConcurrency 参数来同时调用它们。

    取消可选的cancellationToken 参数会导致返回的Task 立即完成(取消),可能在取消当前运行的操作之前。

    任何可能发生的异常都会通过Task 传播,并导致取消所有当前正在运行的操作。

    /// <summary>
    /// Invokes an asynchronous action for each element in the observable sequence,
    /// and returns a 'Task' that represents the completion of the sequence and
    /// all the asynchronous actions.
    /// </summary>
    public static Task ForEachAsync<TSource>(
        this IObservable<TSource> source,
        Func<TSource, CancellationToken, Task> action,
        CancellationToken cancellationToken = default,
        int maximumConcurrency = 1)
    {
        // Arguments validation omitted
        return source
            .Select(item => Observable.FromAsync(ct => action(item, ct)))
            .Merge(maximumConcurrency)
            .DefaultIfEmpty()
            .ToTask(cancellationToken);
    }
    

    使用示例:

    await StreamDataFromSql()
        .Buffer(BatchSize)
        .ForEachAsync(async (batch, token) => await WriteDataAsync(batch, token));
    

    【讨论】:

      【解决方案4】:

      这里是 the source code for ForEachAsync 和 ToEnumerable 和 AsObservable 方法上的 article

      我们可以围绕 ForEachAsync 做一个包装器,它将等待一个任务返回函数:

      public static async Task ForEachAsync<T>( this IObservable<T> t, Func<T, Task> onNext )
      {
          foreach ( var x in t.ToEnumerable() )
              await onNext( x );
      }
      

      示例用法:

      await ForEachAsync( Observable.Range(0, 10), async x => await Task.FromResult( x ) );
      

      【讨论】:

      • 非常好。但是,通过等待,我基本上失去了在 WriteDataAsync 实现中使用异步的好处。我想知道是否有保持原始代码的非阻塞性质。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-05-30
      • 2020-03-26
      • 1970-01-01
      • 1970-01-01
      • 2020-04-22
      相关资源
      最近更新 更多