【发布时间】:2019-01-10 11:16:26
【问题描述】:
我在读取多个数据帧时遇到问题。我有这个功能
def readDF(hdfsPath:String, more arguments): DataFrame = {//function goes here}
它需要一个分区的 hdfs 路径并返回一个数据帧(它基本上使用spark.read.parquet,但我必须使用它)。我正在尝试以下列方式使用show partitions 阅读其中的几个:
val dfs = spark.sql("show partitions table")
.where(col("partition").contains(someFilterCriteria))
.map(partition => {
val hdfsPath = s"hdfs/path/to/table/$partition"
readDF(hdfsPath)
}).reduce(_.union(_))
但它给了我这个错误
org.apache.spark.SparkException: Job aborted due to stage failure: Task 12 in stage 3.0 failed 4 times, most recent failure: Lost task 12.3 in stage 3.0 (TID 44, csmlcsworki0021.unix.aacc.corp, executor 1): java.lang.NullPointerException
我认为这是因为我在 map 操作中对数据框执行 spark.read.parquet,因为如果我更改此代码的代码
val dfs = spark.sql("show partitions table")
.where(col("partition").contains(someFilterCriteria))
.map(row=> row.getString(0))
.collect
.toSeq
.map(partition => {
val hdfsPath = s"hdfs/path/to/table/$partition"
readDF(hdfsPath)
}).reduce(_.union(_))
它正确加载数据。但是,如果可能的话,我不想使用collect。怎样才能达到我的目的?
【问题讨论】:
标签: apache-spark apache-spark-dataset apache-spark-2.0