【发布时间】:2023-03-31 17:35:02
【问题描述】:
我想使用 Spark 将目录中的数据动态写入分区。 这是示例代码。
val input_DF = spark.read.parquet("input path")
input_DF.write.mode("overwrite").partitionBy("colname").parquet("output path...")
如下所示,每个键的记录数不同,并且键存在倾斜。 input_DF.groupBy($"colname").agg(count("colname")).show()
+-----------------+------------------------+
|colname |count(colname) |
+-----------------+------------------------+
| NA| 14859816| --> More no of records
| A| 2907930|
| D| 1118504|
| B| 485151|
| C| 435305|
| F| 370095|
| G| 170060|
+-----------------+------------------------+
因此,当为每个执行程序提供合理的内存 (8GB) 时,作业会失败。为每个执行程序分配高内存 (15GB) 时,作业成功完成,但完成时间过长。
我曾尝试使用 repartition,希望它会在分区之间均匀分布数据。但是,由于它使用默认的 HashPartitioner,因此键的记录会转到单个分区。
repartition(num partition,$"colname") --> Creates HashPartition
但这是创建 num 部分文件,如 repartiton 中所述,但将键的所有记录移动到分区(所有 col 值为 NA 的记录都进入分区)。其余部分文件没有记录(只有 Parquet 元数据,38364 字节)。
-rw-r--r-- 2 hadoop hadoop 0 2017-11-20 14:29 /user/hadoop/table/_SUCCESS
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00000-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00001-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00002-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00003-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:07 /user/hadoop/table/part-r-00004-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00005-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00006-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 1038264502 2017-11-20 13:20 /user/hadoop/table/part-r-00007-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00008-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00009-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00010-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00011-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00012-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00013-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00014-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 128212247 2017-11-20 13:09 /user/hadoop/table/part-r-00015-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00016-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00017-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00018-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 117142244 2017-11-20 13:08 /user/hadoop/table/part-r-00019-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00020-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 347033731 2017-11-20 13:11 /user/hadoop/table/part-r-00021-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00022-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00023-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00024-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 100306686 2017-11-20 13:08 /user/hadoop/table/part-r-00025-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 36961707 2017-11-20 13:07 /user/hadoop/table/part-r-00026-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00027-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00028-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00029-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00030-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00031-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00032-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00033-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00034-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00035-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00036-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:07 /user/hadoop/table/part-r-00037-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00038-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00039-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00040-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00041-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 68859 2017-11-20 13:06 /user/hadoop/table/part-r-00042-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 4031720288 2017-11-20 14:29 /user/hadoop/table/part-r-00043-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00044-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00045-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00046-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00047-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00048-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00049-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00050-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
我想知道
-
有没有办法将相同的关键记录写入 DataFrame/RDD 的不同分区?可能是自定义分区器将每第 N 条记录写入第 N 个分区
(1st rec to partition 1) (2nd rec to partition 2) (3rd rec to partition 3) (4th rec to partition 4) (5th rec to partition 1) (6th rec to partition 2) (7th rec to partition 3) (8th rec to partition 4) 如果是,是否可以使用 DataFrame/RDD 每个分区的最大字节数等参数进行控制。
由于预期的结果只是基于一个键将数据写入不同的子目录(Hive的分区),我想通过将一个键的记录分布到多个任务来写入数据,每个任务在子目录下写入一个零件文件目录。
【问题讨论】:
-
修复唯一键而不是“partitionBy”中使用的键时问题已解决。如果数据帧由于某种原因缺少唯一的,可以使用 df.withColumn("Unique_ID", monotonicallyIncreasingId) 添加一个 sudo 列,然后在 "Unique_ID" 上进行修复,这样我们可以将数据均匀地分配到多个分区。为了进一步提高性能,可以在 DataFrame 分区中使用用于连接/组/分区的键对数据进行排序
标签: apache-spark