【问题标题】:Would Spark preserve key order with this sortByKey/map/collect sequence?Spark 会使用这种 sortByKey/map/collect 序列保留键顺序吗?
【发布时间】:2023-04-10 20:03:01
【问题描述】:

让我们说,我们有这个。

val sx = sc.parallelize(Array((0, 39), (4, 47), (3, 51), (1, 98), (2, 61)))

我们后来称之为。

val sy = sx.sortByKey(true)

这会让

sy = RDD[(0, 39), (1, 98), (2, 61), (3, 51), (4, 47)] 

然后我们做

collected = sy.map(x => (x._2 / 10, x._2)).collect

我们总是会得到以下结果吗?我的意思是,尽管更改了键值,但是否会保留原始键顺序?

collected = [(3, 39), (9, 98), (6, 61), (5, 51), (4, 47)]

【问题讨论】:

    标签: scala hadoop apache-spark bigdata


    【解决方案1】:

    应用map() 转换并调用collect() 不会更改collect() 返回的数组元素的顺序。为了证明这一点,我们只需证明:

    • map 不会修改 RDD 中元素的顺序
    • collect 将始终在每次调用时以相同的数组顺序返回 RDD 的元素

    第一点很容易证明。在后台,对map() 的调用只会通过遍历每个分区并在分区内的每个元素上调用传递给map() 的函数参数来生成MapPartitionsRDD。因此,这里没有修改排序,因为每个分区内的元素排序保持不变。

    第二点可以通过仔细查看collect() 来证明。下面的代码是collect()的实现,以及收集调用的函数。

    来自RDD.scala

    def collect(): Array[T] = withScope {
      val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
      Array.concat(results: _*)
    }
    

    来自SparkContext.scala

    def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
      runJob(rdd, func, 0 until rdd.partitions.length)
    }
    

    被调用的runJob() 函数(这是一个重载方法)将包含分区处理顺序的Seq[Int] 传递给另一个runJob() 方法。这个顺序最终会被传递给调度程序,调度程序将决定操作将如何处理分区。因此,对于collect() 的情况,我们将始终从第一个分区开始按顺序处理分区。

    因此,由于map()collect() 都不会修改分区顺序或分区内元素的顺序,因此每次收集的结果都会看到相同的顺序。但是,如果您在收集之前应用需要洗牌的转换,则所有赌注都将被取消,因为数据将被重新分区。

    【讨论】:

    • 收集前哪些转换需要随机播放?
    • @MetallicPriest This 应该向您展示一个良好的基线,在该基线上,转换触发了随机播放。具体来说,“可能导致洗牌的操作包括像repartitioncoalesce这样的重新分区操作,像groupByKeyreduceByKey这样的'ByKey操作(计数除外),以及像cogroupjoin这样的连接操作” .
    猜你喜欢
    • 1970-01-01
    • 2013-07-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-07-04
    相关资源
    最近更新 更多