【发布时间】:2019-05-26 14:03:36
【问题描述】:
关于 Spark 结构化流与 HIVE 表集成的一个查询。
我尝试做一些火花结构化流的例子。
这是我的例子
val spark =SparkSession.builder().appName("StatsAnalyzer")
.enableHiveSupport()
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("spark.sql.streaming.checkpointLocation", "hdfs://pp/apps/hive/warehouse/ab.db")
.getOrCreate()
// Register the dataframe as a Hive table
val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark.readStream.option("sep", ",").schema(userSchema).csv("file:///home/su/testdelta")
csvDF.createOrReplaceTempView("updates")
val query= spark.sql("insert into table_abcd select * from updates")
query.writeStream.start()
正如您在将数据帧写入 hdfs 位置的最后一步中看到的那样,数据没有插入到令人兴奋的目录中(我现有的目录有一些按“年龄”分区的旧数据)。
我得到了
spark.sql.AnalysisException : 必须使用 writeStream start() 执行带有流式源的查询
您能帮我解释一下为什么我无法将数据插入到 hdfs 位置的现有目录中吗?或者有没有其他方法可以在蜂巢表上进行“插入”操作?
寻找解决方案
【问题讨论】:
-
好吧,我的问题不是 readStream...如何将该数据插入现有的配置单元表?我需要插入操作
-
是的,我得到 spark.sql.AnalysisException :必须使用 writeStream start() 执行带有流式源的查询
-
我的问题是如何进行像 JOIN 这样的转换?
-
我想加入来自 kafka 或 csv 的流数据和来自 HIVE 的静态数据......在将所有内容写入 hive 后不起作用,因为我需要以流方式执行所有操作......
标签: apache-spark hive spark-structured-streaming