【问题标题】:Pause and resume KafkaConsumer in SparkStreaming在 Spark Streaming 中暂停和恢复 Kafka Consumer
【发布时间】:2020-10-07 13:49:48
【问题描述】:

:)

我陷入了一种(奇怪的)境地,简而言之,我不想使用来自 Kafka 的任何新记录,因此暂停 sparkStreaming 消费(InputDStream[ConsumerRecord ]) 为主题中的所有分区,做一些操作,最后,恢复消费记录。

首先... 这可能吗?

我一直在尝试这样的事情:

var consumer: KafkaConsumer[String, String] = _    
consumer = new KafkaConsumer[String, String](properties)    
consumer.subscribe(java.util.Arrays.asList(topicName))

consumer.pause(consumer.assignment())
...
consumer.resume(consumer.assignment())

但我明白了:

println(s"Assigned partitions: $consumer.assignment()") --> []
println(s"Paused partitions: ${consumer.paused()}") --> []
println(s"Partitions for: ${consumer.partitionsFor(topicNAme)}") --> [Partition(topic=topicAAA, partition=0, leader=1, replicas=[1,2,3], partition=1, ... ]

欢迎任何帮助了解我缺少什么以及为什么在很明显消费者分配了分区时我得到空结果!

版本: 卡夫卡:0.10 火花:2.3.0 斯卡拉:2.11.8

【问题讨论】:

    标签: apache-kafka spark-streaming


    【解决方案1】:

    是的,这是可能的 在代码中添加检查点并传递持久存储(本地磁盘、S3、HDFS)路径

    每当您开始/恢复您的工作时,它都会从检查点获取带有消费者偏移量的 Kafka 消费者组信息,并从停止的位置开始处理。

    val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
    

    Spark Check-=pointing 是一种机制,不仅可以保存偏移量,还可以保存 Stages 和 Jobs 的 DAG 的序列化状态。因此,每当您使用新代码重新开始工作时,它都会

    1. 读取并处理序列化数据
    2. 如果您的 Spark 应用中有任何代码更改,请清理缓存的 DAG 阶段
    3. 使用最新代码从新数据恢复处理。

    现在从磁盘读取只是 Spark 加载 Kafka 偏移量、DAG 和旧的未完成处理数据所需的一次性操作

    一旦完成,它将始终按默认或指定的检查点间隔将数据保存到磁盘。

    Spark 流提供了指定 Kafka 组 ID 的选项,但 Spark 结构化流不提供。

    【讨论】:

    • 首先感谢您的回复。现在,请帮助我更好地理解它。我不明白为什么用于“使用状态转换”或“从驱动程序故障中恢复”的机制应该实现我的应用程序不会得到空的“consumer.assignment()”。如果应用程序崩溃,我可以理解它,但如果到目前为止执行一直在正确运行..为什么从磁盘读取信息会解决它?特别是因为:issues.apache.org/jira/browse/SPARK-13316 让我改变了我的整个代码
    • 添加了几行解释@Borja
    猜你喜欢
    • 2016-11-03
    • 2017-03-27
    • 2020-11-11
    • 2023-03-02
    • 2018-08-26
    • 1970-01-01
    • 2020-02-08
    • 1970-01-01
    相关资源
    最近更新 更多