【问题标题】:how to access an RDD in another RDD?如何访问另一个RDD中的RDD?
【发布时间】:2019-01-19 07:43:22
【问题描述】:

我有 2 RDD(在 pyspark 中)在 rdd1=(id1, value1)rdd2=(id2, value2) 形式上,其中 id 是唯一的(即所有 id1 都不同于 id2)。

我在resultRDD=((id1, id2), value3) 表格上有第三个 RDD。我想过滤后者,以便只保留 value3 > (value1+value2) 的元素。

如果我访问 rdd1 和 rdd2 我得到以下异常:

pickle.PicklingError: Could not serialize object: Exception: It appears that you
 are attempting to broadcast an RDD or reference an RDD from an action or transf
ormation. RDD transformations and actions can only be invoked by the driver, not
 inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.co
unt() * x) is invalid because the values transformation and count action cannot
be performed inside of the rdd1.map transformation. For more information, see SP
ARK-5063.

那么访问 rdd1 和 rdd2 以过滤 resultRDD 的最佳策略是什么?

solution1:

如果我 brodcast rdd1 和 rdd2 它可以工作,但我认为它不是优化的解决方案,因为 rdd1 和 rdd2 很大。

solution2:

我们可以收集 rdd1 和 rdd2,而不是广播 rdd1 和 rdd2,因此我们可以进行过滤。那么在我的情况下,有效的解决方案是什么?

我的函数看起来像:

def filterResultRDD(resultRDD, rdd1, rdd2):


    source = rdd1.collect()
    target = rdd2.collect()
    f = resultRDD.filter(lambda t: t[1] >= getElement(source, t[0][0])+ getElement(target, t[0][1])).cache()
    return f

def getElement(mydata, key):
    return [item[1] for item in mydata if item[0] == key][0]    

【问题讨论】:

  • join怎么样?
  • @OmG 在这种情况下我该如何使用 join?
  • 将它们转换为 spark 数据框并使用 join。

标签: python apache-spark pyspark


【解决方案1】:

首先关于您建议的解决方案:
solution2
永远不要收集 rdd。
如果您收集 rdd,这意味着您的解决方案将不可扩展,或者这意味着您首先不需要 rdd。
解决方案 1
类似于对解决方案 2 的引用,但有一些例外,您的案例不是这些例外之一。

如前所述,执行此操作的“火花”方式是使用“join”。
当然,无需转换为 spark dataframe。

这里有一个解决方案:

rdd1 = sc.parallelize([('a', 1), ('b', 2), ('c', 3), ('d', 4), ('e', 5)])
rdd2 = sc.parallelize([('aa', 1), ('bb', 2), ('cc', 3), ('dd', 4), ('ee', 5)])
rdd3 = sc.parallelize([(('a', 'aa'), 1), (('b', 'dd'), 8), (('e', 'aa'), 34), (('c', 'ab'), 23)])

print rdd3.map(lambda x: (x[0][0], (x[0][1], x[1])))\
.join(rdd1)\
.map(lambda x: (x[1][0][0], (x[0], x[1][0][1], x[1][1]))).join(rdd2)\
.filter(lambda x: x[1][0][1] > x[1][0][2] + x[1][1])\
.map(lambda x: ((x[1][0][0], x[0]), x[1][0][1]))\
.collect()

--> [(('b', 'dd'), 8), (('e', 'aa'), 34)]

【讨论】:

  • 您的解决方案非常适合。了解更多信息。如果我被用于广播,会有什么例外
  • 如果 rdds 的长度不同怎么办?你还能这样加入他们吗?
  • 不确定“长度”是什么意思。在示例中,rdd3 有 4 个元素,rdd1 有 5 个元素,因此它们的“长度”已经不同。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-12-23
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-08-23
  • 1970-01-01
相关资源
最近更新 更多