【问题标题】:pyspark operations not scaling uppyspark 操作没有扩大
【发布时间】:2020-05-29 16:07:16
【问题描述】:

我在笔记本中有一个代码可以正常工作,但是在具有无限计算和 java.lang.OutOfMemoryError: Java heap space 的更大数据上失败了。

流程如下:

模拟 pyspark 数据

我从一个包含 3 列的数据框开始,即(用户、时间和项目),如下面的代码中所模拟:

    from pyspark.sql.types import *
    from pyspark.context import SparkContext
    from pyspark.sql.session import SparkSession
    import pandas as pd
    sc = SparkContext.getOrCreate()
    spark = SparkSession(sc)

    df_schema = StructType([ StructField("User", StringType(), True)\
                       ,StructField("Time", IntegerType(), True)\
                       ,StructField("Item", StringType(), True)])
    pddf = pd.DataFrame([["u1",1,"A"],
                    ["u1",1,"A"],
                    ["u1",2,"A"],
                    ["u1",3,"B"],
                    ["u1",3,"C"],
                    ["u1",4,"B"],
                    ["u2",1,"D"],
                    ["u2",2,"D"],
                    ["u2",2,"A"],
                    ["u2",2,"F"],
                    ["u2",3,"D"],
                    ["u2",3,"A"],],columns=["User", "Time", "Item"])

    df = spark.createDataFrame(pddf,schema=df_schema)
    df.show()

给了

+----+----+----+
|User|Time|Item|
+----+----+----+
|  u1|   1|   A|
|  u1|   1|   A|
|  u1|   2|   A|
|  u1|   3|   B|
|  u1|   3|   C|
|  u1|   4|   B|
|  u2|   1|   D|
|  u2|   2|   D|
|  u2|   2|   A|
|  u2|   2|   F|
|  u2|   3|   D|
|  u2|   3|   A|
+----+----+----+

中间步骤

然后我为每个用户计算 topn 最常见的项目,并创建一个数据框,其中包含新列 uc(uc 表示不常见),如果项目在 topn 列表中,则设置为 0 em> 否则为 1。

    import pyspark.sql.functions as F
    from pyspark.sql import Window
    ArrayOfTupleType = ArrayType(StructType([
        StructField("itemId", StringType(), False),
        StructField("count", IntegerType(), False)
    ]))

    @F.udf(returnType=ArrayOfTupleType)
    def most_common(x, topn=2):
        from collections import Counter
        c = Counter(x)
        mc = c.most_common(topn)
        return mc
    topn=2
    w0 = Window.partitionBy("User")
    dfd = (df.withColumn("Item_freq", most_common(F.collect_list("Item").over(w0), F.lit(topn)))
             .select("User", "Time" , "Item" , "Item_freq")
             .withColumn("mcs", F.col("Item_freq.itemId"))
             .withColumn("uc", F.when(F.expr("array_contains(mcs, Item)"), 0).otherwise(1)).cache())

    dfd.select("User", "Time", "Item" , "mcs" , "uc").show()

它给出了下面的中间数据框

+----+----+----+------+---+
|User|Time|Item|mcs   |uc |
+----+----+----+------+---+
|u1  |1   |A   |[A, B]|0  |
|u1  |1   |A   |[A, B]|0  |
|u1  |2   |A   |[A, B]|0  |
|u1  |3   |B   |[A, B]|0  |
|u1  |3   |C   |[A, B]|1  |
|u1  |4   |B   |[A, B]|0  |
|u2  |1   |D   |[D, A]|0  |
|u2  |2   |D   |[D, A]|0  |
|u2  |2   |A   |[D, A]|0  |
|u2  |2   |F   |[D, A]|1  |
|u2  |3   |D   |[D, A]|0  |
|u2  |3   |A   |[D, A]|0  |
+----+----+----+------+---+

聚合步骤

然后我最后按用户和时间分组,这是对真实数据失败的操作

    uncommon = dfd.groupBy("User", "Time").agg(F.sum(F.col("uc")).alias("UncommonItem"))
    uncommon.orderBy("User", "Time", ascending=True).show()

它给出了虚拟数据的预期结果

+----+----+------------+
|User|Time|UncommonItem|
+----+----+------------+
|u1  |1   |0           |
|u1  |2   |0           |
|u1  |3   |1           |
|u1  |4   |0           |
|u2  |1   |0           |
|u2  |2   |1           |
|u2  |3   |0           |
+----+----+------------+

但它失败了 java.lang.OutOfMemoryError: Java heap space on real data。

将 spark.driver.memory 从 6G 增加到 60G 只会在更长的时间后发生崩溃,直到它填满 60G。我的真实数据有1907505个输入样本

我对 pyspark 不是很有经验,我不确定问题出在哪里。许多其他 groupby/agg 操作很快,并且不会在相同类型的数据上失败。因此,我怀疑问题出在我在上述中间步骤中制作数据框 dfd 的方式上。

关于如何优化代码的任何想法?

【问题讨论】:

  • 在哪一行得到OOM?
  • 当我调用 show() 方法时:uncommon.orderBy("User", "Time", ascending=True).show()。如果我们在上面一行调用 show() 方法,也会发生这种情况。我刚刚看到,在中间步骤(就在聚合之前)调用名为 dfd 的数据帧上的 count() 方法时,我也有 OOM
  • 您确定是 show 而不是 orderBy 导致 OOM 吗?检查一下。
  • 是的,我在省略 orderBy 时出现错误。我相信问题发生在之前,因为在groupByorderBy之前在dfd数据帧上调用count()方法时我也有OOM@和orderBy

标签: python pyspark out-of-memory


【解决方案1】:

如果你可以改变方法,你可以试一试:

import pyspark.sql.functions as F

topn=2
w = Window.partitionBy('User','Item')
df1 = df.withColumn("Counts",F.count('Item').over(w))

w1 = Window.partitionBy(df1["User"]).orderBy(df1['Counts'].desc())

(df1.withColumn("dummy",F.when(F.dense_rank().over(w1)<=topn,0).otherwise(1))
.groupBy('User','Time').agg(F.max("dummy").alias('UncommonItem'))).show()

+----+----+------------+
|User|Time|UncommonItem|
+----+----+------------+
|  u1|   1|           0|
|  u1|   2|           0|
|  u1|   3|           1|
|  u1|   4|           0|
|  u2|   1|           0|
|  u2|   2|           1|
|  u2|   3|           0|
+----+----+------------+

答案中的步骤:

  1. 通过用户和项目窗口获取计数
  2. 在 User 上获取 dense_rank 和 step1 中返回的 Count
  3. 只要排名在 2 (topn) 以内,则返回 1,否则返回 0 并将其命名为 dummy
  4. 按用户和时间分组并获得最大的假人

【讨论】:

  • 太棒了,速度很快,没有OOM了,谢谢。知道我的方法的瓶颈是什么吗?
  • @Urian 我觉得这种方法更重要,collect_list 这样的东西效率很低,而且 udfs 比内置方法花费的时间更长
猜你喜欢
  • 2019-05-11
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多