【问题标题】:Running parallel async tasks and return result in .NET Core Web API在 .NET Core Web API 中运行并行异步任务并返回结果
【发布时间】:2021-06-01 05:57:55
【问题描述】:

您好最近我在 .net core web api 项目中工作,该项目正在从外部 api 下载文件。 在这个 .net 核心 api 最近发现了一些问题,而文件数量超过 100 个。API 正在下载最多 50 个文件并跳过其他文件。 WebAPI 部署在 AWS Lambda 上,超时时间为 15 分钟。

由于下载过程较长,实际操作超时

public async Task<bool> DownloadAttachmentsAsync(List<DownloadAttachment> downloadAttachment)
        {
            try
            {
                bool DownloadFlag = false;

                foreach (DownloadAttachment downloadAttachment in downloadAttachments)
                {
                    DownloadFlag = await DownloadAttachment(downloadAttachment.id);

                    //update the download status in database
                    if(DownloadFlag)
                    {
                      bool UpdateFlag = await _DocumentService.UpdateDownloadStatus(downloadAttachment.id);

                      if (UpdateFlag)
                      {
                        await DeleteAttachment(downloadAttachment.id);
                      }
                   }
                }
                return true;
            }
            catch (Exception ext)
            {
                log.Error(ext, "Error in Saving attachment {attachemntId}",downloadAttachment.id);
                return false;
            }
        }

文件服务代码

public async Task<bool> UpdateAttachmentDownloadStatus(string AttachmentID)
        {
            return await _documentRepository.UpdateAttachmentDownloadStatus(AttachmentID);
        }

和数据库更新代码

public async Task<bool> UpdateAttachmentDownloadStatus(string AttachmentID)
        {
            using (var db = new SqlConnection(_connectionString.Value))
            {
                var Result = 0; bool SuccessFlag = false;
                var parameters = new DynamicParameters();
                parameters.Add("@pm_AttachmentID", AttachmentID);               
                parameters.Add("@pm_Result", Result, System.Data.DbType.Int32, System.Data.ParameterDirection.Output);
                var result = await db.ExecuteAsync("[Loan].[UpdateDownloadStatus]", parameters, commandType: CommandType.StoredProcedure);
                Result = parameters.Get<int>("@pm_Result");
                if (Result > 0) { SuccessFlag = true; }
                return SuccessFlag;
            }
        }

如何将此异步任务移动到并行运行?并得到结果?我尝试了以下代码

var task = Task.Run(() => DownloadAttachment( downloadAttachment.id));
bool result = task.Result; 

这种方法好吗?怎样才能提高性能?如何从每个并行任务中获取结果并根据成功标志更新到 DB 和删除?还是这个错误是由于 AWS 超时造成的?

请帮忙

【问题讨论】:

  • 什么是_DocumentService?它是线程安全的吗?
  • @SvyatoslavDanyliv : 那是更新数据库中文件下载标志的数据库操作部分
  • 因此您不能并行执行此操作。您必须为每个任务创建新的 DbContext。
  • 可以进行 HTTP 调用并异步保存到数据库,这比您尝试的要容易得多。那是async/await 的工作,而不是Task.Run。启动一个异步操作只是为了阻止它的任务是没有意义的。然后阻塞任务本身
  • 如果DownloadAttachment 已经是异步的,那么Task.Run(() =&gt; DownloadAttachment( downloadAttachment.id)).Result 的意义何在?如果这甚至有效,它仍然会启动并阻止相同的操作。但现在,它只返回由DownloadAttachment 返回的相同任务。如果你想启动多个DownloadAttachment 调用,你可以使用例如var tasks=downloadAttachments.Select(att=&gt;DownladAttachment(att.Id));。但这会立即启动所有这些,这可能不是您想要的

标签: c# .net-core aws-lambda task-parallel-library asp.net-core-webapi


【解决方案1】:

如果您将处理单个文件的代码提取到单独的方法中:

private async Task DownloadSingleAttachment(DownloadAttachment attachment)
{
    try
    {
        var download = await DownloadAttachment(downloadAttachment.id);
        if(download)
        {
            var update = await _DocumentService.UpdateDownloadStatus(downloadAttachment.id);
            if (update)
            {
                await DeleteAttachment(downloadAttachment.id);
            }
        }
    }
    catch(....)
    {
    ....
    }
}

public async Task<bool> DownloadAttachmentsAsync(List<DownloadAttachment> downloadAttachment)
{
    try
    {
      foreach (var attachment in downloadAttachments)
      {
          await DownloadSingleAttachment(attachment);
      }
    }
    ....
}

一次开始所有下载很容易,虽然效率不高:

public async Task<bool> DownloadAttachmentsAsync(List<DownloadAttachment> downloadAttachment)
{

    try
    {
        //Start all of them
        var tasks=downloadAttachments.Select(att=>DownloadSingleAttachment(att));
        await Task.WhenAll(tasks);
    }
    ....
}

这不是很有效,因为外部服务像您一样讨厌来自单一来源的大量并发调用,并且几乎可以肯定会施加限制。数据库也不喜欢大量并发调用,因为在所有数据库产品中并发调用会导致以一种或另一种方式阻塞。即使在使用多版本的数据库中,这也会带来开销。

使用 Dataflow 类 - 单块

解决此问题的一种简单方法是使用 .NET 的 Dataflow 类将操作分解为一系列步骤,并使用不同数量的并发任务执行每个步骤。

我们可以将整个操作放在一个块中,但如果更新和删除操作不是线程安全的,这可能会导致问题:

var dlOptions= new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = 10,
};

var downloader=new ActionBlock<DownloadAttachment>(async att=>{
    await DownloadSingleAttachment(att);
},dlOptions);

foreach (var attachment in downloadAttachments)
{
    await downloader.SendAsync(attachement.id);
}

downloader.Complete();
await downloader.Completion;

数据流 - 多个步骤

为避免可能的线程问题,其余方法可以转到它们自己的块。它们都可以进入一个同时调用UpdateDeleteActionBlock,或者如果方法与具有不同并发要求的不同服务通信,它们可以进入单独的块。

downloader 块将执行最多 10 个并发下载。默认情况下,每个块一次只使用一个任务。

updaterdeleter 块的默认 DOP=1,这意味着只要它们不尝试同时使用相同的连接,就没有竞争条件的风险。

var downloader=new TransformBlock<string,(string id,bool download)>(
    async id=> {
        var download=await DownloadAttachment(id);
        return (id,download);
},dlOptions);

var updater=new TransformBlock<(string id,bool download),(string id,bool update)>(
    async (id,download)=> {
        if(download)
        {
            var update = await _DocumentService.UpdateDownloadStatus(id);
            return (id,update);
        }
        return (id,false);
});

var deleter=new ActionBlock<(string id,bool update)>(
    async (id,update)=> {
        if(update)
        {
            await DeleteAttachment(id);
        }
});

现在可以将块链接到管道中并使用。设置PropagateCompletion = true 意味着一旦一个块完成处理,它会告诉它所有连接的块也完成:

var linkOptions=new DataflowLinkOptions { PropagateCompletion = true};
downloader.LinkTo(updater, linkOptions);
updater.LinkTo(deleter,linkOptions);

只要需要,我们就可以将数据泵入 head 块。完成后,我们调用 head 块的 Complete() 方法。当每个块完成处理其数据时,它会将其完成传播到管道中的下一个块。我们需要等待最后一个(尾部)块完成以确保所有附件都已处理:

foreach (var attachment in downloadAttachments)
{
    await downloader.SendAsync(attachement.id);
}

downloader.Complete();
await deleter.Completion;

每个块都有一个输入和(必要时)一个输出缓冲区,这意味着消息的“生产者”和“消费者”不必同步,甚至不必相互认识。 “生产者”需要知道的只是在管道中的哪里可以找到头块。

节流和背压

一种节流方法是通过MaxDegreeOfParallelism 使用固定数量的任务。

还可以对输入缓冲区设置限制,从而在块无法足够快地处理消息时阻止先前的步骤或生产者。这可以简单地通过为一个块设置BoundedCapacity option 来完成:

var dlOptions= new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = 10,
    BoundedCapacity=20,
};

var updaterOptions= new ExecutionDataflowBlockOptions
{
    BoundedCapacity=20,
};

...

var downloader=new TransformBlock<...>(...,dlOptions);

var updater=new TransformBlock<...>(...,updaterOptions);

无需其他更改

【讨论】:

    【解决方案2】:

    要运行多个异步操作,您可以执行以下操作:

        public async Task RunMultipleAsync<T>(IEnumerable<T> myList)
        {
            const int myNumberOfConcurrentOperations = 10;
            var mySemaphore = new SemaphoreSlim(myNumberOfConcurrentOperations);
            var tasks = new List<Task>();
            foreach(var myItem in myList)
            {
                await mySemaphore.WaitAsync();
                var task = RunOperation(myItem);
                tasks.Add(task);
                task.ContinueWith(t => mySemaphore.Release());           
            }
    
            await Task.WhenAll(tasks);
        }
    
        private async Task RunOperation<T>(T myItem)
        {
            // Do stuff
        }
    

    将来自DownloadAttachmentsAsync 的代码放在“Do stuff”评论中

    这将使用信号量来限制并发操作的数量,因为由于争用,运行多个并发操作通常是一个坏主意。您需要进行试验以找到适合您的用例的最佳并发操作数。另请注意,为了保持示例简短,省略了错误处理。

    【讨论】:

    • 不喜欢使用primitive ContinueWith 方法。它有两个问题:它使代码对环境TaskScheduler.Current 敏感,并创建了即发即弃的任务。我建议使用this 答案中讨论的急切的.Select 技术创建任务。
    猜你喜欢
    • 2012-09-02
    • 2015-10-06
    • 1970-01-01
    • 1970-01-01
    • 2016-02-23
    • 2017-12-11
    • 1970-01-01
    相关资源
    最近更新 更多