【问题标题】:(Py)Spark combineByKey mergeCombiners output type != mergeCombinerVal type(Pyspark combineByKey 合并组合器输出类型!= mergeCombinerVal 类型
【发布时间】:2018-01-09 00:16:23
【问题描述】:

我正在尝试使用 Pandas DF 优化一个用 Python 编写的软件。该算法以 pandas DF 作为输入,不能分布,它为每个客户端输出一个指标。

也许这不是最好的解决方案,但我的省时方法是并行加载所有文件,然后为每个客户端构建一个 DF

这很好用,但很少有客户拥有真正大量的数据。所以我需要在创建他们的 DF 时节省内存。

为了做到这一点,我正在执行一个 groupBy()(实际上是一个 combineByKey,但从逻辑上讲它是一个 groupBy),然后为每个组(也就是现在 RDD 的单个行)我构建一个列表并从中,熊猫 DF。

但是,这会在单个任务/节点中生成多个数据副本(RDD 行、List 和 pandas DF...)并崩溃,我想在单个节点中删除那么多副本。

我正在考虑使用以下伪代码的“特殊” combineByKey:

def createCombiner(val):
    return [val]

def mergeCombinerVal(x,val):
    x.append(val);
    return x;

def mergeCombiners(x,y):
    #Not checking if y is a pandas DF already, but we can do it too
    if (x is a list):
       pandasDF= pd.Dataframe(data=x,columns=myCols);
       pandasDF.append(y);
       return pandasDF
    else:
       x.append(y);
       return x;

我的问题在这里,文档什么也没说,但有人知道假设这会起作用是否安全? (合并两个组合器的返回数据类型与组合器不同)。如果“坏”调用的数量很少,我也可以控制 mergeCombinerVal 上的数据类型,但是逐行附加到 pandas DF 会非常低效。

有什么更好的想法来执行我想做的事情吗?

谢谢!,

PS:现在我正在打包 Spark 行,从 Spark 行切换到没有列名的 python 列表是否有助于减少内存使用?

【问题讨论】:

    标签: python pandas apache-spark pyspark


    【解决方案1】:

    只是写我的评论作为答案

    最后我使用了常规的 combineByKey,它比 groupByKey 快(确切的原因,我想它有助于打包行,因为我的行很小,但是有 maaaany 行),并且还允许我将它们分组为一个“真正的”Python 列表(groupByKey 分组为 Pandas 不支持的某种 Iterable,并迫使我创建该结构的另一个副本,这会使内存使用量翻倍并导致崩溃),这有助于我在打包时进行内存管理将它们转换为 Pandas/C 数据类型。

    现在我可以使用这些列表直接构建数据框,无需任何额外的转换(我不知道 Spark 的 groupByKey“列表”是什么结构,但 pandas 不会在构造函数中接受它)。

    不过,我最初的想法应该减少内存使用量(最多 1x DF + 0.5x 列表,而现在我有 1x DF + 1x 列表),但正如 user8371915 所说,API/文档不能保证。 ..,最好不要把它投入生产:)

    目前,我最大的客户可以放入合理的内存量。我在一个非常并行的每个执行程序的低内存作业中处理我的大多数客户,而在一个不那么并行的每个执行程序的高内存作业中处理最大的客户。我根据我执行的预计数来决定

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2017-10-06
      • 1970-01-01
      • 1970-01-01
      • 2011-11-26
      • 2018-06-13
      • 2019-11-14
      • 1970-01-01
      相关资源
      最近更新 更多