【发布时间】:2016-11-30 15:25:22
【问题描述】:
我有 DocsRDD : RDD[String, String]
val DocsRDD = sc.wholeTextFiles("myDirectory/*" , 2)
DocsRDD:
Doc1.txt , bla bla bla .....\n bla bla bla \n bla ... bla
Doc2.txt , bla bla bla .....bla \n bla bla \n bla ... bla
Doc3.txt , bla bla bla .....\n bla bla bla \n bla ... bla
Doc4.txt , bla bla \n .....\n bla bla bla bla \n ... bla
有没有一种高效、优雅的方法来使用 mapPartitions 从这些中提取 n-gram? 到目前为止,我已经尝试了所有内容,我已经阅读了至少 5 遍我能找到的关于 mapPartitions 的所有内容,但我仍然无法理解如何使用它!操作起来似乎太难了。 总之我想要:
val NGramsRDD = DocsRDD.map(x => (x._1 , x._2.sliding(n) ) )
但使用 mapPartitions 可以有效。 我对 mapPartitions 的基本误解是:
OneDocRDD : RDD[字符串]
val OneDocRDD = sc.textFile("myDoc1.txt" , 2)
.mapPartitions(s1 : Iterator[String] => s2 : Iterator[String])
我无法理解这一点!从什么时候 s1 是 Iterator[String]? s1 是 sc.textfile 之后的字符串。
好吧,我的第二个问题是:在这种情况下,mapPartitions 会改善我对抗地图的能力吗?
最后但并非最不重要的: f() 可以是:
f(Iterator[String]) : Iterator[Something else?]
【问题讨论】:
-
您对
sc.textFile的调用会为您提供具有2 个分区的RDD[String]。RDD中的每个元素都是文本文件中的一行。mapPartitions为每个分区中的所有行提供了一个迭代器,并且您提供了一个应用于每个迭代器的函数。您应该返回一个迭代器,然后将其展平为RDD。 -
@EricM。感谢你的回答。这以某种方式清除了我关于 mapPartitions 的模糊概念。
标签: scala apache-spark