【问题标题】:Concurrent Framework (TDF), is a deep copy of collection required here?并发框架(TDF),这里需要集合的深层副本吗?
【发布时间】:2012-12-04 10:36:24
【问题描述】:

我对传递到广播块的列表集合有疑问。这是我到目前为止所拥有的(伪代码,因为完整的代码库太长了):

private BroadcastBlock<List<Quote>> tempBCB;
private TransformBlock<List<Quote>, Dictionary<int, IParentOrder>> tfb1;
private TransformBlock<List<Quote>, Dictionary<int, IParentOrder>> tfb2;
private BatchBlock<Dictionary<int, IParentOrder>> batchBlock;
private JoinBlock<List<Quote>, Dictionary<int, IParentOrder>[]> joinBlock;
private TransformBlock<Tuple<List<Quote>, 
    Dictionary<int, IParentOrder>[]>,List<MySignal>> transformBlock;

tempBCB = new BroadcastBlock<List<Quote>>(quoteList => {
    return quoteList;
    //return Cloning.CloneListCloneValues<Quote>(quoteList);
});

tfb1 = new TransformBlock<List<Quote>, Dictionary<int, IParentOrder>>(
    quotes => {//do something and return Dictionary<int, IParentOrder>});

tfb2 = new TransformBlock<List<Quote>, Dictionary<int, IParentOrder>>(
    quotes => {//do something and return Dictionary<int, IParentOrder>});

batchBlock = new BatchBlock<Dictionary<int, IParentOrder>>(2);

joinBlock = new JoinBlock<List<Quote>, Dictionary<int, IParentOrder>[]>(
    new GroupingDataflowBlockOptions { Greedy = false });

transformBlock = new TransformBlock<Tuple<List<Quote>, 
    Dictionary<int, IParentOrder>[]>, List<MySignal>>(
    tuple => { //do something and return List<MySignal>;});

//Linking
tempBCB.LinkTo(tfb1);
tempBCB.LinkTo(tfb2);
tfb1.LinkTo(batchBlock);
tfb2.LinkTo(batchBlock);
tempBCB.LinkTo(joinBlock.Target1);
batchBlock.LinkTo(joinBlock.Target2);
joinBlock.LinkTo(transformBlock);

我的问题是,在tempBCB 的当前实现中,我在最终的TransformBlock&lt;TInput, TOutput&gt; 中得到了奇怪的结果。

例如,作为元组一部分的Dictionary&lt;int, IParentrOrder&gt; 集合大小不相等,即使tfb1tfb2 的实现100% 相同。

tempBCB 实现中注释掉的行对广播列表进行了深层复制,这似乎确实解决了问题,但问题是这种深层复制使我的代码慢了大约 10 倍,这是在这样的我需要找到不同的解决方案。

首先,我不确定这是问题所在,还是只是速度变慢导致并发操作按预期执行,即使错误仍然隐藏在那里。

其次,如果广播块中缺少深拷贝会导致这些问题,我怎样才能让它更快?

这是我的深拷贝代码:

public static List<TValue> CloneListCloneValues<TValue>(List<TValue> original) 
    where TValue : ICloneable
{
    List<TValue> ret = new List<TValue>(original.Count);

    foreach (TValue entry in original)
    {
        ret.Add((TValue)entry.Clone());
    }

    return ret;
}

我可能会将Quote[] 而不是List&lt;Quote&gt; 输入广播块,但我看不出它如何有助于加快深度复制的性能。

我的问题是:

  • 深拷贝问题是否是真正的问题(我怀疑是因为List&lt;Quote&gt;,流入广播块从未被任何转换块改变)?
  • 如果是,为什么以及如何提高深层复制的效率?

【问题讨论】:

  • 你能创建一个独立的、完整的代码示例来展示这种行为吗?我想说问题在于副本,您无法绕过它(但这取决于您的示例将表现出的行为),但我无法用上面阅读的内容复制该行为。跨度>
  • 另外,您为什么不期望tfb1tfb2 不会返回不同的结果?如果您有单独的转换块,那么我希望它们会做不同的事情。
  • @casperOne,将很快发布一个独立的代码示例。我希望得到相同的结果,因为这些块生成完全相同的数据,但我将使用相同的 Func 初始化这些块以使其更清晰
  • 我认为在这种情况下,您需要弄清楚为什么对于可能相同的输入会得到不同的结果。盲目地尝试一些东西(比如你的深拷贝)很可能只会隐藏错误,它并不能解决它。
  • @svick,同意,我被其他工作耽搁了,我将很快发布完整代码的重新工作,如果您也可以快速查看它,我将不胜感激。从第一次测试中,我注意到我肯定遇到的一个问题是,当 joinBlock 处于非贪婪模式时,broadCastBlock 将项目发送到 joinBlock,很可能在 joinBlock 等待来自另一个 targetBlock 的项目时导致项目下降。 “手动” SendAsync() 解决了该问题,但由于结构相当复杂,可能存在更多问题。

标签: c# collections concurrency cloning tpl-dataflow


【解决方案1】:

我回答了我自己的问题,因为我最终解决了这个问题。 svick 警告的问题与List&lt;Quote&gt; 是否需要在广播块中进行深拷贝无关(实际上它不需要深拷贝)。该问题与广播块有关,该广播块被请求完成(对链接的数据流块的完整传播设置为真),在 batchBlock 之前,它也链接到 joinBlock,可能将所有项目流式传输到 joinBlock。我只是取出了 joinBlock,因为我重写了转换块(它们现在返回自己的转换项以及原始项,并使 joinBlock 过时。

关于主 transformBlock 中的并发性的注意事项:将 MaxDegreeOfParallelism 设置为 > 1 即使在这种轻量工作负载下也已经提供了性能优势,但是,当向其投入更重的工作负载时,它确实会发挥作用。

这里是编译和工作的完整代码(我重命名了一些类,但结构与描述保持一致):

public class Test
{
    private Stopwatch watch;

    private BroadcastBlock<List<InputObject>> tempBCB;
    private BatchBlock<Tuple<List<InputObject>, Dictionary<int, IntermediateObject>>> batchBlock;
    private TransformBlock<Tuple<List<InputObject>, Dictionary<int, IntermediateObject>>[], List<FinalObject>> transformBlock;
    private ActionBlock<List<FinalObject>> justToFlushTransformBlock;

    private CoreLogic core1;
    private CoreLogic core2;

    public Test()
    {
        tempBCB = new BroadcastBlock<List<InputObject>>(input => input);

        //here batch size = 2
        batchBlock = new BatchBlock<Tuple<List<InputObject>,Dictionary<int,IntermediateObject>>>(2, new GroupingDataflowBlockOptions { Greedy = false });

        transformBlock = new TransformBlock<Tuple<List<InputObject>,Dictionary<int,IntermediateObject>>[],List<FinalObject>>(array =>
        {
            List<InputObject> inputObjects = array[0].Item1;
            List<FinalObject> ret = inputObjects.ConvertAll(x => new FinalObject(x));

            foreach (var tuple in array)
            {
                //iterate over each individual object
                foreach (var dictionary in tuple.Item2)
                {
                    ret[dictionary.Key].outputList.Add(dictionary.Value);
                }
            }

            return ret;
        }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

        justToFlushTransformBlock = new ActionBlock<List<FinalObject>>(list =>
            {
                //just in order to accept items from the transformBlock output queue
            });

        //Generate 2 CoreLogic objects
        core1 = new CoreLogic();
        core2 = new CoreLogic();

        //linking
        tempBCB.LinkTo(core1.transformBlock, new DataflowLinkOptions { PropagateCompletion = true });
        tempBCB.LinkTo(core2.transformBlock, new DataflowLinkOptions { PropagateCompletion = true });

        core1.transformBlock.LinkTo(batchBlock);
        core2.transformBlock.LinkTo(batchBlock);

        batchBlock.LinkTo(transformBlock, new DataflowLinkOptions { PropagateCompletion = true });

        transformBlock.LinkTo(justToFlushTransformBlock, new DataflowLinkOptions { PropagateCompletion = true });
    }

    public void Start()
    {
        const int numberChunks = 30;

        watch = new Stopwatch();
        watch.Start();

        for (int j = 1; j <= numberChunks; j++)
        {
            int collectionSize = 10000 * j;

            List<InputObject> collection = new List<InputObject>(collectionSize);
            for (int i = 0; i < collectionSize; i++)
            {
                collection.Add(new InputObject(i));
            }

            tempBCB.Post(collection);
        }

        tempBCB.Complete();

        Task.WhenAll(core1.transformBlock.Completion, core2.transformBlock.Completion).ContinueWith(_ =>
            {
                batchBlock.Complete();
            });

        transformBlock.Completion.Wait();

        watch.Stop();

        Console.WriteLine("Elapsed time (in milliseconds): " + watch.ElapsedMilliseconds);
        Console.ReadLine();
    }
}

public class CoreLogic
{
    private Random rand;
    public TransformBlock<List<InputObject>, Tuple<List<InputObject>, Dictionary<int, IntermediateObject>>> transformBlock;

    public CoreLogic()
    {
        const int numberIntermediateObjects = 10000;

        transformBlock = new TransformBlock<List<InputObject>, Tuple<List<InputObject>, Dictionary<int, IntermediateObject>>>(input =>
        {
            //please ignore the fact that `input` is not utilized here, the point is to generate a collection of IntermediateObject and return

            Dictionary<int, IntermediateObject> ret = new Dictionary<int, IntermediateObject>();
            for (int i = 0; i < numberIntermediateObjects; i++)
            {
                IntermediateObject value = new IntermediateObject(i);

                ret.Add(i, value);
            }

            var tuple = new Tuple<List<InputObject>, Dictionary<int, IntermediateObject>>(input, ret);

            return tuple;
        });
    }
}

public class InputObject : ICloneable
{
    public int value1 { get; private set; }

    public InputObject(int value)
    {
        this.value1 = value;
    }

    object ICloneable.Clone()
    {
        return Clone();
    }

    public InputObject Clone()
    {
        return (InputObject)this.MemberwiseClone();
    }
}

public class IntermediateObject
{
    public int value1 { get; private set; }

    public IntermediateObject(int value)
    {
        this.value1 = value;
    }
}

public class FinalObject
{
    public InputObject input { get; private set; }
    public List<IntermediateObject> outputList;

    public FinalObject(InputObject input)
    {
        this.input = input;

        this.outputList = new List<IntermediateObject>();
    }
}

public static class Cloning
{
    public static List<TValue> CloneListCloneValues<TValue>(List<TValue> original) where TValue : ICloneable
    {
        List<TValue> ret = new List<TValue>(original.Count);

        foreach (TValue entry in original)
        {
            ret.Add((TValue)entry.Clone());
        }

        return ret;
    }
}

我希望这对可能遇到类似问题的其他人有所帮助。我喜欢 TPL Dataflow,特别是 svick 真的帮助并激励了我更深入地挖掘。谢谢你!!!

【讨论】:

    猜你喜欢
    • 2012-01-21
    • 2016-06-21
    • 1970-01-01
    • 2011-08-08
    • 2011-01-10
    • 1970-01-01
    • 1970-01-01
    • 2014-04-17
    • 1970-01-01
    相关资源
    最近更新 更多