【发布时间】:2020-05-28 18:30:35
【问题描述】:
我有数百万行的 Spark DataFrame DF1。每行最多有 100 列。
col1 | col2 | col3 | ... | colN
--------------------------------
v11 | v12 | v13 | ... | v1N
v21 | v22 | v23 | ... | v2N
... | ... | ... | ... | ...
此外,我还有另一个 DataFrame DF2,其中有数百行包含名称和正文列。 Name 包含函数名称,body 包含纯 Python 代码,返回 true 或 false 的布尔函数。这些函数在它们的逻辑中,可以引用来自 DF1 的单行中的任何列。
func_name | func_body
-----------------------------------------------
func1 | col2 < col45
func2 | col11.contains("London") and col32*col15 < col21
funcN | ....
我需要将这两个 DataFrames - DF1 与 DF2 连接起来,并将 Df2 中的每个函数应用到 DF1 中的每一行。每个函数都必须能够接受来自 DF1 的参数,比方说具有键/值对的字典数组,这些键/值对代表 DF1 中相应行的所有列的名称/值。
我知道如何加入 DF1 和 DF2,此外,我知道 Python 函数的执行不会以分布式方式工作。现在没关系。这是一个临时解决方案。我只需要将 DF1 中的所有行分布到工作节点上,然后在 Apache Spark 应用程序的不同任务中将每个 Python 函数应用于 DF1 的每一行。评估 eval() 它们并传递字典数组,其中包含键/值对,如上所述。
一般来说,每个 Python 函数都是一个标签,我想将其分配给 DF1 中的行,以防某些函数返回 true。例如,这是生成的 DataFrame DF3:
col1 | col2 | col3 | ... | colN | tags
--------------------------------------
v11 | v12 | v13 | ... | v1N | [func1, func76, funcN]
v21 | v22 | v23 | ... | v2N | [func32]
... | ... | ... | ... | ... | [..., ..., ..., ..., ...]
PySpark 是否可行,如果可以,您能否举例说明如何实现?以DF.columns 中的Map 作为输入参数的UDF 函数是正确的方法还是可以以更简单的方式完成? Spark 对一个时间点可以注册多少 UDF 函数(数量)有任何限制吗?
【问题讨论】:
标签: python dataframe apache-spark pyspark apache-spark-sql