【问题标题】:How to cancel an observable sequence如何取消可观察序列
【发布时间】:2011-07-20 09:36:21
【问题描述】:

我有一个非常简单的IObservable<int>,它每 500 毫秒充当一个脉冲发生器:

var pulses = Observable.GenerateWithTime(0, i => true, i => i + 1, i => i,
                                         i => TimeSpan.FromMilliseconds(500))

我有一个CancellationTokenSource(用于取消同时进行的其他工作)。

如何使用取消令牌源取消我的可观察序列?

【问题讨论】:

    标签: c# system.reactive cancellation


    【解决方案1】:

    这是一个旧线程,但仅供将来参考,这里有一个更简单的方法。

    如果您有 CancellationToken,您可能已经在处理任务。因此,只需将其转换为 Task 并让框架进行绑定:

    using System.Reactive.Threading.Tasks;
    ...
    var task = myObservable.ToTask(cancellationToken);
    

    这将创建一个内部订阅者,该订阅者将在任务取消时被释放。在大多数情况下,这将起到作用,因为大多数可观察对象仅在有订阅者时才产生值。

    现在,如果您有一个实际的 observable 出于某种原因需要处置(如果父任务被取消,可能是一个不再重要的 hot observable),这可以通过 continuation 来实现:

    disposableObservable.ToTask(cancellationToken).ContinueWith(t => {
        if (t.Status == TaskStatus.Canceled)
            disposableObservable.Dispose();
    });
    

    【讨论】:

      【解决方案2】:

      如果您使用的是 GenerateWithTime(现在替换为 Generate 传入时间跨度函数重载),您可以替换第二个参数来评估取消令牌的状态,如下所示:

      var pulses = Observable.Generate(0,
          i => !ts.IsCancellationRequested,
          i => i + 1,
          i => i,
          i => TimeSpan.FromMilliseconds(500));
      

      或者,如果可以将导致设置取消令牌的事件本身转换为可观察对象,则可以使用以下内容:

      pulses.TakeUntil(CancelRequested);
      

      我在http://www.thinqlinq.com/Post.aspx/Title/Cancelling-a-Reactive-Extensions-Observable也发布了更详细的解释。

      【讨论】:

        【解决方案3】:

        您可以使用以下 sn-p 将您的 IObservable 订阅与 CancellationTokenSource 连接起来

        var pulses = Observable.GenerateWithTime(0,
            i => true, i => i + 1, i => i,
            i => TimeSpan.FromMilliseconds(500));
        
        // Get your CancellationTokenSource
        CancellationTokenSource ts = ...
        
        // Subscribe
        ts.Token.Register(pulses.Subscribe(...).Dispose);
        

        【讨论】:

        • 您的答案和吉姆的答案都是实现此目的的有效方法。你的可以应用于各种可观察的,但在使用 Generate 方法之一时,Jim 的更简洁一些。
        【解决方案4】:

        这里有两个方便的运算符来取消可观察序列。它们之间的区别在于取消时会发生什么。 TakeUntil 导致序列正常完成 (OnCompleted),而 WithCancellation 导致异常终止 (OnError)。

        /// <summary>Returns the elements from the source observable sequence until the
        /// CancellationToken is canceled.</summary>
        public static IObservable<TSource> TakeUntil<TSource>(
            this IObservable<TSource> source, CancellationToken cancellationToken)
        {
            return source
                .TakeUntil(Observable.Create<Unit>(observer =>
                    cancellationToken.Register(() => observer.OnNext(default))));
        }
        
        /// <summary>Ties a CancellationToken to an observable sequence. In case of
        /// cancellation propagates an OperationCanceledException to the observer.</summary>
        public static IObservable<TSource> WithCancellation<TSource>(
            this IObservable<TSource> source, CancellationToken cancellationToken)
        {
            return source
                .TakeUntil(Observable.Create<Unit>(o => cancellationToken.Register(() =>
                    o.OnError(new OperationCanceledException(cancellationToken)))));
        }
        

        使用示例:

        var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
        
        var pulses = Observable
            .Generate(0, i => true, i => i + 1, i => i, i => TimeSpan.FromMilliseconds(500))
            .WithCancellation(cts.Token);
        

        注意:在取消的情况下,上面显示的自定义操作符会立即从底层 observable 取消订阅。如果 observable 包含副作用,则需要考虑这一点。将 TakeUntil(cts.Token) 放在执行副作用的操作符之前将推迟整个 observable 的完成,直到副作用完成 (graceful termination)。将其放在副作用之后将使取消立即生效,从而可能导致任何正在运行的代码继续以一种即发即弃的方式继续运行而不被观察。

        【讨论】:

          【解决方案5】:

          您从订阅中得到一个 IDisposable 实例。请致电Dispose()

          【讨论】:

          • 我知道,但是我需要一些其他进程来轮询取消状态并在取消发生时处理订阅。我一直在寻找更自动的东西,以某种方式将我的取消令牌源链接到我的 observable。
          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2012-07-16
          • 1970-01-01
          • 1970-01-01
          • 2019-07-13
          • 1970-01-01
          相关资源
          最近更新 更多