【发布时间】:2019-04-22 11:23:48
【问题描述】:
我想编写一个应用程序来评估来自两个传感器的传感器数据。两个传感器都在Package 对象中发送数据,这些对象被拆分为Frame 对象。 Package 本质上是 Tuple<Timestamp, Data[]>,Frame 是 Tuple<Timestamp, Data>。然后我需要始终使用两个来源中具有最早时间戳的Frame。
所以基本上我的对象流是
Package -(1:n)-> Frame \
}-pair synchronized-> Tuple<Frame, Frame>
Package -(1:n)-> Frame /
示例
假设每个 Package 包含 2 或 3 个值(现实:5-7)和以 1 为增量的整数时间戳(现实:~200Hz => ~5ms 增量)。为简单起见,“数据”只是timestamp * 100。
Packages (timestamp, values[])
Source 1:
{(19, [1700, 1800, 1900]), (22, [2000, 2100, 2200]), (26, [2500, 2600]),
(29, [2700, 2800, 2900]), ...}
Source 2:
{(17, [1500, 1600, 1700]), (19, [1800, 1900]), (21, [2000, 2100]),
(26, [2400, 2500, 2600]), ...}
经过(1:n) 步骤:
Frames (timestamp, value)
Source 1:
{(17, 1700), (18, 1800), (19, 1900), (20, 2000), (21, 2100),
(22, 2200), (25, 2500), (26, 2600), (27, 2700), (28, 2800),
(29, 2900), ...}
Source 2:
{(15, 1500), (16, 1600), (17, 1700), (18, 1800), (19, 1900),
(20, 2000), (21, 2100), (24, 2400), (25, 2500), (26, 2600), ...}
pair synchronized 步骤之后:
Merged tuples (timestamp, source1, source2)
{(15, null, 1500), (16, null, 1600), (17, 1700, 1700), (18, 1800, 1800),
(19, 1900, 1900), (20, 2000, 2000), (21, 2100, 2100), (22, 2200, null),
(24, null, 2400), (25, 2500, 2500), (26, 2600, 2600), ...}
请注意,时间戳 23 丢失了,因为两个来源都没有发送了一个值。这只是一个副作用。我可以放一个空元组,也可以不放,没关系。元组是(27, 2700, 2700) 还是((27, 2700), (27, 2700)) 也没关系,即。 e. Tuple<Timestamp, Data, Data> 或 Tuple<Frame, Frame>。
如果我的文档正确,我很确定 (1:n) 部分应该是 TransformManyBlock<Package, Frame>。
但是我在pair synchronized 部分使用哪个块?起初,我认为JoinBlock<Frame, Frame> 会是我要找的,但它似乎它只是将两个元素按索引配对。但是,由于既不能确保两个管道都以相同的时间戳开始,也不能确保两个管道总是产生稳定的连续时间戳流(因为有时具有几帧的包可能会在传输中丢失),这不是一个选择。所以我需要更多的是一个“MergeBlock”,它可以决定两个输入流的哪个元素接下来传播到输出(如果有的话)。
我想我必须自己写这样的东西。但是我无法编写正确处理两个 ISourceBlock 变量和一个 ITargetBlock 变量的代码。我基本上被卡住了:
private void MergeSynchronized(
ISourceBlock<Frame> source1,
ISourceBlock<Frame> source2,
ITargetBlock<Tuple<Frame, Frame>> target)
{
var frame1 = source1.Receive();
var frame2 = source2.Receive();
//Loop {
// Depending on the timestamp [mis]match,
// either pair frame1+frame2 or frame1+null or null+frame2, and
// replace whichever frame(s) was/were propagated already
// with the next frame from the respective pipeline
//}
}
我什至不确定这个草稿:方法应该是async,所以我可以使用var frame1 = await source1.ReceiveAsnyc();吗?循环的条件是什么?在哪里以及如何检查是否完成?如何解决我的代码意味着我必须等到流中的间隙结束才能意识到存在间隙的明显问题?
我想到的替代方案是在管道中添加一个额外的块,确保每个传感器有足够的“哨兵帧”放入管道中,以便始终对齐每个管道中的第一个将对齐正确的两个。我猜这将是一种TransformManyBlock,它读取一个帧,将“预期”时间戳与实际时间戳进行比较,然后为丢失的时间戳插入标记帧,直到帧的时间戳正确再次。
或者pair synchronized 部分是停止使用 TPL 数据流对象并启动已经使用 Data 部分的实际代码的地方吗?
【问题讨论】:
-
记录:我有一种强烈的感觉,首先尝试同步两个 DataFlow 管道有点破坏 DataFlow 的全部意义。 DataFlow 似乎是为了让您尽可能快地处理数据,而不必考虑线程或循环......所以也许这完全是垃圾,我应该尽可能快地将数据输出到结果流,然后将它们组合起来当第二个流也稍后有结果时?
-
始终使用两个来源中具有最早时间戳的帧您能否澄清一下,这似乎正是您在什么条件下使用
JoinBlock会得到的你想让你的合并加入一个空的Frame吗? -
@JSteward 从两个拆分管道中,我从队列中获取当前元素。这将是每个队列的最早时间戳。所以我有两个元素时间戳。要么,时间戳对齐,然后我需要将它们配对,或者它们不对齐,那么我只需要消耗较早的时间戳并等待该管道“赶上”可以这么说。但即使它们同步了,这并不意味着它们今后将保持同步。
-
@JSteward 我加了一个详细的例子。
标签: c# task-parallel-library tpl-dataflow