【问题标题】:How to merge RDD array如何合并RDD数组
【发布时间】:2017-08-04 13:05:03
【问题描述】:

我有一个 RDD 数组:Array[RDD[(String, Double)]],如何将这些 RDD 合并到 RDD[String, Array[Double]]。例如:

RDD Array: [[('x', 1), ('y', 2)], [('x', 3), ('y', 4)],...] =>
RDD: [('x', [1, 3,...]), ('y', [2, 4, ...])]

任何帮助表示赞赏!谢谢

【问题讨论】:

  • 你是如何得到 Array of RDD 的?我猜应该是Array的RDD,请确认。

标签: scala apache-spark rdd


【解决方案1】:
  1. 您应该将 RDDS 数组合并为一个 RDD(第 1 行)
  2. 按字符串值对它们进行分组(第 2 行)
  3. 我看到预期的输出已排序,如果需要,您可以对值进行排序(第 3 行)

val mergeIntoOne: RDD[(String, Double)] = array.fold(sparkSession.sparkContext.emptyRDD[(String, Double)])(_ ++ _) val groupByKeys: RDD[(String, Iterable[Double])] = mergeIntoOne.groupByKey() val sortedValues = groupByKeys.mapValues(_.toList.sorted)

【讨论】:

  • 谢谢!分组后(第 2 行),groupByKeys 数组中的元素顺序是否与原始数组顺序相同?还是在弃牌时洗牌?
  • 我想排序是唯一可以确定的方法。顺序可能会有所不同,因为 od 数据正在被 Spark 重新分区
【解决方案2】:

假设您在每个 RDD 中没有重复的键,那么您可以在所有 rdds 上尝试使用 Array[RDD] 上的 foldLeft 和 fullOuterJoin

val rdd1 = sc.parallelize(Seq(("x", 1.0), ("y", 2.0)))
val rdd2 = sc.parallelize(Seq(("x", 3.0), ("y", 4.0)))
val rdd3 = sc.parallelize(Seq(("x", 5.0), ("y", 6.0)))

val rdds = Array(rdd1, rdd2, rdd3)

val startRdd = sc.parallelize(Seq[(String, Seq[Option[Double]])]())

(rdds.foldLeft(startRdd)(
    (rdd1, rdd2) => rdd1.fullOuterJoin(rdd2).mapValues(
        p => p._1.getOrElse(Seq[Option[Double]]()) :+ p._2
    )
 ).mapValues(_.collect{ case Some(x) => x }).collect)
// res15: Array[(String, Seq[Double])] = Array((x,List(1.0, 3.0, 5.0)), (y,List(2.0, 4.0, 6.0)))

【讨论】:

  • 非常感谢!确实不支持嵌套 RDD。现在的问题实际上是:Array[RDD(String, Double)], 如何将 RDD 数组合并到 RDD(String, Array[Double]) 中?
  • 你能用你试图解决的实际问题来更新你的问题吗?
【解决方案3】:

这取决于你想在哪里使用它,但你可以使用 for 循环并合并数组

scala> var a = Array(("a1",1.1))
a: Array[(String, Double)] = Array((a1,1.1))

scala> var b = Array(("a2",1.2))
b: Array[(String, Double)] = Array((a2,1.2))

scala>  for (i <- 0 to b.length) {
 |  a = a:+b(i)}

scala> a
res2: Array[(String, Double)] = Array((a1,1.1), (a2,1.2))

【讨论】:

  • 我在这里看不到任何 RDD。你注意到他问了一个关于 Apache Spark 的问题吗?
猜你喜欢
  • 2016-12-25
  • 2023-03-18
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-03-21
相关资源
最近更新 更多