【问题标题】:how to concat and combine two rdd into one in PySpark如何在 PySpark 中将两个 rdd 合并为一个
【发布时间】:2020-08-13 09:18:05
【问题描述】:

我得到两个RDD 并想连接并合并为一个RDD,如下所示:

rdd_1 = ['a1', 'a2', 'a3', 'a4', 'a5', ]
rdd_2 = ['b1', 'b2', 'b3', 'b4', 'b5', ]

# concat and combine these two rdd into one
rdd = ['a1_b1', 'a2_b2', 'a3_b3', 'a4_b4', 'a5_b5']

我知道我可以将这两个 RDD 转换为 DataFrame 并将其连接到 spark.sql 中,如下所示:

df = df.withColumn('col1_col2', concat(col('col1'), lit(' '), col('col2')))

但是对于亿级样本来说效率不够。
所以想知道RRD编程有没有更快的方法。

【问题讨论】:

  • "a1" 和 "b1" 的组合规则是什么?你用“1”吗?将 RDD 想象成一袋大理石。其中没有预定义的顺序。所以列表的第一个元素不会自动与另一个列表的第一个元素结合。你需要定义一个规则。

标签: apache-spark pyspark apache-spark-sql rdd


【解决方案1】:

我想尝试压缩并加入:

rdd_1.zip(rdd_2).map(lambda x : '_'.join(x)).collect()

或者没有lambda:

rdd_1.zip(rdd_2).map('_'.join).collect()

例子:

rdd_1 = spark.sparkContext.parallelize(['a1', 'a2', 'a3', 'a4', 'a5', ])
rdd_2 = spark.sparkContext.parallelize(['b1', 'b2', 'b3', 'b4', 'b5', ])

rdd_1.zip(rdd_2).map(lambda x : '_'.join(x)).collect()

['a1_b1', 'a2_b2', 'a3_b3', 'a4_b4', 'a5_b5']

【讨论】:

    【解决方案2】:

    从列表中创建 rdds,然后在两个 rdds 上执行 zip,然后使用 map 和 join 对其进行迭代和连接。

    rd1 = sc.parallelize(['a1', 'a2', 'a3', 'a4', 'a5', ])
    rd2 = sc.parallelize(['b1', 'b2', 'b3', 'b4', 'b5', ])
    rd1.zip(rd2).map(lambda x: x[0]+'_'+x[1]).collect()
    rd1.zip(rd2).map(lambda x: '_'.join(x)).collect()
    rd1.zip(rd2).map('_'.join).collect()
    
    ['a1_b1', 'a2_b2', 'a3_b3', 'a4_b4', 'a5_b5']
    

    【讨论】:

      猜你喜欢
      • 2017-08-05
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-02-21
      • 1970-01-01
      • 2021-12-29
      • 2020-02-28
      相关资源
      最近更新 更多