【问题标题】:pyspark udf with parameter带有参数的pyspark udf
【发布时间】:2020-12-20 02:53:21
【问题描述】:

需要将一个pyspark数据帧列checkin_time从milisec传输到时区调整时间戳,时区信息在另一列tz_info中。

尝试以下:

def tz_adjust(x,tz_info):
    if tz_info:
        y = col(x)+ col(tz_info) 
        return from_unixtime(col(y)/1000)
    else:
        return from_unixtime(col(x)/1000)
    
def udf_tz_adjust(tz_info):
    return udf(lambda l: tz_adjust(l, tz_info))     

同时使用这个udf到列

df.withColumn('checkin_time',udf_tz_adjust('time_zone')(col('checkin_time')))

got some error:
AttributeError: 'NoneType' object has no attribute '_jvm'

知道将第二列作为参数传递给 udf 吗? 谢谢。

【问题讨论】:

    标签: pyspark user-defined-functions


    【解决方案1】:

    恕我直言,您正在做的是 UDF 和偏函数的组合,这可能会变得很棘手。我认为您根本不需要将 UDF 用于您的应用程序目的。您可以执行以下操作

    #not tested
    
    from pyspark.sql.functions import *
    
    df.withColumn('checkin_time', when(col("tz_info").isNotNull(), (from_unixtime(col('checkin_time')) + F.col("tz_info"))/1000).otherwise(from_unixtime(col("checkin_time"))/1000))
    
    

    UDF 有其自己的serde 效率低下,这在与 python 一起使用时更糟糕,因为它将 scala 数据类型转换为 python 数据类型会产生额外的开销。

    【讨论】:

    • 是的,您的解决方案是正确的。这是我已经实施的一种解决方案。还有其他几个列需要相同的处理过程,所以我认为 udf 可能更具可扩展性。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-05-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多