【问题标题】:How to use mapPartitions in Spark Scala?如何在 Spark Scala 中使用 mapPartitions?
【发布时间】:2016-11-30 15:25:22
【问题描述】:

我有 DocsRDD : RDD[String, String]

val DocsRDD = sc.wholeTextFiles("myDirectory/*" , 2)

DocsRDD:

Doc1.txt , bla bla bla .....\n bla bla bla \n bla ... bla
Doc2.txt , bla bla bla .....bla \n bla bla \n bla ... bla
Doc3.txt , bla bla bla .....\n bla bla bla \n bla ... bla
Doc4.txt , bla bla \n  .....\n bla bla bla bla \n ... bla

有没有一种高效、优雅的方法来使用 mapPartitions 从这些中提取 n-gram? 到目前为止,我已经尝试了所有内容,我已经阅读了至少 5 遍我能找到的关于 mapPartitions 的所有内容,但我仍然无法理解如何使用它!操作起来似乎太难了。 总之我想要:

val NGramsRDD = DocsRDD.map(x => (x._1 , x._2.sliding(n) ) )

但使用 mapPartitions 可以有效。 我对 mapPartitions 的基本误解是:

OneDocRDD : RDD[字符串]

 val OneDocRDD = sc.textFile("myDoc1.txt" , 2)
                   .mapPartitions(s1 : Iterator[String] => s2 : Iterator[String])

我无法理解这一点!从什么时候 s1 是 Iterator[String]? s1 是 sc.textfile 之后的字符串。

好吧,我的第二个问题是:在这种情况下,mapPartitions 会改善我对抗地图的能力吗?

最后但并非最不重要的: f() 可以是:

     f(Iterator[String]) : Iterator[Something else?]

【问题讨论】:

  • 您对sc.textFile 的调用会为您提供具有2 个分区的RDD[String]RDD 中的每个元素都是文本文件中的一行。 mapPartitions 为每个分区中的所有行提供了一个迭代器,并且您提供了一个应用于每个迭代器的函数。您应该返回一个迭代器,然后将其展平为 RDD
  • @EricM。感谢你的回答。这以某种方式清除了我关于 mapPartitions 的模糊概念。

标签: scala apache-spark


【解决方案1】:

我不确定 .mapPartitions 是否会有所帮助(至少,没有给出示例),但使用 .mapPartitions 看起来像:

val OneDocRDD = sc.textFile("myDoc1.txt", 2)
  .mapPartitions(iter => {
    // here you can initialize objects that you would need 
    // that you want to create once by worker and not for each x in the map. 
    iter.map(x => (x._1 , x._2.sliding(n)))
  })

通常您想使用 .mapPartitions 来创建/初始化您不想要的对象(例如:太大)或无法序列化到工作节点。如果没有 .mapPartitions,您将需要在 .map 中创建它们,但这不会有效,因为将为每个 x 创建对象。

【讨论】:

  • 感谢您的回答!它消除了我对 mapPartitions 的一些模糊认知。
  • 酷!我将使用此方法替换由于某种原因 kryo 不会序列化的广播 - 我只是要在 mapPartition 中下载我需要的内容,而不是在驱动程序中下载它然后广播。
  • 您通常会在边界处丢失一些 ngram。
猜你喜欢
  • 2019-07-10
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-11-06
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多