【问题标题】:TPL Dataflow block consumes all available memoryTPL 数据流块消耗所有可用内存
【发布时间】:2015-09-08 18:23:02
【问题描述】:

我有一个TransformManyBlock,其设计如下:

  • 输入:文件路径
  • 输出:文件内容的 IEnumerable,一次一行

我在一个巨大的文件 (61GB) 上运行这个块,该文件太大而无法放入 RAM。为了避免无限的内存增长,我已将此块和所有下游块的BoundedCapacity 设置为非常低的值(例如 1)。尽管如此,该块显然会贪婪地迭代 IEnumerable,这会消耗计算机上的所有可用内存,从而使每个进程都停止。块的 OutputCount 继续无限制地上升,直到我终止进程。

我能做些什么来防止块以这种方式消耗IEnumerable

编辑:这是一个说明问题的示例程序:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

class Program
{
    static IEnumerable<string> GetSequence(char c)
    {
        for (var i = 0; i < 1024 * 1024; ++i)
            yield return new string(c, 1024 * 1024);
    }

    static void Main(string[] args)
    {
        var options = new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 };
        var firstBlock = new TransformManyBlock<char, string>(c => GetSequence(c), options);
        var secondBlock = new ActionBlock<string>(str =>
            {
                Console.WriteLine(str.Substring(0, 10));
                Thread.Sleep(1000);
            }, options);

        firstBlock.LinkTo(secondBlock);
        firstBlock.Completion.ContinueWith(task =>
            {
                if (task.IsFaulted) ((IDataflowBlock) secondBlock).Fault(task.Exception);
                else secondBlock.Complete();
            });

        firstBlock.Post('A');
        firstBlock.Complete();
        for (; ; )
        {
            Console.WriteLine("OutputCount: {0}", firstBlock.OutputCount);
            Thread.Sleep(3000);
        }
    }
}

如果您使用的是 64 位机器,请确保清除 Visual Studio 中的“首选 32 位”选项。我的计算机上有 16GB 的 RAM,这个程序会立即消耗所有可用的字节。

【问题讨论】:

  • 好吧 TBH:我没时间在这里和你争论——祝你好运
  • 如果您仔细阅读该部分的其余部分,您会发现它并没有像您想象的那样工作 - 您的 firstBlock 总是提供它可以产生的一切 - 如果您绑定第二个它只会拒绝第二个输入并稍后获取它

标签: c# .net task-parallel-library dataflow tpl-dataflow


【解决方案1】:

您似乎误解了 TPL 数据流的工作原理。

BoundedCapacity 限制您可以发布到块中的项目数量。在您的情况下,这意味着单个char 进入TransformManyBlock 和单个string 进入ActionBlock

因此,您将单个项目发布到TransformManyBlock,然后返回1024*1024 字符串并尝试将它们传递给ActionBlock,它一次只接受一个。其余的字符串只会放在TransformManyBlock 的输出队列中。

您可能想要做的是创建一个块并在达到容量时通过等待(同步或其他方式)以流式方式将项目发布到其中:

private static void Main()
{
    MainAsync().Wait();
}

private static async Task MainAsync()
{
    var block = new ActionBlock<string>(async item =>
    {
        Console.WriteLine(item.Substring(0, 10));
        await Task.Delay(1000);
    }, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });

    foreach (var item in GetSequence('A'))
    {
        await block.SendAsync(item);
    }

    block.Complete();
    await block.Completion;
}

【讨论】:

  • 谢谢。我最终创建了一个封装源 ActionBlock 和目标 BufferBlock 的新块。操作块按照您的建议使用 SendAsync 来填充缓冲区。对外界来说,它的行为就像一个具有我想要的行为的 TransformManyBlock。
  • @brianberns:对不起,如果这是一个愚蠢的问题,但是“await block.SendAsync(item)”和“block.Post(item)”有什么区别?
  • @Bugmaster 这根本不是一个愚蠢的问题:stackoverflow.com/a/13605979/885318
  • @i3arnon:谢谢,我没有意识到 Post() 无论如何都会立即返回,我认为它会阻塞直到消息被消费。哎呀!
【解决方案2】:

似乎要创建一个输出有界的TransformManyBlock,需要三个内部块:

  1. 一个TransformBlock 接收输入并产生IEnumerables,可能并行运行。
  2. 一个非并行的ActionBlock 枚举产生的IEnumerables,并传播最终结果。
  3. 一个BufferBlock 存储最终结果,尊重理想的BoundedCapacity

稍微棘手的部分是如何传播第二个块的完成,因为它不直接链接到第三个块。在下面的实现中,方法PropagateCompletion是根据库的source code写的。

public static IPropagatorBlock<TInput, TOutput>
    CreateOutputBoundedTransformManyBlock<TInput, TOutput>(
    Func<TInput, Task<IEnumerable<TOutput>>> transform,
    ExecutionDataflowBlockOptions dataflowBlockOptions)
{
    if (transform == null) throw new ArgumentNullException(nameof(transform));
    if (dataflowBlockOptions == null)
        throw new ArgumentNullException(nameof(dataflowBlockOptions));

    var input = new TransformBlock<TInput, IEnumerable<TOutput>>(transform,
        dataflowBlockOptions);
    var output = new BufferBlock<TOutput>(dataflowBlockOptions);
    var middle = new ActionBlock<IEnumerable<TOutput>>(async results =>
    {
        if (results == null) return;
        foreach (var result in results)
        {
            var accepted = await output.SendAsync(result).ConfigureAwait(false);
            if (!accepted) break; // If one is rejected, the rest will be rejected too
        }
    }, new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = 1,
        BoundedCapacity = dataflowBlockOptions.MaxDegreeOfParallelism,
        CancellationToken = dataflowBlockOptions.CancellationToken,
        SingleProducerConstrained = true,
    });

    input.LinkTo(middle, new DataflowLinkOptions() { PropagateCompletion = true });
    PropagateCompletion(middle, output);

    return DataflowBlock.Encapsulate(input, output);

    async void PropagateCompletion(IDataflowBlock source, IDataflowBlock target)
    {
        try
        {
            await source.Completion.ConfigureAwait(false);
        }
        catch { }

        var exception = source.Completion.IsFaulted ? source.Completion.Exception : null;
        if (exception != null) target.Fault(exception); else target.Complete();
    }
}

// Overload with synchronous delegate
public static IPropagatorBlock<TInput, TOutput>
    CreateOutputBoundedTransformManyBlock<TInput, TOutput>(
    Func<TInput, IEnumerable<TOutput>> transform,
    ExecutionDataflowBlockOptions dataflowBlockOptions)
{
    return CreateOutputBoundedTransformManyBlock<TInput, TOutput>(
        item => Task.FromResult(transform(item)), dataflowBlockOptions);
}

使用示例:

var firstBlock = CreateOutputBoundedTransformManyBlock<char, string>(
    c => GetSequence(c), options);

【讨论】:

    【解决方案3】:

    如果管道的输出比率低于发布比率,消息将在管道上累积,直到内存耗尽或达到某个队列限制。 如果消息的大小很大,进程很快就会出现内存不足的情况。

    如果队列已经有一条消息,将BoundedCapacity 设置为 1 将导致消息被队列拒绝。例如,在批处理等情况下,这不是所需的行为。查看此post 以获得见解。

    这个工作测试说明了我的观点:

    //Change BoundedCapacity to +1 to see it fail
    [TestMethod]
    public void stackOverflow()
    {      
        var total = 1000;
        var processed = 0;
        var block = new ActionBlock<int>(
           (messageUnit) =>
           {
               Thread.Sleep(10);
               Trace.WriteLine($"{messageUnit}");
               processed++;
           },
            new ExecutionDataflowBlockOptions() { BoundedCapacity = -1 } 
       );
    
        for (int i = 0; i < total; i++)
        {
            var result = block.SendAsync(i);
            Assert.IsTrue(result.IsCompleted, $"failed for {i}");
        }
    
        block.Complete();
        block.Completion.Wait();
    
        Assert.AreEqual(total, processed);
    }
    

    所以我的方法是限制帖子,这样管道就不会在队列中积累太多消息。

    下面是一个简单的方法。 这样数据流会保持全速处理消息,但不会累积消息,这样做可以避免过多的内存消耗。

    //Should be adjusted for specific use.
    public void postAssync(Message message)
    {
    
        while (totalPending = block1.InputCount + ... + blockn.InputCount> 100)
        {
            Thread.Sleep(200);
            //Note: if allocating huge quantities for of memory for each message the Garbage collector may keep up with the pace. 
            //This is the perfect place to force garbage collector to release memory.
    
        }
        block1.SendAssync(message)
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多