【问题标题】:How to asynchronously apply function via Spark to subsets of dataframe?如何通过 Spark 将函数异步应用于数据帧的子集?
【发布时间】:2015-12-23 14:43:41
【问题描述】:

我用 Python 和 pandas 编写了一个程序,它需要一个非常大的数据集(6 个月内每月约 400 万行),将其按 2 列(日期和标签)分组,然后应用一个函数到每组行。每个分组中有可变数量的行 - 从几行到数千行不等。每月有数千个组(标签-日期组合)。

我当前的程序使用多处理,所以它非常高效,我认为可以很好地映射到 Spark。我以前使用过 map-reduce,但是在 Spark 中实现它时遇到了麻烦。我确定我在流水线中遗漏了一些概念,但我所阅读的所有内容似乎都集中在键值处理或按任意分区拆分分布式数据集,而不是我正在尝试做的事情。是否有这样做的简单示例或范例?任何帮助将不胜感激。

编辑: 这是我目前正在做的一些伪代码:

reader = pd.read_csv()
pool = mp.Pool(processes=4)
labels = <list of unique labels>
for label in labels:
    dates = reader[(reader.label == label)]
    for date in dates:
        df = reader[(reader.label==label) && (reader.date==date)]
        pool.apply_async(process, df, callback=callbackFunc)
pool.close()
pool.join()

当我说异步时,我的意思是类似于 pool.apply_async()。

【问题讨论】:

  • process 是一个任意函数,或者您有什么特别的想法?它是可交换的和关联的吗?
  • @zero323 对于这个程序,process 是一个函数,它读取该日期的外部数据,然后遍历该日期标签的所有行并计算几个不同的值以与之关联每一行。

标签: python apache-spark pyspark


【解决方案1】:

目前(PySpark 1.5.0)只看到两个三个选项:

  1. 您可以尝试使用 SQL 操作和 UDF 来表达您的逻辑。不幸的是,Python API 不支持 UDAF(用户定义的聚合函数),但它仍然具有足够的表现力,尤其是窗口函数,可以涵盖广泛的场景。

    可以通过多种方式处理对外部数据源的访问,包括:

    • 使用可选的 memoization 访问 UDF 内部
    • 加载到数据框并使用join操作
    • 使用广播变量
  2. 将数据框转换为 PairRDD 并使用以下内容之一:

    • partitionBy + mapPartitions
    • reduceByKey / aggregateByKey

如果 Python 不是一个强要求,Scala API > 1.5.0 支持 UDAF,它可以实现如下功能:

df.groupBy(some_columns: _*).agg(some_udaf)
  1. 按键对数据进行分区并在每个分区中使用本地 Pandas 数据帧

【讨论】:

  • 这是我没有想到的——Python 不是一个强要求,我当然可以切换到 Scala 来尝试一下。对于您的 Scala 示例,您能否提供更多详细信息(或指向我可以开始的地方的链接)?谢谢!
  • 两个简单的例子:stackoverflow.com/a/32750733/1560062stackoverflow.com/a/32101530/1560062。您将在 Spark 源代码中找到其他内容。
猜你喜欢
  • 2020-02-17
  • 1970-01-01
  • 2020-09-17
  • 1970-01-01
  • 2021-11-17
  • 1970-01-01
  • 2020-05-28
  • 2020-08-11
  • 1970-01-01
相关资源
最近更新 更多