【问题标题】:Array[RDD[(String, Set[String])]] transformation in Spark ScalaSpark Scala 中的 Array[RDD[(String, Set[String])]] 转换
【发布时间】:2016-03-14 16:54:01
【问题描述】:

我有一个 Array[RDD[(String, Set[String])]] 类型的 RDD 数组,其中每个 RDD 都是键和值的元组。 键是字符串,值是 Set[String],我想用相同的键合并/联合 Set。我试图在 scala 中做到这一点,但没有快乐。你能帮帮我吗?

e.g.
RDD["A",Set("1","2")]
RDD["A",Set("3","4")]
RDD["B",Set("1","2")]
RDD["B",Set("3","4")]
RDD["C",Set("1","2")]
RDD["C",Set("3","4")]

After transformation:
RDD["A",Set("1","2","3","4")]
RDD["B",Set("1","2","3","4")]
RDD["C",Set("1","2","3","4")]

【问题讨论】:

  • 结果必须是 RDD 数组还是带有这些元组的单个 RDD?
  • 嗨,它必须是 RDD 数组的结果

标签: scala apache-spark apache-spark-sql


【解决方案1】:

如果单个RDD 作为输出是可以的(真的没有理由制作许多只有1 条记录的RDD),您可以将RDDArray 减少为单个RDD 和然后做一个groupByKey:

arr.reduce( _ ++ _ )
   .groupByKey
   .mapValues(_.flatMap(identity))

例子:

scala> val x = sc.parallelize( List( ("A", Set(1,2)) ) )
scala> val x2 = sc.parallelize( List( ("A", Set(3,4)) ) )
scala> val arr = Array(x,x2)
arr: Array[org.apache.spark.rdd.RDD[(String, scala.collection.immutable.Set[Int])]] = Array(ParallelCollectionRDD[0] at parallelize at <console>:27, ParallelCollectionRDD[1] at parallelize at <console>:27)
scala> arr.reduce( _ ++ _ ).groupByKey.mapValues(_.flatMap(identity)).foreach(println)
(A,List(1, 2, 3, 4))

@Edit:我发现这是一个非常糟糕的主意,并建议您重新考虑它,但是您可以通过从上面获取所有键并多次过滤 RDD 来获得所需的结果:

val sub = arr.reduce( _ ++ _ ).groupByKey.mapValues(_.flatMap(identity))
val keys = sub.map(_._1).collect()
val result = for(k <- keys) yield sub.filter(_._1 == k)
result: Array[org.apache.spark.rdd.RDD[(String, Iterable[Int])]]

每个RDD 都会有一个元组,不要觉得它很有用,性能很好。

【讨论】:

  • @nilesh1212 好吧,如果你坚持.. 看看我更新的答案
  • 嗨 Mateus 实际上我需要在 HDFS 上写入每个 rdd,并使用 key 作为文件名,其中每个 Set 对 Key 写入一个 csv 文件。
  • @nilesh1212 好的,有什么问题?
  • Nothing Mateus 我只是在回答您关于“为什么要使用 rdd 数组”的问题
  • @nilesh1212 我明白了,我的回答对您有帮助还是您还有问题?
猜你喜欢
  • 1970-01-01
  • 2021-09-28
  • 2020-08-17
  • 1970-01-01
  • 2017-01-29
  • 2015-12-11
  • 1970-01-01
  • 2017-06-13
  • 2019-07-25
相关资源
最近更新 更多