【问题标题】:How to pass a list of paths to spark.read.load?如何将路径列表传递给 spark.read.load?
【发布时间】:2018-06-16 17:53:37
【问题描述】:

我可以通过将多个路径传递给load 方法一次加载多个文件,例如

spark.read
  .format("com.databricks.spark.avro")
  .load(
    "/data/src/entity1/2018-01-01",
    "/data/src/entity1/2018-01-12",
    "/data/src/entity1/2018-01-14")

我想先准备一个路径列表,然后将它们传递给load 方法,但出现以下编译错误:

val paths = Seq(
  "/data/src/entity1/2018-01-01",
  "/data/src/entity1/2018-01-12",
  "/data/src/entity1/2018-01-14")
spark.read.format("com.databricks.spark.avro").load(paths)

<console>:29: error: overloaded method value load with alternatives:
  (paths: String*)org.apache.spark.sql.DataFrame <and>
  (path: String)org.apache.spark.sql.DataFrame
 cannot be applied to (List[String])spark.read.format("com.databricks.spark.avro").load(paths)

为什么?如何将路径列表传递给load 方法?

【问题讨论】:

    标签: scala apache-spark apache-spark-sql


    【解决方案1】:

    您只需要一个 splat 运算符 (_*) 将paths 列为

    spark.read.format("com.databricks.spark.avro").load(paths: _*)
    

    【讨论】:

    • 为什么需要 splat 运算符 (_*)?
    【解决方案2】:

    load 方法支持 varargs 类型的参数,而不是列表类型。因此,您已将列表显式转换为可变参数,并在加载函数中添加 : _*

    spark.read.format("com.databricks.spark.avro").load(paths: _*)
    

    【讨论】:

      【解决方案3】:

      此外,您可以使用来自 Spark 代码源 (ResolvedDataSource.scala) 的 paths 选项:

      val paths = {
                  if (caseInsensitiveOptions.contains("paths") &&
                    caseInsensitiveOptions.contains("path")) {
                    throw new AnalysisException(s"Both path and paths options are present.")
                  }
                  caseInsensitiveOptions.get("paths")
                    .map(_.split("(?<!\\\\),").map(StringUtils.unEscapeString(_, '\\', ',')))
                    .getOrElse(Array(caseInsensitiveOptions("path")))
                    .flatMap{ pathString =>
                      val hdfsPath = new Path(pathString)
                      val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
                      val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
                      SparkHadoopUtil.get.globPathIfNecessary(qualified).map(_.toString)
                    }
                }
      

      这么简单:

      sqlContext.read.option("paths", paths.mkString(",")).load()
      

      会成功的。

      【讨论】:

        【解决方案4】:

        您无需创建列表。你可以像下面那样做

        val df=spark.read.format("com.databricks.spark.avro").option("header","true").load("/data/src/entity1/*")
        

        【讨论】:

        • 就我而言,我不希望每个文件都被加载。如果是,那当然可以。
        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多