【发布时间】:2015-02-26 17:06:49
【问题描述】:
考虑以下管道:
- 将物品缓冲到包装中
- 在线程池线程中观察这些包
- 对这些包进行一些异步处理
如果进程已经完成,将源 observable 设置为 complete 将导致缓冲区按原样发出当前包。但是,处理部分将在获得最后一个包之前获得完整的事件。
我的想法是等待最后一个包被处理,但由于我在 OnNext 之前得到 OnComplete,我似乎无法通过 ReactiveX 机制来做到这一点。
有什么方法可以让 OnComplete 在最后一个 OnNext 之后发生?
这是一个模拟此行为的示例 (available as a .NET Fiddle):
var publications =
obs
.Do(x => Console.WriteLine("to buffer {0}", x), () => Console.WriteLine("to buffer complete"))
.Buffer(2)
.Do(x => Console.WriteLine("from buffer {0}", ShowContent(x)), () => Console.WriteLine("from buffer complete"))
.ObserveOn(ThreadPoolScheduler.Instance)
.Do(x => Console.WriteLine("to selectmany {0}", ShowContent(x)), () => Console.WriteLine("to selectmany complete"))
.SelectMany(x => Test(x).ToEnumerable())
.Do(x => Console.WriteLine("notify {0}", x), () => Console.WriteLine("complete"));
publications
.Do(x => Console.WriteLine("publications notify {0}", x), () => Console.WriteLine("publications complete"))
.Subscribe()
;
obs.OnNext(1);
obs.OnNext(2);
obs.OnNext(3);
var nextpub = publications.FirstAsync();
obs.OnCompleted();
nextpub.Wait();
【问题讨论】:
标签: c# system.reactive