【发布时间】:2017-03-16 07:48:36
【问题描述】:
假设我有两个 PySpark DataFrames df1 和 df2。
df1= 'a'
1
2
5
df2= 'b'
3
6
我想为每个df1['a'] 找到最接近的df2['b'] 值,并将最接近的值作为新列添加到df1 中。
换句话说,对于df1['a']中的每个值x,我想为所有y in df2['b']找到一个达到min(abx(x-y))的y(注意:可以假设只有一个y可以达到最小距离),结果是
'a' 'b'
1 3
2 3
5 6
我尝试使用以下代码首先创建一个距离矩阵(在找到达到最小距离的值之前):
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
def dict(x,y):
return abs(x-y)
udf_dict = udf(dict, IntegerType())
sql_sc = SQLContext(sc)
udf_dict(df1.a, df2.b)
这给了
Column<PythonUDF#dist(a,b)>
然后我尝试了
sql_sc.CreateDataFrame(udf_dict(df1.a, df2.b))
它永远运行而不给出错误/输出。
我的问题是:
- 由于我是 Spark 新手,我构建输出 DataFrame 的方法是否有效? (我的方法是先为所有
a和b值创建一个距离矩阵,然后找到min一个) - 我的代码的最后一行有什么问题以及如何修复它?
【问题讨论】:
标签: pyspark spark-dataframe pyspark-sql