【问题标题】:Processing records in order in Storm在 Storm 中按顺序处理记录
【发布时间】:2016-01-06 13:19:39
【问题描述】:

我是 Storm 的新手,在弄清楚如何按顺序处理记录时遇到了问题。

我有一个数据集,其中包含具有以下字段的记录:

user_id、location_id、time_of_checking

现在,我想识别已完成我指定路径的用户(例如,从位置 A 到位置 B 再到位置 C 的用户)。

我正在使用 Kafka 生产者并从文件中读取这些记录来模拟实时数据。数据按日期排序。

所以,要检查我的模式是否满足,我需要按顺序处理记录。问题是,由于并行化(螺栓复制),我没有按顺序签入用户。因为这种模式行不通。

如何克服这个问题?如何按顺序处理记录?

【问题讨论】:

    标签: apache-kafka apache-storm


    【解决方案1】:

    Storm 中没有对有序处理的一般系统支持。您要么使用支持有序流处理的不同系统,如 Apache Flink(免责声明,我是 Flink 的提交者),要么您需要自己在螺栓代码中处理它。

    Storm 提供的唯一支持是使用 Trident。您可以将某个时间段(例如一分钟)的元组放入一个批次中。因此,您可以在一分钟内一次处理所有元组。但是,这仅在您的用例允许时才有效,因为您无法将不同批次的元组相互关联。在您的情况下,只有在您知道所有用户都已到达目的地的时间点(并且没有其他用户开始新的交互)的情况下,才会出现这种情况;即,您需要不发生任何两个用户重叠的时间点。 (在我看来,您的用例无法满足此要求)。

    对于非系统,即定制的基于用户代码的解决方案,将有两种方法:

    例如,您可以在处理之前缓冲元组并在螺栓内按时间戳排序。为了使其正常工作,您需要注入标点符号/水印,以确保没有比标点符号更大的时间戳的元组出现在标点符号之后。如果您从每个并行输入子流中收到标点符号,则可以安全地触发排序和处理。

    另一种方法是在区域缓冲区中缓冲每个传入子流的元组(保留子流顺序)并按顺序合并来自缓冲区的元组。这具有避免排序的优点。但是,您需要确保每个运算符发出有序的元组。此外,为了避免阻塞(即,如果没有输入可用于子流),可能还需要标点符号。 (我实现了这种方法。随意使用代码或根据您的需要对其进行调整:https://github.com/mjsax/aeolus/blob/master/queries/utils/src/main/java/de/hub/cs/dbis/aeolus/utils/TimestampMerger.java

    【讨论】:

    • 非常感谢。我不高兴听到这个消息,但是,您的回答非常有帮助。我会看看你的源代码,也谢谢你。
    • 还有一个问题,如果可以的话,我是否可以期望记录是按顺序处理的,但只是以分布式的方式,所以输出不是按顺序的?也就是说,如果我将窗口长度设置为 10 秒,我可以期望聚合在那几秒内产生的所有记录吗?如果我正在从一个文件(当前)生成记录,我可以预期,在一个分区的情况下,KafkaSpout 将按顺序发出记录,通过“shuffle”从 spout 获取记录的螺栓只会分配它的处理,并且螺栓具有10秒的窗口就可以累积记录了吧?
    • 对不起,我听不懂...一般:对于每个子流,记录都是FIFO传输的,即如果按顺序发出,则元组将按顺序接收。
    【解决方案2】:

    Storm 支持此用例。为此,您只需确保在整个流程中保持所有相关组件的顺序。因此,作为第一步,在 Kafka 生产者中,特定用户 ID 的所有消息都应该发送到 Kafka 中的同一个分区。为此,您可以在 KafkaProducer 中实现自定义分区器。具体实现请参考链接here

    由于 Kafka 中的一个分区可以被 Storm 中的一个且只有一个 kafkaSpout 实例读取,因此该分区中的消息在 spout 实例中是按顺序排列的。从而保证同一个用户id的所有消息都到达同一个spout。

    现在是棘手的部分 - 为了维护 bolt 中的顺序,您需要确保基于 Kafka spout 发出的“user_id”字段在 bolt 上使用字段分组。提供的 kafkaSpout 不会破坏要发出的消息字段,您必须覆盖 kafkaSpout 才能读取消息并从 spout 发出“user_id”字段。这样做的一种方法是使用一个中间螺栓,它从 Kafkaspout 读取消息并发出带有“user_id”字段的流。

    当您最终在“user_id”上指定具有字段分组的螺栓时,特定 user_id 值的所有消息将转到螺栓的同一实例,无论螺栓的并行度如何。

    适用于您的案例的示例拓扑如下 -

    builder.setSpout("KafkaSpout", Kafkaspout);

    builder.setBolt("FieldsEmitterBolt", FieldsEmitterBolt).shuffleGrouping("KafkaSpout");

    builder.setBolt("CalculatorBolt", CalculatorBolt).fieldsGrouping("FieldsEmitterBolt", new Fields("user_id")); //Bolt2发出的user_id字段

    --请注意,如果您的 user_id 数量有限,可能会出现所有 user_id 值都来自同一个 CalculatorBolt 实例的情况。这反过来会降低有效的“并行性”!

    【讨论】:

    • FieldsEmitterBolt 真的需要吗?您可以直接从 spout 中使用 fieldsGrouping,不是吗?
    • 在这种特殊情况下,将需要它。 KafkaSpout 不会发出“user_id”作为能够分组的字段。所以下游 Bolt 将无法按用户 ID 对消息进行分组。您有两个选择,要么扩展基本 KafkaSpout 并添加将“user_id”作为字段发出的逻辑,以便能够直接使用 CalculatorBolt 添加字段分组,或者有一个从源元组读取“user_id”的场发射器螺栓,提取“user_id”并作为字段发出,以便下游螺栓并在其上添加其字段分组。
    猜你喜欢
    • 2016-09-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-08-07
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多