【问题标题】:Spark Kafka Streaming CommitAsync Error [duplicate]Spark Kafka Streaming CommitAsync错误[重复]
【发布时间】:2018-08-26 22:05:59
【问题描述】:

我是 Scala 和 RDD 概念的新手。在 Spark 中使用 Kafka 流 api 从 kafka 读取消息并尝试在业务工作后提交。但我遇到了错误。

注意:使用重新分区进行并行工作

如何从流 APi 中读取偏移量并将其提交给 Kafka ?

scalaVersion := "2.11.8" val sparkVersion = "2.2.0" val connectorVersion = "2.0.7" val kafka_stream_version = "1.6.3"

代码

    val ssc = new StreamingContext(spark.sparkContext, Seconds(2))
    ssc.checkpoint("C:/Gnana/cp")

    val kafkaStream = {

      val kafkaParams = Map[String, Object](
        "bootstrap.servers" -> "localhost:9092",
        "key.deserializer" -> classOf[StringDeserializer],
        "value.deserializer" -> classOf[StringDeserializer],
        "group.id" -> "ignite3",

        "auto.offset.reset" -> "latest",
        "enable.auto.commit" -> (false: java.lang.Boolean)
      )

      val topics = Array("test")
      val numPartitionsOfInputTopic = 2
      val streams = (1 to numPartitionsOfInputTopic) map {
        _ => KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ).map(_.value())
      }

      val unifiedStream = ssc.union(streams)
      val sparkProcessingParallelism = 1
      unifiedStream.repartition(sparkProcessingParallelism)
    }
//Finding offsetRanges
kafkaStream
  .transform {
    rdd =>
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd
  }
//do business operation and persist offset to kafka
kafkaStream.foreachRDD(rdd=> {
  println("offsetRanges:"+offsetRanges)
  rdd.foreach(conRec=> {
    println(conRec)
    kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
  })
})

    println(" Spark parallel reader is ready !!!")

   ssc.start()
    ssc.awaitTermination()
  }

错误

java.io.NotSerializableException: org.apache.spark.streaming.dstream.TransformedDStream 的对象可能作为 RDD 操作闭包的一部分被序列化。这是因为 DStream 对象是从闭包中引用的。请重写此 DStream 中的 RDD 操作以避免这种情况。这是为了避免 Spark 任务因不必要的对象而膨胀。 在 org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply$mcV$sp(DStream.scala:525) 在 org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply(DStream.scala:512) 在 org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply(DStream.scala:512) 在 org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1303) 在 org.apache.spark.streaming.dstream.DStream.writeObject(DStream.scala:512) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:498) 在 java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) 在 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) 在 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 在 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 在 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 在 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 在 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 在 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 在 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 在 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 在 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 在 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 在 java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 在 org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)

【问题讨论】:

  • @yuval ,我正在使用重新分区的概念。它与那个问题不同。有点不同。请帮我解决这个问题
  • Gnana,你真的需要编写并行阅读器程序吗?参数(--num-executors)是否不考虑多个主题分区的并行性?
  • @kiranM 我想处理 parallel 。如果不可能。给我示例代码,它将在业务逻辑写入 Kafka 后读取消息
  • 我的意思是说你不需要手动编写并行代码,DirectStream(createDirectStream)会根据“--num-executors”参数自动处理。
  • @kiranM 我需要从一个具有多个分区的主题中读取消息,并在 Kafka 中处理数据并提交。我不想重新分区。

标签: scala apache-spark spark-streaming rdd scala-streams


【解决方案1】:

在计算偏移范围之前不要重新分区。如果你这样做,那么就会遇到这个问题。要测试您只需删除重新分区,然后尝试运行此应用程序。

【讨论】:

    猜你喜欢
    • 2019-08-08
    • 1970-01-01
    • 2016-03-12
    • 2018-05-17
    • 1970-01-01
    • 1970-01-01
    • 2017-12-28
    • 2019-07-30
    • 2020-10-29
    相关资源
    最近更新 更多