【问题标题】:pyspark join rdds by a specific keypyspark 通过特定键加入 rdds
【发布时间】:2017-08-06 21:52:58
【问题描述】:

我有两个 rdd 需要将它们连接在一起。它们如下所示:

RDD1

[(u'2', u'100', 2),
 (u'1', u'300', 1),
 (u'1', u'200', 1)]

RDD2

[(u'1', u'2'), (u'1', u'3')]

我想要的输出是:

[(u'1', u'2', u'100', 2)]

所以我想从 RDD2 中选择具有相同第二个 RDD1 值的那些。我尝试过加入,也尝试过笛卡尔,但都没有工作,甚至没有接近我正在寻找的东西。我是 Spark 的新手,非常感谢你们的帮助。

谢谢

【问题讨论】:

  • 你允许在这个解决方案中使用 Spark Dataframe 吗?

标签: join pyspark rdd


【解决方案1】:

Dataframe 如果您允许在解决方案中使用 Spark Dataframe。您可以将给定的 RDD 转换为数据框并将相应的列连接在一起。

df1 = spark.createDataFrame(rdd1, schema=['a', 'b', 'c'])
df2 = spark.createDataFrame(rdd2, schema=['d', 'a'])
rdd_join = df1.join(df2, on='a')
out = rdd_join.rdd.collect()

RDD 只需将要加入的密钥压缩到第一个元素,然后使用join 进行加入

rdd1_zip = rdd1.map(lambda x: (x[0], (x[1], x[2])))
rdd2_zip = rdd2.map(lambda x: (x[1], x[0]))
rdd_join = rdd1_zip.join(rdd2_zip)
rdd_out = rdd_join.map(lambda x: (x[0], x[1][0][0], x[1][0][1], x[1][1])).collect() # flatten the rdd
print(rdd_out)

【讨论】:

  • 这看起来也很酷,但我想要一个使用 rdds 操作而不是数据帧的解决方案。
  • @dagg3r 当然,如果我可以使用 RDD 解决这个问题,我会更新答案!
【解决方案2】:

对我来说,您的过程看起来像手动的。这是示例代码:-

rdd = sc.parallelize([(u'2', u'100', 2),(u'1', u'300', 1),(u'1', u'200', 1)])
rdd1 = sc.parallelize([(u'1', u'2'), (u'1', u'3')])
newRdd = rdd1.map(lambda x:(x[1],x[0])).join(rdd.map(lambda x:(x[0],(x[1],x[2]))))
newRdd.map(lambda x:(x[1][0], x[0], x[1][1][0], x[1][1][1])).coalesce(1).collect()

输出:-

[(u'1', u'2', u'100', 2)]

【讨论】:

  • 我对 RDD 的了解有限。您能否详细说明最后一个地图功能。 newRdd.map(lambda x:(x[1][0], x[0], x[1][1][0], x[1][1][1]))。谢谢
  • @vikrantrana 我使用第二张地图以所需格式输出[(u'1', u'2', u'100', 2)] 否则没有第二张地图将是[('2', ('1', ('100', 2)))]
猜你喜欢
  • 1970-01-01
  • 2016-03-25
  • 1970-01-01
  • 2015-12-16
  • 1970-01-01
  • 2021-02-25
  • 2021-05-16
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多