【问题标题】:Pyspark - Loop n times - Each loop gets gradually slowerPyspark - 循环 n 次 - 每个循环逐渐变慢
【发布时间】:2022-09-27 17:18:24
【问题描述】:

所以基本上我想通过我的数据框循环 n 次并在每个循环中应用一个函数 (执行连接)。 我的 test-Dataframe 就像 1000 行,在每次迭代中,只会添加一列。 前三个循环立即执行,然后它变得非常慢。 第 10 个循环例如需要10多分钟。

我不明白为什么会发生这种情况,因为我的 Dataframe 在行数方面不会变大。 如果我用 n=20 调用我的函数,例如,连接会立即执行。 但是当我迭代循环 20 次时,它很快就会卡住。

你知道什么可能导致这个问题吗?

  • 您将不得不分享一些示例代码或您在数据框中尝试执行的操作。使用 Pyspark,您很可能能够删除循环并将所有内容包含在一个事务中。您应该能够为第一个循环定义逻辑,然后为下一个循环定义执行,它应该按该顺序执行。如果您在每次代码进入循环时分配一个新的 DF,您应该记住您的集群资源将在事务期间被阻塞
  • 我添加了来自另一个主题的示例代码,该主题似乎面临我遇到的类似问题。他的解决方案是在每次迭代结束时将 DF 写入 HDFS,但我觉得这不是最好的解决方案。你是对的,在每次迭代中,我重新分配新的 DF,并在每个循环中添加列。你建议做什么。保存每次迭代的信息并将其与初始 DF 组合一次?

标签: apache-spark pyspark


【解决方案1】:

来自Evaluating Spark DataFrame in loop slows down with every iteration, all work done by controller 的示例代码

import time
from pyspark import SparkContext

sc = SparkContext()

def push_and_pop(rdd):
    # two transformations: moves the head element to the tail
    first = rdd.first()
    return rdd.filter(
        lambda obj: obj != first
    ).union(
        sc.parallelize([first])
    )

def serialize_and_deserialize(rdd):
    # perform a collect() action to evaluate the rdd and create a new instance
    return sc.parallelize(rdd.collect())

def do_test(serialize=False):
    rdd = sc.parallelize(range(1000))
    for i in xrange(25):
        t0 = time.time()
        rdd = push_and_pop(rdd)
        if serialize:
            rdd = serialize_and_deserialize(rdd)
        print "%.3f" % (time.time() - t0)

do_test()

【讨论】:

    【解决方案2】:

    我已经解决了这个问题,每 n 次将 df 转换为 rdd 并返回 df。 代码现在运行得很快。但我不明白究竟是什么原因。如果我不进行转换,解释计划似乎在迭代期间上升得非常快。 此修复程序也在“高性能 Spark”一书中使用此解决方法发布。

    虽然 Catalyst 优化器非常强大,但其中一种情况是 它目前遇到的挑战是非常大的查询计划。 这些查询计划往往是迭代算法的结果,例如 图算法或机器学习算法。一种简单的解决方法 因为这是将数据转换为 RDD 并返回 每次迭代结束时的 DataFrame/Dataset

    【讨论】:

      猜你喜欢
      • 2021-05-28
      • 2020-01-05
      • 1970-01-01
      • 2011-08-16
      • 2017-04-28
      • 1970-01-01
      • 2018-10-13
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多