【问题标题】:Linking dynamically created ActionBlocks to a BufferBlock将动态创建的 ActionBlock 链接到 BufferBlock
【发布时间】:2012-09-28 20:47:08
【问题描述】:

我不确定这是否可行,但如果可以,我可能做得不对。假设我有一个链接到许多消费者(ActionBlocks)的共享缓冲区。每个消费者都应该使用满足用于将其链接到缓冲区的谓词的数据。例如,ActionBlock1 应该消费满足x => x % 5 == 0 的数字,ActionBlock2 应该只消费x => x % 5 == 1 等等。

这是我得到的:

private static ITargetBlock<int> BuildPipeline(int NumProductionLines)
{
    var productionQueue = new BufferBlock<int>();

    for (int i = 0; i < NumProductionLines; i++)
    {
        ActionBlock<int> productionLine = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", i + 1, num));

        productionQueue.LinkTo(productionLine, x => x % NumProductionLines == i);
    }

    return productionQueue;
}

然后我打电话:

Random rnd = new Random();

ITargetBlock<int> temp = BuildPipeline(5);

while (true)
{
    temp.Post(rnd.Next(255));
}

但是这不起作用。控制台中不显示任何输出。如果我将BuildPipeline 方法修改为:

private static ITargetBlock<int> BuildPipeline(int NumProductionLines)
{
    var productionQueue = new BufferBlock<int>();

    ActionBlock<int> productionLine1 = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", 1, num));
    ActionBlock<int> productionLine2 = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", 2, num));
    ActionBlock<int> productionLine3 = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", 3, num));
    ActionBlock<int> productionLine4 = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", 4, num));
    ActionBlock<int> productionLine5 = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", 5, num));

    productionQueue.LinkTo(productionLine1, x => x % 5 == 0);
    productionQueue.LinkTo(productionLine2, x => x % 5 == 1);
    productionQueue.LinkTo(productionLine3, x => x % 5 == 2);
    productionQueue.LinkTo(productionLine4, x => x % 5 == 3);
    productionQueue.LinkTo(productionLine5, x => x % 5 == 4);

    return productionQueue;
}

代码按预期执行。

有人能解释为什么动态创建和链接动作块不起作用吗?

附:如果我在ITargetBlock&lt;int&gt; temp = BuildPipeline(5); temp 之后立即中断代码,则确实显示 5 个目标已链接到缓冲区。并且每个target的id都不一样。

提前致谢

编辑:添加了 svick 建议的更改,但仍然没有好处:

private static ITargetBlock<int> BuildPipeline(int NumProductionLines)
{
    var productionQueue = new BufferBlock<int>();
    var opt = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 };

    for (int i = 0; i < NumProductionLines; i++)
    {
        ActionBlock<int> productionLine = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", i + 1, num));

        int j = i;
        productionQueue.LinkTo(productionLine, x => x % NumProductionLines == j);
    }

    ActionBlock<int> discardedLine = new ActionBlock<int>(num => Console.WriteLine("Discarded: {0}", num));
    productionQueue.LinkTo(discardedLine);

    return productionQueue;
}

现在只有第二条生产线处理数据(满足 x % 5 == 1 谓词的那一条)。并且数据不满足谓词,这意味着我得到了以 9 和 7 结尾的数字。

编辑:工作代码如下所示:

private static ITargetBlock<int> BuildPipeline(int NumProductionLines)
{
    var productionQueue = new BufferBlock<int>();
    var opt = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 };

    for (int i = 0; i < NumProductionLines; i++)
    {
        int j = i;
        ActionBlock<int> productionLine = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", j + 1, num));

        productionQueue.LinkTo(productionLine, x => x % NumProductionLines == j);
    }

    productionQueue.LinkTo(DataflowBlock.NullTarget<int>());

    return productionQueue;
}

【问题讨论】:

    标签: c# task-parallel-library tpl-dataflow


    【解决方案1】:

    问题在于,在您的第一个版本中,您对每个目标块使用相同的谓词。换句话说,谓词不依赖于i

    但即使这样做,您的代码也不会工作,因为i 变量在谓词之间共享,因此它们都将使用最后一个值。解决方法是将i 复制到一个局部变量中并在谓词中使用它。

    代码可能如下所示:

    private static ITargetBlock<int> BuildPipeline(int NumProductionLines)
    {
        var productionQueue = new BufferBlock<int>();
    
        for (int i = 0; i < NumProductionLines; i++)
        {
            int iCopy = i;
    
            ActionBlock<int> productionLine = new ActionBlock<int>(
                num => Console.WriteLine("Processed by line {0}: {1}", iCopy + 1, num));
    
            productionQueue.LinkTo(
                productionLine, x => x % NumProductionLines == iCopy);
        }
    
        return productionQueue;
    }
    

    如果您问为什么您的代码至少不处理 x % 5 == 1 数字,那是因为随机生成器可能会生成一个与该谓词不匹配的数字,所以 ActionBlocks 都不会接受。因此,该号码将留在源块的输出队列中,其他号码将无法通过。

    如果在您的真实代码中可能发生类似情况并且您想丢弃所有不适合任何谓词的数字,您可以将源代码块链接到 a block that does nothing 并在链接到之后接受任何内容你所有有用的块:

    productionQueue.LinkTo(DataflowBlock.NullTarget<int>());
    

    【讨论】:

    • @Dimitri 您可能已经注意到,您也需要在块 lambda 中使用副本。我修复了答案中的代码。
    • 是的,我确实替换了所有出现的 i,谢谢。此外,我的代码确实在 for 循环中包含错误:它不是硬编码的谓词,而是依赖于变量的。
    • 供以后参考:@svick 举了一个Closure 的例子来解决这个问题。如果涉及到 lambda 表达式,这实际上是最常见的问题,因为它很容易被忽略。
    猜你喜欢
    • 2016-05-25
    • 2021-06-22
    • 1970-01-01
    • 2013-06-29
    • 2020-01-22
    • 2023-03-19
    • 2011-10-16
    • 2023-04-06
    相关资源
    最近更新 更多