【问题标题】:Kafka consumer group and partitions with Spark structured streamingKafka 消费者组和具有 Spark 结构化流的分区
【发布时间】:2019-07-22 05:17:52
【问题描述】:

我有一个包含 3 个分区的 Kafka 主题,我正在使用 spark 结构化流处理这些数据。我有 3 个消费者(假设是消费者组 A)每个从单个分区读取,直到这里一切都是工作文件。

我有一个从同一个主题读取的新要求,我想通过再次创建 3 个消费者(比如消费者组 B)来并行化它,每个从单个分区读取。由于我使用的是结构化流媒体,因此我无法明确提及 group.id

指向单个/相同分区的不同组的消费者会读取所有数据吗?

【问题讨论】:

    标签: apache-spark apache-kafka spark-structured-streaming


    【解决方案1】:

    使用可以使用 group.id 如下进行流式传输

    字符串 processingGroup = "processingGroupA";

    Dataset<Row> raw_df = sparkSession
                          .readStream()
                          .format("kafka")
                          .option("kafka.bootstrap.servers", consumerAppProperties.getProperty(BOOTSTRAP_SERVERS_CONFIG))
                          .option("subscribe", topicName) 
                          .option("startingOffsets", "latest")
                          .option("group.id",  processingGroup)
                          .load();
    

    【讨论】:

      【解决方案2】:

      除非您使用的是 Spark 3.x 或更高版本,否则您将无法在 Kafka 输入流中设置 group.id。正如您所提到的,使用 Spark 3.x,您可以拥有两个不同的结构化流作业,提供两个不同的 group.id,以确保每个作业独立于其他作业读取主题的所有消息。

      对于 Spark 版本 code on GitHub 中查找:

      // Each running query should use its own group id. Otherwise, the query may be only 
      // assigned partial data since Kafka will assign partitions to multiple consumers having
      // the same group id. Hence, we should generate a unique id for each query.
      val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
      

      因此,同样在这种情况下,拥有两个不同的 Streaming Jobs 将确保您拥有两个不同的 ConsumerGroup,这允许两个作业独立于另一个作业读取来自该主题的所有消息。

      【讨论】:

        【解决方案3】:

        来自 Spark 3.0.1 documentation

        默认情况下,每个查询都会生成一个唯一的组 id 用于读取数据。 这确保了每个 Kafka 源都有自己的消费者组 不会面临来自任何其他消费者的干扰,因此可以 读取其订阅主题的所有分区。

        因此,如果您使用assign 选项并提及要使用哪个分区,它将从特定分区读取所有数据,因为默认情况下它将是一个不同的消费者组(group.id)。 assign 选项将 json 字符串作为值,也可以有来自不同主题的多个分区。例如,{"topicA":[0,1],"topicB":[2,4]}

        val df = spark
          .read
          .format("kafka")
          .option("kafka.bootstrap.servers", "host:port")
          .option("assign", "{"topic-name":[0]}")
          .load()
        

        【讨论】:

          猜你喜欢
          • 2019-09-24
          • 1970-01-01
          • 2017-01-04
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2018-06-01
          • 2014-10-25
          • 2020-11-21
          相关资源
          最近更新 更多