【发布时间】:2016-04-19 06:08:20
【问题描述】:
我有由风暴拓扑处理的实时数据。数据可以有四种类型,比如 A、B、C、D。这些数据中的每一个都由 Bolt 以随机顺序使用。我需要做的是比较两个相同数据类型的元组。例如,我想将 A 型元组与下一个 A 型元组进行比较,或者将当前 A 型元组与之前收到的 A 型元组进行比较。有没有办法在螺栓中做到这一点?或者我必须将先前的结果保存在数据库中的某个位置(比如说 hbase 或缓存)并查询它以与特定类型的当前元组进行比较。
编辑
假设 A、B、C、D 类型的数据流来自 spout
B4 A4 C7 D2 A3 A2 B3 C6 D1 B2 C5 C4 B1 C3 C2 C1 A1----->喷口-->螺栓
现在我想比较 A1 和 A2、A2 和 A3、A3 和 A4。类似地,B1 与 B2,B2 与 B3 等等。
【问题讨论】:
-
Storm 没有订购保证。那么,您将“类型 A 元组与下一个类型 A 元组”进行比较是什么意思?如果一个 bolt 有多个生产者任务(即单个生产者 bolt 的 dop > 1),则可以在每个通道上传递 A 型元组,并且没有排序。
-
我已经更新了这个例子。见编辑
-
但这只有在你的 spout 和 bolt 都具有一个并行度时才有效!这真的是你想要的吗?
-
是的,到目前为止,我希望它具有“一”的并行性。我认为很难有两个或更多的并行性。在那种情况下,我认为我必须将它存储在某个地方并查询它。我说的对吗?
-
如果您使用可靠的 spout 并启用容错,您可以在内部变量中缓冲第一个元组并“等待”第二个元组到达。比你可以比较两者,并删除第一个元组(并确认它 - 只要你需要它进行比较就不要确认它 - 否则你会破坏容错)。
标签: java hadoop distributed apache-storm