【发布时间】:2020-07-31 14:09:34
【问题描述】:
我有一个 IObservable 正在流式传输单个事件,我想将它们批处理到另一个 IObservable 流式传输事件批次。我是这么写的;
class MyEvent
{
public string Description {get; set;}
}
class EventsBatch
{
public MyEvent[] Batch {get; set;}
}
IObservable<EventsBatch> ConvertToBatches(IObservable<MyEvent> observable, int batchSize)
{
var eventsAccumulator = new List<MyEvent>();
var subject = new ReplaySubject<EventsBatch>();
var completionSource = new TaskCompletionSource<object>();
var subscription = observable.Subscribe(events =>
{
eventsAccumulator.Add(events);
if (eventsAccumulator.Count == batchSize)
{
subject.OnNext(new EventsBatch { Batch = eventsAccumulator.ToArray() });
eventsAccumulator.Clear();
}
},
ex => completionSource.TrySetException(ex),
() =>
{
subject.OnNext(new EventsBatch { Batch = eventsAccumulator.ToArray() });
subject.OnCompleted();
completionSource.TrySetResult(null);
});
completionSource.Task.Wait();
subscription.Dispose();
subject.OnCompleted();
return subject;
}
这会阻塞,直到整个流完成,然后再流式传输生成的批次,在最好的情况下性能很差,并且在无限流的情况下永远不会返回。
这样做的正确方法是什么?
【问题讨论】:
-
永远不要在
.Subscribe中使用.OnNext。总有一种方法可以进行纯粹的查询。
标签: c# observable system.reactive