【问题标题】:How to do properly a full Outer Join of two RDDs with PySpark?如何使用 PySpark 正确完成两个 RDD 的完全外连接?
【发布时间】:2016-10-13 16:02:16
【问题描述】:

我正在寻找一种通过键组合两个 RDD 的方法。

给定:

x = sc.parallelize([('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', 'FR', '75001'),
                ('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', 'TN', '8160'),
               ]
              )
y = sc.parallelize([('_guid_oX6Lu2xxHtA_T93sK6igyW5RaHH1tAsWcF0RpNx_kUQ=', 'JmJCFu3N'),
                ('_guid_hG88Yt5EUsqT8a06Cy380ga3XHPwaFylNyuvvqDslCw=', 'KNPQLQth'),
                ('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', 'KlGZj08d'),
               ]
              )

我找到了解决办法!尽管如此,这个解决方案对于我想做的事情并不完全令人满意。 我创建了一个函数来指定我的密钥,该密钥将应用于名为“x”的rdd:

def get_keys(rdd):

    new_x = rdd.map(lambda item: (item[0], (item[1], item[2])))
    return new_x

new_x = get_keys(x)

给出:

[('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', ('FR', '75001')),
 ('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', ('TN', '8160'))]

然后:

new_x.union(y).map(lambda (x, y): (x, [y])).reduceByKey(lambda p, q : p + q).collect()

结果:

[('_guid_oX6Lu2xxHtA_T93sK6igyW5RaHH1tAsWcF0RpNx_kUQ=', ['JmJCFu3N']),
 ('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', [('FR', '75001'), 'KlGZj08d']),
 ('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', [('TN', '8160')]),
 ('_guid_hG88Yt5EUsqT8a06Cy380ga3XHPwaFylNyuvvqDslCw=', ['KNPQLQth'])]

我想要的是:

[('_guid_oX6Lu2xxHtA_T93sK6igyW5RaHH1tAsWcF0RpNx_kUQ=', (None, None, 'JmJCFu3N')),
 ('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', ('FR', '75001', 'KlGZj08d')),
 ('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', ('TN', '8160', None)),
 ('_guid_hG88Yt5EUsqT8a06Cy380ga3XHPwaFylNyuvvqDslCw=', (None, None, 'KNPQLQth'))]  

帮助?

【问题讨论】:

    标签: apache-spark mapreduce pyspark apache-spark-sql outer-join


    【解决方案1】:

    为什么不呢?

    >>> new_x.fullOuterJoin(y)
    

    >>> x.toDF().join(y.toDF(), ["_1"], "fullouter").rdd
    

    【讨论】:

    • LostInOverflow : x.toDF().join(y.toDF(), ["_1"], "fullouter").rdd.collect() 给出:[Row(_1=u'_guid_YWKnKkcrg_Ej0icb07bhd -mXPjw-FcPi764RRhVrOxE=',_2=u'FR',_3=u'75001',_2=u'KlGZj08d')]。 new_x.fullOuterJoin(y) 更好。
    • [('_guid_oX6Lu2xxHtA_T93sK6igyW5RaHH1tAsWcF0RpNx_kUQ=', (None, 'JmJCFu3N')), ('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOuter=', => new. (('FR', '75001'), 'KlGZj08d')), ('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', (('TN', '8160'), 无)), ('_guid_hG88Yt5EUsqT8a06Cy380ga​​3XHPvwaFyl, 'NUCUVE, '_guid_hG88Yt5EUsqT8a06Cy380ga​​3XHPvwaFyl KNPQLQth'))]
    • 有没有办法将 (('FR', '75001'), 'KlGZj08d') 中的 'FR' 和 '75001' 分开?
    • Row 是一个元组。名称只是一种展示糖。关于第二个:mapValues(lambda x: x[0] + (x[1], ))?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-03-30
    • 1970-01-01
    • 1970-01-01
    • 2020-03-17
    • 1970-01-01
    • 2020-06-10
    • 2015-06-23
    相关资源
    最近更新 更多