【问题标题】:How to create collection of RDDs out of RDD?如何从 RDD 中创建 RDD 集合?
【发布时间】:2015-09-11 15:39:39
【问题描述】:

我有一个RDD[String]wordRDD。我还有一个从字符串/单词创建 RDD[String] 的函数。我想wordRDD 中的每个字符串 创建一个新的RDD。以下是我的尝试:

1) 失败,因为 Spark 不支持嵌套 RDD:

var newRDD = wordRDD.map( word => {
  // execute myFunction()
  (new MyClass(word)).myFunction()
})

2) 失败(可能是因为范围问题?):

var newRDD = sc.parallelize(new Array[String](0))
val wordArray = wordRDD.collect
for (w <- wordArray){
  newRDD = sc.union(newRDD,(new MyClass(w)).myFunction())
}

我的理想结果如下:

// input RDD (wordRDD)
wordRDD: org.apache.spark.rdd.RDD[String] = ('apple','banana','orange'...)

// myFunction behavior
new MyClass('apple').myFunction(): RDD[String] = ('pple','aple'...'appl')

// after executing myFunction() on each word in wordRDD:
newRDD: RDD[String] = ('pple','aple',...,'anana','bnana','baana',...)

我在这里找到了一个相关问题:Spark when union a lot of RDD throws stack overflow error,但它没有解决我的问题。

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    根据需要使用flatMap 获取RDD[String]

    var allWords = wordRDD.flatMap { word => 
      (new MyClass(word)).myFunction().collect()
    }
    

    【讨论】:

    • 这应该如何并行运行? wordRDD.map 中发生的所有事情都在集群上执行。因此,内部collect 必须从正在运行的作业中触发新的 Spark 作业。我怀疑它不会分布式运行。
    • 他也可以更改函数以返回数组而不是 RDD,但问题没有指定实际函数。
    • 但是他的描述说他有一个函数,我假设它是 myFunction 从字符串/单词创建 RDD[String]
    • 是的。您的回答告诉他更改 myFunction 以返回不同的内容。在不知道函数有多复杂的情况下,很难说其中完成的计算是否是分布式的。如果收集数据集意味着之前完成的所有计算都不再分布,那么什么都不会分布。
    • 我不知道你所说的发送它是什么意思,但我已经运行了我最初发布的代码。有效。 @JacekLaskowski 使我的代码更紧凑,但我认为它仍然有效。
    【解决方案2】:

    您不能从另一个RDD 中创建RDD

    但是,可以将您的函数 myFunction: String =&gt; RDD[String] 重写为另一个函数 modifiedFunction: String =&gt; Seq[String],该函数从输入中删除一个字母的所有单词生成,以便可以在 RDD 中使用它。这样,它也将在您的集群上并行执行。拥有modifiedFunction,您只需调用wordRDD.flatMap(modifiedFunction)即可获得包含所有单词的最终RDD

    关键点是使用flatMap(对mapflatten进行转换):

    def main(args: Array[String]) {
      val sparkConf = new SparkConf().setAppName("Test").setMaster("local[*]")
      val sc = new SparkContext(sparkConf)
    
      val input = sc.parallelize(Seq("apple", "ananas", "banana"))
    
      // RDD("pple", "aple", ..., "nanas", ..., "anana", "bnana", ...)
      val result = input.flatMap(modifiedFunction) 
    }
    
    def modifiedFunction(word: String): Seq[String] = {
      word.indices map {
        index => word.substring(0, index) + word.substring(index+1)
      }
    }
    

    【讨论】:

      猜你喜欢
      • 2019-08-11
      • 2016-09-27
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-03-25
      • 2019-05-04
      相关资源
      最近更新 更多