【问题标题】:Execution order in Spark mapPartitionsSpark mapPartitions 中的执行顺序
【发布时间】:2017-11-22 04:22:04
【问题描述】:

我在使用Spark mapPatartitions时遇到了一些奇怪的事情,创建的mutable.HashSet在map过程中无法正确填写,代码如下:

object Test {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Test").setMaster("local")
    val sc = new SparkContext(conf)
    val input = List[String]("1", "2", "3", "3", "4", "5", "5")
    val result = sc.parallelize(input)
      .mapPartitions((pi: Iterator[String]) => {
        val valuesInPartition = new mutable.HashSet[String]()
        val values = pi.map(line => {
          valuesInPartition.add(line)
          println("processing line: " + line + ", valuesInPartition: " + valuesInPartition)
        })
        println("valuesInPartition: " + valuesInPartition)
        values
      })
    result.collect
  }
}

和输出:

valuesInPartition: Set()
processing line: 1, valuesInPartition: Set(1)
processing line: 2, valuesInPartition: Set(1, 2)
processing line: 3, valuesInPartition: Set(3, 1, 2)
processing line: 3, valuesInPartition: Set(3, 1, 2)
processing line: 4, valuesInPartition: Set(3, 4, 1, 2)
processing line: 5, valuesInPartition: Set(3, 4, 1, 5, 2)
processing line: 5, valuesInPartition: Set(3, 4, 1, 5, 2)

但据我了解,mapPartition 中的代码应该按顺序执行,它应该在“map”函数完成后在最后打印第一行。但是这里的 Set 是打印出来的,没有填充值。

我想我在这里理解错了,请帮我指出。

【问题讨论】:

    标签: scala apache-spark iterator


    【解决方案1】:

    这与 Spark 无关 - 误解是关于 Iteratormap 方法的语义。请记住,Iterator 是一种一次遍历一个元素的结构的方法。调用pi.map(line => ...) 会产生另一个Iterator - 但只有在请求该元素时才会感受到产生Iterator 的每个元素所涉及的副作用。

    考虑以下(普通的旧 Scala)REPL 交互:

    scala> val l1 = List(1,2,3,4,5)
    l1: List[Int] = List(1, 2, 3, 4, 5)
    
    scala> val l2 = l1.map(println)
    1
    2
    3
    4
    5
    l2: List[Unit] = List((), (), (), (), ())
    
    scala> val i1 = Iterator(1,2,3,4,5)
    i1: Iterator[Int] = non-empty iterator
    
    scala> val i2 = i1.map(println)         // Look Ma, nothing happened!!
    i2: Iterator[Unit] = non-empty iterator
    
    scala> i2.next                          // Request the first element...
    1
    
    scala> i2.next                          // Request the second element...
    2
    
    scala> val l3 = i2.toList               // Request remaining elements.
    3
    4
    5
    l3: List[Unit] = List((), (), ())
    

    在您的情况下,存储在values 中的Iterator 仅在您退出匿名函数之后 被遍历(因此在println("valuesInPartition: " + valuesInPartition) 之后)。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-11-06
      • 1970-01-01
      • 2017-01-26
      • 1970-01-01
      • 1970-01-01
      • 2015-06-30
      相关资源
      最近更新 更多