【问题标题】:How do I use the pandas split-apply-combine style strategy with scala api in spark?如何在 spark 中使用带有 scala api 的 pandas split-apply-combine 风格策略?
【发布时间】:2020-04-21 23:22:23
【问题描述】:

我有一个 scala 函数,它接受一个 spark 数据帧并返回一个值,即双精度值。函数复杂,使用DataFrame类中定义的聚合,调用其他java库,不能用SQL表达。它需要数据帧的全部内容来进行计算,它不能一次添加一行并建立一个结果。

我有一个大数据框,其中包含我想用来将数据框拆分为小块并对每个小块执行上述计算的列。然后,我想返回一个新的数据框,其中每个组的一行包含两列,一列包含 groupby 值,另一列包含结果。

这将是使用 PandasUDFs 的一项相对简单的任务,但我无法弄清楚如何在 Scala 中做到这一点。

我尝试使用按列分组重新分区数据帧,然后调用 mapPartitions,但是传递给 mapPartitions 的函数必须具有签名 Iterator[Row] -> Iterator[X]。我可以使用 Iterator[Row] 并轻松地创建 Seq[Row] 或 List[Row],但似乎无法从此 Seq 创建数据帧,因为正在工作节点上进行计算并创建数据帧可以只能从驱动程序完成。重写原始函数以采用 Seq[Row] 需要进行大量重新设计,因为它使用了 DataFrame 中的一些高级聚合函数(例如 approxQuantile)。

问题的症结似乎在于没有“本地(仅/worker only/非分布式)数据帧”的概念,而 Pandas 的数据帧显然被限制在本地。

我错过了什么明显的东西吗?

【问题讨论】:

    标签: pandas scala apache-spark split-apply-combine


    【解决方案1】:

    我有一个大数据框,其中包含我想用来将数据框拆分为小块并对每个小块执行上述计算的列。

    是否事先知道该列中的值?如果不是,它们至少可以收藏吗?假设您可以收集它们,如下所示:

    val chunkValues: Array[Any] = df.select("chunk")
      .collect()
      .map(r => r.getAs[Any](0))
    

    遍历这些值以多次过滤 inputDF 并执行繁重的逻辑:

    val chunkDFs: Array[DataFrame] = chunkValues.map(value => {
      val chunkBeforeDF = inputDF.filter(col("chunk") === value)
      val chunkAfterDF = yourLogic(chunkBefore)
    })
    

    再次合并它们。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-12-27
      • 2017-12-14
      • 2017-06-24
      • 1970-01-01
      相关资源
      最近更新 更多