【问题标题】:Spark Streaming - Join on multiple kafka stream operation is slowSpark Streaming - 加入多个kafka流操作很慢
【发布时间】:2020-03-16 12:11:28
【问题描述】:

我有 3 个 kafka 流,每个流有 60 万多条记录,火花流需要 10 多分钟来处理流之间的简单连接。

Spark 集群配置:

这就是我在 spark(scala) 中读取 kafka 流到 tempviews 的方式

spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "KAFKASERVER")
.option("subscribe", TOPIC1)
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest").load()
.selectExpr("CAST(value AS STRING) as json")
.select( from_json($"json", schema=SCHEMA1).as("data"))
.select($"COL1", $"COL2")
.createOrReplaceTempView("TABLE1")

我使用 spark spark sql 加入 3 个 TABLES

select COL1, COL2 from TABLE1   
JOIN TABLE2 ON TABLE1.PK = TABLE2.PK
JOIN TABLE3 ON TABLE2.PK = TABLE3.PK

作业的执行:

我是否错过了一些我必须研究的 spark 配置?

【问题讨论】:

  • 这可能是由于 Kafka 主题的分区非常低导致 Spark 中的并行度低。你的 Kafka 主题有多少个分区?你使用了多少个执行器?

标签: scala apache-spark apache-kafka spark-structured-streaming


【解决方案1】:

我发现了同样的问题。而且我发现流和流之间的连接需要更多的内存,就像我想象的那样。当我增加每个执行程序的核心时,问题就消失了。

【讨论】:

    【解决方案2】:

    很遗憾,没有任何测试数据,也没有您期望的结果数据,所以我可以玩,所以我无法给出确切的正确答案。

    @Asteroid 注释是有效的,因为我们看到每个阶段的任务数是 1。通常 Kafka 流使用接收器来消费主题;每个接收者只创建一个任务。一种方法是使用多个接收器/拆分分区/增加资源(核心数量)以增加并行度。

    如果这仍然不起作用,另一种方法是使用 Kafka API 创建DirectStream。根据https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/streaming/kafka/KafkaUtils.html 的文档,这个创建了一个输入流,它直接从 Kafka Brokers 拉取消息,而不使用任何接收器。

    我初步制作了一个示例代码,用于在下面创建直接流。您可能想了解这一点以根据自己的喜好进行定制。

    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.streaming.kafka010._
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
    
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "KAFKASERVER",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "use_a_separate_group_id_for_each_stream",
      "startingOffsets" -> "earliest",
      "endingOffsets" -> "latest"
    )
    
    val topics = Array(TOPIC1)
    val stream = KafkaUtils.createDirectStream[String, String](
      streamingContext,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )
    val schema = StructType(StructField('data', StringType, True))
    val df = spark.createDataFrame([], schema)
    val dstream = stream.map(_.value())
    dstream.forEachRDD(){rdd:RDD[String], time:Time} => {
        val tdf = spark.read.schema(schema).json(rdd)
        df = df.union(tdf)
        df.createOrReplaceTempView("TABLE1")
    }
    

    一些相关资料:

    https://mapr.com/blog/monitoring-real-time-uber-data-using-spark-machine-learning-streaming-and-kafka-api-part-2/(向下滚动到 Kafka 消费者代码部分。其他部分无关)

    https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html(用于创建直接流的 Spark Doc)

    祝你好运!

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-03-12
      • 1970-01-01
      • 2017-03-29
      • 1970-01-01
      • 1970-01-01
      • 2019-08-08
      • 1970-01-01
      • 2019-07-12
      相关资源
      最近更新 更多