【发布时间】:2015-07-30 23:45:57
【问题描述】:
我有以下方法:
public IObservable<DataManagementWorkItem> GetWorkItemSource(int maxConcurrentCalls)
{
return m_namespaceManager
.GetNamespaceConnectionInfoSource(true, drainAndDisable: false)
.Select(nci => Observable.Defer(() => GetPolicySourceForNamespace(nci)))
.Merge(maxConcurrentCalls)
.Where(IsValid)
.Select(ToWorkItem)
.Where(o => o != null);
}
它实现了以下逻辑:
- 通过从命名空间管理器 (
GetNamespaceConnectionInfoSource) 获取IObservable<NamespaceConnectionInfo>进入 monad。 - 随着命名空间变得可用,获取与特定命名空间 (
GetPolicySourceForNamespace) 对应的IObservable<DataManagementPolicy>。但是,请使用Merge运算符将并发调用数限制为GetPolicySourceForNamespace。 - 过滤掉不良的
DataManagementPolicy记录(不能在SQL 中完成)。 - 将看似不错的
DataManagementPolicy记录转换为DataManagementWorkItem实例。有些可能会变成null,所以最后会被过滤掉。
GetNamespaceConnectionInfoSource 在产生一定数量的有效NamespaceConnectionInfo 对象后会出错。到那时,在最终的可观察序列中,完全有可能已经产生了一定数量的 DataManagementWorkItem 对象。
我有一个单元测试,其中:
-
GetNamespaceConnectionInfoSource在生成 25 个命名空间后抛出 -
GetPolicySourceForNamespace每个命名空间产生 10 个对象 - 并发限制为 10
我也有兴趣检查最终 observable 在出现故障之前产生的项目:
var dm = DependencyResolver.Instance.GetInstance<IDataManagement>();
var workItems = new List<DataManagementWorkItem>();
try
{
var obs = dm.GetWorkItemSource(10);
obs.Subscribe(wi => workItems.Add(wi));
await obs;
Assert.Fail("An expected exception was not thrown");
}
catch (Exception exc)
{
AssertTheRightException(exc);
}
workItems 集合每次都有不同数量的项目。一次运行它有 69 个项目,另一个 - 50 个,另一个 - 18 个。
我的解释是,当故障发生时,在处理的各个阶段都有好的NamespaceConnectionInfo 和DataManagementPolicy 对象,所有这些都因为故障而中止。每次的数量都不一样,因为物品是异步产生的。
这就是我的问题 - 我不希望他们被中止。我希望它们运行完成,以最终的可观察序列产生,然后才传达故障。本质上,我想保留异常并在最后重新抛出它。
我尝试稍微修改一下实现:
public IObservable<DataManagementWorkItem> GetWorkItemSource(int maxConcurrentCalls)
{
Exception fault = null;
return m_namespaceManager
.GetNamespaceConnectionInfoSource(true, drainAndDisable: false)
.Catch<NamespaceConnectionInfo, Exception>(exc =>
{
fault = exc;
return Observable.Empty<NamespaceConnectionInfo>();
})
.Select(nci => Observable.Defer(() => GetPolicySourceForNamespace(nci)))
.Merge(maxConcurrentCalls)
.Where(IsValid)
.Select(ToWorkItem)
.Where(o => o != null)
.Finally(() =>
{
if (fault != null)
{
throw fault;
}
});
}
不用说 - 它没有用。 Finally 似乎没有传播任何异常,我实际上同意。
那么,实现我想要的正确方法是什么?
编辑
与问题无关,我发现我用来收集生成的DataManagementWorkItem 实例的测试代码不好。而不是
var obs = dm.GetWorkItemSource(10);
obs.Subscribe(wi => workItems.Add(wi));
await obs;
应该是
await dm.GetWorkItemSource(1).Do(wi => workItems.Add(wi));
区别在于后者只订阅了一次项目的来源,而原始版本订阅了两次:
- by
Subscribe -
await
它不会影响问题,但会破坏我的模拟代码。
澄清
这更像是一个澄清。每个命名空间产生 10 个策略对象的序列。但是这个过程是异步的——策略对象是按顺序生成的,但是是异步的。在此期间,命名空间会继续生成,因此在故障发生前给出了 25 个命名空间,生成的命名空间可能处于三种可能的“状态”:
- 尚未为其生成任何策略对象,但已启动异步策略生成过程
- 已经生成了一些(但少于 10 个)策略对象
- 命名空间的所有 10 个策略对象都已生成
当命名空间生产中发生错误时,整个管道将被中止,无论“好”命名空间现在处于何种“状态”。
让我们看看下面这个简单的例子:
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
namespace observables
{
class Program
{
static void Main()
{
int count = 0;
var obs = Observable
.Interval(TimeSpan.FromMilliseconds(1))
.Take(50)
.Select(i =>
{
if (25 == Interlocked.Increment(ref count))
{
throw new Exception("Boom!");
}
return i;
})
.Select(i => Observable.Defer(() => Observable.Interval(TimeSpan.FromMilliseconds(1)).Take(10).Select(j => i * 1000 + j)))
.Merge(10);
var items = new HashSet<long>();
try
{
obs.Do(i => items.Add(i)).GetAwaiter().GetResult();
}
catch (Exception exc)
{
Debug.WriteLine(exc.Message);
}
Debug.WriteLine(items.Count);
}
}
}
当我运行它时,我通常有以下输出:
Boom!
192
但是,它也可以显示191。但是,如果我们应用故障连接解决方案(即使在没有故障时它不起作用):
int count = 0;
var fault = new Subject<long>();
var obs = Observable
.Interval(TimeSpan.FromMilliseconds(1))
.Take(50)
.Select(i =>
{
if (25 == Interlocked.Increment(ref count))
{
throw new Exception("Boom!");
}
return i;
})
.Catch<long, Exception>(exc =>
{
fault.OnError(exc);
return Observable.Empty<long>();
})
.Select(i => Observable.Defer(() => Observable.Interval(TimeSpan.FromMilliseconds(1)).Take(10).Select(j => i * 1000 + j)))
.Merge(10)
.Concat(fault);
那么输出始终是240,因为我们让所有已经启动的异步进程都完成了。
基于 pmccloghrylaing 回答的尴尬解决方案
public IObservable<DataManagementWorkItem> GetWorkItemSource(int maxConcurrentCalls)
{
var fault = new Subject<DataManagementWorkItem>();
bool faulted = false;
return m_namespaceManager
.GetNamespaceConnectionInfoSource(true, drainAndDisable: false)
.Catch<NamespaceConnectionInfo, Exception>(exc =>
{
faulted = true;
return Observable.Throw<NamespaceConnectionInfo>(exc);
})
.Finally(() =>
{
if (!faulted)
{
fault.OnCompleted();
}
})
.Catch<NamespaceConnectionInfo, Exception>(exc =>
{
fault.OnError(exc);
return Observable.Empty<NamespaceConnectionInfo>();
})
.Select(nci => Observable.Defer(() => GetPolicySourceForNamespace(nci)))
.Merge(maxConcurrentCalls)
.Where(IsValid)
.Select(ToWorkItem)
.Where(o => o != null)
.Concat(fault);
}
它在命名空间产生错误和成功时都有效,但看起来很尴尬。加上多个订阅仍然共享错误。一定有更优雅的解决方案。
GetNamespaceConnectionInfo源代码
public IObservable<NamespaceConnectionInfo> GetNamespaceConnectionInfoSource(bool? isActive = null,
bool? isWorkflowEnabled = null, bool? isScheduleEnabled = null, bool? drainAndDisable = null,
IEnumerable<string> nsList = null, string @where = null, IList<SqlParameter> whereParameters = null)
{
IList<SqlParameter> parameters;
var sql = GetNamespaceConnectionInfoSqls.GetSql(isActive,
isWorkflowEnabled, isScheduleEnabled, drainAndDisable, nsList, @where, whereParameters, out parameters);
var sqlUtil = m_sqlUtilProvider.Get(m_siteSettings.ControlDatabaseConnString);
return sqlUtil.GetSource(typeof(NamespaceConnectionInfo), sqlUtil.GetReaderAsync(sql, parameters)).Cast<NamespaceConnectionInfo>();
}
public IObservable<DbDataReader> GetReaderAsync(string query, IList<SqlParameter> parameters = null, CommandBehavior commandBehavior = CommandBehavior.Default)
{
return Observable.FromAsync(async () =>
{
SqlCommand command = null;
try
{
var conn = await GetConnectionAsync();
command = GetCommand(conn, query, parameters);
return (DbDataReader)await command.ExecuteReaderAsync(commandBehavior | CommandBehavior.CloseConnection);
}
finally
{
DisposeSilently(command);
}
});
}
public IObservable<object> GetSource(Type objectType, IObservable<DbDataReader> readerTask)
{
return Observable.Create<object>(async (obs, ct) => await PopulateSource(objectType, await readerTask, true, obs, ct));
}
private static async Task PopulateSource(Type objectType, DbDataReader reader, bool disposeReader, IObserver<object> obs, CancellationToken ct)
{
try
{
if (IsPrimitiveDataType(objectType))
{
while (await reader.ReadAsync(ct))
{
obs.OnNext(reader[0]);
}
}
else
{
// Get all the properties in our Object
var typeReflector = objectType.GetTypeReflector(TypeReflectorCreationStrategy.PREPARE_DATA_RECORD_CONSTRUCTOR);
// For each property get the data from the reader to the object
while (await reader.ReadAsync(ct))
{
obs.OnNext(typeReflector.DataRecordConstructor == null ?
ReadNextObject(typeReflector, reader) :
typeReflector.DataRecordConstructor(reader));
}
}
}
catch (OperationCanceledException)
{
}
finally
{
if (disposeReader)
{
reader.Dispose();
}
}
}
【问题讨论】:
-
你的第二个实现有一个缺陷,你需要使用
.Create(...)来避免。如果您获得对返回的 observable 的多个订阅,则fault可能已经有一个值,您的代码将错误地报告失败。您需要将方法的内容包装在.Create(...)中以避免这种情况。 -
我认为没有问题。虽然,我在帖子中没有提到,但是命名空间的 observable 是冷的 observable,所以多个订阅者是相互隔离的,看不到彼此的错误。
-
这是因为您可以有多个订阅者返回 observable,他们共享相同的
fault变量。它与冷热无关。 -
哦,那个。现在我明白了。由于我的第二个实现完全错误,我想您的评论适用于 pmccloghrylaing 提出的解决方案,它使用类似的模式。你是对的,再次感谢。
-
您是否希望单元测试有 100 个结果?
标签: c# system.reactive