Functions on RDDs: Transformations Versus Actions
RDDs上定义了两种类型的函数:动作和转换。Actions是返回一些不是RDD的东西(包括副作用)的函数,而Transformations是返回另一个RDD的函数。每个Spark程序必须包含一个Actions,因为Actions要么将信息带回驱动程序,要么将数据写入稳定存储。Actions是强制评估(求值)一个 Spark program。持久化调用也会强制评估,但通常不会标志Spark作业的结束。将数据带回驱动程序的操作包括collect、count、collectAsMap、sample、reduce和take。
注: 这些Actions中的一些不能很好地伸缩,因为它们会导致驱动程序中的内存错误。通常,最好使用像take、count和reduce这样的Actions,这些Actions会将固定数量的数据返回给驱动程序,而不是 collect 或 sample.。
写入存储的Actions包括saveAsTextFile、saveAsSequenceFile和saveAsObjectFile。大多数保存到Hadoop的Actions只在键/值对的RDDs上可用; 因为它们在PairRDDFunctions类(通过隐式转换为元组类型的RDDs提供方法)和NewHadoopRDD类中被定义, 而这就是实现通过从Hadoop读取创建的RDDs的原因。 有些保存函数,如saveAsTextFile和saveAsObjectFile,在所有RDDs上都可用,它们通过向每个记录添加隐式的空键来工作(然后保存级别会忽略空键)。 没有返回值(Java中的void, Scala中的Unit)的函数,比如foreach,也是Actions:它们强制执行Spark作业。 foreach可用于强制评估RDD,但也经常用于写出不受支持的格式(如web端点)。
Spark API的强大之处在于它的transformations。Spark transformations是用于对分布式数据进行排序、减少、分组、采样、过滤和映射的粗粒度转换。
宽依赖与窄依赖
为了理解RDDs是如何评估的,关于转换最重要的一点是,它们可以分为两类:具有狭窄依赖关系的转换和具有广泛依赖关系的转换。窄与宽的区别对Spark评估转换的方式以及其性能有重大影响。在本章的Spark作业调度中,为了理解Spark的执行范例,我们将定义窄转换和宽转换,但是我们将在第5章中对与之相关的性能考虑进行更详细的解释。
从概念上讲,窄 转换是指子RDD中的每个分区对父RDD中的分区具有简单、有限的依赖关系。只有当可以在设计时确定依赖关系,而不考虑父分区中的记录值,并且每个父分区最多有一个子分区时,依赖关系才会缩小。具体来说,窄转换中的分区既可以依赖于一个父分区(如map操作符中),也可以依赖于设计时已知的父分区的唯一子集(合并)。因此,可以在数据的任意子集上执行窄转换,而不需要关于其他分区的任何信息。相反,具有宽依赖关系的转换不能在任意行上执行,而是要求以特定的方式对数据进行分区,例如根据它们的键值。在 sort 中,必须对记录进行分区,以便相同范围内的键位于同一分区上。宽依赖的转换包括sort、reduceByKey、groupByKey、join和任何调用 rePartition 函数的转换。
在某些情况下,当Spark已经知道数据以某种方式分区时,则宽依赖的操作不会导致 shuffle。 如果某个操作需要执行shuffle, Spark会向与RDD关联的依赖项列表中添加一个ShuffledDependency对象。 一般来说,shuffle是消耗性能的。随着数据的增加,以及在转移过程中必须将更大比例的数据移动到新分区时,它们会变得更加消耗性能。正如我们将在第6章中详细讨论的那样,我们可以通过做更少、更便宜的变换来从Spark程序中获得大量的性能提升。
下面的两个图说明了具有窄依赖关系的转换与具有宽依赖关系的转换的依赖关系图的区别。图2-2显示了每个子分区(底部行的每个蓝色方块)依赖于父分区的已知子集的狭窄依赖关系。蓝色箭头显示了狭窄的依赖关系。左边表示窄转换(如map、filter、mapPartitions和flatMap)的依赖关系图。右上角是用于合并的分区之间的依赖关系,合并是一种狭窄的转换。在这个实例中,我们试图说明,如果子分区可能依赖于多个父分区,只要父分区的集合可以确定,而不管分区中的数据值如何,转换仍然可以限定为narrow。
图2-3显示了分区之间的宽依赖关系。在这种情况下,子分区(如图2-3底部所示)依赖于一组任意的父分区。在评估数据之前,不能完全了解宽依赖关系(以红色箭头显示)。与coalesce(合并)操作相反,数据是根据其值进行分区的。导致shuffle的任何操作(如groupByKey、reduceByKey、sort和sortByKey)的依赖关系图都遵循此模式。
join 函数稍微复杂一些,因为根据两个父RDDs的分区方式,它们可以具有宽的或窄的依赖关系。我们演示了Core Spark连接中join操作在不同场景中的依赖关系。
Spark Job Scheduling
Spark应用程序由一个驱动进程(高级Spark逻辑就是在这个驱动进程中编写的)和一系列可以分散在集群节点上的执行程序进程组成。Spark程序本身在驱动节点中运行,并向执行器发送一些指令。一个Spark集群可以并发地运行多个Spark应用程序。应用程序由集群管理器调度,并对应于一个SparkContext。Spark应用程序可以运行多个并发作业。作业对应于给定应用程序中RDD上调用的每个操作。在本节中,我们将描述Spark应用程序以及它如何启动Spark作业:计算RDD转换的进程。
Resource Allocation Across Applications (跨应用程序的资源分配)
Spark提供了两种跨应用程序分配资源的方法:静态分配和动态分配。通过静态分配,每个应用程序在集群上分配了有限的最大资源,并在应用程序运行期间保留这些资源(只要SparkContext仍在运行)。在静态分配类别中,有许多种可用的资源分配,这取决于集群。有关更多信息,请参阅有关作业调度的Spark文档。
Spark提供了两种跨应用程序分配资源的方法:静态分配和动态分配。静态分配,每个应用程序在集群上分配了有限的最大资源,并在应用程序运行期间保留这些资源(只要SparkContext仍在运行)。在静态分配类别中,有许多种可用的资源分配,这取决于集群。有关更多信息,请参阅有关作业调度的Spark文档。
Spark提供了两种跨应用程序分配资源的方法:静态分配和动态分配。通过静态分配,每个应用程序在集群上分配了有限的最大资源,并在应用程序运行期间保留这些资源(只要SparkContext仍在运行)。在静态分配类别中,有许多种可用的资源分配,这取决于集群。有关更多信息,请参阅有关作业调度的Spark文档( http://spark.apache.org/docs/latest/job-scheduling.html)。
从1.2开始,Spark提供了动态资源分配选项,这扩展了静态分配的功能。在动态分配中,根据一组估计资源需求的启发式方法,在Spark应用程序中添加和删除执行器。我们将在集群资源分配和动态分配中讨论资源分配。
集群资源分配和动态分配
The Spark Application
一个Spark应用程序对应于驱动程序中一个SparkContext定义的一组Spark作业。当启动SparkContext时,Spark应用程序开始运行。启动SparkContext时,一个驱动程序和一系列执行器将在集群的工作节点上启动。每个执行器是它自己的Java虚拟机(JVM),一个执行器不能跨多个节点,尽管一个节点可能包含多个执行器。
SparkContext决定分配给每个执行器的资源数量。当启动Spark作业时,每个执行器都有槽来运行计算RDD所需的任务。通过这种方式,我们可以将SparkContext看作运行Spark作业的一组配置参数。这些参数在SparkConf对象中公开,该对象用于创建SparkContext。我们将讨论如何使用附录a中的参数。应用程序通常(但不总是)对应于用户。也就是说,集群上运行的每个Spark程序都可能使用一个SparkContext
注: RDDs不能在应用程序之间共享。因此,使用多个RDD的转换(如join)必须具有相同的SparkContext。
图2-4说明了启动SparkContext时会发生什么。首先,驱动程序ping群集管理器。集群管理器在集群的工作节点(蓝色圆圈显示)上启动许多Spark执行器(图中显示为黑盒的jvm)。一个节点可以有多个Spark执行器,但是一个执行器不能跨多个节点。RDD将在分区中通过执行器进行评估(显示为红色矩形)。每个执行器可以有多个分区,但是一个分区不能跨多个执行器分布。
Default Spark Scheduler
默认情况下,Spark以先入先出的方式调度作业。然而,Spark确实提供了一个公平的调度器,它以循环方式将任务分配给并发作业,即。,为每个任务分配一些任务,直到所有任务都完成。fair scheduler确保作业更均匀地共享集群资源。然后,Spark应用程序按照在SparkContext上调用作业的相应操作的顺序启动作业。
The Anatomy of a Spark Job( 对Spark Job的剖析)
在Spark惰性计算范式中,在驱动程序调用一个action之前,Spark应用程序不会执行任何操作。对于每个action,Spark调度程序构建一个执行图并启动一个Spark作业。每个作业由 stages 组成, stages 是实现最终RDD所需的数据转换中的步骤。每个 stages都由 tasks 组成,这些 tasks 表示每个并行计算和在执行器上并行执行。
图2-5显示了Spark应用程序不同组件的树,以及这些组件如何对应于API调用。应用程序对应于启动SparkContext/SparkSession。每个Application可能包含许多job,这些作业对应于一个RDD action。每个作业可能包含对应几个stage于每个宽依赖 转换的。每个stage由一个或多个task组成,这些task对应于在每个stage完成的可并行计算单元。在该stage的结果RDD中,每个分区都有一个任务。
DAG
Spark的高级调度层利用RDD依赖关系为每个Spark作业建立stages有向无环图。 您可能已经注意到,与连接集群、配置参数或启动Spark作业有关的错误将显示为DAG调度器错误。 Spark高级调度层利用RDD依赖关系为每个Spark作业建立一个阶段有向无环图。在Spark API中,这称为DAG调度器。您可能已经注意到,与连接集群、配置参数或启动Spark作业有关的错误将显示为DAG调度器错误。这是因为Spark作业的执行是由DAG处理的。DAG为每个作业构建stages图,确定运行每个任务的位置,并将该信息传递给负责在集群上运行任务的TaskScheduler。TaskScheduler创建一个分区之间具有依赖关系的图。
JOB
Spark作业是Spark执行层次结构的最高元素。每个Spark作业对应于一个action,每个action都由Spark应用程序的驱动程序调用。一种概念化的action是将spark RDDs数据带到其他存储系统(通常是通过将数据带会deive或写到一些稳定的存储系统)。 Spark执行图的边缘基于RDD分区之间转换的依赖关系(如图2-2和2-3所示)。因此,返回RDD以外内容的操作不能有任何子操作。 在图论中,我们会说动作在DAG中形成一个叶节点。因此,一个任意大的转换集合可以与一个执行图相关联。然而,一旦一个动作被调用,Spark就不能再添加到该图中。Application启动一个job,其中包括评估调用所需的转换最终RDD的action
Stages
回想一下Spark对转换进行惰性评估;在调用action之前,不会执行 transformations。 如前所述,job是通过调用action定义的。action可以包括一个或多个transformations,而宽依赖的转换将作业分解为各个stages。
每个stages都对应一个shuffle依赖(Spark程序中一个宽依赖的transformations) 在较高的层次上,一个stages可以被认为是一组计算(task),每一个都可以在一个执行器上进行计算,而不需要与其他执行器或驱动程序进行通信。换句话说,只要workers之间网络通讯上,一个新的阶段就开始了;例如,在 shuffle.中。