【发布时间】:2019-01-19 07:43:22
【问题描述】:
我有
2 RDD(在 pyspark 中)在 rdd1=(id1, value1) 和 rdd2=(id2, value2) 形式上,其中 id 是唯一的(即所有 id1 都不同于 id2)。
我在resultRDD=((id1, id2), value3) 表格上有第三个 RDD。我想过滤后者,以便只保留 value3 > (value1+value2) 的元素。
如果我访问 rdd1 和 rdd2 我得到以下异常:
pickle.PicklingError: Could not serialize object: Exception: It appears that you
are attempting to broadcast an RDD or reference an RDD from an action or transf
ormation. RDD transformations and actions can only be invoked by the driver, not
inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.co
unt() * x) is invalid because the values transformation and count action cannot
be performed inside of the rdd1.map transformation. For more information, see SP
ARK-5063.
那么访问 rdd1 和 rdd2 以过滤 resultRDD 的最佳策略是什么?
solution1:
如果我 brodcast rdd1 和 rdd2 它可以工作,但我认为它不是优化的解决方案,因为 rdd1 和 rdd2 很大。
solution2:
我们可以收集 rdd1 和 rdd2,而不是广播 rdd1 和 rdd2,因此我们可以进行过滤。那么在我的情况下,有效的解决方案是什么?
我的函数看起来像:
def filterResultRDD(resultRDD, rdd1, rdd2):
source = rdd1.collect()
target = rdd2.collect()
f = resultRDD.filter(lambda t: t[1] >= getElement(source, t[0][0])+ getElement(target, t[0][1])).cache()
return f
def getElement(mydata, key):
return [item[1] for item in mydata if item[0] == key][0]
【问题讨论】:
-
用
join怎么样? -
@OmG 在这种情况下我该如何使用 join?
-
将它们转换为 spark 数据框并使用 join。
标签: python apache-spark pyspark