【问题标题】:Handling errors in an observable sequence using Rx使用 Rx 处理可观察序列中的错误
【发布时间】:2011-05-19 01:19:42
【问题描述】:

如果发生错误,有没有办法让一个可观察的序列用序列中的下一个元素恢复执行? 从this post 看来,您需要在 Catch() 中指定一个新的可观察序列才能恢复执行,但是如果您只需要继续处理序列中的下一个元素怎么办?有没有办法做到这一点?

更新: 场景如下: 我有一堆需要处理的元素。该处理由一系列步骤组成。我有 将步骤分解为我想要编写的任务。 我遵循了 here 发布的 ToObservable() 指南 将任务转换为可观察对象以进行组合。 所以基本上我正在做这样的事情 -

foreach(element in collection)
{
   var result = from aResult in DoAAsync(element).ToObservable() 
         from bResult in DoBAsync(aResult).ToObservable() 
         from cResult in DoCAsync(bResult).ToObservable() 
         select cResult;
   result.subscribe( register on next and error handlers here)
 }

或者我可以这样:

var result = 
        from element in collection.ToObservable() 
        from aResult in DoAAsync(element).ToObservable() 
         from bResult in DoBAsync(aResult).ToObservable() 
         from cResult in DoCAsync(bResult).ToObservable() 
         select cResult;

这里继续处理其他元素的最佳方式是什么,即使我们说处理 其中一个元素引发异常。我希望能够记录错误并继续前进。

【问题讨论】:

    标签: system.reactive reactive-programming


    【解决方案1】:

    James 和 Richard 都提出了一些很好的观点,但我认为他们没有为您提供解决问题的最佳方法。

    James 建议使用.Catch(Observable.Never<Unit>())。当他说“将......允许流继续”时他错了,因为一旦你遇到异常,流就必须结束——这就是 Richard 在提到观察者和可观察者之间的合同时指出的。

    此外,以这种方式使用 Never 会导致您的 observables 永远无法完成。

    简短的回答是.Catch(Observable.Empty<Unit>()) 是将序列从以错误结尾的序列更改为以完成结尾的序列的正确方法。

    您已经想到了使用SelectMany 处理源集合的每个值以便捕获每个异常的正确想法,但是您遇到了几个问题。

    您使用任务 (TPL) 只是为了将函数调用转换为可观察对象。这会强制您的 observable 使用任务池线程,这意味着 SelectMany 语句可能会以不确定的顺序生成值。

    您还隐藏了处理数据的实际调用,使重构和维护更加困难。

    我认为您最好创建一个允许跳过异常的扩展方法。这里是:

    public static IObservable<R> SelectAndSkipOnException<T, R>(
        this IObservable<T> source, Func<T, R> selector)
    {
        return
            source
                .Select(t =>
                    Observable.Start(() => selector(t)).Catch(Observable.Empty<R>()))
                .Merge();
    }
    

    使用此方法,您现在可以简单地执行此操作:

    var result =
        collection.ToObservable()
            .SelectAndSkipOnException(t =>
            {
                var a = DoA(t);
                var b = DoB(a);
                var c = DoC(b);
                return c;
            });
    

    这段代码要简单得多,但它隐藏了异常。如果你想在让你的序列继续的同时保持异常,那么你需要做一些额外的时髦。向Materialize 扩展方法添加几个重载可以避免错误。

    public static IObservable<Notification<R>> Materialize<T, R>(
        this IObservable<T> source, Func<T, R> selector)
    {
        return source.Select(t => Notification.CreateOnNext(t)).Materialize(selector);
    }
    
    public static IObservable<Notification<R>> Materialize<T, R>(
        this IObservable<Notification<T>> source, Func<T, R> selector)
    {
        Func<Notification<T>, Notification<R>> f = nt =>
        {
            if (nt.Kind == NotificationKind.OnNext)
            {
                try
                {
                    return Notification.CreateOnNext<R>(selector(nt.Value));
                }
                catch (Exception ex)
                {
                    ex.Data["Value"] = nt.Value;
                    ex.Data["Selector"] = selector;
                    return Notification.CreateOnError<R>(ex);
                }
            }
            else
            {
                if (nt.Kind == NotificationKind.OnError)
                {
                    return Notification.CreateOnError<R>(nt.Exception);
                }
                else
                {
                    return Notification.CreateOnCompleted<R>();
                }
            }
        };
        return source.Select(nt => f(nt));
    }
    

    这些方法允许你这样写:

    var result =
        collection
            .ToObservable()
            .Materialize(t =>
            {
                var a = DoA(t);
                var b = DoB(a);
                var c = DoC(b);
                return c;
            })
            .Do(nt =>
            {
                if (nt.Kind == NotificationKind.OnError)
                {
                    /* Process the error in `nt.Exception` */
                }
            })
            .Where(nt => nt.Kind != NotificationKind.OnError)
            .Dematerialize();
    

    您甚至可以链接这些Materialize 方法并使用ex.Data["Value"]ex.Data["Selector"] 来获取抛出错误的值和选择器函数。

    我希望这会有所帮助。

    【讨论】:

    • 我在尝试使用可观察对象的可观察对象时遇到了类似的问题。当一个内部的 observable 抛出一个 OnError 时,外部的 observable 查看它也会随之移动到 OnError —— 导致一切都关闭。我已经尝试过捕获异常并抛出 OnCompleted 的解决方案,但这会产生与 OnCompleted 和 OnError 完全相同的行为都会导致订阅关闭
    【解决方案2】:

    IObservableIObserver 之间的合同是OnNext*(OnCompelted|OnError)?,所有运营商都遵守这一协议,即使不是由来源提供。

    您唯一的选择是使用Retry 重新订阅源,但如果源为每个描述返回IObservable 实例,您将看不到任何新值。 p>

    您能否提供有关您的场景的更多信息?也许还有另一种看待它的方式。

    编辑:根据您更新的反馈,听起来您只需要Catch

    var result = 
        from element in collection.ToObservable() 
        from aResult in DoAAsync(element).ToObservable().Log().Catch(Observable.Empty<TA>())
        from bResult in DoBAsync(aResult).ToObservable().Log().Catch(Observable.Empty<TB>()) 
        from cResult in DoCAsync(bResult).ToObservable().Log().Catch(Observable.Empty<TC>())
        select cResult;
    

    这会将错误替换为不会触发下一个序列的Empty(因为它在后台使用SelectMany

    【讨论】:

    • 我已经更新了帖子以包含我想要完成的场景
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-08-12
    • 2017-04-06
    • 1970-01-01
    • 1970-01-01
    • 2015-07-03
    • 1970-01-01
    相关资源
    最近更新 更多