【问题标题】:How spark structured streaming consumers initiated and invoked while reading multi-partitioned kafka topics?在阅读多分区 kafka 主题时,如何启动和调用 spark 结构化流式消费者?
【发布时间】:2019-06-11 21:34:52
【问题描述】:

如果一个 kakfa 主题有多个分区,在 java 中,那些许多消费者实例/线程将在消费者端实例化。

如何在火花流消费者方面处理它?我没有找到很多关于相同的信息。相同的任何示例,即在主题的 spark-streaming-consumer 调用多个消费者。

非常感谢任何设计建议/示例。

问候, 夏姆

【问题讨论】:

  • 请停止使用 Spark 电子邮件用户列表宣传您的问题
  • @Kiwy 当然先生,但是应该发送什么样的电子邮件来触发电子邮件用户列表?很抱歉,我是针对火花相关的问题。
  • 直接向邮件列表提出您的问题,不要将其用作增加您在本网站上的浏览量的方式。
  • @Kiwy 有时在邮件中没有人回复,我需要在邮件和 SOF 中复制问题。先生,我无意增加现场浏览量。如果有人可以提供帮助,我只需要解决方案。

标签: apache-spark apache-kafka spark-streaming kafka-consumer-api


【解决方案1】:

如果 Kafka 有多个分区,这意味着消费者可以通过并行执行某项任务从中受益。特别是 spark-streaming 在内部可以通过增加 num-executors 参数来加速作业。这与 Kafka 拥有的分区数量有关,例如如果您的 Kafka 分区数量与 spark 中的 num-executors 数量相同,理论上所有 executor 都可以一次读取所有分区,这显然提高系统吞吐量。

【讨论】:

  • 感谢您的快速回复......“特别是内部的 spark-streaming 可以通过增加 num-executors 参数来加速工作。”这是否意味着我知道需要与 spark-consumer 相关的任何内容...即假设我在一个主题中有 4 个分区,执行器的数量为 10,然后在 spark 集群上运行时自动调用 4 个消费者 .... 我不需要做任何事情@ spark-consumer 方面。对吗?
  • 是的,但请记住,例如,10 个执行程序中的 6 个将在从 kafka 读取时处于空闲状态,但是如果您在读取后将数据集重新分区为 10,那么所有执行程序都将参与其中.通常,分区数或多或少会为您提供比例因子
【解决方案2】:

只要 spark 有足够的资源,Spark 流总是从 Kafka 中的所有可用分区并行读取数据。 Spark 开箱即用,我们不需要为此编写任何代码。

例如,如果您的 Kafka 主题有 4 个分区,那么如果您启动 你的火花作业有 2 个执行器,每个执行器有 2 个核心,然后是你的火花作业 将启动 4 个任务从 4 个 Kafka 并行读取数据 分区。

如果您需要更多信息,请随时发表评论。

https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html

import java.sql.Timestamp

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import java.time.{LocalDate, LocalDateTime}
import java.util.Calendar



object SparkKafka {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder()
      .appName("test_app")
      .getOrCreate()
    val sparkContext = spark.sparkContext


    val ssc = new StreamingContext(sparkContext, Seconds(1)) // the polling frequency is 2 seconds, can be modified based on the BM requirements.
///val currentHour = now.get(Calendar.HOUR_OF_DAY)

    log.info("Before starting the Stream -->>")
    val stream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String]
      (Array.apply("Kafka_topic_name"), getKafkaParams()))

      .map(record => record.value)

    stream.foreachRDD { rdd =>

      try {
        if (!rdd.isEmpty()) {
          log.info("rdd is not empty and saving to -->>"+LocalDate.now.getYear+"/"+LocalDate.now.getMonth+"/"+LocalDate.now.getDayOfMonth+"/"+LocalDateTime.now().getHour)
          rdd.saveAsTextFile("hdfs:///<folder to save>") //TODO::: Externalize the HDFS location to Props




          LocalDate.now.getMonth


         if (null != args && null != args {
            0
          } && args {
            0
          }.equals("log")) {
            rdd.foreach(x => print("Message read and saved TO S3 bucket----*****--->>" + x))
          }
        }
      } catch {

        case t: Throwable =>
          t.printStackTrace() // TODO: handle error)
          log.error("Exception occured while processing the data exception is {}", t.getCause)
      }
    }

    ssc.start()
    log.info("started now-->> " + compat.Platform.currentTime)
    ssc.awaitTermination()

  }

  def getKafkaParams(): Map[String, Object] = {
    Map[String, Object]("bootstrap.servers" -> "host:port
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "Group_Name",
      //      "sasl.kerberos.service.name" -> "kafka",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (true: java.lang.Boolean))
  }

}

【讨论】:

  • 谢谢苏雷什。如何处理结构化流任何样本中的偏移量?
  • Spark 为您管理偏移量。 spark 将偏移量存储在一个名为 _consumer_offsets 的特殊主题中。如果您的 spark 作业停止并再次返回,那么您的作业将从上次离开的位置读取数据。
  • 很高兴分享更多细节,请告诉我
  • 如果你想自己管理偏移量,那么你必须使用 hive 或任何其他持久存储。
猜你喜欢
  • 2019-07-22
  • 2018-06-01
  • 2017-10-17
  • 2018-01-19
  • 2019-09-24
  • 2019-11-15
  • 2020-06-27
  • 2018-01-16
  • 2021-11-05
相关资源
最近更新 更多