【发布时间】:2018-07-31 17:38:22
【问题描述】:
我有一个存储为 orc 的 spark 数据框,其中包含大约 10000 行和以下架构:
>>> df.printSchema()
root
|-- contig: string (nullable = true)
|-- start: integer (nullable = true)
|-- ref: string (nullable = true)
|-- alt: string (nullable = true)
|-- gt: array (nullable = true)
| |-- element: integer (containsNull = true)
其中 arrayField 是 200000 个整数的列表。我想将其转换为具有扁平结构的数据框:
>>> from pyspark.sql.functions import posexplode
>>> flat = df.select('contig', 'start', 'ref', 'alt', posexplode(df.gt))
>>> flat.explain()
== Physical Plan ==
*Project [contig#0, start#1, ref#2, alt#3, pos#11, col#12]
+- Generate posexplode(gt#4), true, false, [pos#11, col#12]
+- *FileScan orc [contig#0,start#1,ref#2,alt#3,gt#4] Batched: false, Format: ORC, Location: InMemoryFileIndex[file:/path/to/data], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<contig:string,start:int,ref:string,alt:string,gt:array<int>>
>>> flat.write.orc('/path/to/output/file')
在具有 24 个 CPU 内核和超过 100GB 内存的机器上,将扁平化的数据帧写入文件需要五个多小时。这只是poseexplode函数的一个特性还是有其他问题?
【问题讨论】:
-
如果你分解 200000 个整数,那么每行会生成 200000 行。所以很明显变慢了。
-
确实,我们在这里谈论的是 20 亿个输出行,但 24 个工作线程真的有那么多工作吗?输出文件的大小仅为 1.3GB。
-
谢谢,我不确定驱动程序的 72g,但您可以尝试调整执行程序内存吗?更具体地说,显着增加它(假设您有资源,否则,请尝试减少驱动程序内存以进行补偿)并查看它是否加快了执行速度。
-
另外,从您的评论来看,似乎存在某种形式的偏差,使您的工作并行性不太理想。原始数据是否以某种方式分区或分桶?
-
我个人建议你重新分区数据帧,这样每个 24 执行器在你做poseexplode之前都会得到相等的分区。然后对posexplode使用withColumn函数,然后只使用select函数。试试看。
标签: python apache-spark pyspark spark-dataframe orc