【问题标题】:How to manually set group.id and commit kafka offsets in spark structured streaming?如何在 Spark 结构化流中手动设置 group.id 并提交 kafka 偏移量?
【发布时间】:2018-11-23 11:04:58
【问题描述】:

我正在阅读 Spark 结构化流式传输 - Kafka 集成指南here

在这个链接上被告知

enable.auto.commit:Kafka 源不提交任何偏移量。

那么,一旦我的 spark 应用程序成功处理了每条记录,我该如何手动提交偏移量?

【问题讨论】:

    标签: apache-spark apache-kafka spark-structured-streaming spark-kafka-integration


    【解决方案1】:

    tl;博士

    无法向 Kafka 提交任何消息。从 Spark 版本 3.x 开始,您可以定义 Kafka 消费者组的名称,但是,这仍然不允许您提交任何消息。


    自 Spark 3.0.0

    根据Structured Kafka Integration Guide,您可以提供ConsumerGroup 作为选项kafka.group.id

    val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
      .option("subscribe", "topic1")
      .option("kafka.group.id", "myConsumerGroup")
      .load()
    

    但是,Spark 仍然不会提交任何偏移量,因此您将无法“手动”向 Kafka 提交偏移量。此功能旨在处理 Kafka 的最新功能 Authorization using Role-Based Access Control,您的 ConsumerGroup 通常需要遵循命名约定。

    讨论并解决了 Spark 3.x 应用程序的完整示例here

    直到 Spark 2.4.x

    Spark Structured Streaming + Kafka integration Guide 清楚地说明了它如何管理 Kafka 偏移量。 Spark 将不会将任何消息提交回 Kafka,因为它依赖于内部偏移管理来实现容错。

    用于管理偏移量的最重要的 Kafka 配置是:

    • group.id: Kafka 源将为每个查询自动创建一个唯一的组 ID。根据code group.id 将设置为
    val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
    
    • auto.offset.reset: 设置源选项startingOffsets 来指定从哪里开始。 Structured Streaming 管理内部使用的偏移量,而不是依赖 kafka Consumer 来完成。
    • enable.auto.commit: Kafka 源不提交任何偏移量。

    因此,在结构化流中,目前无法为 Kafka 消费者定义您的自定义 group.id,并且结构化流在内部管理偏移量,而不是提交回 Kafka(也不是自动)。

    2.4.x 在行动

    假设您有一个简单的 Spark Structured Streaming 应用程序,它可以读取和写入 Kafka,如下所示:

    // create SparkSession
    val spark = SparkSession.builder()
      .appName("ListenerTester")
      .master("local[*]")
      .getOrCreate()
    
    // read from Kafka topic
    val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "testingKafkaProducer")
      .option("failOnDataLoss", "false")
      .load()
    
    // write to Kafka topic and set checkpoint directory for this stream
    df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("topic", "testingKafkaProducerOut")
      .option("checkpointLocation", "/home/.../sparkCheckpoint/")
      .start()
    

    Spark 的偏移管理

    一旦提交了这个应用程序并正在处理数据,可以在检查点目录中找到相应的偏移量:

    myCheckpointDir/offsets/

    {"testingKafkaProducer":{"0":1}}
    

    这里检查点文件中的条目确认下一个要消耗的分区0的偏移量是1。这意味着应用程序已经处理了来自名为testingKafkaProducer 的主题的分区0 的偏移量0

    有关容错语义的更多信息请参见 Spark Documentation

    Kafka 的偏移管理

    但是,如文档中所述,偏移量提交回 Kafka。 这可以通过执行Kafka安装的kafka-consumer-groups.sh来检查。

    ./kafka/current/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group "spark-kafka-source-92ea6f85-[...]-driver-0 "

    TOPIC                PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG  CONSUMER-ID      HOST         CLIENT-ID
    testingKafkaProducer 0          -               1               -    consumer-1-[...] /127.0.0.1   consumer-1
    
    Kafka 不知道此应用程序的当前偏移量,因为它从未被提交。

    可能的解决方法

    请仔细阅读以下来自 Spark 提交者 @JungtaekLim 关于解决方法的 cmets:“Spark 的容错保证基于 Spark 完全控制偏移管理这一事实,如果他们试图修改它。(例如,如果他们更改为向 Kafka 提交偏移量,则没有批处理信息,并且如果 Spark 需要移回特定批处理,则“后面”保证不再有效。)”

    我在网上看到的一些研究是,您可以在 Spark 的自定义 StreamingQueryListener 中的 onQueryProgress 方法的回调函数中提交偏移量。这样,您可以拥有一个跟踪当前进度的消费者组。但是,它的进展不一定与实际的消费者群体保持一致。

    以下是一些您可能会发现有用的链接:

    【讨论】:

    • 感谢您提供我的 PR 和存储库,但我必须更正一件事 - 无法手动提交偏移量。这是为了让 Spark 完全控制偏移管理,而不是依赖 Kafka。我的 PR 和存储库是在“不同”组 ID 上提交偏移量,以便最终用户能够利用这些信息与 Kafka 生态系统 UI/管理工具集成。
    • 嗨@JungtaekLim,谢谢你说清楚。我重新阅读了我的答案,您的陈述似乎已经反映在我的句子“结构化流式处理在内部管理偏移量而不是提交回 Kafka(也不是自动)”中。如果您仍然认为这令人困惑或具有误导性,请告诉我。
    • 很抱歉,但我不得不再说一遍,没有可能的方法,最终用户不应该尝试这样做。答案应该是NO。 Spark 的容错保证是基于 Spark 可以完全控制偏移管理的事实,如果他们试图修改它,他们就会取消保证。 (例如,如果他们更改为向 Kafka 提交偏移量,则没有批处理信息,并且如果 Spark 需要移回特定的批处理“后面”保证不再有效。)
    • 再次感谢@JungtaekLim 花时间阅读“旧”答案并提供一些非常有用的见解。我完全同意你的论点,并试图在我的回答中指出这一点。
    • 我的荣幸。实际上,我以某种方式访问​​了这个,因为有人被这个答案误导了,并将我的项目视为实际问题的解决方案(关于 Kafka 数据源上的偏移量问题),而事实并非如此,也不可能。
    猜你喜欢
    • 2015-04-09
    • 2020-02-15
    • 2021-04-06
    • 1970-01-01
    • 2018-09-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多