【问题标题】:Reactive Extensions: buffer until subscriber is idle反应式扩展:缓冲直到订阅者空闲
【发布时间】:2012-11-27 21:22:32
【问题描述】:

我有一个程序,我正在接收事件并希望分批处理它们,以便在我处理当前批次时进入的所有项目都将出现在下一个批次中。

Rx 中简单的 TimeSpan 和基于计数的 Buffer 方法将给我多批次的项目,而不是给我一大批已经进来的所有东西(如果订阅者花费的时间超过指定的 TimeSpan 或超过 N 个项目进来,N大于count)。

我研究了使用采用Func<IObservable<TBufferClosing>>IObservable<TBufferOpening> and Func<TBufferOpening, IObservable<TBufferClosing>> 的更复杂的缓冲区重载,但我找不到如何使用这些的示例,更不用说弄清楚如何将它们应用于我正在尝试的内容做。

【问题讨论】:

  • This page 可能有助于 Buffer 重载。整个系列都很有帮助
  • 您在 TPL 数据流中尝试过BufferBlock 吗?

标签: c# system.reactive


【解决方案1】:

这是你想要的吗?

var xs = new Subject<int>();
var ys = new Subject<Unit>();

var zss =
    xs.Buffer(ys);

zss
    .ObserveOn(Scheduler.Default)
    .Subscribe(zs =>
    {
        Thread.Sleep(1000);
        Console.WriteLine(String.Join("-", zs));
        ys.OnNext(Unit.Default);
    });

ys.OnNext(Unit.Default);
xs.OnNext(1);
Thread.Sleep(200);
xs.OnNext(2);
Thread.Sleep(600);
xs.OnNext(3);
Thread.Sleep(400);
xs.OnNext(4);
Thread.Sleep(300);
xs.OnNext(5);
Thread.Sleep(900);
xs.OnNext(6);
Thread.Sleep(100);
xs.OnNext(7);
Thread.Sleep(1000);

我的结果:

1-2-3
4-5
6-7

【讨论】:

  • 删除 ObserveOn 以使程序在一个线程上执行会导致它中断。
  • @ChrisEldredge - 是的。您可能必须允许多线程才能使其正常工作。
  • 当队列为空时也会导致忙等待(将 CPU 固定在 100%)。如果你在 zs.Count == 0 时移除睡眠,你会看到尖峰。
  • @ChrisEldredge - 也许我应该尝试别的。 :-(
【解决方案2】:

你需要的是一些东西来缓冲值,然后当工人 准备就绪,它会请求当前缓冲区,然后将其重置。这个可以 通过 RX 和 Task 的组合来完成

class TicTac<Stuff> {

    private TaskCompletionSource<List<Stuff>> Items = new TaskCompletionSource<List<Stuff>>();

    List<Stuff> in = new List<Stuff>();

    public void push(Stuff stuff){
        lock(this){
            if(in == null){
                in = new List<Stuff>();
                Items.SetResult(in);
            }
            in.Add(stuff);
        }
    }

    private void reset(){
        lock(this){
            Items = new TaskCompletionSource<List<Stuff>>();
            in = null;
        }
    }

    public async Task<List<Stuff>> Items(){
        List<Stuff> list = await Items.Task;
        reset();
        return list;
    }
}

然后

var tictac = new TicTac<double>();

IObservable<double> source = ....

source.Subscribe(x=>tictac.Push(x));

然后在你的工人中

while(true){

    var items = await tictac.Items();

    Thread.Sleep(100);

    for each (item in items){
        Console.WriteLine(item);
    }

}

【讨论】:

  • 我想这可行,但如果我必须直接使用 TPL/APM 进行编码,为什么还要使用响应式扩展?
  • ReactiveExtensions 是一个框架。您可以根据需要扩展它。我经常为自定义内容添加自己的运算符
  • @ChrisEldredge 公平地说,您所要求的不适合 Rx 框架;订阅者无法向观察者发出信号表明它是否空闲,您必须在带外进行。
【解决方案3】:

我之前这样做的方法是在 DotPeek/Reflector 中提取 ObserveOn 方法,并采用它具有的排队概念并使其适应我们的要求。例如,在具有快速计时数据的 UI 应用程序(如金融)中,UI 线程可能会被事件淹没,有时它无法足够快地更新。在这些情况下,我们希望删除除最后一个事件之外的所有事件(对于特定工具)。在这种情况下,我们将 ObserveOn 的内部队列更改为单个值 T(查找 ObserveLatestOn(IScheduler))。在您的情况下,您想要队列,但是您想要推送整个队列而不仅仅是第一个值。这应该可以帮助您入门。

【讨论】:

    【解决方案4】:

    @Enigmativity 答案的一种扩展。我用这个来解决这个问题:

    public static IObservable<(Action ready, IReadOnlyList<T> values)> BufferUntilReady<T>(this IObservable<T> stream)
    {
        var gate = new BehaviorSubject<Guid>(Guid.NewGuid());
    
        void Ready() => gate.OnNext(Guid.NewGuid());
    
        return stream.Publish(shared => shared
            .Buffer(gate.CombineLatest(shared, ValueTuple.Create)
                .DistinctUntilChanged(new AnyEqualityComparer<Guid, T>()))
            .Where(x => x.Any())
            .Select(x => ((Action) Ready, (IReadOnlyList<T>) x)));
    }
    
    public class AnyEqualityComparer<T1, T2> : IEqualityComparer<(T1 a, T2 b)>
    {
        public bool Equals((T1 a, T2 b) x, (T1 a, T2 b) y) => Equals(x.a, y.a) || Equals(x.b, y.b);
        public int GetHashCode((T1 a, T2 b) obj) => throw new NotSupportedException();
    }
    

    订阅者收到一个 Ready() 函数,当准备好接收下一个缓冲区时调用该函数。我没有在同一个线程上观察每个缓冲区以避免循环,但我想你可以在其他地方打破它,如果你需要在同一个线程上处理每个缓冲区。

    【讨论】:

    • 这是一个非常天才的解决方案!
    猜你喜欢
    • 1970-01-01
    • 2013-05-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-04
    • 1970-01-01
    相关资源
    最近更新 更多