【问题标题】:Create Spark DataFrame in Spark Streaming from JSON Message on Kafka从 Kafka 上的 JSON 消息在 Spark Streaming 中创建 Spark DataFrame
【发布时间】:2015-09-13 14:05:50
【问题描述】:

我正在 Scala 中实现 Spark Streaming,我从 Kafka 主题中提取 JSON 字符串,并希望将它们加载到数据帧中。有没有办法让 Spark 从 RDD[String] 自行推断架构?

【问题讨论】:

    标签: scala apache-spark dataframe apache-kafka


    【解决方案1】:

    流式传输没有架构推断。您始终可以读取文件并从中提取架构。您还可以将文件提交到版本控制并将其放入 s3 存储桶中。

    【讨论】:

      【解决方案2】:

      您可以使用以下代码从 Kafka 读取消息流,提取 JSON 值并将其转换为 DataFrame:

      val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
      
      messages.foreachRDD { rdd =>
      //extracting the values only
        val df = sqlContext.read.json(rdd.map(x => x._2))
        df.show()
      }
      

      【讨论】:

        【解决方案3】:

        是的,您可以使用以下内容:

        sqlContext.read
        //.schema(schema) //optional, makes it a bit faster, if you've processed it before you can get the schema using df.schema
        .json(jsonRDD)  //RDD[String]
        

        我现在正在尝试做同样的事情。我很好奇你是如何从 Kafka 中得到 RDD[String] 的,但我仍然觉得 Spark+Kafka 只做流媒体而不是一次性“取出里面的东西”。 :)

        【讨论】:

        • 您可以使用 KafkaUtils.createRDD 从 Kafka 获取非流式 RDD
        【解决方案4】:

        在 spark 1.4 中,您可以尝试以下方法从 rdd 生成 Dataframe:

          val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
          val yourDataFrame = hiveContext.createDataFrame(yourRDD)
        

        【讨论】:

        猜你喜欢
        • 2017-10-30
        • 1970-01-01
        • 1970-01-01
        • 2018-06-25
        • 2015-02-04
        • 2017-02-23
        • 2014-12-30
        • 2017-08-30
        • 2021-05-05
        相关资源
        最近更新 更多