【问题标题】:How to insert spark structured streaming DataFrame to Hive external table/location?如何将火花结构化流数据帧插入 Hive 外部表/位置?
【发布时间】:2019-05-26 14:03:36
【问题描述】:

关于 Spark 结构化流与 HIVE 表集成的一个查询。

我尝试做一些火花结构化流的例子。

这是我的例子

 val spark =SparkSession.builder().appName("StatsAnalyzer")
     .enableHiveSupport()
     .config("hive.exec.dynamic.partition", "true")
     .config("hive.exec.dynamic.partition.mode", "nonstrict")
     .config("spark.sql.streaming.checkpointLocation", "hdfs://pp/apps/hive/warehouse/ab.db")
     .getOrCreate()

 // Register the dataframe as a Hive table

 val userSchema = new StructType().add("name", "string").add("age", "integer")
 val csvDF = spark.readStream.option("sep", ",").schema(userSchema).csv("file:///home/su/testdelta") 
 csvDF.createOrReplaceTempView("updates")
 val query= spark.sql("insert into table_abcd select * from updates")

 query.writeStream.start()

正如您在将数据帧写入 hdfs 位置的最后一步中看到的那样,数据没有插入到令人兴奋的目录中(我现有的目录有一些按“年龄”分区的旧数据)。

我得到了

spark.sql.AnalysisException : 必须使用 writeStream start() 执行带有流式源的查询

您能帮我解释一下为什么我无法将数据插入到 hdfs 位置的现有目录中吗?或者有没有其他方法可以在蜂巢表上进行“插入”操作?

寻找解决方案

【问题讨论】:

  • 好吧,我的问题不是 readStream...如何将该数据插入现有的配置单元表?我需要插入操作
  • 是的,我得到 spark.sql.AnalysisException :必须使用 writeStream start() 执行带有流式源的查询
  • 我的问题是如何进行像 JOIN 这样的转换?
  • 我想加入来自 kafka 或 csv 的流数据和来自 HIVE 的静态数据......在将所有内容写入 hive 后不起作用,因为我需要以流方式执行所有操作......

标签: apache-spark hive spark-structured-streaming


【解决方案1】:

Spark 结构化流不支持将流查询的结果写入 Hive 表。

scala> println(spark.version)
2.4.0

val sq = spark.readStream.format("rate").load
scala> :type sq
org.apache.spark.sql.DataFrame

scala> assert(sq.isStreaming)

scala> sq.writeStream.format("hive").start
org.apache.spark.sql.AnalysisException: Hive data source can only be used with tables, you can not write files of Hive data source directly.;
  at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:246)
  ... 49 elided

如果不支持目标系统(又名 sink),您可以使用 foreach and foreachBatch operations(突出显示我的):

foreachforeachBatch 操作允许您在流式查询的输出上应用任意操作和编写逻辑。它们的用例略有不同——foreach 允许在每一行上自定义写入逻辑,foreachBatch 允许在每个微批处理的输出上进行任意操作和自定义逻辑。

我认为foreachBatch 是你最好的选择。

import org.apache.spark.sql.DataFrame
sq.writeStream.foreachBatch { case (ds: DataFrame, batchId: Long) =>
  // do whatever you want with your input DataFrame
  // incl. writing to Hive
  // I simply decided to print out the rows to the console
  ds.show
}.start

还有Apache Hive Warehouse Connector,我从未与之合作过,但似乎它可能会有所帮助。

【讨论】:

  • 您好.. 还有一个查询... 是否可以流式传输文件,比如说 csv 文件来触发流式传输?如果我将任何行附加到该文件,那么应该通过火花流捕获..这可能吗?可以将文件添加到目录和流式传输...在 spark 中可以进行文件流式传输?
  • @BigD 不,不可能将行附加到 CSV 文件并“捕获”更新。只会处理新文件。请参阅spark.apache.org/docs/latest/… 上的文档
  • 您好,我已经尝试过但抛出错误.. 请您帮忙... val csvDF = spark.readStream.option("sep", ",").schema(userSchema).csv( "file:///home/sas/testdelta") csvDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => batchDF.show }.start() :56: error: value foreachBatch is not org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row] csvDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => 的成员
  • @BigD 你使用什么 Spark 版本? foreachBatch 自 2.4.0 起可用。请改用foreach
  • 我正在使用 spark 2.3.0 .. 我都尝试了.. 在我的示例中 csvDF 是数据框.. 我如何使用 foreach 将其插入蜂巢.. 这是我的问题
【解决方案2】:

以防万一有人真正尝试了 Jacek Laskowski 的代码,他知道它并不能真正在 Spark 2.4.0 中编译(查看我在 AWS EMR 5.20.0 和 vanilla Spark 上测试的 gist)。所以我想这就是他关于它应该如何在未来的 Spark 版本中工作的想法。 真正的代码是:

scala> import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Dataset

scala> sq.writeStream.foreachBatch((batchDs: Dataset[_], batchId: Long) => batchDs.show).start
res0: org.apache.spark.sql.streaming.StreamingQuery = 
org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@5ebc0bf5

【讨论】:

  • 但是我使用的是 spark 2.3.0.. 你能提供如何在 2.3.0 中使用 foreach
【解决方案3】:

在 HDP 3.1 与 Spark 2.3.2 和 Hive 3.1.0 上,我们使用 Hortonwork 的 spark-llap 库将结构化流数据帧从 Spark 写入 Hive。在GitHub 上,您会找到一些关于其用法的文档。

所需的库 hive-warehouse-connector-assembly-1.0.0.3.1.0.0-78.jar 在Maven 上可用,需要在spark-submit 命令中传递。该库有许多更新的版本,虽然我还没有机会测试它们。

手动创建 Hive 表之后(例如通过beeline/Hive shell),您可以应用以下代码:

import com.hortonworks.hwc.HiveWarehouseSession

val csvDF = spark.readStream.[...].load()

val query = csvDF.writeStream
  .format(HiveWarehouseSession.STREAM_TO_STREAM)
  .option("database", "database_name")
  .option("table", "table_name")
  .option("metastoreUri", spark.conf.get("spark.datasource.hive.warehouse.metastoreUri"))
  .option("checkpointLocation", "/path/to/checkpoint/dir")
  .start()

query.awaitTermination()

【讨论】:

    猜你喜欢
    • 2018-11-08
    • 1970-01-01
    • 2022-01-23
    • 2019-01-19
    • 2021-05-31
    • 2021-07-08
    • 1970-01-01
    • 2019-11-07
    • 2019-10-08
    相关资源
    最近更新 更多