【发布时间】:2021-04-26 15:32:47
【问题描述】:
在 Spark 流式传输中,我会在日志到达时获取它们。但我想一次获得至少 N 个日志。如何实现?
从this 的回答看来,Kafka 中有这样的实用程序,但doesn't seem to be present in Spark 使它成为可能。
【问题讨论】:
标签: apache-spark-sql spark-structured-streaming spark-kafka-integration
在 Spark 流式传输中,我会在日志到达时获取它们。但我想一次获得至少 N 个日志。如何实现?
从this 的回答看来,Kafka 中有这样的实用程序,但doesn't seem to be present in Spark 使它成为可能。
【问题讨论】:
标签: apache-spark-sql spark-structured-streaming spark-kafka-integration
没有选项可以让您为从 Kafka 接收的消息数量设置 最小 值。 maxOffsetsPerTrigger 选项允许您设置消息的最大值。
如果您希望您的微批处理一次处理更多消息,您可以考虑增加触发间隔。
另外(参考您提供的链接),这也无法在 Kafka 本身中设置。您可以设置获取字节的最小数量,但不能设置消息编号的最小数量。
请注意,您可以通过前缀kafka. 在结构化流中通过 readStream 设置所有 Kafka 选项,如 Kafka Specific Configurations 部分所述:
"Kafka 自己的配置可以通过 DataStreamReader.option 和 kafka. 前缀进行设置,例如 stream.option("kafka.bootstrap.servers", "host:port")。"
这样,您还可以使用消费者配置kafka.fetch.min.bytes。但是,在 Kafka 2.5.0 安装上使用 Spark 3.0.1 进行测试并没有任何影响。添加配置 kafka.fetch.max.wait.ms 时,我的测试中的获取时间确实发生了变化,但不是以可预测的方式(至少对我而言)。
查看 Spark 的 KafkaDataConsumer 的源代码,与纯 KafkaConsumer 相比,fetch 似乎没有直接考虑任何最小/最大字节数。
【讨论】:
option("kafka.fetch.min.bytes","100000") 来检查消费者是否仍在获取 JSON 字符串,结果发现它仍在这样做。即,它没有实现给定的表达式。