【问题标题】:How to implement ForEachAsync?如何实现 ForEachAsync?
【发布时间】:2019-06-10 08:01:38
【问题描述】:

处理多线程执行

【问题讨论】:

  • 实际问题是什么? Lambdas 或ForEachAsync 似乎与代码无关。 SQL Server 中的表分区不会影响可以同时执行的查询数量。查询并行化,即在运行查询时使用并行处理,在企业版中可用,查询优化器基于统计信息和索引,而不是分区的存在。
  • 另一方面,DDL、存储操作和管理任务通常需要以受控方式执行。如果您想创建一些表,请创建一个包含您需要的所有命令的 SQL 脚本,然后以 single 的形式将其发送到服务器,最好是异步调用。脚本本身应该负责错误处理
  • 从雅典娜获取客户。 GetClients() 方法可以做到这一点。然后我将这些客户端循环到 for 循环中。但是我的过程需要很长时间来执行删除分区的查询。所以如果我使用 foreachAsync 那么多线程执行将被处理
  • Athena 是一个非常常见的项目名称,即使在微软内部也是如此。我记得第一个使用这个名字的项目是 OData。你指的是什么?无论如何,您所要做的就是异步执行而不是多线程。你不需要ForEachAsync,你可以在foreach循环中使用await
  • 你可以像stackoverflow.com/questions/12251874/…一样实现Parallel.ForEach

标签: c# multithreading


【解决方案1】:

看来实际的问题应该是:

如何在 AWS Athena 中更改大量客户端的数据库分区而不按顺序执行它们?

答案不是 ForEachAsync 或 C# 8 中即将推出的 await foreach。异步循环仍会一次向服务发送一个调用,它“只是”不会在等待答案时阻塞。

并发工作人员

这是一个并发工作线程问题,可以使用例如 TPL 数据流库的 ActionBlock 类或新的 System.Threading.Channel 类来处理。

Dataflow 库旨在通过在独立块之间移动数据来创建类似于 shell 脚本管道的事件/消息处理管道。每个块都在自己的任务/线程上运行,这意味着您只需将处理分解为块即可获得并发执行。

还可以增加每个块的处理任务数,by specifying the MaxDegreeOfParallelism option 在创建块时。这使我们能够快速创建可以同时处理大量消息的“工作者”。

示例

在这种情况下,“消息”是Client,不管它是什么。单个 ActionBlock 可以创建 DDL 语句执行它。每个块都有一个输入队列,这意味着我们可以将消息发布到一个块并等待它使用我们指定的 DOP 执行所有操作。

我们还可以为队列指定一个限制,这样如果工作任务不能足够快地运行,它就不会被淹没:

var options=new ExecutionDataflowBlockOptions
     {
        MaxDegreeOfParallelism = _maxclientsToBeProcessed,
        BoundedCapacity = _maxclientsToBeProcessed*3, //Just a guess
     });
var block=new ActionBlock<Client>(client=>CreateAndRunDDL(client));

//Post the client requests
foreach(var client in clients)
{
    await block.SendAsync(client);
}

//Tell the block we're done
block.Complete();
//Await for all queued messages to finish processing
await block.Completion;

CreateAndRunDDL(Client) 方法应该执行问题循环中的代码执行的操作。一个好主意是重构它,并创建单独的函数来创建和执行查询,例如:

async Task CreateAndRunDDL(Client client)
{
    var query = QueryForClient(...);
    LambdaLogger.Log(query);
    if (query.Length >= MaxQueryLength) {
        throw new Exception("Delete partition query length exceeded.");
    }
    var queryExecutionId = await StartQueryExecution(query);
    await CheckQueryExecutionStatus(queryExecutionId);
}

块也可以链接。如果我们想将多个客户端批处理在一起进行处理,我们可以使用BatchBlock 并将其结果提供给我们的操作块,例如:

var batchClients = new BatchBlock<Client>(20);
var linkOptions = new DataflowLinkOptions
                  { 
                      PropagateCompletion = true
                  };

var block=new ActionBlock<Client>(clients=>CreateAndRunDDL(clients));
batchClients.LinkTo(block,linkOptions);

这次CreateAndRunDDL 方法接受Client[] 数组,其中包含我们在批处理大小中指定的客户端/消息数量。

async Task CreateAndRunDDL(Client[] clients)
{
    var query = QueryForClients(clients);
    ...
}

现在应该将消息发布到batchClients 块。一旦完成,我们需要等待管道中的最后一个块完成:

foreach(var client in clients)
{
    await batchClients.SendAsync(client);
}

//Tell the *batch block* we're done
batchClient.Complete();
//Await for all queued messages to finish processing
await block.Completion;

【讨论】:

  • 对如此混乱的问题(以及数据流)的极其详细的答案表示支持
猜你喜欢
  • 2021-12-04
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多