【发布时间】:2019-08-22 19:56:38
【问题描述】:
我有一个大(> 500m 行)CSV 文件。此 CSV 文件中的每一行都包含位于 HDFS 上的二进制文件的路径。我想使用 Spark 读取每个文件,对其进行处理,然后将结果写入另一个 CSV 文件或表格。
在驱动程序中这样做很简单,下面的代码可以完成这项工作
val hdfsFilePathList = // read paths from CSV, collect into list
hdfsFilePathList.map( pathToHdfsFile => {
sqlContext.sparkContext.binaryFiles(pathToHdfsFile).mapPartitions {
functionToProcessBinaryFiles(_)
}
})
这样做的主要问题是驱动程序做了太多的工作。我想将binaryFiles 所做的工作外包给执行者。我发现了一些有希望的示例,我认为这些示例可以让我从执行程序访问 sparkContext:
Use SparkContext hadoop configuration within RDD methods/closures, like foreachPartition
但它们似乎不像我想象的那样工作。我希望以下工作:
import java.io.{ObjectInputStream, ObjectOutputStream}
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.conf.Configuration
class ConfigSerDeser(var conf: Configuration) extends Serializable {
def this() {
this(new Configuration())
}
def get(): Configuration = conf
private def writeObject (out: java.io.ObjectOutputStream): Unit = {
conf.write(out)
}
private def readObject (in: java.io.ObjectInputStream): Unit = {
conf = new Configuration()
conf.readFields(in)
}
private def readObjectNoData(): Unit = {
conf = new Configuration()
}
}
val serConf = new ConfigSerDeser(sc.hadoopConfiguration)
val mappedIn = inputDf.map( row => {
serConf.get()
})
但它失败了KryoException: java.util.ConcurrentModificationException
是否可以让执行程序直接访问 HDFS 文件或 HDFS 文件系统?或者,有没有一种有效的方法来读取 HDFS/S3 上的数百万个二进制文件并使用 Spark 处理它们?
【问题讨论】:
-
尝试 rdd.foreachPartitionAsync() 其中 rdd 是输入 rdd,其中包含路径详细信息。 (我不确定它是否能解决你的问题)
标签: scala apache-spark hadoop amazon-s3 hdfs