【发布时间】:2020-10-07 13:49:48
【问题描述】:
:)
我陷入了一种(奇怪的)境地,简而言之,我不想使用来自 Kafka 的任何新记录,因此暂停 sparkStreaming 消费(InputDStream[ConsumerRecord ]) 为主题中的所有分区,做一些操作,最后,恢复消费记录。
首先... 这可能吗?
我一直在尝试这样的事情:
var consumer: KafkaConsumer[String, String] = _
consumer = new KafkaConsumer[String, String](properties)
consumer.subscribe(java.util.Arrays.asList(topicName))
consumer.pause(consumer.assignment())
...
consumer.resume(consumer.assignment())
但我明白了:
println(s"Assigned partitions: $consumer.assignment()") --> []
println(s"Paused partitions: ${consumer.paused()}") --> []
println(s"Partitions for: ${consumer.partitionsFor(topicNAme)}") --> [Partition(topic=topicAAA, partition=0, leader=1, replicas=[1,2,3], partition=1, ... ]
欢迎任何帮助了解我缺少什么以及为什么在很明显消费者分配了分区时我得到空结果!
版本: 卡夫卡:0.10 火花:2.3.0 斯卡拉:2.11.8
【问题讨论】:
标签: apache-kafka spark-streaming