【问题标题】:Appending map functions to PySpark RDD inside for loop在 for 循环中将映射函数附加到 PySpark RDD
【发布时间】:2017-11-29 12:17:20
【问题描述】:

有人可以帮我理解在 python for 循环中将映射函数附加到 RDD 的行为吗?

对于以下代码:

rdd = spark.sparkContext.parallelize([[1], [2], [3]])

def appender(l, i):
    return l + [i]

for i in range(3):
    rdd = rdd.map(lambda x: appender(x, i))

rdd.collect()

我得到了输出:

[[1, 2, 2, 2], [2, 2, 2, 2], [3, 2, 2, 2]]

而使用以下代码:

rdd = spark.sparkContext.parallelize([[1], [2], [3]])

def appender(l, i):
    return l + [i]

rdd = rdd.map(lambda x: appender(x, 1))
rdd = rdd.map(lambda x: appender(x, 2))
rdd = rdd.map(lambda x: appender(x, 3))

rdd.collect()

我得到了预期的输出:

[[1, 1, 2, 3], [2, 1, 2, 3], [3, 1, 2, 3]]

我想这与传递给 PySpark 编译器的闭包有关,但我找不到任何关于此的文档...

【问题讨论】:

    标签: apache-spark pyspark


    【解决方案1】:

    我最好的猜测是因为懒惰的评估: 而且你的射程也很差。

    这两个代码 sn-ps 产生相同的输出:

    rdd = spark.sparkContext.parallelize([[1], [2], [3]])
    
    def appender(l, i):
        return l + [i]
    
    for i in range(1,4):
        rdd = spark.sparkContext.parallelize(rdd.map(lambda x: appender(x, i)).collect())
    
    rdd.collect()
    

    输出:

    [[1, 1, 2, 3], [2, 1, 2, 3], [3, 1, 2, 3]]
    

    第二个:

    rdd = spark.sparkContext.parallelize([[1], [2], [3]])
    
    rdd = rdd.map(lambda x: appender(x, 1))
    rdd = rdd.map(lambda x: appender(x, 2))
    rdd = rdd.map(lambda x: appender(x, 3))
    
    rdd.collect()
    

    输出:

    [[1, 1, 2, 3], [2, 1, 2, 3], [3, 1, 2, 3]]
    

    另外,为了显示在简化示例(仅输入 1 和 2)中 for 循环中发生的情况,修改了 appender 函数以打印 l 参数:

    1. for 循环打印:

      [2]
      [2, 2]
      [1]
      [3]
      [1, 2]
      [3, 2]
      

    首先它从输入列表中获取第二个字段

    1. 映射器输出的显式写入是:

      [1]
      [1, 1]
      [2]
      [2, 1]
      [3]
      [3, 1]
      

    【讨论】:

    • 嗯,从我(和我的 python 解释器)的角度来看,这个范围没有错。 docs.python.org/2/library/functions.html#range
    • 并行化 rdd.map 函数当然也不是我想做的。应该使用 Parallelize 在集群上分发现有集合。请记住,这只是测试伪代码。
    • ` Python 2.7.12 >>> for i in range(3): ... print(i) 0 1 2 ` 和第二个 sn-p 你输入数字:1,2 ,3
    【解决方案2】:

    解决方案是将所有全局变量(在本例中为 i)存储在 lambda 函数中,以确保正确关闭。这可以通过

    for i in range(3):
        rdd = rdd.map(lambda x, i=i: appender(x, i))
    

    更多信息请访问lambda function accessing outside variable

    有趣的是,至少在本地集群上(还没有在分布式集群上测试过),这个问题也可以通过持久化中间rdd来解决:

    for i in range(3):
        rdd = rdd.map(lambda x: appender(x, i))
        rdd.persist()
    

    两种解决方案都产生

    [[1, 0, 1, 2], [2, 0, 1, 2], [3, 0, 1, 2]] 
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2019-10-15
      • 1970-01-01
      • 1970-01-01
      • 2021-01-16
      • 2017-08-20
      • 2018-07-13
      • 2017-10-21
      相关资源
      最近更新 更多