【问题标题】:How can I see what my reactive extensions query is doing?如何查看我的响应式扩展查询在做什么?
【发布时间】:2019-11-11 14:32:20
【问题描述】:

我正在编写一个包含大量运算符的复杂响应式扩展查询。我怎样才能看到发生了什么?

我正在询问并回答这个问题,因为它出现了一些问题,并且可能具有很好的一般用途。

【问题讨论】:

    标签: c# debugging system.reactive


    【解决方案1】:

    您可以在开发 Rx 运算符时随意将此函数附加到它们以查看发生了什么:

        public static IObservable<T> Spy<T>(this IObservable<T> source, string opName = null)
        {
            opName = opName ?? "IObservable";
            Console.WriteLine("{0}: Observable obtained on Thread: {1}",
                              opName,
                              Thread.CurrentThread.ManagedThreadId);
    
            return Observable.Create<T>(obs =>
            {
                Console.WriteLine("{0}: Subscribed to on Thread: {1}",
                                  opName,
                                  Thread.CurrentThread.ManagedThreadId);
    
                try
                {
                    var subscription = source
                        .Do(x => Console.WriteLine("{0}: OnNext({1}) on Thread: {2}",
                                                    opName,
                                                    x,
                                                    Thread.CurrentThread.ManagedThreadId),
                            ex => Console.WriteLine("{0}: OnError({1}) on Thread: {2}",
                                                     opName,
                                                     ex,
                                                     Thread.CurrentThread.ManagedThreadId),
                            () => Console.WriteLine("{0}: OnCompleted() on Thread: {1}",
                                                     opName,
                                                     Thread.CurrentThread.ManagedThreadId)
                        )
                        .Subscribe(obs);
                    return new CompositeDisposable(
                        subscription,
                        Disposable.Create(() => Console.WriteLine(
                              "{0}: Cleaned up on Thread: {1}",
                              opName,
                              Thread.CurrentThread.ManagedThreadId)));
                }
                finally
                {
                    Console.WriteLine("{0}: Subscription completed.", opName);
                }
            });
        }
    

    这是一个示例用法,显示了Range 的细微行为差异:

    Observable.Range(0, 1).Spy("Range").Subscribe();
    

    给出输出:

    Range: Observable obtained on Thread: 7
    Range: Subscribed to on Thread: 7
    Range: Subscription completed.
    Range: OnNext(0) on Thread: 7
    Range: OnCompleted() on Thread: 7
    Range: Cleaned up on Thread: 7
    

    但是这个:

    Observable.Range(0, 1, Scheduler.Immediate).Spy("Range").Subscribe();
    

    给出输出:

    Range: Observable obtained on Thread: 7
    Range: Subscribed to on Thread: 7
    Range: OnNext(0) on Thread: 7
    Range: OnCompleted() on Thread: 7
    Range: Subscription completed.
    Range: Cleaned up on Thread: 7
    

    找出不同之处?

    显然,您可以将其更改为写入日志或调试,或使用预处理器指令对 Release 构建等进行精益传递订阅...

    您可以在整个运营商链中应用Spy。例如:

    Observable.Range(0,3).Spy("Range")
              .Scan((acc, i) => acc + i).Spy("Scan").Subscribe();
    

    给出输出:

    Range: Observable obtained on Thread: 7
    Scan: Observable obtained on Thread: 7
    Scan: Subscribed to on Thread: 7
    Range: Subscribed to on Thread: 7
    Range: Subscription completed.
    Scan: Subscription completed.
    Range: OnNext(1) on Thread: 7
    Scan: OnNext(1) on Thread: 7
    Range: OnNext(2) on Thread: 7
    Scan: OnNext(3) on Thread: 7
    Range: OnCompleted() on Thread: 7
    Scan: OnCompleted() on Thread: 7
    Range: Cleaned up on Thread: 7
    Scan: Cleaned up on Thread: 7
    

    我相信您可以找到丰富的方法来满足您的目的。

    【讨论】:

    • 如果Do(x =&gt; Console.WriteLine(...)) 不够用,这是一个很好的解决方案。
    • 我通过将订阅放在一个字段中然后将其返回包装在一个带有 Console.WriteLine 的 CompositeDisposable 中来“丰富”,这对于记录取消订阅是很遗憾的。不错。
    • 不错!需要注意的一件事是,Create 会在您返回的 IDisposable 上有效地调用 Dispose()即使订阅者没有Dispose 将在订阅者处置或可观察对象终止时调用 - 以最快发生者为准。它这样做是为了让您可以尽快清理资源 - 所以请记住,这可能不是取消订阅的结果。
    • 不错的答案。我还发现订阅和处置(或缺少)是调试 Rx 查询的关键要素。当您有可能假设已经屈服/完成的 selectmany 子句时,这一点尤其重要。这种记录/间谍活动消除了猜测和假设。
    • 再次回来说这种扩展方法仍然经常“挽救我的生命”:)
    【解决方案2】:

    又过了三年,我还在使用你的想法。我的版本现在演变如下:

    • 日志记录目的地选择的重载
    • 订阅日志数
    • 记录来自不良订阅者的“下游”异常。

    代码:

    public static IObservable<T> Spy<T>(this IObservable<T> source, string opName = null)
    {
        return Spy(source, opName, Console.WriteLine);
    }
    
    public static IObservable<T> Spy<T>(this IObservable<T> source, string opName, 
                                                                  Action<string> logger)
    {
        opName = opName ?? "IObservable";
        logger($"{opName}: Observable obtained on Thread: {Thread.CurrentThread.ManagedThreadId}");
    
        var count = 0;
        return Observable.Create<T>(obs =>
        {
            logger($"{opName}: Subscribed to on Thread: {Thread.CurrentThread.ManagedThreadId}");
            try
            {
                var subscription = source
                    .Do(x => logger($"{opName}: OnNext({x}) on Thread: {Thread.CurrentThread.ManagedThreadId}"),
                        ex => logger($"{opName}: OnError({ex}) on Thread: {Thread.CurrentThread.ManagedThreadId}"),
                        () => logger($"{opName}: OnCompleted() on Thread: {Thread.CurrentThread.ManagedThreadId}")
                    )
                    .Subscribe(t =>
                    {
                        try
                        {
                            obs.OnNext(t);
                        }
                        catch(Exception ex)
                        {
                            logger($"{opName}: Downstream exception ({ex}) on Thread: {Thread.CurrentThread.ManagedThreadId}");
                            throw;
                        }
                    }, obs.OnError, obs.OnCompleted);
    
                return new CompositeDisposable(
                        Disposable.Create(() => logger($"{opName}: Dispose (Unsubscribe or Observable finished) on Thread: {Thread.CurrentThread.ManagedThreadId}")),
                        subscription,
                        Disposable.Create(() => Interlocked.Decrement(ref count)),
                        Disposable.Create(() => logger($"{opName}: Dispose (Unsubscribe or Observable finished) completed, {count} subscriptions"))
                    );
            }
            finally
            {
                Interlocked.Increment(ref count);
                logger($"{opName}: Subscription completed, {count} subscriptions.");
            }
        });
    }
    

    【讨论】:

    • 很高兴听到它很有帮助,而且装饰精美!
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-11-22
    • 1970-01-01
    • 2019-10-01
    • 2018-11-09
    • 1970-01-01
    相关资源
    最近更新 更多