【发布时间】:2019-12-28 10:49:01
【问题描述】:
如何在 kafka spout 中微批处理事件以减少后续螺栓中的 IO 调用? 期望是:使用 kafka 中的事件发出最大大小为 100 的批次,但最多等待 1 秒以形成该批次。如果 1 秒内没有足够的事件,则发出可用事件。
我可以通过“source.groupedWithin”方法在 Akka 中实现相同的目的。我如何对 kafka spout 做同样的事情?
【问题讨论】:
如何在 kafka spout 中微批处理事件以减少后续螺栓中的 IO 调用? 期望是:使用 kafka 中的事件发出最大大小为 100 的批次,但最多等待 1 秒以形成该批次。如果 1 秒内没有足够的事件,则发出可用事件。
我可以通过“source.groupedWithin”方法在 Akka 中实现相同的目的。我如何对 kafka spout 做同样的事情?
【问题讨论】:
查看 Storm 的 tick tuples,它提供了一种将预定的元组(ticks)发送到 bolts 的方法。对于您的情况,您可以每秒配置一个刻度。与此同时,bolt 将简单地处理来自 Kafka spout 的元组并对它们进行批处理,当它达到 100 条消息(在你的情况下)或当你得到一个 tick 元组时发送一个批处理。请注意,您确实需要检查每个输入元组以查看它是滴答声还是 Kafka 消息。
【讨论】:
除了Chris的回答,你还可以使用Storm的窗口化功能https://storm.apache.org/releases/2.0.0/Windowing.html。您可以在 https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingWindowTopology.java
找到一个这样的例子如果您愿意,也可以使用 Trident。设置 KafkaTridentSpoutOpaque 后,您可以使用 Kafka 客户端设置来控制每批中有多少消息。您可以使用KafkaSpoutConfigpollTimeoutMs 来设置要等待批次填充的时间,并通过KafkaSpoutConfig.Builder.setProp 设置max.poll.records Kafka 客户端配置来控制批次中的最大记录数。
有关使用 Kafka Trident spout 的完整示例,请参阅https://github.com/apache/storm/blob/master/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientTopologyNamedTopics.java
【讨论】: