【问题标题】:joing two Rdds with multipule values and adding extra value based on join In Pyspark?加入两个具有多个值的 Rdd 并基于加入 Pyspark 添加额外的值?
【发布时间】:2018-12-04 01:51:01
【问题描述】:

我创建了 2 个RDD's,如下所示:

rdd1 = sc.parallelize([(u'176', u'244', -0.03925566875021147), (u'28', u'244', 0.9175106515709205), (u'165', u'244', -0.3837580218245722), (u'181', u'244', 0.29145693160561503), (u'161', u'244', -0.503468718448459), (u'28', u'275', 1.1636548589189926), (u'165', u'275', -1.026158464467282), (u'181', u'275', 0.6685791983070568)])

rdd2 = sc.parallelize([(u'176', u'244'), (u'28', u'244'), (u'165', u'244'), (u'165', u'275'), (u'181', u'275'), (u'141', u'388'), (u'154', u'238')])

我的预期输出应该如下所示:

[(u'176', u'244', -0.03925566875021147,1), (u'28', u'244', 0.9175106515709205,1), (u'165', u'244', -0.3837580218245722,1), (u'181', u'244', 0.29145693160561503,0), (u'161', u'244', -0.503468718448459,0), (u'28', u'275', 1.1636548589189926,0), (u'165', u'275', -1.026158464467282,1), (u'181', u'275', 0.6685791983070568,1)]

我想加入两个 rdds 添加加入状态,如 1 或 0。

在 rdd1 第一个元组是 (u'176', u'244', -0.03925566875021147) 并且 rdd2 包含 (u'176', u'244'),rdd1,rdd2 的前两个元素相同,那么我的预期输出是 (u'176', u'244', -0.03925566875021147,1)

在 Rdd1 的情况下相同:(u'181', u'275', 0.6685791983070568) 和 Rdd2 :(u'181', u'275') 输出将是 (u'181', u'275', 0.6685791983070568,1)

其他情况: rdd1 包含 (u'181', u'244', 0.29145693160561503) 但 rdd2 不包含任何像 (u'181', u'244') 这样的元组,所以预期的输出将是 (u'181', u'244', 0.29145693160561503,0)

我通过创建数据框实现了这一点,但我不想使用数据框连接。请帮助解决如何使用 rdds 来实现。

【问题讨论】:

    标签: python apache-spark pyspark apache-spark-sql pyspark-sql


    【解决方案1】:

    要在 rdd 方法中做到这一点,您必须将 rdd 与要加入的列配对。然后执行此和其他的左外连接。对于其中的每个元素 (k, v),生成的 RDD 将包含其他中 w 的所有对 (k, (v, Some(w))),如果没有则包含对 (k, (v, None)) other 中的元素具有键 k。

     userRDD.leftOuterJoin(empRDD).collect {
            case (String, (firstrddvalue, None)) => (k,v,0)
            case (String, (firstrddvalue,secondrddvalue))=>(k,v,1)
          }
    

    【讨论】:

      【解决方案2】:

      我想加入两个 rdds 添加加入状态,例如 1 或 0

      要加入 rdd,您需要 pairedRdd

      pairedRdd1 = rdd1.map(lambda x: ((x[0], x[1]), x[2:]))
      pairedRdd2 = rdd2.map(lambda x: ((x[0], x[1]), 1))
      

      在这里,我在 pairedRdd2 中填充了 1,因为您的输出要求是 1 用于匹配 rdd1 中的 rdd2。

      最后,使用leftOuterJoin 并对预期输出进行一些操作

      finalRdd = pairedRdd1.leftOuterJoin(pairedRdd2).map(lambda x: tuple(list(x[0]) + list(x[1][0]) + [0 if(x[1][1] == None) else 1]))
      #[('161', '244', -0.503468718448459, 0),('165', '244', -0.3837580218245722, 1),('181', '244', 0.29145693160561503, 0),('165', '275', -1.026158464467282, 1),('181', '275', 0.6685791983070568, 1),('176', '244', -0.03925566875021147, 1),('28', '275', 1.1636548589189926, 0),('28', '244', 0.9175106515709205, 1)]
      

      希望回答对你有帮助

      【讨论】:

        猜你喜欢
        • 2021-02-12
        • 1970-01-01
        • 2021-04-04
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2020-03-28
        • 2023-03-08
        • 1970-01-01
        相关资源
        最近更新 更多