【问题标题】:PySpark - Add map function as columnPySpark - 添加地图功能作为列
【发布时间】:2018-09-27 12:04:48
【问题描述】:

我有一个 pyspark 数据框

a = [
    ('Bob', 562),
    ('Bob',880),
    ('Bob',380),
    ('Sue',85),
    ('Sue',963)
] 
df = spark.createDataFrame(a, ["Person", "Amount"])

我需要创建一个散列 Amount 并返回金额的列。问题是我不能使用UDF,所以我使用了映射函数。

df.rdd.map(lambda x: hash(x["Amount"]))

【问题讨论】:

    标签: pyspark apache-spark-sql rdd


    【解决方案1】:

    如果您不能使用udf,您可以使用map 函数,但正如您当前编写的那样,只有一列。要保留所有列,请执行以下操作:

    df = df.rdd\
        .map(lambda x: (x["Person"], x["Amount"], hash(str(x["Amount"]))))\
        .toDF(["Person", "Amount", "Hash"])
    
    df.show()
    #+------+------+--------------------+
    #|Person|Amount|                Hash|
    #+------+------+--------------------+
    #|   Bob|   562|-4340709941618811062|
    #|   Bob|   880|-7718876479167384701|
    #|   Bob|   380|-2088598916611095344|
    #|   Sue|    85|    7168043064064671|
    #|   Sue|   963|-8844931991662242457|
    #+------+------+--------------------+
    

    注意:在这种情况下,hash(x["Amount"]) 不是很有趣,所以我将其更改为将 Amount 哈希转换为字符串。

    基本上,您必须将该行映射到包含所有现有列的元组并添加新列。

    如果您的列太多而无法枚举,您也可以在现有行中添加一个元组。

    df = df.rdd\
        .map(lambda x: x + (hash(str(x["Amount"])),))\
        .toDF(df.columns + ["Hash"])\
    

    我还应该指出,如果散列值是您的最终目标,还有一个 pyspark 函数 pyspark.sql.functions.hash 可用于避免序列化为 rdd

    import pyspark.sql.functions as f
    df.withColumn("Hash", f.hash("Amount")).show()
    #+------+------+----------+
    #|Person|Amount|      Hash|
    #+------+------+----------+
    #|   Bob|   562|  51343841|
    #|   Bob|   880|1241753636|
    #|   Bob|   380| 514174926|
    #|   Sue|    85|1944150283|
    #|   Sue|   963|1665082423|
    #+------+------+----------+
    

    这似乎使用了不同于 python 内置的散列算法。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-05-05
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-06-26
      • 1970-01-01
      相关资源
      最近更新 更多