【问题标题】:Pyspark RDD filter behaviour when filter condition variable is modified修改过滤条件变量时的 Pyspark RDD 过滤行为
【发布时间】:2022-01-10 21:58:22
【问题描述】:

当以下代码运行时:

A = ss.sparkContext.parallelize(range(1, 100))
t = 50
B = A.filter(lambda x: x < t)
print(B.count())
t = 10
C = B.filter(lambda x: x > t)
print(C.count())

输出是:

49
0

这是不正确的,因为在 10 和 49 之间有 39 个值。似乎将 t 从 50 更改为 10 也影响了第一个过滤器,并且它得到了重新评估,因此当两个过滤器连续应用时,它实际上变成了 @ 987654324@ 这将导致 1, 2, 3, 4, 5, 6 ,7, 8, 9 后跟 x&gt;10 导致空 rdd。

但是当我在代码中添加调试打印时,结果不是我所期望的,我正在寻找解释:

A = ss.sparkContext.parallelize(range(1, 100))
t = 50
B = A.filter(lambda x: x < t)
print(B.collect())
t = 10
print(B.collect())
print(B.count())
C = B.filter(lambda x: x > t)
print(C.collect())
print(C.count())

输出是:

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49]
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49]
9
[]
0

为什么t=10 之后的计数是 10,但 print(B.collect()) 显示的预期 rdd 值从 1 到 49?如果在更改t 后触发收集重新执行filter,那么collect() 不应该显示1-9 的值吗?

我是 pyspark 的新手,我怀疑这与 spark 的惰性操作和缓存有关。有人可以解释幕后发生的事情吗?

谢谢!

【问题讨论】:

    标签: pyspark filter rdd python-3.9


    【解决方案1】:

    您的假设是正确的,观察到的行为与 Spark 对转换的惰性评估有关。

    B.count() 被执行时,Spark 只需将过滤器x &lt; tt = 50 一起应用并打印49 的预期值。

    C.count()执行时,Spark看到C的执行计划中有两个过滤器,分别是x &lt; tx &gt; t。此时t 已设置为10,并且没有任何 rdd 元素满足小于和大于10 的条件。 Spark 忽略了第一个过滤器已经被评估的事实。当调用 Spark 操作时,所有当前rdd历史中的转换都会被执行(除非缓存了一些中间结果,见下文)。

    更详细地检查此行为的一种方法(稍微)是切换到 Scala 并为两个 rdds 打印 toDebugString1

    println(B.toDebugString)
    

    打印

    (4) MapPartitionsRDD[1] at filter at SparkStarter.scala:23 []
     |  ParallelCollectionRDD[0] at parallelize at SparkStarter.scala:19 []
    

    同时

    println(C.toDebugString)
    

    打印

    (4) MapPartitionsRDD[2] at filter at SparkStarter.scala:28 []
     |  MapPartitionsRDD[1] at filter at SparkStarter.scala:23 []
     |  ParallelCollectionRDD[0] at parallelize at SparkStarter.scala:19 []
    

    在这里我们可以看到,对于 rdd B 应用了一个过滤器,对于 rdd C 应用了两个过滤器。

    如何解决这个问题?

    如果第一个过滤器的结果被缓存,则打印出预期的结果。然后 t 更改并应用第二个过滤器 C.count() 仅根据 B 的缓存结果触发第二个过滤器:

    A = ss.sparkContext.parallelize(range(1, 100))
    t = 50
    B = A.filter(lambda x: x < t).cache()
    print(B.count())
    t = 10
    C = B.filter(lambda x: x > t)
    print(C.count())
    

    打印预期结果。

    49
    39
    

    1 不幸的是,这只适用于 Scala 版本的 Spark。 PySpark 似乎“压缩”了toDebugString(版本 3.1.1)的输出。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多