【发布时间】:2014-12-29 02:16:00
【问题描述】:
我有一个应用程序运行多个观察者的可观察间隔。每 0.5 秒间隔从 Web 服务器加载一些 XML 数据,然后观察者在后台线程上执行一些特定于应用程序的处理。一旦不再需要数据,订阅和可观察的时间间隔就会被释放,因此观察者的 OnNext/OnCompleted/OnError 将不再被调用。到目前为止一切顺利。
我的问题:在极少数情况下,调用 Dispose 后,我的观察者的 OnNext 方法可能仍在运行!在处理完之后进行进一步操作之前,我想确保 OnNext 已经完成。
我当前的解决方案:我在观察者类中引入了一个储物柜字段(参见代码)。处置后,我尝试获取锁并仅在获取锁后继续。虽然这个解决方案有效(?),但不知怎的,我觉得有点不对劲。
问题:有没有更优雅、更“Rx Way”的方式来解决这个问题?
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace RxExperimental
{
internal sealed class MyXmlDataFromWeb
{
public string SomeXmlDataFromWeb { get; set; }
}
internal sealed class MyObserver : IObserver<MyXmlDataFromWeb>
{
private readonly object _locker = new object();
private readonly string _observerName;
public MyObserver(string observerName) {
this._observerName = observerName;
}
public object Locker {
get { return this._locker; }
}
public void OnCompleted() {
lock (this._locker) {
Console.WriteLine("{0}: Completed.", this._observerName);
}
}
public void OnError(Exception error) {
lock (this._locker) {
Console.WriteLine("{0}: An error occured: {1}", this._observerName, error.Message);
}
}
public void OnNext(MyXmlDataFromWeb value) {
lock (this._locker) {
Console.WriteLine(" {0}: OnNext running on thread {1}... ", this._observerName, Thread.CurrentThread.ManagedThreadId);
Console.WriteLine(" {0}: XML received: {1}", this._observerName, value.SomeXmlDataFromWeb);
Thread.Sleep(5000); // simulate some long running operation
Console.WriteLine(" {0}: OnNext running on thread {1}... Done.", this._observerName, Thread.CurrentThread.ManagedThreadId);
}
}
}
internal sealed class Program
{
private static void Main() {
const int interval = 500;
//
var dataSource = Observable.Interval(TimeSpan.FromMilliseconds(interval), NewThreadScheduler.Default).Select(_ => {
var data = new MyXmlDataFromWeb {
SomeXmlDataFromWeb = String.Format("<timestamp>{0:yyyy.MM.dd HH:mm:ss:fff}</timestamp>", DateTime.Now)
};
return data;
}).Publish();
//
var observer1 = new MyObserver("Observer 1");
var observer2 = new MyObserver("Observer 2");
//
var subscription1 = dataSource.ObserveOn(NewThreadScheduler.Default).Subscribe(observer1);
var subscription2 = dataSource.ObserveOn(NewThreadScheduler.Default).Subscribe(observer2);
//
var connection = dataSource.Connect();
//
Console.WriteLine("Press any key to cancel ...");
Console.ReadLine();
//
subscription1.Dispose();
subscription2.Dispose();
connection.Dispose();
//
lock (observer1.Locker) {
Console.WriteLine("Observer 1 completed.");
}
lock (observer2.Locker) {
Console.WriteLine("Observer 2 completed.");
}
//
Console.WriteLine("Can only be executed, after all observers completed.");
}
}
}
【问题讨论】:
-
如果您在 OnNext 回调中执行长时间阻塞工作,我认为您没有按照 Rx 的精神工作。由于 Rx 实际上是一个回调管道,您将阻塞您的源生产者。您可能需要考虑使用消息传递设计,或者考虑让您的 OnNext 处理程序引入另一层异步(请参阅嵌套可观察序列)
-
另外,我建议您不要实现 IObserver
(或就此而言 IObservable )接口。相反,使用运算符创建查询。
标签: c# multithreading system.reactive