【问题标题】:Multiple iterations of sha2 on spark dataframe columnspark数据框列上sha2的多次迭代
【发布时间】:2020-03-12 01:04:42
【问题描述】:

我有一个用例,我需要在字符串列上计算 5000 轮 sha512。到目前为止,我尝试使用 pyspark 函数 sha2、python“旧”UDF 和 python pandas udf。我正在寻找一种加快计算速度的方法。

对于 pyspark 我无法定义使用 sha2 5000 次的列(即使在显示列定义时堆栈溢出) - 我使用循环定义它:

for _ in range(5000):
     column = sha2(column,512)

对于 python 我使用 hashlib 定义了类似的函数:

def sha(text):
    for _ in range(5000):
        text = hashlib.sha512(text.encode('utf-8')).hexdigest()

    return text

但它引入了序列化/反序列化和数据传输的开销。

我尝试为 pandas_udf 重写此函数,但不幸的是我的集群上的节点没有安装 pyarrow,在我需要工作原型之前它不会改变。

所以我正在寻找一种方法来加快速度。

  • 我不知道 scala 或 java,但我愿意尝试使用 scala/java udf 来加快速度 - 我是否正确假设在这种情况下切换到 scala/java udf 应该会加快速度?
  • pyspark 是否缺少我定义此类函数的方法?

编辑:我在 python 3.7 中使用 Spark 2.3。所以我无法访问 2.4 中引入的高阶函数

【问题讨论】:

    标签: apache-spark pyspark


    【解决方案1】:

    当然,您只能使用 SQL 函数来做到这一点。有

    df = spark.createDataFrame(["Hello World"], "string")
    

    Spark 2.4 或更高版本(使用某些专有平台时更早)您可以

    df.selectExpr("""aggregate(
        sequence(1, 5000),            -- Dummy sequence
        value,                        -- Init
        (acc, x) -> sha2(acc, 512)    -- Aggregation function
    ) AS hash""")
    

    Spark 3.1 或更高版本中,您可以

    from pyspark.sql.functions import aggregate, col, lit, sequence
    
    df.select(aggregate(
        sequence(lit(1), lit(5000)),     # Dummy sequence
        col("value"),                    # Init
        lambda acc, _: sha2(acc, 512)    # Aggregation function
    ).alias("hash"))
    

    编辑(如果您无法更新):

    在实践中,5000 轮散列可能足以抵消移动数据的成本,因此您应该可以使用普通的 udf,尤其是原型设计。

    【讨论】:

    • 不幸的是我有 spark 2.3 :(
    • re 你的编辑 - 这是一个公平的观点,我没有强调,但我认为我仍然会尝试为教育目的寻找优化 - 即尝试学习如何编写简单的 scala 或 java udf 和比较速度。如果没有人在一两天内给出更好的答案,我会接受你的。谢谢。
    猜你喜欢
    • 2018-08-21
    • 2021-11-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-06-14
    • 1970-01-01
    相关资源
    最近更新 更多