【问题标题】:strange behaviour of spark filter火花过滤器的奇怪行为
【发布时间】:2018-03-18 12:57:48
【问题描述】:
A = sc.parallelize(xrange(1, 100))
t = 50
B = A.filter(lambda x: x < t)
print B.collect()
t = 10
C = B.filter(lambda x: x > t)
print C.collect()

我想从 A 中取出所有低于 50 的数字并放入 B,然后从 B 中取出所有高于 10 的数字并将它们放入 C。 但是,C.collect() 的结果是空数组。

但是,如果我改变了

m = 10
C = B.filter(lambda x: x > m)

它会正常工作的。 我不明白为什么,在这个操作上它需要以前的t

【问题讨论】:

    标签: python apache-spark pyspark rdd


    【解决方案1】:

    我不明白为什么,在这个操作中它需要之前的 t 值

    它实际上不是引用之前的 t=50,而是 引用 t=10 本身

    如果你编写一个自定义函数来打印正在发生的事情会更明显

    A = sc.parallelize(xrange(1, 100))
    t = 50
    B = A.filter(lambda x: x < t)
    B.collect()
    t = 10
    def filters(x):
        print x, t
        return x > t
    C = B.filter(lambda x: filters(x))
    print C.collect()
    

    filters 函数将打印如下

    1 10
    2 10
    3 10
    4 10
    5 10
    6 10
    7 10
    8 10
    9 10
    

    它表明 t 是 10 并且 B = A.filter(lambda x: x &lt; t) 再次被 t=10 调用 即B 现在有 1 到 9,当调用 .filter(lambda x: x &gt; t) 时,它返回一个 空列表 因为没有大于 10 的数字

    正如Spark's official documentation 所说的那样

    默认情况下,每个转换后的 RDD 可能会在您每次对其执行操作时重新计算。但是,您也可以使用持久(或缓存)方法将 RDD 持久化到内存中,在这种情况下,Spark 会将元素保留在集群上,以便下次查询时更快地访问它。还支持在磁盘上持久化 RDD,或跨多个节点复制。

    【讨论】:

      猜你喜欢
      • 2022-01-27
      • 1970-01-01
      • 2020-07-18
      • 1970-01-01
      • 2017-06-02
      • 1970-01-01
      • 2019-01-07
      • 2022-08-19
      • 1970-01-01
      相关资源
      最近更新 更多