如果您不能使用udf,您可以使用map 函数,但正如您当前编写的那样,只有一列。要保留所有列,请执行以下操作:
df = df.rdd\
.map(lambda x: (x["Person"], x["Amount"], hash(str(x["Amount"]))))\
.toDF(["Person", "Amount", "Hash"])
df.show()
#+------+------+--------------------+
#|Person|Amount| Hash|
#+------+------+--------------------+
#| Bob| 562|-4340709941618811062|
#| Bob| 880|-7718876479167384701|
#| Bob| 380|-2088598916611095344|
#| Sue| 85| 7168043064064671|
#| Sue| 963|-8844931991662242457|
#+------+------+--------------------+
注意:在这种情况下,hash(x["Amount"]) 不是很有趣,所以我将其更改为将 Amount 哈希转换为字符串。
基本上,您必须将该行映射到包含所有现有列的元组并添加新列。
如果您的列太多而无法枚举,您也可以在现有行中添加一个元组。
df = df.rdd\
.map(lambda x: x + (hash(str(x["Amount"])),))\
.toDF(df.columns + ["Hash"])\
我还应该指出,如果散列值是您的最终目标,还有一个 pyspark 函数 pyspark.sql.functions.hash 可用于避免序列化为 rdd:
import pyspark.sql.functions as f
df.withColumn("Hash", f.hash("Amount")).show()
#+------+------+----------+
#|Person|Amount| Hash|
#+------+------+----------+
#| Bob| 562| 51343841|
#| Bob| 880|1241753636|
#| Bob| 380| 514174926|
#| Sue| 85|1944150283|
#| Sue| 963|1665082423|
#+------+------+----------+
这似乎使用了不同于 python 内置的散列算法。