看来实际的问题应该是:
如何在 AWS Athena 中更改大量客户端的数据库分区而不按顺序执行它们?
答案不是 ForEachAsync 或 C# 8 中即将推出的 await foreach。异步循环仍会一次向服务发送一个调用,它“只是”不会在等待答案时阻塞。
并发工作人员
这是一个并发工作线程问题,可以使用例如 TPL 数据流库的 ActionBlock 类或新的 System.Threading.Channel 类来处理。
Dataflow 库旨在通过在独立块之间移动数据来创建类似于 shell 脚本管道的事件/消息处理管道。每个块都在自己的任务/线程上运行,这意味着您只需将处理分解为块即可获得并发执行。
还可以增加每个块的处理任务数,by specifying the MaxDegreeOfParallelism option 在创建块时。这使我们能够快速创建可以同时处理大量消息的“工作者”。
示例
在这种情况下,“消息”是Client,不管它是什么。单个 ActionBlock 可以创建 DDL 语句并执行它。每个块都有一个输入队列,这意味着我们可以将消息发布到一个块并等待它使用我们指定的 DOP 执行所有操作。
我们还可以为队列指定一个限制,这样如果工作任务不能足够快地运行,它就不会被淹没:
var options=new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = _maxclientsToBeProcessed,
BoundedCapacity = _maxclientsToBeProcessed*3, //Just a guess
});
var block=new ActionBlock<Client>(client=>CreateAndRunDDL(client));
//Post the client requests
foreach(var client in clients)
{
await block.SendAsync(client);
}
//Tell the block we're done
block.Complete();
//Await for all queued messages to finish processing
await block.Completion;
CreateAndRunDDL(Client) 方法应该执行问题循环中的代码执行的操作。一个好主意是重构它,并创建单独的函数来创建和执行查询,例如:
async Task CreateAndRunDDL(Client client)
{
var query = QueryForClient(...);
LambdaLogger.Log(query);
if (query.Length >= MaxQueryLength) {
throw new Exception("Delete partition query length exceeded.");
}
var queryExecutionId = await StartQueryExecution(query);
await CheckQueryExecutionStatus(queryExecutionId);
}
块也可以链接。如果我们想将多个客户端批处理在一起进行处理,我们可以使用BatchBlock 并将其结果提供给我们的操作块,例如:
var batchClients = new BatchBlock<Client>(20);
var linkOptions = new DataflowLinkOptions
{
PropagateCompletion = true
};
var block=new ActionBlock<Client>(clients=>CreateAndRunDDL(clients));
batchClients.LinkTo(block,linkOptions);
这次CreateAndRunDDL 方法接受Client[] 数组,其中包含我们在批处理大小中指定的客户端/消息数量。
async Task CreateAndRunDDL(Client[] clients)
{
var query = QueryForClients(clients);
...
}
现在应该将消息发布到batchClients 块。一旦完成,我们需要等待管道中的最后一个块完成:
foreach(var client in clients)
{
await batchClients.SendAsync(client);
}
//Tell the *batch block* we're done
batchClient.Complete();
//Await for all queued messages to finish processing
await block.Completion;