【问题标题】:Kafka consumer group and partitions with Spark structured streamingKafka 消费者组和具有 Spark 结构化流的分区
【发布时间】:2019-07-22 05:17:52
【问题描述】:
我有一个包含 3 个分区的 Kafka 主题,我正在使用 spark 结构化流处理这些数据。我有 3 个消费者(假设是消费者组 A)每个从单个分区读取,直到这里一切都是工作文件。
我有一个从同一个主题读取的新要求,我想通过再次创建 3 个消费者(比如消费者组 B)来并行化它,每个从单个分区读取。由于我使用的是结构化流媒体,因此我无法明确提及 group.id。
指向单个/相同分区的不同组的消费者会读取所有数据吗?
【问题讨论】:
标签:
apache-spark
apache-kafka
spark-structured-streaming
【解决方案1】:
使用可以使用 group.id 如下进行流式传输
字符串 processingGroup = "processingGroupA";
Dataset<Row> raw_df = sparkSession
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", consumerAppProperties.getProperty(BOOTSTRAP_SERVERS_CONFIG))
.option("subscribe", topicName)
.option("startingOffsets", "latest")
.option("group.id", processingGroup)
.load();
【解决方案2】:
除非您使用的是 Spark 3.x 或更高版本,否则您将无法在 Kafka 输入流中设置 group.id。正如您所提到的,使用 Spark 3.x,您可以拥有两个不同的结构化流作业,提供两个不同的 group.id,以确保每个作业独立于其他作业读取主题的所有消息。
对于 Spark 版本 code on GitHub 中查找:
// Each running query should use its own group id. Otherwise, the query may be only
// assigned partial data since Kafka will assign partitions to multiple consumers having
// the same group id. Hence, we should generate a unique id for each query.
val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
因此,同样在这种情况下,拥有两个不同的 Streaming Jobs 将确保您拥有两个不同的 ConsumerGroup,这允许两个作业独立于另一个作业读取来自该主题的所有消息。
【解决方案3】:
来自 Spark 3.0.1 documentation:
默认情况下,每个查询都会生成一个唯一的组 id 用于读取数据。
这确保了每个 Kafka 源都有自己的消费者组
不会面临来自任何其他消费者的干扰,因此可以
读取其订阅主题的所有分区。
因此,如果您使用assign 选项并提及要使用哪个分区,它将从特定分区读取所有数据,因为默认情况下它将是一个不同的消费者组(group.id)。 assign 选项将 json 字符串作为值,也可以有来自不同主题的多个分区。例如,{"topicA":[0,1],"topicB":[2,4]}。
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host:port")
.option("assign", "{"topic-name":[0]}")
.load()