【问题标题】:Error when decoding the Proto Buf messages in Spark Streaming , using scalapb使用 scalapb 在 Spark Streaming 中解码 Proto Buf 消息时出错
【发布时间】:2017-03-31 15:41:04
【问题描述】:

这是一个使用 Proto Buf 编码的 Kafka 消息的 Spark Streaming 应用程序。使用scalapb 库。我收到以下错误。请帮忙。

> com.google.protobuf.InvalidProtocolBufferException: While parsing a
> protocol message, the input ended unexpectedly in the middle of a
> field.  This could mean either that the input has been truncated or
> that an embedded message misreported its own length.  at
> com.google.protobuf.InvalidProtocolBufferException.truncatedMessage(InvalidProtocolBufferException.java:82)
>   at
> com.google.protobuf.CodedInputStream.skipRawBytesSlowPath(CodedInputStream.java:1284)
>   at
> com.google.protobuf.CodedInputStream.skipRawBytes(CodedInputStream.java:1267)
>   at
> com.google.protobuf.CodedInputStream.skipField(CodedInputStream.java:198)
>   at com.example.protos.demo.Student.mergeFrom(Student.scala:59)  at
> com.example.protos.demo.Student.mergeFrom(Student.scala:11)   at
> com.trueaccord.scalapb.LiteParser$.parseFrom(LiteParser.scala:9)  at
> com.trueaccord.scalapb.GeneratedMessageCompanion$class.parseFrom(GeneratedMessageCompanion.scala:103)
>   at com.example.protos.demo.Student$.parseFrom(Student.scala:88)     at
> com.trueaccord.scalapb.GeneratedMessageCompanion$class.parseFrom(GeneratedMessageCompanion.scala:119)
>   at com.example.protos.demo.Student$.parseFrom(Student.scala:88)     at
> StudentConsumer$.StudentConsumer$$parseLine$1(StudentConsumer.scala:24)
>   at StudentConsumer$$anonfun$1.apply(StudentConsumer.scala:30)   at
> StudentConsumer$$anonfun$1.apply(StudentConsumer.scala:30)    at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
> Source)   at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
>   at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
>   at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
>   at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
>   at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)     at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)    at
> org.apache.spark.scheduler.Task.run(Task.scala:86)    at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)

以下是我的代码...

object StudentConsumer {
  import com.trueaccord.scalapb.spark._
  import org.apache.spark.sql.{ SparkSession}
  import com.example.protos.demo._

  def main(args : Array[String]) {

    val spark = SparkSession.builder.
      master("local")
      .appName("spark session example")
      .getOrCreate()

    import spark.implicits._

    def parseLine(s: String): Student =
      Student.parseFrom(
        org.apache.commons.codec.binary.Base64.decodeBase64(s))

    val ds1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","student").load()

    val ds2 = ds1.selectExpr("CAST(value AS String)").as[String].map(str => parseLine(str))

    val query = ds2.writeStream
      .outputMode("append")
      .format("console")
      .start()

    query.awaitTermination()

  }
}

【问题讨论】:

    标签: scala apache-spark-sql protocol-buffers spark-streaming scalapb


    【解决方案1】:

    根据错误,您尝试解析的消息似乎被截断或损坏。发送方是否在将 protobuf 发送到 Kafka 之前将其编码为 base64?

    如果是这样,值得将 println(s) 添加到 parseLine 中,看看你得到的结果是否符合你的预期(也许这个 CAST(value as String) 对你的输入有一些意想不到的后果)。

    最后,以下 Kafka/Scala Streaming/ScalaPB 示例可能对您有所帮助,它假设消息作为原始字节发送到 Kafka:

    https://github.com/thesamet/sbtb2016-votes/blob/master/spark/src/main/scala/votes/Aggregator.scala

    【讨论】:

    • 谢谢,我将其用作参考,但在尝试使其适用于结构化流时我搞砸了。我正在使用 spark.readStream。请帮助将以下代码 sn-p 转换为结构化流式传输.. val votesAsBytes = KafkaUtils.createDirectStream[String, Array[Byte]]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, Array[Byte]](Array ("votes"), kafkaParams)) val votes: DStream[Vote] = votesAsBytes.map { (cr: ConsumerRecord[String, Array[Byte]]) => Vote.parseFrom(cr.value()) }
    【解决方案2】:

    感谢@thesamet 的反馈。

    以下代码有效...

      def main(args : Array[String]) {
    
        val spark = SparkSession.builder.
          master("local")
          .appName("spark session example")
          .getOrCreate()
    
        import spark.implicits._
    
        val ds1 = spark.readStream.format("kafka").
          option("kafka.bootstrap.servers","localhost:9092").
          option("subscribe","student").load()
    
        val ds2 = ds1.map(row=> row.getAs[Array[Byte]]("value")).map(Student.parseFrom(_))
    
        val query = ds2.writeStream
          .outputMode("append")
          .format("console")
          .start()
    
        query.awaitTermination()
    
      }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2014-08-10
      • 1970-01-01
      • 2017-10-30
      • 2017-07-13
      • 2021-12-26
      • 2023-03-03
      • 1970-01-01
      相关资源
      最近更新 更多