【问题标题】:How to get at least N number of logs from Kafka through Spark?如何通过 Spark 从 Kafka 获取至少 N 条日志?
【发布时间】:2021-04-26 15:32:47
【问题描述】:

在 Spark 流式传输中,我会在日志到达时获取它们。但我想一次获得至少 N 个日志。如何实现?

this 的回答看来,Kafka 中有这样的实用程序,但doesn't seem to be present in Spark 使它成为可能。

【问题讨论】:

    标签: apache-spark-sql spark-structured-streaming spark-kafka-integration


    【解决方案1】:

    没有选项可以让您为从 Kafka 接收的消息数量设置 最小 值。 maxOffsetsPerTrigger 选项允许您设置消息的最大值

    如果您希望您的微批处理一次处理更多消息,您可以考虑增加触发间隔。

    另外(参考您提供的链接),这也无法在 Kafka 本身中设置。您可以设置获取字节的最小数量,但不能设置消息编号的最小数量。

    请注意,您可以通过前缀kafka. 在结构化流中通过 readStream 设置所有 Kafka 选项,如 Kafka Specific Configurations 部分所述:

    "Kafka 自己的配置可以通过 DataStreamReader.option 和 kafka. 前缀进行设置,例如 stream.option("kafka.bootstrap.servers", "host:port")。"

    这样,您还可以使用消费者配置kafka.fetch.min.bytes。但是,在 Kafka 2.5.0 安装上使用 Spark 3.0.1 进行测试并没有任何影响。添加配置 kafka.fetch.max.wait.ms 时,我的测试中的获取时间确实发生了变化,但不是以可预测的方式(至少对我而言)。

    查看 Spark 的 KafkaDataConsumer 的源代码,与纯 KafkaConsumer 相比,fetch 似乎没有直接考虑任何最小/最大字节数。

    【讨论】:

    • 谢谢。我试图解决选项(“kafka.fetch.min.bytes”,“1000”)但它不工作,虽然也没有错误。
    • 生产者推送了多个 JSON 字符串。我只想一次获取 5-6 个 JSON 字符串。所以,我保留option("kafka.fetch.min.bytes","100000") 来检查消费者是否仍在获取 JSON 字符串,结果发现它仍在这样做。即,它没有实现给定的表达式。
    • @Mr.Sigma。抱歉,我今天正在测试这个,也意识到这个配置没有任何影响。看起来只有 Trigger time 和 maxOffsetsPerTrigger 对 Kafka fetcher 有任何直接影响。对不起,由于混乱,我已经相应地更新了我的答案。
    猜你喜欢
    • 2018-05-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-11-17
    • 2023-03-09
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多