【问题标题】:How to merge two TPL DataFlow pipelines in synchronized fashion?如何以同步方式合并两个 TPL DataFlow 管道?
【发布时间】:2019-04-22 11:23:48
【问题描述】:

我想编写一个应用程序来评估来自两个传感器的传感器数据。两个传感器都在Package 对象中发送数据,这些对象被拆分为Frame 对象。 Package 本质上是 Tuple<Timestamp, Data[]>FrameTuple<Timestamp, Data>。然后我需要始终使用两个来源中具有最早时间戳的Frame

所以基本上我的对象流是

Package -(1:n)-> Frame \
                        }-pair synchronized-> Tuple<Frame, Frame>
Package -(1:n)-> Frame /

示例

假设每个 Package 包含 2 或 3 个值(现实:5-7)和以 1 为增量的整数时间戳(现实:~200Hz => ~5ms 增量)。为简单起见,“数据”只是timestamp * 100

Packages (timestamp, values[])

Source 1:
{(19, [1700, 1800, 1900]), (22, [2000, 2100, 2200]), (26, [2500, 2600]),
 (29, [2700, 2800, 2900]), ...}

Source 2:
{(17, [1500, 1600, 1700]), (19, [1800, 1900]), (21, [2000, 2100]),
 (26, [2400, 2500, 2600]), ...}

经过(1:n) 步骤:

Frames (timestamp, value)

Source 1:
{(17, 1700), (18, 1800), (19, 1900), (20, 2000), (21, 2100),
 (22, 2200), (25, 2500), (26, 2600), (27, 2700), (28, 2800),
 (29, 2900), ...}

Source 2:
{(15, 1500), (16, 1600), (17, 1700), (18, 1800), (19, 1900),
 (20, 2000), (21, 2100), (24, 2400), (25, 2500), (26, 2600), ...}

pair synchronized 步骤之后:

Merged tuples (timestamp, source1, source2)

{(15, null, 1500), (16, null, 1600), (17, 1700, 1700), (18, 1800, 1800),
 (19, 1900, 1900), (20, 2000, 2000), (21, 2100, 2100), (22, 2200, null),
 (24, null, 2400), (25, 2500, 2500), (26, 2600, 2600), ...}

请注意,时间戳 23 丢失了,因为两个来源都没有发送了一个值。这只是一个副作用。我可以放一个空元组,也可以不放,没关系。元组是(27, 2700, 2700) 还是((27, 2700), (27, 2700)) 也没关系,即。 e. Tuple&lt;Timestamp, Data, Data&gt;Tuple&lt;Frame, Frame&gt;


如果我的文档正确,我很确定 (1:n) 部分应该是 TransformManyBlock&lt;Package, Frame&gt;

但是我在pair synchronized 部分使用哪个块?起初,我认为JoinBlock&lt;Frame, Frame&gt; 会是我要找的,但它似乎它只是将两个元素按索引配对。但是,由于既不能确保两个管道都以相同的时间戳开始,也不能确保两个管道总是产生稳定的连续时间戳流(因为有时具有几帧的包可能会在传输中丢失),这不是一个选择。所以我需要更多的是一个“MergeBlock”,它可以决定两个输入流的哪个元素接下来传播到输出(如果有的话)。

我想我必须自己写这样的东西。但是我无法编写正确处理两个 ISourceBlock 变量和一个 ITargetBlock 变量的代码。我基本上被卡住了:

private void MergeSynchronized(
    ISourceBlock<Frame> source1,
    ISourceBlock<Frame> source2,
    ITargetBlock<Tuple<Frame, Frame>> target)
{
  var frame1 = source1.Receive();
  var frame2 = source2.Receive();

  //Loop {
  //  Depending on the timestamp [mis]match,
  //  either pair frame1+frame2 or frame1+null or null+frame2, and
  //  replace whichever frame(s) was/were propagated already
  //  with the next frame from the respective pipeline
  //}
}

我什至不确定这个草稿:方法应该是async,所以我可以使用var frame1 = await source1.ReceiveAsnyc();吗?循环的条件是什么?在哪里以及如何检查是否完成?如何解决我的代码意味着我必须等到流中的间隙结束才能意识到存在间隙的明显问题?

我想到的替代方案是在管道中添加一个额外的块,确保每个传感器有足够的“哨兵帧”放入管道中,以便始终对齐每个管道中的第一个将对齐正确的两个。我这将是一种TransformManyBlock,它读取一个帧,将“预期”时间戳与实际时间戳进行比较,然后为丢失的时间戳插入标记帧,直到帧的时间戳正确再次。

或者pair synchronized 部分是停止使用 TPL 数据流对象并启动已经使用 Data 部分的实际代码的地方吗?

【问题讨论】:

  • 记录:我有一种强烈的感觉,首先尝试同步两个 DataFlow 管道有点破坏 DataFlow 的全部意义。 DataFlow 似乎是为了让您尽可能快地处理数据,而不必考虑线程或循环......所以也许这完全是垃圾,我应该尽可能快地将数据输出到结果流,然后将它们组合起来当第二个流稍后有结果时?
  • 始终使用两个来源中具有最早时间戳的帧您能否澄清一下,这似乎正是您在什么条件下使用JoinBlock 会得到的你想让你的合并加入一个空的Frame吗?
  • @JSteward 从两个拆分管道中,我从队列中获取当前元素。这将是每个队列的最早时间戳。所以我有两个元素时间戳。要么,时间戳对齐,然后我需要将它们配对,或者它们不对齐,那么我只需要消耗较早的时间戳并等待该管道“赶上”可以这么说。但即使它们同步了,这并不意味着它们今后将保持同步。
  • @JSteward 我加了一个详细的例子。

标签: c# task-parallel-library tpl-dataflow


【解决方案1】:

TPL DataFlow API 的问题在于,一切都是内部/私有和/或密封的。这使您没有太多扩展 API 的可能性。

无论如何,对于您的问题,实现一个新的 SynchronizedJoinBlock 类可能是个好主意。实际的业务逻辑位于 GetMessagesRecursive 方法中:

    public sealed class SynchronizedJoinBlock<T1, T2>
        : IReceivableSourceBlock<Tuple<T1, T2>>
    {
        private readonly object _syncObject = new object();
        private readonly Func<T1, T2, int> _compareFunction;
        private readonly Queue<T1> _target1Messages;
        private readonly Queue<T2> _target2Messages;
        private readonly TransformManyBlock<T1, Tuple<T1, T2>> _target1;
        private readonly TransformManyBlock<T2, Tuple<T1, T2>> _target2;
        private readonly BatchedJoinBlock<Tuple<T1, T2>, Tuple<T1, T2>> _batchedJoinBlock;
        private readonly TransformManyBlock<Tuple<IList<Tuple<T1, T2>>, IList<Tuple<T1, T2>>>, Tuple<T1, T2>> _transformManyBlock;

        public ITargetBlock<T1> Target1 => _target1;

        public ITargetBlock<T2> Target2 => _target2;

        public Task Completion => _transformManyBlock.Completion;

        public SynchronizedJoinBlock(Func<T1, T2, int> compareFunction)
        {
            _compareFunction = compareFunction
                ?? throw new ArgumentNullException(nameof(compareFunction));
            _batchedJoinBlock = new BatchedJoinBlock<Tuple<T1, T2>, Tuple<T1, T2>>(1);
            _target1Messages = new Queue<T1>();
            _target2Messages = new Queue<T2>();

            Func<ICollection<Tuple<T1, T2>>> getMessagesFunction = () =>
            {
                lock (_syncObject)
                {
                    if (_target1Messages.Count > 0 && _target2Messages.Count > 0)
                    {
                        return GetMessagesRecursive(_target1Messages.Peek(), _target2Messages.Peek()).ToArray();
                    }
                    else
                    {
                        return new Tuple<T1, T2>[0];
                    }
                }
            };

            _target1 = new TransformManyBlock<T1, Tuple<T1, T2>>((element) =>
            {
                _target1Messages.Enqueue(element);
                return getMessagesFunction();
            });
            _target1.LinkTo(_batchedJoinBlock.Target1, new DataflowLinkOptions() { PropagateCompletion = true });

            _target2 = new TransformManyBlock<T2, Tuple<T1, T2>>((element) =>
            {
                _target2Messages.Enqueue(element);
                return getMessagesFunction();
            });
            _target2.LinkTo(_batchedJoinBlock.Target2, new DataflowLinkOptions() { PropagateCompletion = true });

            _transformManyBlock = new TransformManyBlock<Tuple<IList<Tuple<T1, T2>>, IList<Tuple<T1, T2>>>, Tuple<T1, T2>>(
                element => element.Item1.Concat(element.Item2)
            );
            _batchedJoinBlock.LinkTo(_transformManyBlock, new DataflowLinkOptions() { PropagateCompletion = true });
        }

        private IEnumerable<Tuple<T1, T2>> GetMessagesRecursive(T1 value1, T2 value2)
        {
            int result = _compareFunction(value1, value2);
            if (result == 0)
            {
                yield return Tuple.Create(_target1Messages.Dequeue(), _target2Messages.Dequeue());
            }
            else if (result < 0)
            {
                yield return Tuple.Create(_target1Messages.Dequeue(), default(T2));

                if (_target1Messages.Count > 0)
                {
                    foreach (var item in GetMessagesRecursive(_target1Messages.Peek(), value2))
                    {
                        yield return item;
                    }
                }
            }
            else
            {
                yield return Tuple.Create(default(T1), _target2Messages.Dequeue());

                if (_target2Messages.Count > 0)
                {
                    foreach (var item in GetMessagesRecursive(value1, _target2Messages.Peek()))
                    {
                        yield return item;
                    }
                }
            }
        }

        public void Complete()
        {
            _target1.Complete();
            _target2.Complete();
        }

        Tuple<T1, T2> ISourceBlock<Tuple<T1, T2>>.ConsumeMessage(
            DataflowMessageHeader messageHeader,
            ITargetBlock<Tuple<T1, T2>> target, out bool messageConsumed)
        {
            return ((ISourceBlock<Tuple<T1, T2>>)_transformManyBlock)
                .ConsumeMessage(messageHeader, target, out messageConsumed);
        }

        void IDataflowBlock.Fault(Exception exception)
        {
            ((IDataflowBlock)_transformManyBlock).Fault(exception);
        }

        public IDisposable LinkTo(ITargetBlock<Tuple<T1, T2>> target,
            DataflowLinkOptions linkOptions)
        {
            return _transformManyBlock.LinkTo(target, linkOptions);
        }

        void ISourceBlock<Tuple<T1, T2>>.ReleaseReservation(
            DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
        {
            ((ISourceBlock<Tuple<T1, T2>>)_transformManyBlock)
                .ReleaseReservation(messageHeader, target);
        }

        bool ISourceBlock<Tuple<T1, T2>>.ReserveMessage(
            DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
        {
            return ((ISourceBlock<Tuple<T1, T2>>)_transformManyBlock)
                .ReserveMessage(messageHeader, target);
        }

        public bool TryReceive(Predicate<Tuple<T1, T2>> filter, out Tuple<T1, T2> item)
        {
            return _transformManyBlock.TryReceive(filter, out item);
        }

        public bool TryReceiveAll(out IList<Tuple<T1, T2>> items)
        {
            return _transformManyBlock.TryReceiveAll(out items);
        }
    }

【讨论】:

  • 不错。你对(我认为是)非常好的 TPL 数据流库的主要限制是绝对正确的。
【解决方案2】:

这是一个SynchronizedJoinBlock 块的实现,与Hardy Hobeck 的answer 中的块类似。这个处理一些小细节,比如取消、处理异常以及在输入块Target1Target2 标记为完成时处理传播剩余项目。此外,合并逻辑不涉及递归,这应该使其性能更好(希望我没有测量它)并且不易受到堆栈溢出异常的影响。小偏差:输出是ValueTuple&lt;T1, T2&gt; 而不是Tuple&lt;T1, T2&gt;(目的是减少分配)。

public sealed class SynchronizedJoinBlock<T1, T2> : IReceivableSourceBlock<(T1, T2)>
{
    private readonly Func<T1, T2, int> _comparison;
    private readonly Queue<T1> _queue1 = new Queue<T1>();
    private readonly Queue<T2> _queue2 = new Queue<T2>();
    private readonly ActionBlock<T1> _input1;
    private readonly ActionBlock<T2> _input2;
    private readonly BufferBlock<(T1, T2)> _output;
    private readonly object _locker = new object();

    public SynchronizedJoinBlock(Func<T1, T2, int> comparison,
        CancellationToken cancellationToken = default)
    {
        _comparison = comparison ?? throw new ArgumentNullException(nameof(comparison));

        // Create the three internal blocks
        var options = new ExecutionDataflowBlockOptions()
        {
            CancellationToken = cancellationToken
        };
        _input1 = new ActionBlock<T1>(Add1, options);
        _input2 = new ActionBlock<T2>(Add2, options);
        _output = new BufferBlock<(T1, T2)>(options);

        // Link the input blocks with the output block
        var inputTasks = new Task[] { _input1.Completion, _input2.Completion };
        Task.WhenAny(inputTasks).Unwrap().ContinueWith(t =>
        {
            // If ANY input block fails, then the whole block has failed
            ((IDataflowBlock)_output).Fault(t.Exception.InnerException);
            if (!_input1.Completion.IsCompleted) _input1.Complete();
            if (!_input2.Completion.IsCompleted) _input2.Complete();
            ClearQueues();
        }, default, TaskContinuationOptions.OnlyOnFaulted |
            TaskContinuationOptions.RunContinuationsAsynchronously,
            TaskScheduler.Default);
        Task.WhenAll(inputTasks).ContinueWith(t =>
        {
            // If ALL input blocks succeeded, then the whole block has succeeded
            try
            {
                if (!t.IsCanceled) PostRemaining(); // Post what's left
            }
            catch (Exception ex)
            {
                ((IDataflowBlock)_output).Fault(ex);
            }
            _output.Complete();
            ClearQueues();
        }, default, TaskContinuationOptions.NotOnFaulted |
            TaskContinuationOptions.RunContinuationsAsynchronously,
            TaskScheduler.Default);
    }

    public ITargetBlock<T1> Target1 => _input1;
    public ITargetBlock<T2> Target2 => _input2;
    public Task Completion => _output.Completion;

    private void Add1(T1 value1)
    {
        lock (_locker)
        {
            _queue1.Enqueue(value1);
            FindAndPostMatched_Unsafe();
        }
    }

    private void Add2(T2 value2)
    {
        lock (_locker)
        {
            _queue2.Enqueue(value2);
            FindAndPostMatched_Unsafe();
        }
    }

    private void FindAndPostMatched_Unsafe()
    {
        while (_queue1.Count > 0 && _queue2.Count > 0)
        {
            var result = _comparison(_queue1.Peek(), _queue2.Peek());
            if (result < 0)
            {
                _output.Post((_queue1.Dequeue(), default));
            }
            else if (result > 0)
            {
                _output.Post((default, _queue2.Dequeue()));
            }
            else // result == 0
            {
                _output.Post((_queue1.Dequeue(), _queue2.Dequeue()));
            }
        }
    }

    private void PostRemaining()
    {
        lock (_locker)
        {
            while (_queue1.Count > 0)
            {
                _output.Post((_queue1.Dequeue(), default));
            }
            while (_queue2.Count > 0)
            {
                _output.Post((default, _queue2.Dequeue()));
            }
        }
    }

    private void ClearQueues()
    {
        lock (_locker)
        {
            _queue1.Clear();
            _queue2.Clear();
        }
    }

    public void Complete() => _output.Complete();

    public void Fault(Exception exception)
        => ((IDataflowBlock)_output).Fault(exception);

    public IDisposable LinkTo(ITargetBlock<(T1, T2)> target,
        DataflowLinkOptions linkOptions)
        => _output.LinkTo(target, linkOptions);

    public bool TryReceive(Predicate<(T1, T2)> filter, out (T1, T2) item)
        => _output.TryReceive(filter, out item);

    public bool TryReceiveAll(out IList<(T1, T2)> items)
        => _output.TryReceiveAll(out items);

    (T1, T2) ISourceBlock<(T1, T2)>.ConsumeMessage(
        DataflowMessageHeader messageHeader, ITargetBlock<(T1, T2)> target,
        out bool messageConsumed)
        => ((ISourceBlock<(T1, T2)>)_output).ConsumeMessage(
            messageHeader, target, out messageConsumed);

    void ISourceBlock<(T1, T2)>.ReleaseReservation(
        DataflowMessageHeader messageHeader, ITargetBlock<(T1, T2)> target)
        => ((ISourceBlock<(T1, T2)>)_output).ReleaseReservation(
            messageHeader, target);

    bool ISourceBlock<(T1, T2)>.ReserveMessage(
        DataflowMessageHeader messageHeader, ITargetBlock<(T1, T2)> target)
        => ((ISourceBlock<(T1, T2)>)_output).ReserveMessage(
            messageHeader, target);
}

使用示例:

var joinBlock = new SynchronizedJoinBlock<(int, int), (int, int)>(
    (x, y) => Comparer<int>.Default.Compare(x.Item1, y.Item1));

var source1 = new (int, int)[] {(17, 1700), (18, 1800), (19, 1900),
    (20, 2000), (21, 2100), (22, 2200), (25, 2500), (26, 2600),
    (27, 2700), (28, 2800), (29, 2900)};

var source2 = new (int, int)[] {(15, 1500), (16, 1600), (17, 1700),
    (18, 1800), (19, 1900), (20, 2000), (21, 2100), (24, 2400),
    (25, 2500), (26, 2600)};

Array.ForEach(source1, x => joinBlock.Target1.Post(x));
Array.ForEach(source2, x => joinBlock.Target2.Post(x));

joinBlock.Target1.Complete();
joinBlock.Target2.Complete();

while (joinBlock.OutputAvailableAsync().Result)
{
    Console.WriteLine($"> Received: {joinBlock.Receive()}");
}

输出:

收到:((0, 0), (15, 1500))
收到:((0, 0), (16, 1600))
收到:((17, 1700), (17, 1700))
收到:((18, 1800), (18, 1800))
收到:((19, 1900), (19, 1900))
收到:((20, 2000), (20, 2000))
收到:((21, 2100), (21, 2100))
收到:((22, 2200), (0, 0))
收到:((0, 0), (24, 2400))
收到:((25, 2500), (25, 2500))
收到:((26, 2600), (26, 2600))
收到:((27, 2700), (0, 0))
收到:((28, 2800), (0, 0))
收到:((29, 2900), (0, 0))

假设传入的数据是有序的。

这个类与我前段时间在somewhat related question 中发布的JoinDependencyBlock 类具有相似的结构。

【讨论】:

  • Lists 切换到Queues 用于内部存储。
  • 这看起来很棒。您对输入数据进行排序的假设是正确的。如果可行,我会对其进行测试并接受它。
  • 这是个好消息,因为如果没有订购,除了 comparison lambda 之外,您还必须通过 T1T2 的比较器,并替换内部的 @987654337 @s 带有一些有序的数据结构。而且 .NET 不提供任何允许重复的有效有序集合。
  • 我终于明白了它是如何工作的。基本上,只要两个队列中至少有一个元素,就可以根据需要耗尽它们。一旦一个队列为空,就中断耗尽。填满一个队列的每个方法都会重新触发这种耗尽机制。
  • 嗯,是的...在我的情况下,我“仅”以 200 Hz 记录,每 5 毫秒添加一个元素应该有足够的时间从前面几英里的队列中耗尽大量数据其他。唯一可以担心的另一个问题是lock doesn't guarantee FIFO,但代码会自我修复,只需将太多帧与null 配对。
猜你喜欢
  • 2016-11-10
  • 2017-11-06
  • 2021-11-25
  • 1970-01-01
  • 2021-09-24
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多