【问题标题】:pyspark Udf is not working as expected when apply map transformation with broadcast?使用广播应用地图转换时,pyspark Udf 未按预期工作?
【发布时间】:2018-12-07 14:28:29
【问题描述】:

我有如下两个列表

l=[['A', 'B', 'C'], ['A', 'C'], ['A', 'B', 'C'], ['A', 'B'],['B','C'],['B']]
x=[('A', 'B'), ('A', 'C')]

我想从列表l 列表中删除不包含列表x 中任何tuples 中的所有元素的所有元素。换句话说,x 中应该至少有一个tuple,其所有元组项都存在于l 的元素中。

根据我的last question,我在python中得到了以下解决方案:

print([l_ for l_ in l if any(all(e in l_ for e in x_) for x_ in x)])

产生所需的输出:

[['A', 'B', 'C'], ['A', 'C'], ['A', 'B', 'C'], ['A', 'B']]

现在我正在尝试使用 pyspark rdd 复制相同的操作,但我没有得到预期的结果。

这是我尝试过的:

rddsort=sc.parallelize(l)
broadcastVar = sc.broadcast(x)

def flist(unique_product_List,x):
    filter_list = [
        l_ for l_ in unique_product_List 
        if any(all(e in l_ for e in x_) for x_ in x)
    ]

    return filter_list

rddsort=rddsort.map(lambda flist(x[0],broadcastVar.value)) 
print(rddsort.collect())

结果我得到一个空列表列表:

[[], [], [], [], [], []]

但是我的预期结果应该和上面一样。

【问题讨论】:

  • 您不需要将广播变量传递给map 函数。通过广播它,它已经作为只读变量在每台机器上可用。
  • 问题是您在 map 函数中对 unique_product_List 进行列表理解。你认为这是在迭代什么?这不是您所想的 rdd 中的行,而是每一行中的元素。
  • 可以帮帮我。我该如何解决这个问题。

标签: apache-spark pyspark rdd user-defined-functions


【解决方案1】:

您需要对 rdd 进行过滤(不是地图)。过滤器将检查每一行的条件并删除不匹配的条件。这里的条件是行值 (list _l = l[0]) 应该包含 x 中的一个列表中的所有元素。

l=[['A', 'B', 'C'], ['A', 'C'], ['A', 'B', 'C'], ['A', 'B'],['B','C'],['B']]
x=[('A', 'B'), ('A', 'C')]
rddsort=sc.parallelize(l)

rddsort=rddsort.filter(lambda l_: any(all(e in l_ for e in x_) for x_ in x)) 
print(rddsort.collect())

输出

[['A', 'B', 'C'], ['A', 'C'], ['A', 'B', 'C'], ['A', 'B']]

更新: 在函数中使用广播变量:

l=[['A', 'B', 'C'], ['A', 'C'], ['A', 'B', 'C'], ['A', 'B'],['B','C'],['B']]
x=[('A', 'B'), ('A', 'C')]
rddsort=sc.parallelize(l)
broadcastVar = sc.broadcast(x)

def flist(row):
    filter_flag = any(all(e in l_ for e in x_) for x_ in broadcastVar.value)
    return filter_flag

rddsort=rddsort.filter(flist) 
print(rddsort.collect())

【讨论】:

  • 您好,感谢您的回复,但我想根据我的问题使用广播变量,请对此提供帮助。以上建议的解决方案我们没有使用。
  • 更新了在函数中使用广播变量的答案
猜你喜欢
  • 2016-01-04
  • 1970-01-01
  • 1970-01-01
  • 2021-12-01
  • 1970-01-01
  • 2016-11-28
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多