【问题标题】:How to save a partitioned parquet file in Spark 2.1?如何在 Spark 2.1 中保存分区拼花文件?
【发布时间】:2017-09-29 14:39:31
【问题描述】:

我正在尝试测试如何使用 Spark 2.1 在 HDFS 2.7 中写入数据。我的数据是一个简单的虚拟值序列,输出应按属性分区:idkey

 // Simple case class to cast the data
 case class SimpleTest(id:String, value1:Int, value2:Float, key:Int)

 // Actual data to be stored
 val testData = Seq(
    SimpleTest("test", 12, 13.5.toFloat, 1),
    SimpleTest("test", 12, 13.5.toFloat, 2),
    SimpleTest("test", 12, 13.5.toFloat, 3),
    SimpleTest("simple", 12, 13.5.toFloat, 1),
    SimpleTest("simple", 12, 13.5.toFloat, 2),
    SimpleTest("simple", 12, 13.5.toFloat, 3)
 )

 // Spark's workflow to distribute, partition and store
 // sc and sql are the SparkContext and SparkSession, respectively
 val testDataP = sc.parallelize(testData, 6)
 val testDf = sql.createDataFrame(testDataP).toDF("id", "value1", "value2", "key")
 testDf.write.partitionBy("id", "key").parquet("/path/to/file")

我希望在 HDFS 中得到以下树形结构:

- /path/to/file
   |- /id=test/key=1/part-01.parquet
   |- /id=test/key=2/part-02.parquet
   |- /id=test/key=3/part-03.parquet
   |- /id=simple/key=1/part-04.parquet
   |- /id=simple/key=2/part-05.parquet
   |- /id=simple/key=3/part-06.parquet

但是当我运行之前的代码时,我得到以下输出:

/path/to/file/id=/key=24/
 |-/part-01.parquet
 |-/part-02.parquet
 |-/part-03.parquet
 |-/part-04.parquet
 |-/part-05.parquet
 |-/part-06.parquet

不知道是不是代码有问题,还是Spark在做其他事情。

我正在执行spark-submit,如下:

spark-submit --name APP --master local --driver-memory 30G --executor-memory 30G --executor-cores 8 --num-executors 8 --conf spark.io.compression.codec=lzf --conf spark.akka.frameSize=1024 --conf spark.driver.maxResultSize=1g --conf spark.sql.orc.compression.codec=uncompressed --conf spark.sql.parquet.filterPushdown=true --class myClass我的FatJar.jar

【问题讨论】:

    标签: scala apache-spark apache-spark-sql parquet


    【解决方案1】:

    我找到了解决办法!根据 Cloudera,是 mapred-site.xml 配置问题(检查下面的链接)。另外,不要将数据框写为:testDf.write.partitionBy("id", "key").parquet("/path/to/file")

    我是这样做的:testDf.write.partitionBy("id", "key").parquet("hdfs://<namenode>:<port>/path/to/file")。您可以将<namenode><port> 分别替换为HDFS 的主节点名称和端口。

    特别感谢 @jacek-laskowski 的宝贵贡献。

    参考资料:

    https://community.cloudera.com/t5/Batch-SQL-Apache-Hive/MKDirs-failed-to-create-file/m-p/36363#M1090

    Writing to HDFS in Spark/Scala

    【讨论】:

      【解决方案2】:

      很有趣,因为……嗯……“它对我有用”

      当您在 Spark 2.1 中使用 SimpleTest 案例类描述您的数据集时,您将在 import spark.implicits._ 之外有一个键入的 Dataset

      在我的例子中,sparksql

      换句话说,您不必创建testDataPtestDf(使用sql.createDataFrame)。

      import spark.implicits._
      ...
      val testDf = testData.toDS
      testDf.write.partitionBy("id", "key").parquet("/path/to/file")
      

      在另一个终端(保存到/tmp/testDf目录后):

      $ tree /tmp/testDf/
      /tmp/testDf/
      ├── _SUCCESS
      ├── id=simple
      │   ├── key=1
      │   │   └── part-00003-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
      │   ├── key=2
      │   │   └── part-00004-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
      │   └── key=3
      │       └── part-00005-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
      └── id=test
          ├── key=1
          │   └── part-00000-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
          ├── key=2
          │   └── part-00001-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
          └── key=3
              └── part-00002-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
      
      8 directories, 7 files
      

      【讨论】:

        猜你喜欢
        • 2021-12-11
        • 2021-07-23
        • 2021-06-26
        • 2023-03-18
        • 2022-07-31
        • 2016-02-12
        • 2019-10-28
        • 2022-07-29
        • 2017-04-29
        相关资源
        最近更新 更多