【发布时间】:2018-01-09 00:16:23
【问题描述】:
我正在尝试使用 Pandas DF 优化一个用 Python 编写的软件。该算法以 pandas DF 作为输入,不能分布,它为每个客户端输出一个指标。
也许这不是最好的解决方案,但我的省时方法是并行加载所有文件,然后为每个客户端构建一个 DF
这很好用,但很少有客户拥有真正大量的数据。所以我需要在创建他们的 DF 时节省内存。
为了做到这一点,我正在执行一个 groupBy()(实际上是一个 combineByKey,但从逻辑上讲它是一个 groupBy),然后为每个组(也就是现在 RDD 的单个行)我构建一个列表并从中,熊猫 DF。
但是,这会在单个任务/节点中生成多个数据副本(RDD 行、List 和 pandas DF...)并崩溃,我想在单个节点中删除那么多副本。
我正在考虑使用以下伪代码的“特殊” combineByKey:
def createCombiner(val):
return [val]
def mergeCombinerVal(x,val):
x.append(val);
return x;
def mergeCombiners(x,y):
#Not checking if y is a pandas DF already, but we can do it too
if (x is a list):
pandasDF= pd.Dataframe(data=x,columns=myCols);
pandasDF.append(y);
return pandasDF
else:
x.append(y);
return x;
我的问题在这里,文档什么也没说,但有人知道假设这会起作用是否安全? (合并两个组合器的返回数据类型与组合器不同)。如果“坏”调用的数量很少,我也可以控制 mergeCombinerVal 上的数据类型,但是逐行附加到 pandas DF 会非常低效。
有什么更好的想法来执行我想做的事情吗?
谢谢!,
PS:现在我正在打包 Spark 行,从 Spark 行切换到没有列名的 python 列表是否有助于减少内存使用?
【问题讨论】:
标签: python pandas apache-spark pyspark