【问题标题】:Lazy foreach on a Spark RDDSpark RDD 上的惰性 foreach
【发布时间】:2015-07-21 15:02:42
【问题描述】:

我有一个很大的字符串 RDD(通过几个 sc.textFile(...)) 的联合获得。

我现在想在该 RDD 中搜索给定的字符串,并且我希望在找到“足够好”的匹配项时停止搜索。

为此,我可以改造 foreachfiltermap,但所有这些都将遍历该 RDD 中的每个元素,无论是否已达到匹配。

有没有办法缩短这个过程并避免遍历整个 RDD?

【问题讨论】:

    标签: apache-spark rdd lazy-sequences


    【解决方案1】:

    我可以为此目的改造 foreach、过滤器或映射,但所有这些都将遍历该 RDD 中的每个元素

    其实你错了。如果您限制结果(使用takefirst),Spark 引擎足够智能以优化计算:

    import numpy as np
    from __future__ import print_function
    
    np.random.seed(323)
    
    acc = sc.accumulator(0)
    
    def good_enough(x, threshold=7000):
        global acc
        acc += 1
        return x > threshold
    
    rdd = sc.parallelize(np.random.randint(0, 10000) for i in xrange(1000000))
    
    x = rdd.filter(good_enough).first()
    

    现在让我们检查累积:

    >>> print("Checked {0} items, found {1}".format(acc.value, x))
    Checked 6 items, found 7109
    

    只是为了确定一切是否按预期工作:

    acc = sc.accumulator(0)
    rdd.filter(lambda x: good_enough(x, 100000)).take(1)
    assert acc.value == rdd.count()
    

    同样的事情可以做,可能使用数据帧和 udf 以更有效的方式。

    注意:在某些情况下,甚至可以在 Spark 中使用无限序列并仍然得到结果。例如,您可以查看我对Spark FlatMap function for huge lists 的回答。

    【讨论】:

      【解决方案2】:

      不是真的。没有find 方法,就像在启发 Spark API 的 Scala 集合中一样,一旦找到满足谓词的元素,它将停止查找。可能你最好的选择是使用可以最大限度地减少过度扫描的数据源,比如 Cassandra,驱动程序会在其中下推一些查询参数。您还可以查看名为 BlinkDB 的更具实验性的伯克利项目。

      归根结底,Spark 更多地是为扫描数据集而设计的,例如之前的 MapReduce,而不是传统的类似数据库的查询。

      【讨论】:

      • 据我所知,RDD 实际上更像 Scala 惰性集合。请检查my answer,如果您有任何cmets,请告诉我。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2014-06-17
      • 2014-11-05
      • 2019-09-01
      • 1970-01-01
      • 2020-12-18
      相关资源
      最近更新 更多