【问题标题】:Spark Read multiple paths with automatic partitions discoverySpark 通过自动分区发现读取多个路径
【发布时间】:2018-12-03 08:17:18
【问题描述】:

我正在尝试从多个路径将一些 avro 文件读取到 DataFrame。 假设我的路径是"s3a://bucket_name/path/to/file/year=18/month=11/day=01" 在这条路径下,我还有两个分区,比如说country=XX/region=XX

我想一次读取多个日期而不明确命名国家和地区分区。此外,我希望国家和地区成为此 DataFrame 中的列。

sqlContext.read.format("com.databricks.spark.avro").load("s3a://bucket_name/path/to/file/year=18/month=11/day=01")

这条线运行良好,因为我只读取了一条路径。它检测国家和地区分区并推断其架构。

当我尝试读取多个日期时,比如说

val paths = Seq("s3a://bucket_name/path/to/file/year=18/month=11/day=01", "s3a://bucket_name/path/to/file/year=18/month=11/day=02")

sqlContext.read.format("com.databricks.spark.avro").load(paths:_*)

我收到此错误:

    18/12/03 03:13:53 WARN S3AbortableInputStream: Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result insub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use.
18/12/03 03:13:53 WARN S3AbortableInputStream: Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result in sub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use.
java.lang.AssertionError: assertion failed: Conflicting directory structures detected. Suspicious paths:?
 s3a://bucket_name/path/to/file/year=18/month=11/day=02
s3a://bucket_name/path/to/file/year=18/month=11/day=01
    
If provided paths are partition directories, please set "basePath" in the options of the data source to specify the root directory of the table. If there are multiple root directories, please load them separately and then union them.
        at scala.Predef$.assert(Predef.scala:179)
        at org.apache.spark.sql.execution.datasources.PartitioningUtils$.parsePartitions(PartitioningUtils.scala:106)
        at org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$discoverPartitions(interfaces.scala:621)
        at org.apache.spark.sql.sources.HadoopFsRelation$$anonfun$partitionSpec$3.apply(interfaces.scala:526)
        at org.apache.spark.sql.sources.HadoopFsRelation$$anonfun$partitionSpec$3.apply(interfaces.scala:525)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.sql.sources.HadoopFsRelation.partitionSpec(interfaces.scala:524)
        at org.apache.spark.sql.sources.HadoopFsRelation$$anonfun$partitionColumns$1.apply(interfaces.scala:578)
        at org.apache.spark.sql.sources.HadoopFsRelation$$anonfun$partitionColumns$1.apply(interfaces.scala:578)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.sql.sources.HadoopFsRelation.partitionColumns(interfaces.scala:578)
        at org.apache.spark.sql.sources.HadoopFsRelation.schema$lzycompute(interfaces.scala:637)
        at org.apache.spark.sql.sources.HadoopFsRelation.schema(interfaces.scala:635)
        at org.apache.spark.sql.execution.datasources.LogicalRelation.<init>(LogicalRelation.scala:39)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:125)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:136)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:25)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:32)
        at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:34)
        at $iwC$$iwC$$iwC$$iwC.<init>(<console>:36)
        at $iwC$$iwC$$iwC.<init>(<console>:38)
        at $iwC$$iwC.<init>(<console>:40)
        at $iwC.<init>(<console>:42)
        at <init>(<console>:44)
        at .<init>(<console>:48)
        at .<clinit>(<console>)
        at .<init>(<console>:7)
        at .<clinit>(<console>)
        at $print(<console>)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1045)
        at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1326)
        at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:821)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:852)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:800)
        at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
        at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
        at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
        at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
        at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
        at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
        at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
        at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1064)
        at org.apache.spark.repl.Main$.main(Main.scala:35)
        at org.apache.spark.repl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:730)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

显然我不能使用 basePath 因为路径不共享一个。我还尝试在每条路径的末尾使用 /*,这确实有效,但完全忽略了国家和地区分区。

我可以一一读取路径并将其合并,但我觉得我错过了一些东西。

知道为什么它只适用于单条路径以及如何使其适用于多条路径吗?

【问题讨论】:

    标签: scala apache-spark spark-avro


    【解决方案1】:

    真的希望所有错误消息都一样清晰 - If provided paths are partition directories, please set "basePath" in the options of the data source to specify the root directory of the table. If there are multiple root directories, please load them separately and then union them.

    相对路径year=18/month=11/day=01 是由于分区引起的,还是您只是使用了相同的约定?

    如果前者是正确的,那么您应该只阅读s3a://bucket_name/path/to/file/,并使用谓词来过滤所需的日期。或者也许正如错误所建议的那样,您可以尝试sqlContext.read.option("basePath","s3a://bucket_name/path/to/file/").format("com.databricks.spark.avro").load(paths:_*),其中路径是相对的

    如果后者为真,那么您应该分别查询每个并在数据帧上应用unionAll(如错误消息所示)。在这种情况下,也许将年/月/日视为分区列也可以,即使您在写入数据时没有使用 partitionBy...

    【讨论】:

    • 因为我想在 Windows 中阅读并且每次都使用不同的日期集,所以我不想只使用 basePath。我可以阅读它以某种格式发送日期列表并进行过滤我也可以逐个阅读路径和 unionAll 我在最初的问题中特别提到了它。我不明白为什么它适用于单条路径而不是多条路径。我不明白有什么区别。如果不可能,我不能做这些选项之一。
    • 断言与以下有关分区发现的票证有关 - issues.apache.org/jira/browse/SPARK-11678issues.apache.org/jira/browse/SPARK-10304。我猜如果你没有使用与 spark 用于分区发现相同的约定,它就不会抛出这个异常......
    • “我猜如果你不使用 spark 用于分区发现的相同约定,它不会抛出这个异常”是的,我知道,但我仍然不明白为什么分区发现会在一条路径上工作,而不是在多条路径上工作。我确实想保留分区的火花约定。如果没有人能回答,我不明白为什么人们会否决一个合理的问题。
    【解决方案2】:

    老问题,但这是我最终在类似情况下所做的事情

    spark.read.parquet(paths:_*)
      .withColumn("year", regexp_extract(input_file_name, "year=(.+?)/", 1))
      .withColumn("month", regexp_extract(input_file_name, "month=(.+?)/", 1))
      .withColumn("day", regexp_extract(input_file_name, "day=(.+?)/", 1))
    

    当您具有静态分区结构时有效。谁来挑战将其扩展到动态(即解析出 'x=y/z=c' 形式的任意分区结构并将其转换为列)?

    【讨论】:

      猜你喜欢
      • 2021-08-15
      • 1970-01-01
      • 2015-11-28
      • 2022-12-18
      • 2017-08-26
      • 1970-01-01
      • 2017-03-28
      • 1970-01-01
      • 2019-04-03
      相关资源
      最近更新 更多