【问题标题】:Use schema to convert AVRO messages with Spark to DataFrame使用模式将带有 Spark 的 AVRO 消息转换为 DataFrame
【发布时间】:2016-12-27 05:46:53
【问题描述】:

有没有办法使用架构将 消息从 转换为?用户记录的架构文件:

{
  "fields": [
    { "name": "firstName", "type": "string" },
    { "name": "lastName", "type": "string" }
  ],
  "name": "user",
  "type": "record"
}

并从SqlNetworkWordCount exampleKafka, Spark and Avro - Part 3, Producing and consuming Avro messages 编码sn-ps 以读取消息。

object Injection {
  val parser = new Schema.Parser()
  val schema = parser.parse(getClass.getResourceAsStream("/user_schema.json"))
  val injection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)
}

...

messages.foreachRDD((rdd: RDD[(String, Array[Byte])]) => {
  val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
  import sqlContext.implicits._

  val df = rdd.map(message => Injection.injection.invert(message._2).get)
    .map(record => User(record.get("firstName").toString, records.get("lastName").toString)).toDF()

  df.show()
})

case class User(firstName: String, lastName: String)

不知何故,除了使用案例类将 AVRO 消息转换为 DataFrame 之外,我找不到其他方法。是否有可能改用模式?我正在使用Spark 1.6.2Kafka 0.10

完整的代码,如果你感兴趣的话。

import com.twitter.bijection.Injection
import com.twitter.bijection.avro.GenericAvroCodecs
import kafka.serializer.{DefaultDecoder, StringDecoder}
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.spark.{SparkConf, SparkContext}

object ReadMessagesFromKafka {
  object Injection {
    val parser = new Schema.Parser()
    val schema = parser.parse(getClass.getResourceAsStream("/user_schema.json"))
    val injection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)
  }

  def main(args: Array[String]) {
    val brokers = "127.0.0.1:9092"
    val topics = "test"

    // Create context with 2 second batch interval
    val sparkConf = new SparkConf().setAppName("ReadMessagesFromKafka").setMaster("local[*]")
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](
  ssc, kafkaParams, topicsSet)

    messages.foreachRDD((rdd: RDD[(String, Array[Byte])]) => {
      val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
      import sqlContext.implicits._

      val df = rdd.map(message => Injection.injection.invert(message._2).get)
    .map(record => User(record.get("firstName").toString, records.get("lastName").toString)).toDF()

      df.show()
    })

    // Start the computation
    ssc.start()
    ssc.awaitTermination()
  }
}

/** Case class for converting RDD to DataFrame */
case class User(firstName: String, lastName: String)

/** Lazily instantiated singleton instance of SQLContext */
object SQLContextSingleton {
  @transient  private var instance: SQLContext = _

  def getInstance(sparkContext: SparkContext): SQLContext = {
    if (instance == null) {
      instance = new SQLContext(sparkContext)
    }
    instance
  }
}

【问题讨论】:

    标签: avro kafka spark dataframe scala apache-spark apache-kafka spark-streaming avro


    【解决方案1】:

    OP 可能解决了这个问题,但为了将来参考,我很一般地解决了这个问题,所以认为在这里发帖可能会有所帮助。

    所以一般来说,您应该将 Avro 模式转换为 spark StructType,并将 RDD 中的对象转换为 Row[Any],然后使用:

    spark.createDataFrame(<RDD[obj] mapped to RDD[Row}>,<schema as StructType>
    

    为了转换 Avro 架构,我使用了 spark-avro,如下所示:

    SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
    

    RDD 的转换更加棘手.. 如果您的架构很简单,您可能只需做一个简单的映射.. 像这样:

    rdd.map(obj=>{
        val seq = (obj.getName(),obj.getAge()
        Row.fromSeq(seq))
        })
    

    在本例中,对象有 2 个字段名称和年龄。

    重要的是要确保 Row 中的元素与之前的 StructType 中的字段的顺序和类型相匹配。

    在我的特殊情况下,我有一个更复杂的对象,我想对其进行通用处理以支持未来的架构更改,因此我的代码要复杂得多。

    OP 建议的方法也应该适用于某些情况,但很难暗示复杂对象(不是原始对象或案例类)

    另一个提示是,如果您在一个类中有一个类,您应该将该类转换为一个 Row,以便包装类将转换为类似:

    Row(Any,Any,Any,Row,...)
    

    您还可以查看我之前提到的 spark-avro 项目,了解如何将对象转换为行。我自己使用了那里的一些逻辑

    如果有人阅读本文需要进一步帮助,请在 cmets 中问我,我会尽力提供帮助

    类似的问题也解决了here

    【讨论】:

      【解决方案2】:

      请看这个 https://github.com/databricks/spark-avro/blob/master/src/test/scala/com/databricks/spark/avro/AvroSuite.scala

      所以不是

       val df = rdd.map(message => Injection.injection.invert(message._2).get)
      .map(record => User(record.get("firstName").toString,records.get("lastName").toString)).toDF()
      

      你可以试试这个

       val df = spark.read.avro(message._2.get)
      

      【讨论】:

      • spark-avro 2.0.1 需要路径作为输入并且无法处理 Array[Byte]。因此spark.read.avro(message._2) 会引发类型不匹配。
      • 如何对多条消息进行微批处理并将其写入 /tmp/ 目录并从中读取?如果您使用的是 Spark 2.0,这将起作用: spark.read.format("com.databricks.spark.avro").schema(DataType.fromJson("path/to/schema.json").asInstanceOf[StructType]) .load("/tmp/").show()
      【解决方案3】:

      我曾处理过类似的问题,但使用的是 Java。所以不确定 Scala,但请查看库 com.databricks.spark.avro

      【讨论】:

        【解决方案4】:

        对于有兴趣以无需停止和重新部署 Spark 应用程序(假设您的应用程序逻辑可以处理此问题)的方式处理架构更改的方式处理此问题的任何人,请参阅此question/answer

        【讨论】:

          猜你喜欢
          • 2018-02-22
          • 1970-01-01
          • 2017-03-17
          • 1970-01-01
          • 2016-11-03
          • 2019-11-22
          • 2021-09-14
          • 1970-01-01
          • 2021-05-01
          相关资源
          最近更新 更多