【问题标题】:Correct using of broadcast in Spark在 Spark 中正确使用广播
【发布时间】:2016-02-03 20:37:46
【问题描述】:

假设我有一个 RDD,例如(使用 pyspark):

 RDDstrings = sc.parallelize(['alpha','alpha4','veta','gamma','delta'])

我想通过使用字符串距离函数将每个字符串与其他字符串进行比较。因此,在这种情况下,最终结果将是一个 5x5 矩阵,其对角线的值为 1,因为字符串值与其在对角线上的值进行比较。

所以,我的想法如下:

  1. 我创建了一个函数StringDistance(string,alist),它将string 与字符串的alist 进行比较。
  2. 广播RDDstrings

    broadcastedRDDstrings = sc.broadcast(RDDstrings.collect())
    
  3. 映射初始RDD:

     stringsDistances = RDDstrings.map(lambda string:StringDistance(string,broadcastedRDDstrings.value))
    

所以在这个转换中,我将初始 RDD 中的每个字符串与稍后广播的相同 RDD 字符串进行比较。

结果是正确的。但我的问题是,这是否是实现它的最佳方式,以及这是否是广播的正确用法。还是我应该 cache() 初始 RDD?

欢迎任何负面反馈。

【问题讨论】:

    标签: apache-spark pyspark broadcast


    【解决方案1】:

    我会建议另一种方法。

    def patheticDistance(a, b):
        return len(a) - len(b)
    
    rdd = sc.parallelize(["maritza", "alberto", "andres", "dakota", "miguel"]).sortBy(lambda x: x)
    
    distances = (rdd.cartesian(rdd)
                    .groupByKey()
                    .map(lambda (x, y): [patheticDistance(x, a) for a in y])))
    
    distances.collect()
    # [[-1, 0, 0, -1, 0],
    #  [-1, 0, 0, -1, 0],
    #  [ 0, 1, 1,  0, 1],
    #  [-1, 0, 0, -1, 0],
    #  [ 0, 1, 1,  0, 1]]
    

    【讨论】:

      【解决方案2】:

      你可以在RDD上使用cartesian方法,比如

      rdd.cartesian(rdd)

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2019-10-03
        • 1970-01-01
        • 1970-01-01
        • 2018-11-12
        • 1970-01-01
        相关资源
        最近更新 更多