【问题标题】:buffer while processing items处理项目时的缓冲区
【发布时间】:2015-03-05 14:30:41
【问题描述】:

我有一个定期触发的事件。假设处理事件需要大约 1 秒。我不想为每个收到的事件等待 1 秒,而是想累积事件 直到最后一个处理完成。处理完成后,我想处理上次处理期间收到的事件数据:

e1   e2   e3                                                            e4   e5   e6                 e7                                              events happening   
---------------------------------------------------------------------------------------------------------------------------------------------------> time
                         1s                      2s                     3s                       4s                       5s                      6s
p(e1)                    p(e2, e3)                                      p(e4)                    p(e5, e6)                p(e7)
[-----------------------][-----------------------]                      [-----------------------][-----------------------][-----------------------]  processing of items                        

In above example, processing start as soon as e1 happens. While the processing takes places 2 more events have arrived. They should be stored so when p(e1) - which means the processing of e1 - 
is finished the processing of the events e2 and e3 takes place. 

This proces is similar to a rolling build: a changeset is checked in, the buildserver starts building and once the build is finished all changesets that have been 
checked in during the build will then be processed.

我应该如何使用 Rx 做到这一点?

我曾尝试将缓冲区与打开和关闭选择器结合使用,但我无法正确使用。任何示例或方向表示赞赏!

让我们假设Subject<int> 作为输入流。

我已经尝试过这样的事情,但我完全迷失了。

var observer1 = input
.Buffer(bc.Where(open => open), _ => bc.Where(open => !open))
.Subscribe(ev =>
{
    bc.OnNext(true);
    String.Format("Processing items {0}.", string.Join(", ", ev.Select(e => e.ToString())).Dump());
    Thread.Sleep(300);
    bc.OnNext(false);
});

【问题讨论】:

  • 我喜欢滚动构建类比。以后解释这种情况时我会自己使用它!

标签: c# .net system.reactive reactive-programming


【解决方案1】:

这是不平凡的。幸运的是@DaveSexton 已经完成了所有艰苦的工作。你想要来自 Rxx 库的BufferIntrospectiveCheck out the source here.

这很难的原因是IObserver<T> 没有内置的方法来表示背压 - 除了阻止 OnXXX 调用的微妙之处。 Observable需要注意Observer,需要引入并发来管理缓冲。

另请注意,如果您有多个订阅者,他们将获得不同的数据,因为他们收到的数据取决于源事件率和他们的消费率。

另一种方法是将所有事件添加到 OnNext 处理程序中的线程安全队列中,并有一个单独的任务在循环中清空队列。 BufferIntrospective 可能更干净。

玩了一会儿,这个玩具实现似乎奏效了。但是 Rxx 会更加健壮,所以这只是教学上真正展示所涉及的类型的东西。关键是通过调度器引入并发。

public static IObservable<IList<TSource>> BufferIntrospective<TSource>(
    this IObservable<TSource> source,
    IScheduler scheduler = null)
{
    scheduler = scheduler ?? Scheduler.Default;
    return Observable.Create<IList<TSource>>(o => {
        Subject<Unit> feedback = new Subject<Unit>();
        var sourcePub = source.Publish().RefCount();
        var sub = sourcePub.Buffer(
            () => feedback).ObserveOn(scheduler).Subscribe(@event =>
            {                
                o.OnNext(@event);
                feedback.OnNext(Unit.Default);
            },
            o.OnError,
            o.OnCompleted);
        var start = sourcePub.Take(1).Subscribe(_ => feedback.OnNext(Unit.Default));
        return new CompositeDisposable(sub, start);
    });        
}

此示例代码显示了用法以及两个不同节奏的订阅者如何获得不同的事件缓冲,一个接收 5 个批次,另一个接收 10 个批次。

我正在使用LINQPadDump 轻松显示每个缓冲区的内容。

var xs = Observable.Interval(TimeSpan.FromSeconds(0.2)).Take(30);

var buffered = xs.BufferIntrospective();

buffered.Subscribe(x => {
    x.Dump();
    Task.Delay(TimeSpan.FromSeconds(1)).Wait();
});

buffered.Subscribe(x => {
    x.Dump();
    Task.Delay(TimeSpan.FromSeconds(2)).Wait();
});

【讨论】:

  • 看起来不错。我第一次看 Rxx 扩展。很好的解释,我仍然对这种高级(对我而言)的东西感到困惑。有一件事,Rxx 方法没有观察到第一个 OnNext(),知道为什么吗?
  • @Expecho 链接到的BufferIntrospective James 的版本中有an issue,它实际上异步订阅源流,因此在将其与热源一起使用时,您可能会丢失一些初始项目如果你不知道这一点。如果你使用latest version 应该不会有这个问题
猜你喜欢
  • 2014-08-29
  • 1970-01-01
  • 1970-01-01
  • 2016-07-04
  • 1970-01-01
  • 1970-01-01
  • 2021-11-29
  • 2019-06-01
  • 1970-01-01
相关资源
最近更新 更多