【发布时间】:2014-12-10 23:10:22
【问题描述】:
这里是涉及的代码:
private static async Task DoRunInOrderAsync<TTaskSeed>(SemaphoreSlim sem, IObservable<TTaskSeed> taskSeedSource, CreateTaskDelegate<TTaskSeed> createTask, OnTaskErrorDelegate<TTaskSeed> onFailed, OnTaskSuccessDelegate<TTaskSeed> onSuccess) where TTaskSeed : class
{
var tasks = await taskSeedSource
.Select(taskSeed => GetPendingOrRunningTask(taskSeed, createTask, onFailed, onSuccess, sem))
.ToList()
.ToTask();
await Task.WhenAll(tasks);
}
private static async Task GetPendingOrRunningTask<T>(T taskSeed, CreateTaskDelegate<T> createTask, OnTaskErrorDelegate<T> onFailed, OnTaskSuccessDelegate<T> onSuccess,
SemaphoreSlim sem) where T : class
{
Exception exc = null;
await sem.WaitAsync();
try
{
var task = createTask(taskSeed);
if (task != null)
{
await task;
}
onSuccess(task, taskSeed);
}
catch (Exception e)
{
exc = e;
}
sem.Release();
if (exc != null)
{
onFailed(exc, taskSeed);
}
}
地点:
-
Select是IObservable<TResult> Select<TSource, TResult>(this IObservable<TSource> source, Func<TSource, TResult> selector)来自System.Reactive.Linq.Observable -
ToList是IObservable<IList<TSource>> ToList<TSource>(this IObservable<TSource> source)来自System.Reactive.Linq.Observable -
ToTask是Task<TResult> ToTask<TResult>(this IObservable<TResult> observable)来自System.Reactive.Threading.Tasks.TaskObservableExtensions - System.Reactive.Linq 版本为 2.2.5.0
据我所知,一切都已构建,周围没有陈旧的二进制文件。该错误经常发生,但并非总是如此。
对于我的生活,如果GetPendingOrRunningTask 方法是async Task,我无法理解tasks 列表如何包含null?
编辑
所以ToList 注入null。如何?为什么?我做错了什么(除了以编程为生)?
【问题讨论】:
-
你知道 Rx 的序列化合约吗?请参阅Rx Design Guidelines 中的第 4.2 节。信号量不是必需的,因为您的
taskSeedSource不得推送重叠通知。如果是,那么它违反了重要的 Rx 合同,这可能会导致运营商内部出现竞争条件。 -
请注意,如果您想按顺序而不是同时运行任务,请使用
FromAsync将它们投影到可观察对象中并调用Concat而不是ToList。 -
仅当信号量计数最初为 1 时它是顺序的。但它可能是 N,在这种情况下,我想同时处理最多 N 个种子,直到处理完所有种子。名称中的
InOrder有点令人困惑,并且出于历史原因。 -
刚刚阅读了 Rx 的序列化合约。所以,看来我需要使用 Serialize 运算符,对吧?
-
@DaveSexton - 请安排您的评论作为答案,我想感谢您。
标签: c# linq asynchronous async-await system.reactive