【问题标题】:C# Rx Observable collapse individual events to batchesC# Rx Observable 将单个事件折叠成批处理
【发布时间】:2020-07-31 14:09:34
【问题描述】:

我有一个 IObservable 正在流式传输单个事件,我想将它们批处理到另一个 IObservable 流式传输事件批次。我是这么写的;

class MyEvent
{
    public string Description {get; set;}
}

class EventsBatch
{
    public MyEvent[] Batch {get; set;}
}

IObservable<EventsBatch> ConvertToBatches(IObservable<MyEvent> observable, int batchSize)
{
    var eventsAccumulator = new List<MyEvent>();
    var subject = new ReplaySubject<EventsBatch>();
    var completionSource = new TaskCompletionSource<object>();

    var subscription = observable.Subscribe(events =>
        {
            eventsAccumulator.Add(events);
            if (eventsAccumulator.Count == batchSize)
            {
                subject.OnNext(new EventsBatch { Batch = eventsAccumulator.ToArray() });
                eventsAccumulator.Clear();
            }
        },
        ex => completionSource.TrySetException(ex),
        () =>
        {
            subject.OnNext(new EventsBatch { Batch = eventsAccumulator.ToArray() });
            subject.OnCompleted();
            completionSource.TrySetResult(null);
        });

    completionSource.Task.Wait();
    subscription.Dispose();
    subject.OnCompleted();
    return subject;
}

这会阻塞,直到整个流完成,然后再流式传输生成的批次,在最好的情况下性能很差,并且在无限流的情况下永远不会返回。

这样做的正确方法是什么?

【问题讨论】:

  • 永远不要在.Subscribe 中使用.OnNext。总有一种方法可以进行纯粹的查询。

标签: c# observable system.reactive


【解决方案1】:

我想我可能已经想通了;

IObservable<EventsBatch> convertToBatches(IObservable<MyEvent> observable, int batchSize)
{
    return observable.Buffer(batchSize).Select(e => new EventsBatch{ Batch = e.ToArray() });
}

如果可能,请提出更好的解决方案

【讨论】:

  • 是的,Buffer 就是答案。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2013-03-16
  • 1970-01-01
  • 1970-01-01
  • 2014-08-26
相关资源
最近更新 更多