【问题标题】:Wait for Rx observer to complete without using lock等待 Rx 观察者完成而不使用锁
【发布时间】:2014-12-29 02:16:00
【问题描述】:

我有一个应用程序运行多个观察者的可观察间隔。每 0.5 秒间隔从 Web 服务器加载一些 XML 数据,然后观察者在后台线程上执行一些特定于应用程序的处理。一旦不再需要数据,订阅和可观察的时间间隔就会被释放,因此观察者的 OnNext/OnCompleted/OnError 将不再被调用。到目前为止一切顺利。

我的问题:在极少数情况下,调用 Dispose 后,我的观察者的 OnNext 方法可能仍在运行!在处理完之后进行进一步操作之前,我想确保 OnNext 已经完成。

我当前的解决方案:我在观察者类中引入了一个储物柜字段(参见代码)。处置后,我尝试获取锁并仅在获取锁后继续。虽然这个解决方案有效(?),但不知怎的,我觉得有点不对劲。

问题:有没有更优雅、更“Rx Way”的方式来解决这个问题?

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace RxExperimental
{
    internal sealed class MyXmlDataFromWeb
    {
        public string SomeXmlDataFromWeb { get; set; }
    }

    internal sealed class MyObserver : IObserver<MyXmlDataFromWeb>
    {
        private readonly object _locker = new object();
        private readonly string _observerName;

        public MyObserver(string observerName) {
            this._observerName = observerName;
        }

        public object Locker {
            get { return this._locker; }
        }

        public void OnCompleted() {
            lock (this._locker) {
                Console.WriteLine("{0}: Completed.", this._observerName);
            }
        }

        public void OnError(Exception error) {
            lock (this._locker) {
                Console.WriteLine("{0}: An error occured: {1}", this._observerName, error.Message);
            }
        }

        public void OnNext(MyXmlDataFromWeb value) {
            lock (this._locker) {
                Console.WriteLine("  {0}: OnNext running on thread {1}... ", this._observerName, Thread.CurrentThread.ManagedThreadId);
                Console.WriteLine("  {0}: XML received: {1}", this._observerName, value.SomeXmlDataFromWeb);
                Thread.Sleep(5000); // simulate some long running operation
                Console.WriteLine("  {0}: OnNext running on thread {1}... Done.", this._observerName, Thread.CurrentThread.ManagedThreadId);
            }
        }
    }

    internal sealed class Program
    {
        private static void Main() {
            const int interval = 500;
            //
            var dataSource = Observable.Interval(TimeSpan.FromMilliseconds(interval), NewThreadScheduler.Default).Select(_ => {
                var data = new MyXmlDataFromWeb {
                    SomeXmlDataFromWeb = String.Format("<timestamp>{0:yyyy.MM.dd HH:mm:ss:fff}</timestamp>", DateTime.Now)
                };
                return data;
            }).Publish();
            //
            var observer1 = new MyObserver("Observer 1");
            var observer2 = new MyObserver("Observer 2");
            //
            var subscription1 = dataSource.ObserveOn(NewThreadScheduler.Default).Subscribe(observer1);
            var subscription2 = dataSource.ObserveOn(NewThreadScheduler.Default).Subscribe(observer2);
            //
            var connection = dataSource.Connect();
            //
            Console.WriteLine("Press any key to cancel ...");
            Console.ReadLine();
            //
            subscription1.Dispose();
            subscription2.Dispose();
            connection.Dispose();
            //
            lock (observer1.Locker) {
                Console.WriteLine("Observer 1 completed.");
            }
            lock (observer2.Locker) {
                Console.WriteLine("Observer 2 completed.");
            }
            //
            Console.WriteLine("Can only be executed, after all observers completed.");
        }
    }
}

【问题讨论】:

  • 如果您在 OnNext 回调中执行长时间阻塞工作,我认为您没有按照 Rx 的精神工作。由于 Rx 实际上是一个回调管道,您将阻塞您的源生产者。您可能需要考虑使用消息传递设计,或者考虑让您的 OnNext 处理程序引入另一层异步(请参阅嵌套可观察序列)
  • 另外,我建议您不要实现 IObserver(或就此而言 IObservable)接口。相反,使用运算符创建查询。

标签: c# multithreading system.reactive


【解决方案1】:

是的,有更多的 Rx 方式来做到这一点。

第一个观察结果是,取消订阅可观察流本质上独立于观察者当前正在发生的事情。真的没有任何反馈。由于您需要明确知道观察何时结束,因此您需要将其建模到可观察流中。换句话说,您应该完成该流,而不是取消订阅该流,这样您就可以观察到OnComplete 事件。在您的情况下,您可以使用 TakeUntil 来结束 observable 而不是取消订阅它。

第二个观察是你的主程序需要观察你的“观察者”何时完成它的工作。但是由于您将“观察者”设置为实际的IObservable,因此您实际上没有办法做到这一点。当人们第一次开始使用 Rx 时,我发现这是一个常见的混淆来源。如果您将“观察者”建模为可观察链中的另一个链接,那么您的主程序可以观察者。具体来说,您的“观察者”只不过是一个映射操作(具有副作用),它将传入的 Xml 数据映射到“完成”消息中。

所以如果你重构你的代码,你就能得到你想要的……

public class MyObserver
{
    private readonly string _name;

    public MyObserver(string name) { _name = name; }

    public IObservable<Unit> Handle(IObservable<MyXmlDataFromWeb source)
    {
        return source.Select(value =>
        {
            Thread.Sleep(5000); // simulate work
            return Unit.Default;
        });
    }
}

// main
var endSignal = new Subject<Unit>();
var dataSource = Observable
    .Interval(...)
    .Select(...)
    .TakeUntil(endSignal)
    .Publish();
var observer1 = new MyObserver("Observer 1");
var observer2 = new MyObserver("Observer 2");
var results1 = observer1.Handle(dataSource.ObserveOn(...));
var results2 = observer2.Handle(dataSource.ObserveOn(...));
// since you just want to know when they are all done
// just merge them.
// use ToTask() to subscribe them and collect the results
// as a Task
var processingDone = results1.Merge(results2).Count().ToTask();

dataSource.Connect();

Console.WriteLine("Press any key to cancel ...");
Console.ReadLine();

// end the stream
endSignal.OnNext(Unit.Default);

// wait for the processing to complete.
// use await, or Task.Result
var numProcessed = await processingDone;

【讨论】:

  • 非常感谢您的意见。然而,这种方法有几个问题没有得到解答: 1. 虽然在 MyOberver 的 Handle 方法中进行实际工作可以替代 onNext 处理程序,但这种方法应该如何处理 onError 和 onCompleted 事件?在取消间隔后继续进行进一步操作之前,您如何确保 onError/onCompleted 所做的工作实际完成?当然我可以添加一次性订阅,但是我遇到了和以前一样的问题。 ;-)
  • 2.在 MyObserver 的 Handle 方法中执行实际工作有一个缺点: processingDone.Wait() 一直等到序列中的所有消息都已处理完毕。这种行为是不希望的!我希望它在取消间隔后立即忽略/处理所有未完成的消息(如果有)(并且只等待当前运行的 onNext/onError/onCompleted 完成)。我最初的方法(带锁)通过处理订阅来实现这种行为。有什么想法吗?
  • 1 - 让MyObserverSelect 之后附加更多运算符。根据您的具体要求,有很多选择。 .Do() 是一个开始寻找的好地方。 Rx 将保证在processingDone 任务完成之前完成操作。
  • 2 - 当您发出endSignal 信号时,这会导致TakeUntil(endSignal) 结束输入流并停止处理任何新消息。只有当前消息将完成处理。 Rx 已经保证一次只发送一条消息。除非您专门应用 BufferObserveOn 之类的运算符,否则消息不会“堆积”,即使这样,您也可以将 TakeUntil 运算符 after 放置在此类缓冲操作之后,以确保您结束流并丢弃缓冲的请求。
猜你喜欢
  • 2019-08-20
  • 2018-01-03
  • 2019-03-25
  • 2019-09-23
  • 2017-09-08
  • 1970-01-01
  • 2017-10-12
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多