【问题标题】:Update global variables by using map reduce使用 map reduce 更新全局变量
【发布时间】:2020-07-01 06:24:03
【问题描述】:

假设我在 pyspark 中有这个:

def condi( x ):
    if x["age"] <= 2:
        return True
    else:
        return False

def add_count( x ):
    global aa
    aa += 1
    x["count"] += 10000
    return x

sc = pyspark.SparkContext(  master = 'spark://192.168.56.103:7077',appName = 'test' )

data = [{"age":1,"count":10},{"age":2,"count":20},{"age":3,"count":30}]

data = sc.parallelize( data )

global aa
aa = 0

k = data.map( lambda x : add_count( x ) if condi( x ) else x )

print( k.collect() )
print( aa )

这样的输出:

[{'count': 10010, 'age': 1}, {'count': 10020, 'age': 2}, {'count': 30, 'age': 3}] # data
0 # aa

全局变量aa根本没有修改。

如何使用 map reduce 修改全局变量?

【问题讨论】:

  • 什么全局变量a?我只看到一个全局变量aa。无论如何,您是否尝试检查add_count 是否真的被调用,例如通过print在那里发送消息?
  • 我认为它不会在所有执行者上都可用。考虑使用广播变量-spark.apache.org/docs/latest/api/python/…
  • @Karl Knechtel 是的,实际上 add_count 被调用了!

标签: python apache-spark pyspark mapreduce


【解决方案1】:

您需要将aa 声明为Accumulator,因此它将被所有执行者共享。请使用

aa = sc.accumulator(0)

而不是

aa = 0

修改后,打印出来的值为2

说明:每个执行器都使用自己的本地变量副本。因此,将 +1 添加到执行程序上的aa 的一个副本 不会更改驱动程序上aa 的值。语句print( aa ) 在驱动程序上执行,因此看不到执行程序上的更改。

您也可以查看this question

【讨论】:

  • 如果我想做乘法或除法怎么办?
  • afaik,普通的普通累加器只支持add。对于更多操作,您必须创建自己的 AccumulatorParam 并实现自定义 addInPlace 方法,该方法将执行乘法或任何所需的操作。但是到目前为止我还没有这样做......
猜你喜欢
  • 2020-09-18
  • 1970-01-01
  • 2014-07-20
  • 2022-01-04
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2022-09-22
相关资源
最近更新 更多