【问题标题】:How to convert spark streaming output into dataframe or storing in table如何将火花流输出转换为数据帧或存储在表中
【发布时间】:2018-02-26 15:23:31
【问题描述】:

我的代码是:

val lines = KafkaUtils.createStream(ssc, "localhost:2181", "spark-streaming-consumer-group", Map("hello" -> 5))
val data=lines.map(_._2)
data.print()

我的输出有 50 个不同的值,格式如下

{"id:st04","data:26-02-2018 20:30:40","temp:30", "press:20"}

谁能帮我将这些数据以表格形式存储为

| id |date               |temp|press|   
|st01|26-02-2018 20:30:40| 30 |20   |  
|st01|26-02-2018 20:30:45| 80 |70   |  

我会很感激的。

【问题讨论】:

    标签: scala apache-spark apache-spark-sql spark-streaming


    【解决方案1】:

    您可以使用 foreachRDD 函数,以及普通的 Dataset API:

    data.foreachRDD(rdd => {
        // rdd is RDD[String]
        // foreachRDD is executed on the  driver, so you can use SparkSession here; spark is SparkSession, for Spark 1.x use SQLContext
        val df = spark.read.json(rdd); // or sqlContext.read.json(rdd)
        df.show(); 
        df.write.saveAsTable("here some unique table ID");
    });
    

    但是,如果您使用 Spark 2.x,我建议您使用结构化流:

    val stream = spark.readStream.format("kafka").load()
    val data = stream
                .selectExpr("cast(value as string) as value")
                .select(from_json(col("value"), schema))
    data.writeStream.format("console").start();
    

    您必须手动指定架构,但这很简单:) 还要在任何处理之前导入org.apache.spark.sql.functions._

    【讨论】:

    • 我不能使用 spark.load.json。即使使用 SQLContext 也会给出错误。我试过这个 val sqlcontext=new SQLContext(sc) import sqlcontext.implicits._ data.foreachRDD{ rdd=> val dfi=sqlcontext.load.json(rdd) 但它对 sqlcontext 上的重载定义给出了模棱两可的参考。
    • @huny 您使用哪个 Spark 版本?错误是什么?一般来说,它应该工作。也许你有一些极端情况:)
    • 我使用的是 spark 2.10 版本 1.5.1
    • @ T. Gawęda 我试过了,val res=rdd.toDF().registerTempTable("sensor") System.ln.print(res).. 没有错误,但显示为空()
    • 非常感谢您的耐心和解答。终于解决了:)
    猜你喜欢
    • 2022-01-23
    • 2023-03-05
    • 1970-01-01
    • 2018-02-22
    • 2023-03-16
    • 2018-02-18
    • 2017-05-01
    • 2016-09-23
    • 2019-04-24
    相关资源
    最近更新 更多