【问题标题】:Data structure to use when gathering response from async calls从异步调用收集响应时使用的数据结构
【发布时间】:2019-04-20 14:24:32
【问题描述】:

我正在我的应用程序中运行这段代码。

public Task<BulkResponse<JObject>> GetRelatedObjectsAsync(IEnumerable<PrimaryObjectInfo> primaryObjectInfos)
{
    var allSecondaries = new List<Tuple<int, List<JObject>>>();
    var exceptionsDict = new ConcurrentDictionary<int, Exception>();

    var relatedObjectsTasks = primaryObjectInfos.Select(async primaryObjectInfo =>
    {
        try
        {
            var secondaryObject = await objectManager.GetRelatedObjectsAsync(primaryObjectInfo);
            allSecondaries.Add(Tuple.Create(primaryObjectInfo.Index, secondaryObject.ToList()));
        }
        catch (Exception ex)
        {
            exceptionsDict.TryAdd(primaryObjectInfo.Index,  ex);
        }
    });

    await Task.WhenAll(relatedObjectsTasks);

    return ConvertToBulkResponse(allSecondaries, exceptionsDict);
}

当我运行此代码时,allSecondaries 对象有时会返回有效的结果列表,有时代码最终会捕获我为每个 primaryObjectInfo 拥有的并行线程的异常。

异步方法objectManager.GetRelatedObjectsAsync()内部调用了4-5个异步函数,有些函数是通过引用传递参数的。 (参考关键字)

问题: 我是否使用正确的数据结构来整合所有并行线程的结果? 如果是,我每次得到不同结果的原因可能是什么?

【问题讨论】:

  • List&lt;T&gt; 不是线程安全的,不能在可以同时修改的情况下使用它。请改用 System.Collections.Concurrent 命名空间中的一种集合类型。或者,您可以让每个任务返回自己的元组,并在任务完成后从各自的Task&lt;T&gt; 对象中收集每个元组(基本上避免在任务中完全接触allSecondaries

标签: c# asynchronous parallel-processing async-await task


【解决方案1】:

您最好将执行与结果收集分开:

public Task<BulkResponse<JObject>> GetRelatedObjectsAsync(
    IEnumerable<PrimaryObjectInfo> primaryObjectInfos)
{
    var relatedObjectsTasks = primaryObjectInfos
        .Select(
            async primaryObjectInfo =>
                (primaryObjectInfo.Index,
                 await objectManager.GetRelatedObjectsAsync(primaryObjectInfo)))
        .ToList();

    try
    {
        await Task.WhenAll(relatedObjectsTasks);
    }
    catch
    {
        // observe each task's, exception
    }

    var allSecondaries = new List<(int index, List<JObject> related)>();
    var exceptionsDict = new Dictionary<int, Exception>();

    foreach (var relatedObjectsTask in relatedObjectsTasks)
    {
        try
        {
            allSecondaries.Add(relatedObjectsTask.Result);
        }
        catch (Exception ex)
        {
            exceptionsDict.Add(relatedObjectsTask.index,  ex);
        }
    }

    return ConvertToBulkResponse(allSecondaries, exceptionsDict);
}

您可以查看每个任务的 IsFaulted/ExceptionIsCancelled 属性,而不是引发异常:

    foreach (var relatedObjectsTask in relatedObjectsTasks)
    {
        if (relatedObjectsTask.IsCancelled)
        {
            exceptionsDict.Add(
                relatedObjectsTask.index,
                new TaskCancelledException(relatedObjectsTask));
        }
        else if (relatedObjectsTask.IsFaulted)
        {
            exceptionsDict.TryAdd(relatedObjectsTask.index,  ex);
        }
        else
        {
            allSecondaries.Add(relatedObjectsTask.Result);
        }
    }

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-01-20
    • 1970-01-01
    • 2022-01-16
    • 2022-01-16
    • 1970-01-01
    相关资源
    最近更新 更多