【发布时间】:2017-03-21 07:28:11
【问题描述】:
对于spark sql,我们应该如何从HDFS中的一个文件夹中获取数据,进行一些修改,并将更新的数据保存到HDFS中的同一文件夹中通过覆盖保存模式而不得到FileNotFoundException?
import org.apache.spark.sql.{SparkSession,SaveMode}
import org.apache.spark.SparkConf
val sparkConf: SparkConf = new SparkConf()
val sparkSession = SparkSession.builder.config(sparkConf).getOrCreate()
val df = sparkSession.read.parquet("hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-20")
val newDF = df.select("a","b","c")
newDF.write.mode(SaveMode.Overwrite)
.parquet("hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-20") // doesn't work
newDF.write.mode(SaveMode.Overwrite)
.parquet("hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-21") // works
FileNotFoundException 发生在我们从 hdfs 目录 "d=2017-03-20" 读取数据并将更新的数据保存 (SaveMode.Overwrite) 到同一个 hdfs 目录 "d=2017-03-20" 时
Caused by: org.apache.spark.SparkException: Task failed while writing rows
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:204)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.FileNotFoundException: File does not exist: hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-20/part-05020-35ea100f-829e-43d9-9003061-1788904de770.snappy.parquet
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:157)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:243)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:190)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:188)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:193)
... 8 more
以下尝试仍然出现相同的错误,我应该如何使用 spark sql 解决这个问题?谢谢!
val hdfsDirPath = "hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-20"
val df = sparkSession.read.parquet(hdfsDirPath)
val newdf = df
newdf.write.mode(SaveMode.Overwrite).parquet(hdfsDirPath)
// or
val df = sparkSession.read.parquet(hdfsDirPath)
df.createOrReplaceTempView("orgtable")
sparkSession.sql("SELECT * from orgtable").createOrReplaceTempView("tmptable")
sparkSession.sql("TRUNCATE TABLE orgtable")
sparkSession.sql("INSERT INTO orgtable SELECT * FROM tmptable")
val newdf = sparkSession.sql("SELECT * FROM orgtable")
newdf.write.mode(SaveMode.Overwrite).parquet(hdfsDirPath)
// or
val df = sparkSession.read.parquet(hdfsDirPath)
df.createOrReplaceTempView("orgtable")
sparkSession.sql("SELECT * from orgtable").createOrReplaceTempView("tmptable")
sparkSession.sql("REFRESH TABLE orgtable")
sparkSession.sql("ALTER VIEW tmptable RENAME TO orgtable")
val newdf = sparkSession.sql("SELECT * FROM orgtable")
newdf.write.mode(SaveMode.Overwrite).parquet(hdfsDirPath)
【问题讨论】:
-
我觉得不行,需要先保存到其他目录再复制到原目录
-
我们无法将 Spark Dataframe 写入我们正在读取它的文件。如果您仍想这样做,请先将 DF 写入临时目录,然后将其写入带有
SaveMode.Overwrite的目录。 -
谢谢 Akash 和 himanshuIIITian,在 HDFS 上的临时目录中编写 DF 是一个很好的解决方案,但我想知道是否有一种方法可以使用 Spark SQL 来解决这个问题?由于从 HDFS 写入和获取数据的时间和空间效率低于仅在内存中使用 Spark SQL。我们可以使用 REFRESH、TRUNCATE 或 DROP 表来解决问题吗?
-
我有同样的问题,即使我正在将 df 写入 java 中的临时表
-
@faustineinsun 你是怎么解决这个问题的?除了临时目录,你有没有找到其他方法?
标签: apache-spark apache-spark-sql