【问题标题】:In Rx.Net how can I implement an observable feedback loop until the feedback is exhausted?在 Rx.Net 中,我如何实现一个可观察的反馈循环,直到反馈用尽?
【发布时间】:2015-11-06 20:17:43
【问题描述】:

我有以下 API:

IObservable<IList<SqlDataRecord>> WriteToDBAndGetFailedSource(SqlConnection conn, IList<SqlDataRecord> batch)

它尝试将批处理写入数据库。如果失败,则返回整个批次,否则返回的 observable 为空。

我还有一个生产批次的来源:

IObservable<IList<SqlDataRecord>> GetDataSource(string filePath, int bufferThreshold)

现在,我可以这样组合它们:

var failedBatchesSource = GetDataSource(filePath, 1048576)
  .Select(batch => WriteToDBAndGetFailedSource(conn, batch))
  .Merge(100);

这将写入所有批次(最多同时写入 100 个)并返回一个可观察到的失败批次。

我真正想要的是在某个暂停后将失败的批次送回批次的来源,可能是在原始来源仍在生产批次的时候。当然,我可以这样写:

var failedBatchesSource = GetDataSource(filePath, 1048576)
  .Select(batch => WriteToDBAndGetFailedSource(conn, batch))
  .Merge(100)
  .Select(batch => WriteToDBAndGetFailedSource(conn, batch))
  .Merge(100);

但这当然是错误的,因为:

  1. 这打破了在再次处理失败的批次之前暂停的要求。
  2. 它可能会向数据库生成超过 100 个并发写入请求。
  3. 这就像展开一个迭代次数未知的 for 循环 - 没有效率。

一旦我收集了所有的失败,我也可以跳出可观察的 monad,并在循环中重新开始:

            var src = GetDataSource(filePath, 1048576);

            for (;;)
            {
                var failed = await src
                    .Select(batch => WriteToDBAndGetFailedSource(conn, batch))
                    .Merge(100)
                    .ToList();
                if (failed.Count == 0)
                {
                    break;
                }
                src = failed.ToObservable();
            }

但我想知道我是否可以在可观察的单子中做得更好。

【问题讨论】:

    标签: c# system.reactive


    【解决方案1】:

    认为这可能会奏效

    public static IObservable<T> ProcessAll<T>(this IObservable<T> source, Func<T, IObservable<T>> processor, int mergeCount, TimeSpan failureDelay)
    {
        return Observable.Create<T>(
            observer =>
                {
                    var failed = new Subject<T>();
    
                    return source.Merge(failed)
                            .Select(processor)
                            .Merge(mergeCount)
                            .Delay(failureDelay)
                            .Subscribe(failed.OnNext, observer.OnError, observer.OnCompleted);
                });
    }
    

    并像这样使用它:

    GetDataSource(filePath, 1048576)
      .ProcessAll(batch => WriteToDBAndGetFailedSource(conn, batch), 100, TimeSpan.FromMilliseconds(500))
      .Subscribe();
    

    ProcessAll 是一个可怕的名字,但现在是星期五晚上,我想不出更好的名字。

    【讨论】:

    • failed 如何完成?
    【解决方案2】:

    使用Observable.Buffer. 可以让您缓冲直到有 100 条记录要发送,或者直到 X 时间过去。

    或者,Observable.Interval 将简单地每 X 时间跨度触发一次。您可以在处理发布事件时添加并发限制。

    只要有要发布的对象,它们中的任何一个都应该重复触发。

    【讨论】:

      猜你喜欢
      • 2014-09-16
      • 2018-12-09
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-11-04
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多