【问题标题】:How to hold an exception in an IObservable pipeline and re-throw it at the end?如何在 IObservable 管道中保存异常并在最后重新抛出它?
【发布时间】:2015-07-30 23:45:57
【问题描述】:

我有以下方法:

public IObservable<DataManagementWorkItem> GetWorkItemSource(int maxConcurrentCalls)
{
    return m_namespaceManager
        .GetNamespaceConnectionInfoSource(true, drainAndDisable: false)
        .Select(nci => Observable.Defer(() => GetPolicySourceForNamespace(nci)))
        .Merge(maxConcurrentCalls)
        .Where(IsValid)
        .Select(ToWorkItem)
        .Where(o => o != null);
}

它实现了以下逻辑:

  1. 通过从命名空间管理器 (GetNamespaceConnectionInfoSource) 获取 IObservable&lt;NamespaceConnectionInfo&gt; 进入 monad。
  2. 随着命名空间变得可用,获取与特定命名空间 (GetPolicySourceForNamespace) 对应的IObservable&lt;DataManagementPolicy&gt;。但是,请使用 Merge 运算符将并发调用数限制为 GetPolicySourceForNamespace
  3. 过滤掉不良的DataManagementPolicy 记录(不能在SQL 中完成)。
  4. 将看似不错的DataManagementPolicy 记录转换为DataManagementWorkItem 实例。有些可能会变成null,所以最后会被过滤掉。

GetNamespaceConnectionInfoSource 在产生一定数量的有效NamespaceConnectionInfo 对象后会出错。到那时,在最终的可观察序列中,完全有可能已经产生了一定数量的 DataManagementWorkItem 对象。

我有一个单元测试,其中:

  • GetNamespaceConnectionInfoSource 在生成 25 个命名空间后抛出
  • GetPolicySourceForNamespace 每个命名空间产生 10 个对象
  • 并发限制为 10

我也有兴趣检查最终 observable 在出现故障之前产生的项目:

var dm = DependencyResolver.Instance.GetInstance<IDataManagement>();
var workItems = new List<DataManagementWorkItem>();
try
{
    var obs = dm.GetWorkItemSource(10);
    obs.Subscribe(wi => workItems.Add(wi));
    await obs;
    Assert.Fail("An expected exception was not thrown");
}
catch (Exception exc)
{
    AssertTheRightException(exc);
}

workItems 集合每次都有不同数量的项目。一次运行它有 69 个项目,另一个 - 50 个,另一个 - 18 个。

我的解释是,当故障发生时,在处理的各个阶段都有好的NamespaceConnectionInfoDataManagementPolicy 对象,所有这些都因为故障而中止。每次的数量都不一样,因为物品是异步产生的。

这就是我的问题 - 我不希望他们被中止。我希望它们运行完成,以最终的可观察序列产生,然后才传达故障。本质上,我想保留异常并在最后重新抛出它。

我尝试稍微修改一下实现:

public IObservable<DataManagementWorkItem> GetWorkItemSource(int maxConcurrentCalls)
{
    Exception fault = null;
    return m_namespaceManager
        .GetNamespaceConnectionInfoSource(true, drainAndDisable: false)
        .Catch<NamespaceConnectionInfo, Exception>(exc =>
        {
            fault = exc;
            return Observable.Empty<NamespaceConnectionInfo>();
        })
        .Select(nci => Observable.Defer(() => GetPolicySourceForNamespace(nci)))
        .Merge(maxConcurrentCalls)
        .Where(IsValid)
        .Select(ToWorkItem)
        .Where(o => o != null)
        .Finally(() =>
        {
            if (fault != null)
            {
                throw fault;
            }
        });
}

不用说 - 它没有用。 Finally 似乎没有传播任何异常,我实际上同意。

那么,实现我想要的正确方法是什么?

编辑

与问题无关,我发现我用来收集生成的DataManagementWorkItem 实例的测试代码不好。而不是

    var obs = dm.GetWorkItemSource(10);
    obs.Subscribe(wi => workItems.Add(wi));
    await obs;

应该是

    await dm.GetWorkItemSource(1).Do(wi => workItems.Add(wi));

区别在于后者只订阅了一次项目的来源,而原始版本订阅了两次:

  1. bySubscribe
  2. await

它不会影响问题,但会破坏我的模拟代码。

澄清

这更像是一个澄清。每个命名空间产生 10 个策略对象的序列。但是这个过程是异步的——策略对象是按顺序生成的,但是是异步的。在此期间,命名空间会继续生成,因此在故障发生前给出了 25 个命名空间,生成的命名空间可能处于三种可能的“状态”:

  • 尚未为其生成任何策略对象,但已启动异步策略生成过程
  • 已经生成了一些(但少于 10 个)策略对象
  • 命名空间的所有 10 个策略对象都已生成

当命名空间生产中发生错误时,整个管道将被中止,无论“好”命名空间现在处于何种“状态”。

让我们看看下面这个简单的例子:

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;

namespace observables
{
    class Program
    {
        static void Main()
        {
            int count = 0;
            var obs = Observable
                .Interval(TimeSpan.FromMilliseconds(1))
                .Take(50)
                .Select(i =>
                {
                    if (25 == Interlocked.Increment(ref count))
                    {
                        throw new Exception("Boom!");
                    }
                    return i;
                })
                .Select(i => Observable.Defer(() => Observable.Interval(TimeSpan.FromMilliseconds(1)).Take(10).Select(j => i * 1000 + j)))
                .Merge(10);

            var items = new HashSet<long>();
            try
            {
                obs.Do(i => items.Add(i)).GetAwaiter().GetResult();
            }
            catch (Exception exc)
            {
                Debug.WriteLine(exc.Message);
            }
            Debug.WriteLine(items.Count);
        }
    }
}

当我运行它时,我通常有以下输出:

Boom!
192

但是,它也可以显示191。但是,如果我们应用故障连接解决方​​案(即使在没有故障时它不起作用):

        int count = 0;
        var fault = new Subject<long>();
        var obs = Observable
            .Interval(TimeSpan.FromMilliseconds(1))
            .Take(50)
            .Select(i =>
            {
                if (25 == Interlocked.Increment(ref count))
                {
                    throw new Exception("Boom!");
                }
                return i;
            })
            .Catch<long, Exception>(exc =>
            {
                fault.OnError(exc);
                return Observable.Empty<long>();
            })
            .Select(i => Observable.Defer(() => Observable.Interval(TimeSpan.FromMilliseconds(1)).Take(10).Select(j => i * 1000 + j)))
            .Merge(10)
            .Concat(fault);

那么输出始终是240,因为我们让所有已经启动的异步进程都完成了。

基于 pmccloghrylaing 回答的尴尬解决方案

    public IObservable<DataManagementWorkItem> GetWorkItemSource(int maxConcurrentCalls)
    {
        var fault = new Subject<DataManagementWorkItem>();
        bool faulted = false;
        return m_namespaceManager
            .GetNamespaceConnectionInfoSource(true, drainAndDisable: false)
            .Catch<NamespaceConnectionInfo, Exception>(exc =>
            {
                faulted = true;
                return Observable.Throw<NamespaceConnectionInfo>(exc);
            })
            .Finally(() =>
            {
                if (!faulted)
                {
                    fault.OnCompleted();
                }
            })
            .Catch<NamespaceConnectionInfo, Exception>(exc =>
            {
                fault.OnError(exc);
                return Observable.Empty<NamespaceConnectionInfo>();
            })
            .Select(nci => Observable.Defer(() => GetPolicySourceForNamespace(nci)))
            .Merge(maxConcurrentCalls)
            .Where(IsValid)
            .Select(ToWorkItem)
            .Where(o => o != null)
            .Concat(fault);
    }

它在命名空间产生错误和成功时都有效,但看起来很尴尬。加上多个订阅仍然共享错误。一定有更优雅的解决方案。

GetNamespaceConnectionInfo源代码

public IObservable<NamespaceConnectionInfo> GetNamespaceConnectionInfoSource(bool? isActive = null,
    bool? isWorkflowEnabled = null, bool? isScheduleEnabled = null, bool? drainAndDisable = null,
    IEnumerable<string> nsList = null, string @where = null, IList<SqlParameter> whereParameters = null)
{
    IList<SqlParameter> parameters;
    var sql = GetNamespaceConnectionInfoSqls.GetSql(isActive,
        isWorkflowEnabled, isScheduleEnabled, drainAndDisable, nsList, @where, whereParameters, out parameters);
    var sqlUtil = m_sqlUtilProvider.Get(m_siteSettings.ControlDatabaseConnString);
    return sqlUtil.GetSource(typeof(NamespaceConnectionInfo), sqlUtil.GetReaderAsync(sql, parameters)).Cast<NamespaceConnectionInfo>();
}

public IObservable<DbDataReader> GetReaderAsync(string query, IList<SqlParameter> parameters = null, CommandBehavior commandBehavior = CommandBehavior.Default)
{
    return Observable.FromAsync(async () =>
    {
        SqlCommand command = null;
        try
        {
            var conn = await GetConnectionAsync();
            command = GetCommand(conn, query, parameters);
            return (DbDataReader)await command.ExecuteReaderAsync(commandBehavior | CommandBehavior.CloseConnection);
        }
        finally
        {
            DisposeSilently(command);
        }
    });
}

public IObservable<object> GetSource(Type objectType, IObservable<DbDataReader> readerTask)
{
    return Observable.Create<object>(async (obs, ct) => await PopulateSource(objectType, await readerTask, true, obs, ct));
}

private static async Task PopulateSource(Type objectType, DbDataReader reader, bool disposeReader, IObserver<object> obs, CancellationToken ct)
{
    try
    {
        if (IsPrimitiveDataType(objectType))
        {
            while (await reader.ReadAsync(ct))
            {
                obs.OnNext(reader[0]);
            }
        }
        else
        {
            // Get all the properties in our Object
            var typeReflector = objectType.GetTypeReflector(TypeReflectorCreationStrategy.PREPARE_DATA_RECORD_CONSTRUCTOR);

            // For each property get the data from the reader to the object
            while (await reader.ReadAsync(ct))
            {
                obs.OnNext(typeReflector.DataRecordConstructor == null ?
                    ReadNextObject(typeReflector, reader) :
                    typeReflector.DataRecordConstructor(reader));
            }
        }
    }
    catch (OperationCanceledException)
    {
    }
    finally
    {
        if (disposeReader)
        {
            reader.Dispose();
        }
    }
}

【问题讨论】:

  • 你的第二个实现有一个缺陷,你需要使用.Create(...) 来避免。如果您获得对返回的 observable 的多个订阅,则 fault 可能已经有一个值,您的代码将错误地报告失败。您需要将方法的内容包装在 .Create(...) 中以避免这种情况。
  • 我认为没有问题。虽然,我在帖子中没有提到,但是命名空间的 observable 是冷的 observable,所以多个订阅者是相互隔离的,看不到彼此的错误。
  • 这是因为您可以有多个订阅者返回 observable,他们共享相同的 fault 变量。它与冷热无关。
  • 哦,那个。现在我明白了。由于我的第二个实现完全错误,我想您的评论适用于 pmccloghrylaing 提出的解决方案,它使用类似的模式。你是对的,再次感谢。
  • 您是否希望单元测试有 100 个结果?

标签: c# system.reactive


【解决方案1】:

m_namespaceManager.GetNamespaceConnectionInfoSource(true, drainAndDisable: false) 的调用返回IObservable&lt;NamespaceConnectionInfo&gt;。现在,任何单个 observable 的合约都是这样的:

OnNext*(OnError|OnCompleted)

这意味着您得到零个或多个值,后跟一个,并且只有一个,错误或完成。

你不能从单个 observable 中得到多个错误,并且在你得到一个错误之后你不能得到值。

如果您的 observable 确实返回了多个错误,则它违反了正常的 Rx 合同。

因此,鉴于此,鉴于现有代码,您不可能将错误延迟到可观察对象的末尾,因为错误可观察对象的末尾。

您可以做的是更改在GetNamespaceConnectionInfoSource 中生成值的方式,以便在将它们合并为一个之前生成多个序列调用.Materialize()。这意味着您将有一个 IObservable&lt;Notification&lt;NamespaceConnectionInfo&gt;&gt; 并且在整个流中可能有多个错误和完成。然后,您可以在处理错误之前对该流进行分组并处理这些值。但这一切都取决于对GetNamespaceConnectionInfoSource 的更改,并且由于您尚未发布源代码,因此我无法为您提供正确的代码。

为了帮助理解这一点,请查看以下代码:

var xs = new [] { 1, 2, 3, 0, 4, 0, 5 }.ToObservable();

xs
    .Select(x =>
    {
        if (x == 0)
            throw new NotSupportedException();
        else
            return x;
    })
    .Subscribe(
        x => Console.WriteLine(x),
        ex => Console.WriteLine(ex.ToString()));

它产生这个:

1
2
3
System.NotSupportedException: Specified method is not supported.
   at UserQuery.<Main>b__0(Int32 x) in query_ioaahp.cs:line 45
   at System.Reactive.Linq.ObservableImpl.Select`2._.OnNext(TSource value)

45 根本不会产生。

现在看看这段代码:

xs
    .Select(x =>
        Observable
            .Start(() =>
            {
                if (x == 0)
                    throw new NotSupportedException();
                else
                    return x;
            })
            .Materialize())
    .Merge()
    .Where(x => x.Kind != NotificationKind.OnCompleted)
    .Subscribe(
        x => Console.WriteLine(String.Format(
            "{0} {1}",
            x.Kind,
            x.HasValue ? x.Value.ToString() : "")),
        ex => Console.WriteLine(ex.ToString()));

这会产生以下内容:

OnNext 1
OnNext 4
OnError 
OnError 
OnNext 5
OnNext 3
OnNext 2

由于引入了并行性而出现故障。

但是现在你可以处理所有的错误了。

【讨论】:

  • GetNamespaceConnectionInfoSource 产生单个错误。这里没有违约。而且我不明白为什么我不能将错误延迟到最后。 pmccloghrylaing 清楚地表明这是可能的(解决方案不完整,但方向很有希望)。
  • @mark - 但是当我产生一个错误时 必须在末尾​​b> - 这就是合同。所以没有“错误”可以延迟到最后。 最后已经有一个错误了。 pmccloghrylaing 只是简单地将 错误放在末尾​​b> 并将其 附加到末尾 - 换句话说,他的解决方案根本没有做任何有用的事情。如果我在这里误解了什么,你能解释一下吗?
  • 我已经编辑了我的帖子 - 添加了说明。现在更有意义了吗?
【解决方案2】:

Concat 会解决您的问题吗?我已经用Finally 将它包裹在Observable.Create 中以完成faults 主题。

public IObservable<DataManagementWorkItem> GetWorkItemSource(int maxConcurrentCalls)
{
    return Observable.Create<DataManagementWorkItem>((observer) =>
    {
        var faults = new Subject<DataManagementWorkItem>();
        return m_namespaceManager
            .GetNamespaceConnectionInfoSource(true, drainAndDisable: false)
            .Catch<NamespaceConnectionInfo, Exception>(exc =>
            {
                faults.OnError(exc);
                return Observable.Empty<NamespaceConnectionInfo>();
            })
            .Take(maxConcurrentCalls)
            .Select(nci => GetPolicySourceForNamespace(nci))
            .Merge()
            .Where(IsValid)
            .Select(ToWorkItem)
            .Where(o => o != null)
            .Finally(() => faults.OnCompleted())
            .Concat(faults)
            .Subscribe(observer);
    });
}

另外,这会返回您期望的结果吗? (你的测试中有 24 个)

m_namespaceManager
    .GetNamespaceConnectionInfoSource(true, drainAndDisable: false)
    .Catch<NamespaceConnectionInfo, Exception>(exc =>
    {
        faults.OnError(exc);
        return Observable.Empty<NamespaceConnectionInfo>();
    })
    .Count()

【讨论】:

  • 感谢您的回答。然而,所提出的解决方案有一个问题。当没有故障时,faults 永远不会完成,因此生成的序列永远不会完成。
  • Enigmativity 发现的另一个问题是多个订阅者将共享同一个 faults 主题。因此,如果一个用户出现故障,那么其他用户可能会受到影响,即使他们没有任何错误。
  • 这根本不起作用。当 GetNamespaceConnectionInfoSource 出现错误时,它会停止生成值 - .Catch 将错误转换为 OnComplete(因为它用空的 obversable 替换错误)。所以源代码已经完成,然后.Concat 出现并在可观察对象的末尾弹出异常 - 就在它之前的位置。这根本不允许原始源运行超过异常。
  • 这看起来很奇怪。在所有合并的 observables 完成之前,不应添加故障。
  • @pmccloghrylaing - 没错,但错误来自GetNamespaceConnectionInfoSource 而不是GetPolicySourceForNamespace,因此它对最终结果没有影响。在这个查询中,涉及GetPolicySourceForNamespace 的选择只是对潜在问题的干扰。
【解决方案3】:

是的,基本问题是Merge 有一个快速失败的实现。如果源 observable 产生错误,或者任何内部 observable 产生错误,则Merge 会导致流失败,而无需等待剩余的内部 observable 完成。

为了实现你想要的,你需要在合并看到它之前“捕获”错误,并在内部 observables 完成后“重新抛出”它:

public IObservable<DataManagementWorkItem> GetWorkItemSource(int maxConcurrentCalls)
{
    // wrap within Observable.Defer
    // so that each new subscription
    // gets its own Error subject
    return Observable.Defer(() =>
    {
        var error = new ReplaySubject<DataManagementWorkItem>(1);

        return m_namespaceManager
            .GetNamespaceConnectionInfoSource(true, drainAndDisable: false)
            .Catch(err =>
            {
                error.OnError(err);
                return Observable.Empty<NamespaceConnectionInfo>();
            })
            .Finally(error.OnCompleted)
            .Select(nci => Observable.Defer(() => GetPolicySourceForNamespace(nci)))
            .Merge(maxConcurrentCalls)
            .Where(IsValid)
            .Select(ToWorkItem)
            .Where(o => o != null)
            .Concat(error);
    });
}

另外,我注意到您的单元测试将 两次 订阅返回的 observable,这增加了您的困惑。一次调用Subscribe 来填充您的列表,然后再次调用await。你真的只想订阅一次。我们可以使用.Do 运算符来填充您的列表,您应该能够在错误处理程序中检查它:

var dm = DependencyResolver.Instance.GetInstance<IDataManagement>();
var workItems = new List<DataManagementWorkItem>();
try
{
    var obs = dm.GetWorkItemSource(10).Do(workItems.Add);
    await obs;
    Assert.Fail("An expected exception was not thrown");
}
catch (Exception exc)
{
    AssertTheRightException(exc);
    // workItems should be populated.
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-10-05
    • 2020-02-13
    • 2011-04-13
    • 2012-06-29
    • 1970-01-01
    • 2016-08-20
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多