【问题标题】:How to span MaxDegreeOfParallelism across multiple TPL DataFlow blocks?如何跨多个 TPL DataFlow 块跨越 MaxDegreeOfParallelism?
【发布时间】:2019-01-15 19:24:00
【问题描述】:

我想将跨所有 DataFlow 块提交到数据库服务器的查询总数限制为 30。在以下场景中,每个块限制 30 个并发任务,因此它在执行期间始终达到 60 个并发任务。显然,我可以将我的并行度限制为每个块 15 个,以实现系统范围内总共 30 个,但这不是最优的。

我该如何进行这项工作?我是否使用 SemaphoreSlim 等来限制(并阻止)我的等待,或者是否有更有效的内在 DataFlow 方法?

public class TPLTest
{
    private long AsyncCount = 0;
    private long MaxAsyncCount = 0;
    private long TaskId = 0;
    private object MetricsLock = new object();

    public async Task Start()
    {
        ExecutionDataflowBlockOptions execOption = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 30 };
        DataflowLinkOptions linkOption = new DataflowLinkOptions() { PropagateCompletion = true };

        var doFirstIOWorkAsync = new TransformBlock<Data, Data>(async data => await DoIOBoundWorkAsync(data), execOption);
        var doCPUWork = new TransformBlock<Data, Data>(data => DoCPUBoundWork(data));
        var doSecondIOWorkAsync = new TransformBlock<Data, Data>(async data => await DoIOBoundWorkAsync(data), execOption);
        var doProcess = new TransformBlock<Data, string>(i => $"Task finished, ID = : {i.TaskId}");
        var doPrint = new ActionBlock<string>(s => Debug.WriteLine(s));

        doFirstIOWorkAsync.LinkTo(doCPUWork, linkOption);
        doCPUWork.LinkTo(doSecondIOWorkAsync, linkOption);
        doSecondIOWorkAsync.LinkTo(doProcess, linkOption);
        doProcess.LinkTo(doPrint, linkOption);

        int taskCount = 150;
        for (int i = 0; i < taskCount; i++)
        {
            await doFirstIOWorkAsync.SendAsync(new Data() { Delay = 2500 });
        }
        doFirstIOWorkAsync.Complete();

        await doPrint.Completion;
        Debug.WriteLine("Max concurrent tasks: " + MaxAsyncCount.ToString());
    }

    private async Task<Data> DoIOBoundWorkAsync(Data data)
    {
        lock(MetricsLock)
        {
            AsyncCount++;
            if (AsyncCount > MaxAsyncCount)
                MaxAsyncCount = AsyncCount;
        }

        if (data.TaskId <= 0)
            data.TaskId = Interlocked.Increment(ref TaskId);

        await Task.Delay(data.Delay);

        lock (MetricsLock)
            AsyncCount--;

        return data;
    }

    private Data DoCPUBoundWork(Data data)
    {
        data.Step = 1;
        return data;
    }
}

数据类:

public class Data
{
    public int Delay { get; set; }
    public long TaskId { get; set; }
    public int Step { get; set; }
}

起点:

TPLTest tpl = new TPLTest();
await tpl.Start();

【问题讨论】:

  • 使用信号量可能会更好,因为它允许您锁定只是每个查询的执行,而不是锁定该块中涉及的任何其余工作.创建一个用于运行查询的抽象(或更改现有的,如果有的话),以便抽象在查询时取出信号量,这样您就不必每次都手动执行。
  • 我同意@Servy。我推荐SemaphoreSlim 来限制对数据库的访问。通过这种方式,您将对数据库的访问方式进行单点控制,而不是依赖于 DataFlow 中的细节。
  • 与其尝试限制所有块,不如使用 one 块写入数据库?并行 database 访问并不比并行 disk 访问好 - 您仍然写入相同的存储,因此 更多 连接会导致 更糟 i> 性能。规范的方法(即在 SSIS 和所有 ETL 工具中使用)是将数据批处理成足够大的块,然后使用任何可用的批量导入功能将其发送到数据库,例如 SqlBulkCopy
  • 你的区块在做什么,为什么他们使用 30 的 DOP?它很重要。与其让 30 个任务尝试在 30 个连接上插入单行,不如将记录发送到批处理块,然后将批处理发送到带有 SqlBulkCopy 实例的 ActionBlock,该实例使用最少的日志记录和 no 将它们泵入目标表 交叉连接阻塞。像这样的单个任务很容易最终快 30 倍
  • 如果任何块执行查找,最好预加载和缓存查找值,就像 SSIS 一样。如果块执行多个活动,请将它们分开以允许每个块一次执行一项工作而不会阻塞太久。

标签: c# concurrency async-await task-parallel-library tpl-dataflow


【解决方案1】:

为什么不将所有内容编组到具有实际限制的操作块中?

var count = 0;
var ab1 = new TransformBlock<int, string>(l => $"1:{l}");
var ab2 = new TransformBlock<int, string>(l => $"2:{l}");
var doPrint = new ActionBlock<string>(
    async s =>
    {
        var c = Interlocked.Increment(ref count);
        Console.WriteLine($"{c}:{s}");
        await Task.Delay(5);
        Interlocked.Decrement(ref count);
    },
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 15 });

ab1.LinkTo(doPrint);
ab2.LinkTo(doPrint);

for (var i = 100; i > 0; i--)
{
    if (i % 3 == 0) await ab1.SendAsync(i);
    if (i % 5 == 0) await ab2.SendAsync(i);
}

ab1.Complete();
ab2.Complete();

await ab1.Completion;
await ab2.Completion;

【讨论】:

  • 谁反对这个,为什么?这实际上是处理 ETL 的正确方式。 30 个阻塞连接,即使使用信号量,也远比 1 阻塞尽可能快地抽取数据
  • 我提供了一个弱例子。我的 I/O 块将前一个 I/O 块的输出作为输入,并进行一些次要处理。这些块实际上并不相同。有些执行查找,有些检索数据集,有些执行存储的过程。我能够使用这种方法,使用返回 DataSet 但不是针对所有 I/O 块类型的通用块。
  • @NPCampbell,这应该不是问题。 doPrint 块甚至可能除了限制同时调用的数量之外什么都不做。这也是一个显示解决方案路径的弱示例。
  • @PauloMorgado 我对您的 doPrint 块评论很感兴趣。我有大约 10,000 个元素贯穿我的 8 个 I/O 块的管道。我的理解是,一个虚拟的 doPrint 块可以限制通过管道中该点的元素数量,但它不会限制限制块上游或下游的数据库活动。我的理解正确吗?
  • 它将限制下游的一切,因为只会执行该数量的操作。我的意思是“愚蠢”不是“什么都不做”,而是“不做任何事”。
【解决方案2】:

这是我最终采用的解决方案(除非我能弄清楚如何使用单个通用 DataFlow 块来编组每种类型的数据库访问):

我在类级别定义了一个 SemaphoreSlim:

private SemaphoreSlim ThrottleDatabaseQuerySemaphore = new SemaphoreSlim(30, 30);

我修改了 I/O 类以调用节流类:

    private async Task<Data> DoIOBoundWorkAsync(Data data)
    {
        if (data.TaskId <= 0)
            data.TaskId = Interlocked.Increment(ref TaskId);

        Task t = Task.Delay(data.Delay); ;
        await ThrottleDatabaseQueryAsync(t);

        return data;
    }

节流类:(我也有一个通用版本的节流例程,因为我不知道如何编写一个例程来同时处理 Task 和 Task

    private async Task ThrottleDatabaseQueryAsync(Task task)
    {
        await ThrottleDatabaseQuerySemaphore.WaitAsync();
        try
        {
            lock (MetricsLock)
            {
                AsyncCount++;
                if (AsyncCount > MaxAsyncCount)
                    MaxAsyncCount = AsyncCount;
            }

            await task;
        }
        finally
        {
            ThrottleDatabaseQuerySemaphore.Release();

            lock (MetricsLock)
                AsyncCount--;
        }
    }
}

【讨论】:

  • 这会将您遇到的数据库并发问题转移到客户端。它不能解决它们。使用“通用”数据导入步骤很容易 - 创建一个接受记录数组并将其写入具有 SqlBulkCopy 类的表的 ActionBlock。为每个目标表创建一个这样的块以保持简单。在每个之前添加一个 BatchBlock 以将记录批处理到例如 5000 或 10000 个数组中。
  • 更高级的版本是使用自定义块将记录一起批处理,并将它们添加到包含记录目标表名称的 DTO。将它们传递给使用表名配置 SqlBulkCopy 实例并写出记录的 ActionBlock。这将为您提供一个数据库编写器,但“分组”块必须为每个目标表处理不同的批处理大小,以考虑快速表和慢速表。
  • 此外,DataFlow、Reactive Extensions 和任务可以组合在一起。源块和目标块可以是可观察者和观察者。您可以使用像Group ByBuffer 这样的Rx 操作来按目标分隔记录并按计数时间对它们进行批处理
  • 我不得不在客户端进行节流,因为我的应用程序的行为类似于拒绝服务攻击。我目前在输入端对我的记录进行批处理,以防止在处理过程中内存不足。我 80% 的性能瓶颈都在读取端。在写入方面,我可以使用 SqlBulkCopy 节省一些时间,但删除目标表上的所有索引使其非常有效。我会听取你对反应式扩展的建议。我买了 Stephen Clearys 的书,但还没到。
  • I had to throttle on the client side because my application was behaving like a denial of service attack 这就是我所说的。这不是并行处理问题。这是一个数据工程问题
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2021-05-25
  • 1970-01-01
  • 2012-04-28
  • 1970-01-01
  • 2013-09-25
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多