【发布时间】:2019-01-15 19:24:00
【问题描述】:
我想将跨所有 DataFlow 块提交到数据库服务器的查询总数限制为 30。在以下场景中,每个块限制 30 个并发任务,因此它在执行期间始终达到 60 个并发任务。显然,我可以将我的并行度限制为每个块 15 个,以实现系统范围内总共 30 个,但这不是最优的。
我该如何进行这项工作?我是否使用 SemaphoreSlim 等来限制(并阻止)我的等待,或者是否有更有效的内在 DataFlow 方法?
public class TPLTest
{
private long AsyncCount = 0;
private long MaxAsyncCount = 0;
private long TaskId = 0;
private object MetricsLock = new object();
public async Task Start()
{
ExecutionDataflowBlockOptions execOption = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 30 };
DataflowLinkOptions linkOption = new DataflowLinkOptions() { PropagateCompletion = true };
var doFirstIOWorkAsync = new TransformBlock<Data, Data>(async data => await DoIOBoundWorkAsync(data), execOption);
var doCPUWork = new TransformBlock<Data, Data>(data => DoCPUBoundWork(data));
var doSecondIOWorkAsync = new TransformBlock<Data, Data>(async data => await DoIOBoundWorkAsync(data), execOption);
var doProcess = new TransformBlock<Data, string>(i => $"Task finished, ID = : {i.TaskId}");
var doPrint = new ActionBlock<string>(s => Debug.WriteLine(s));
doFirstIOWorkAsync.LinkTo(doCPUWork, linkOption);
doCPUWork.LinkTo(doSecondIOWorkAsync, linkOption);
doSecondIOWorkAsync.LinkTo(doProcess, linkOption);
doProcess.LinkTo(doPrint, linkOption);
int taskCount = 150;
for (int i = 0; i < taskCount; i++)
{
await doFirstIOWorkAsync.SendAsync(new Data() { Delay = 2500 });
}
doFirstIOWorkAsync.Complete();
await doPrint.Completion;
Debug.WriteLine("Max concurrent tasks: " + MaxAsyncCount.ToString());
}
private async Task<Data> DoIOBoundWorkAsync(Data data)
{
lock(MetricsLock)
{
AsyncCount++;
if (AsyncCount > MaxAsyncCount)
MaxAsyncCount = AsyncCount;
}
if (data.TaskId <= 0)
data.TaskId = Interlocked.Increment(ref TaskId);
await Task.Delay(data.Delay);
lock (MetricsLock)
AsyncCount--;
return data;
}
private Data DoCPUBoundWork(Data data)
{
data.Step = 1;
return data;
}
}
数据类:
public class Data
{
public int Delay { get; set; }
public long TaskId { get; set; }
public int Step { get; set; }
}
起点:
TPLTest tpl = new TPLTest();
await tpl.Start();
【问题讨论】:
-
使用信号量可能会更好,因为它允许您锁定只是每个查询的执行,而不是锁定该块中涉及的任何其余工作.创建一个用于运行查询的抽象(或更改现有的,如果有的话),以便抽象在查询时取出信号量,这样您就不必每次都手动执行。
-
我同意@Servy。我推荐
SemaphoreSlim来限制对数据库的访问。通过这种方式,您将对数据库的访问方式进行单点控制,而不是依赖于 DataFlow 中的细节。 -
与其尝试限制所有块,不如使用 one 块写入数据库?并行 database 访问并不比并行 disk 访问好 - 您仍然写入相同的存储,因此 更多 连接会导致 更糟 i> 性能。规范的方法(即在 SSIS 和所有 ETL 工具中使用)是将数据批处理成足够大的块,然后使用任何可用的批量导入功能将其发送到数据库,例如
SqlBulkCopy -
你的区块在做什么,为什么他们使用 30 的 DOP?它很重要。与其让 30 个任务尝试在 30 个连接上插入单行,不如将记录发送到批处理块,然后将批处理发送到带有 SqlBulkCopy 实例的 ActionBlock,该实例使用最少的日志记录和 no 将它们泵入目标表 交叉连接阻塞。像这样的单个任务很容易最终快 30 倍
-
如果任何块执行查找,最好预加载和缓存查找值,就像 SSIS 一样。如果块执行多个活动,请将它们分开以允许每个块一次执行一项工作而不会阻塞太久。
标签: c# concurrency async-await task-parallel-library tpl-dataflow