【问题标题】:Pyspark and local variables inside UDFsUDF 中的 Pyspark 和局部变量
【发布时间】:2019-04-14 12:19:00
【问题描述】:

当我定义一个局部变量(例如一个庞大的复杂对象列表)并在 pyspark 的 UDF 中使用它时,究竟会发生什么。让我以此为例:

huge_list = [<object_1>, <object_2>, ..., <object_n>]

@udf
def some_function(a, b):
    l = []
    for obj in huge_list:
        l.append(a.operation(obj))
    return l

df2 = df.withColumn('foo', some_function(col('a'), col('b')))

它是自动广播的吗?还是节点每次都与主节点通信以获取其数据?这种方法对我有什么性能惩罚?有更好的吗? (考虑到每次应用 UDF 时都从头开始构建 huge_list 会更糟糕)

【问题讨论】:

    标签: python apache-spark pyspark user-defined-functions


    【解决方案1】:

    查看代码,可以看到以下情况:每个 udf this function 调用一次,它通过 CloudPickleSerializerthis 函数中腌制可调用对象。它还具有将腌制可调用对象的大小与 1Mb 的hardcoded threshold 进行比较的逻辑。如果大小较大,则广播腌制命令并改为腌制pyspark.broadcast.Broadcast 类型的对象(其序列化值显然非常短,因为该对象几乎是一个引用)。读取腌制可调用的地方似乎是here。我的理解是,执行者从头开始为每个新任务执行创建一个 python 进程。对于每个使用过的 udf,它会获取 pickled 命令并将其 unpickle,或者(对于广播)将需要从 JVM 获取广播的值并 unpickle。

    据我了解,如果在此处创建 pyspark.broadcast.Broadcast 对象,则所有执行程序都将保留其值,以供该执行程序将创建的 python worker.py 进程将来进行的所有查找。

    因此,如果您想回答某个函数是否会被广播的问题,您可以重复 pyspark 所做的相同操作,查看腌制对象是否大于 1Mb,例如像这样:

    from pyspark.serializers import CloudPickleSerializer
    ser = CloudPickleSerializer()
    x = [i**2 for i in range(10**5)]
    v = ser.dumps(lambda : x)
    print(len(v)) # 607434 - less than 1Mb, won't be broadcast
    

    关于替代方法,我认为我看到的唯一替代方法(除了每次调用 udf'ed 函数时创建新对象,这已经被解释为过于昂贵)将创建一个模块,该模块将在输入。在这种情况下,将为每个任务执行创建一次对象。因此,这几乎可以让您选择(a)如果您只允许 udf 函数捕获它,则通过 CloudPickleSerializer 在每次任务执行时反序列化一次对象,或者(b)通过导入模块在每次任务执行时创建一次对象。更快的是一个单独的问题 - 但我想答案可能取决于所讨论的对象。在每种情况下,它似乎都相当容易衡量。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-04-04
      • 2013-10-06
      • 2012-06-09
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多