【问题标题】:Comparing previous and next tuple in storm bolt比较风暴螺栓中的上一个和下一个元组
【发布时间】:2016-04-19 06:08:20
【问题描述】:

我有由风暴拓扑处理的实时数据。数据可以有四种类型,比如 A、B、C、D。这些数据中的每一个都由 Bolt 以随机顺序使用。我需要做的是比较两个相同数据类型的元组。例如,我想将 A 型元组与下一个 A 型元组进行比较,或者将当前 A 型元组与之前收到的 A 型元组进行比较。有没有办法在螺栓中做到这一点?或者我必须将先前的结果保存在数据库中的某个位置(比如说 hbase 或缓存)并查询它以与特定类型的当前元组进行比较。

编辑

假设 A、B、C、D 类型的数据流来自 spout

B4 A4 C7 D2 A3 A2 B3 C6 D1 B2 C5 C4 B1 C3 C2 C1 A1----->喷口-->螺栓

现在我想比较 A1 和 A2、A2 和 A3、A3 和 A4。类似地,B1 与 B2,B2 与 B3 等等。

【问题讨论】:

  • Storm 没有订购保证。那么,您将“类型 A 元组与下一个类型 A 元组”进行比较是什么意思?如果一个 bolt 有多个生产者任务(即单个生产者 bolt 的 dop > 1),则可以在每个通道上传递 A 型元组,并且没有排序。
  • 我已经更新了这个例子。见编辑
  • 但这只有在你的 spout 和 bolt 都具有一个并行度时才有效!这真的是你想要的吗?
  • 是的,到目前为止,我希望它具有“一”的并行性。我认为很难有两个或更多的并行性。在那种情况下,我认为我必须将它存储在某个地方并查询它。我说的对吗?
  • 如果您使用可靠的 spout 并启用容错,您可以在内部变量中缓冲第一个元组并“等待”第二个元组到达。比你可以比较两者,并删除第一个元组(并确认它 - 只要你需要它进行比较就不要确认它 - 否则你会破坏容错)。

标签: java hadoop distributed apache-storm


【解决方案1】:

您可以在 spout 中发出元组时指定数据的类型。 然后你可以使用字段分组,这样每个类型 A 都会去同一个线程。这样,您最多可以有 4 个不同的线程执行您的螺栓代码。保证每个线程中的顺序。

builder.setBolt(BOLT_NAME, new BoltClass(),4)
.fieldsGrouping(SPOUT_NAME,new Fields("type"));

Storm 文档中的字段分组定义:

字段分组:流按分组中指定的字段进行分区。例如,如果流按“user-id”字段分组,具有相同“user-id”的元组将始终执行相同的任务,但具有不同“user-id”的元组可能会执行不同的任务.

http://storm.apache.org/documentation/Concepts.html

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-07-29
    • 2014-09-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多