1、Spark核心概念详解

Application User program built on Spark. Consists of a driver program and executors on the cluster.
翻译:
应用程序: 建立在Spark上的用户程序。由集群上的驱动程序和执行程序组成。

功能解读:Application 基于Spark的应用程序 = 1个driver + 多个executors 组成

spark0402.py自己写文代码/pyspark/spark-shell 就是应用程序

Application jar A jar containing the user’s Spark application. In some cases users will want to create an “uber jar” containing their application along with its dependencies. The user’s jar should never include Hadoop or Spark libraries, however, these will be added at runtime.
翻译:
应用程序 jar 包含用户的Spark应用程序的jar。在某些情况下,用户会希望创建一个包含应用程序及其依赖项的“uber jar”。用户的jar不应该包含Hadoop或Spark库,但是,这些库将在运行时添加。


Driver program The process running the main() function of the application and creating the SparkContext
翻译:
驱动程序 运行应用程序的main()函数并创建SparkContext的进程


Cluster manager An external service for acquiring resources on the cluster (e.g. standalone manager, Mesos, YARN)
翻译:
集群管理器 用于获取集群上资源的外部服务(例如standalone 、Mesos、YARN)
解读:例如spark-submit --master local[2] /spark://hadoop000:7077/yarn都是通过Cluster manager 来申请资源的


Deploy mode Distinguishes where the driver process runs. In “cluster” mode, the framework launches the driver inside of the cluster. In “client” mode, the submitter launches the driver outside of the cluster.
翻译:
部署模式 区分驱动程序进程运行的位置。 1、在“集群”模式下,框架在集群内部启动驱动程序。2、在“客户端”模式下,提交者在集群外部启动驱动程序。


Worker node Any node that can run application code in the cluster
翻译:
工作节点 可以在群集中运行应用程序代码的任何节点
解读:standalone: slave节点 slave配置文件 yarn:nodemanager


Executor A process launched for an application on a worker node, that runs tasks and keeps data in memory or disk storage across them. Each application has its own executors.
翻译:
执行人 为工作节点上的应用程序启动的进程,它运行任务并将数据保存在内存或磁盘存储器中。每个应用程序都有自己的执行者。


Task A unit of work that will be sent to one executor
翻译:
任务 一个工作单元,将被发送给一个执行者


Job A parallel computation consisting of multiple tasks that gets spawned in response to a Spark action (e.g. save, collect); you’ll see this term used in the driver’s logs.
翻译:
工作 由多个任务组成的并行计算,这些任务是响应一个Spark操作(例如save、collect)而产生的;您将在驱动程序日志中看到这个术语。


Stages Each job gets divided into smaller sets of tasks called stages that depend on each other (similar to the map and reduce stages in MapReduce); you’ll see this term used in the driver’s logs.
翻译:
阶段 每个作业都被分成更小的一组称为依赖于彼此的阶段的任务(类似于map reduce中的map和reduce阶段);您将在驱动程序日志中看到这个术语。
解读:一个stage的边界往往是从某个地方取数据开始,到shuffle的结束


章小结:Application 基于Spark的应用程序 = 1个driver + 多个executors 组成 executors又是运行在 Worker node上的 里面那又有好多的 Task,task那也是遇到action动作是被触发,那么一个action又对应一个job,那么一个job会被拆分成多个task子集又被称为stages,最终task就是我们最小的运行单元,在运行当中又可以指定不同的运行模式Cluster manager来指定我们的Deploy mode到底是用cluster还是client,


2、结合Spark UI 讲解Spark 核心概念

1、首先打开 http://hadoop000:8080/ UI页面。
Spark Core进阶
3、Spark运行架构及注意事项
Spark Core进阶
Components(组件)

Spark applications run as independent sets of processes on a cluster, coordinated by the SparkContext object in your main program (called the driver program).

Specifically, to run on a cluster, the SparkContext can connect to several types of cluster managers (either Spark’s own standalone cluster manager, Mesos or YARN), which allocate resources across applications. Once connected, Spark acquires executors on nodes in the cluster, which are processes that run computations and store data for your application. Next, it sends your application code (defined by JAR or Python files passed to SparkContext) to the executors. Finally, SparkContext sends tasks to the executors to run.

翻译:

Spark应用程序在集群上作为独立的进程集运行,由主程序(称为驱动程序)中的SparkContext对象协调。

具体来说,要在集群上运行,SparkContext可以连接到几种类型的集群管理器(Spark自己的独立集群管理器、Mesos或YARN),它们在应用程序之间分配资源。一旦连接,Spark将获取集群中节点上的执行器,这些节点是运行计算和存储应用程序数据的进程。接下来,它将应用程序代码(由传递给SparkContext的JAR或Python文件定义)发送给执行器。最后,SparkContext将任务发送给执行器以运行。


There are several useful things to note about this architecture:

Each application gets its own executor processes, which stay up for the duration of the whole application and run tasks in multiple threads. This has the benefit of isolating applications from each other, on both the scheduling side (each driver schedules its own tasks) and executor side (tasks from different applications run in different JVMs). However, it also means that data cannot be shared across different Spark applications (instances of SparkContext) without writing it to an external storage system.
Spark is agnostic to the underlying cluster manager. As long as it can acquire executor processes, and these communicate with each other, it is relatively easy to run it even on a cluster manager that also supports other applications (e.g. Mesos/YARN).
The driver program must listen for and accept incoming connections from its executors throughout its lifetime (e.g., see spark.driver.port in the network config section). As such, the driver program must be network addressable from the worker nodes.
Because the driver schedules tasks on the cluster, it should be run close to the worker nodes, preferably on the same local area network. If you’d like to send requests to the cluster remotely, it’s better to open an RPC to the driver and have it submit operations from nearby than to run a driver far away from the worker nodes.

翻译:

关于此体系结构,需要注意以下几点:

每个应用程序都有自己的执行器进程,这些进程在整个应用程序的持续时间内保持运行,并在多个线程中运行任务。这样做的好处是在调度端(每个驱动程序都调度自己的任务)和执行器端(不同应用程序的任务在不同的jvm中运行)将应用程序彼此隔离。然而,这也意味着数据不能在不同的Spark应用程序(SparkContext的实例)之间共享,除非将其写入外部存储系统。

Spark对底层的集群管理器是不可知的。只要它能够获取执行器进程,并且这些进程相互通信,即使在支持其他应用程序(例如Mesos/YARN)的集群管理器上运行它也相对容易。

驱动程序必须在其整个生命周期内侦听并接受来自其执行程序的传入连接(例如,请参阅网络配置部分中的spark.driver.port)。因此,驱动程序必须可以从工作节点进行网络寻址。

因为驱动程序在集群上调度任务,所以它应该在工作节点附近运行,最好在同一个局域网上运行。如果您想远程向集群发送请求,最好打开一个RPC到驱动程序并让它从附近提交操作,而不是在远离工作节点的地方运行驱动程序。


4、Spark和Hadoop重要的概念区分
Spark Core进阶
5、Spark缓存的作用
Spark Core进阶
RDD Persistence(RDD 持久化)
One of the most important capabilities in Spark is persisting (or caching) a dataset in memory across operations. When you persist an RDD, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows future actions to be much faster (often by more than 10x). Caching is a key tool for iterative algorithms and fast interactive use.

You can mark an RDD to be persisted using the persist() or cache() methods on it. The first time it is computed in an action, it will be kept in memory on the nodes. Spark’s cache is fault-tolerant – if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it.

In addition, each persisted RDD can be stored using a different storage level, allowing you, for example, to persist the dataset on disk, persist it in memory but as serialized Java objects (to save space), replicate it across nodes. These levels are set by passing a StorageLevel object (Scala, Java, Python) to persist(). The cache() method is a shorthand for using the default storage level, which is StorageLevel.MEMORY_ONLY (store deserialized objects in memory). The full set of storage levels is:

翻译:
Spark中最重要的功能之一是跨操作在内存中持久化(或缓存)数据集。当您持久化一个RDD时,每个节点将它计算的任何分区存储在内存中,并在该数据集(或从它派生的数据集)上的其他操作中重用它们。这使得未来的行动更快(通常超过10倍)。缓存是迭代算法和快速交互使用的关键工具。

可以使用persist()或cache()方法将RDD标记为持久化。第一次在操作中计算时,它将保存在节点上的内存中。Spark的缓存是容错的——如果RDD的任何分区丢失,它将使用最初创建它的转换自动重新计算。

此外,每个持久化的RDD都可以使用不同的存储级别进行存储,例如,允许您将数据集持久化在磁盘上,将其持久化在内存中,但将其作为序列化的Java对象(以节省空间)跨节点复制。通过将StorageLevel对象(Scala、Java、Python)传递给persist()来设置这些级别。cache()方法是使用默认存储级别的简写,默认存储级别是storage level.MEMORY_ONLY(将反序列化对象存储在内存中)。全套存储级别是:


MEMORY_ONLY
Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they’re needed. This is the default level.

仅内存
将RDD作为反序列化的Java对象存储在JVM中。如果RDD不适合内存,一些分区将不会被缓存,并且将在每次需要时动态重新计算。这是默认级别。


MEMORY_AND_DISK
Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don’t fit on disk, and read them from there when they’re needed.

内存磁盘
将RDD作为反序列化的Java对象存储在JVM中。如果RDD不适合内存,请将不适合的分区存储在磁盘上,并在需要时从中读取它们


MEMORY_ONLY_SER
(Java and Scala) Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.

仅存储用户
(Java和Scala)将RDD存储为序列化Java对象(每个分区一个字节数组)。这通常比反序列化对象更节省空间,特别是在使用快速序列化程序时,但读取时需要更多的CPU。


MEMORY_AND_DISK_SER
(Java and Scala) Similar to MEMORY_ONLY_SER, but spill partitions that don’t fit in memory to disk instead of recomputing them on the fly each time they’re needed.

内存和磁盘服务器
(Java和Scala)类似于只使用内存,但将不适合内存的分区溢出到磁盘,而不是在每次需要时动态重新计算它们。


DISK_ONLY
Store the RDD partitions only on disk.

仅磁盘
只在磁盘上存储RDD分区。


MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.
Same as the levels above, but replicate each partition on two cluster nodes.

只有内存、内存和磁盘等。
与上面的级别相同,但是在两个集群节点上复制每个分区


OFF_HEAP (experimental)
Similar to MEMORY_ONLY_SER, but store the data in off-heap memory. This requires off-heap memory to be enabled.

堆外(实验性)
类似于MEMORY-ONLY-SER,但将数据存储在堆外内存中。这需要启用堆外内存。


Removing Data
Spark automatically monitors cache usage on each node and drops out old data partitions in a least-recently-used (LRU) fashion. If you would like to manually remove an RDD instead of waiting for it to fall out of the cache, use the RDD.unpersist() method.

删除数据
Spark自动监视每个节点上的缓存使用情况,并以最近使用最少(LRU)的方式删除旧数据分区。如果要手动删除RDD而不是等待它从缓存中退出,请使用RDD.unpersist()方法。


Which Storage Level to Choose?
Spark’s storage levels are meant to provide different trade-offs between memory usage and CPU efficiency. We recommend going through the following process to select one:

If your RDDs fit comfortably with the default storage level (MEMORY_ONLY), leave them that way. This is the most CPU-efficient option, allowing operations on the RDDs to run as fast as possible.

If not, try using MEMORY_ONLY_SER and selecting a fast serialization library to make the objects much more space-efficient, but still reasonably fast to access. (Java and Scala)

Don’t spill to disk unless the functions that computed your datasets are expensive, or they filter a large amount of the data. Otherwise, recomputing a partition may be as fast as reading it from disk.

Use the replicated storage levels if you want fast fault recovery (e.g. if using Spark to serve requests from a web application). All the storage levels provide full fault tolerance by recomputing lost data, but the replicated ones let you continue running tasks on the RDD without waiting to recompute a lost partition.

要选择哪个存储级别?

Spark的存储级别旨在在内存使用率和CPU效率之间提供不同的权衡。我们建议通过以下过程选择一个:

如果RDD与默认存储级别(仅限内存)匹配得很好,请保留它们。这是CPU效率最高的选项,允许RDD上的操作尽可能快地运行。

如果没有,请尝试仅使用内存服务器并选择快速序列化库,以使对象更节省空间,但访问速度仍然相当快。(Java和Scala)

除非计算数据集的函数很昂贵,或者它们过滤了大量数据,否则不要溢出到磁盘。否则,重新计算分区可能与从磁盘读取分区一样快。

如果需要快速故障恢复,请使用复制的存储级别(例如,如果使用Spark服务来自web应用程序的请求)。所有的存储级别都通过重新计算丢失的数据提供了完全的容错,但是复制的存储级别允许您在RDD上继续运行任务,而无需等待重新计算丢失的分区。


6、Spark Lineage机制
Spark Core进阶

7、Spark 窄依赖和宽依赖
窄依赖:一个父RDD的partition之多,被子RDD的某个partition使用一次。
宽依赖:一个父的partition会被子RDD的partition使用多次,有shuffle

Shuffle operations
Certain operations within Spark trigger an event known as the shuffle. The shuffle is Spark’s mechanism for re-distributing data so that it’s grouped differently across partitions. This typically involves copying data across executors and machines, making the shuffle a complex and costly operation.

Background
To understand what happens during the shuffle we can consider the example of the reduceByKey operation. The reduceByKey operation generates a new RDD where all values for a single key are combined into a tuple - the key and the result of executing a reduce function against all values associated with that key. The challenge is that not all values for a single key necessarily reside on the same partition, or even the same machine, but they must be co-located to compute the result.

In Spark, data is generally not distributed across partitions to be in the necessary place for a specific operation. During computations, a single task will operate on a single partition - thus, to organize all the data for a single reduceByKey reduce task to execute, Spark needs to perform an all-to-all operation. It must read from all partitions to find all the values for all keys, and then bring together values across partitions to compute the final result for each key - this is called the shuffle.

Although the set of elements in each partition of newly shuffled data will be deterministic, and so is the ordering of partitions themselves, the ordering of these elements is not. If one desires predictably ordered data following shuffle then it’s possible to use:

mapPartitions to sort each partition using, for example, .sorted
repartitionAndSortWithinPartitions to efficiently sort partitions while simultaneously repartitioning
sortBy to make a globally ordered RDD
Operations which can cause a shuffle include repartition operations like repartition and coalesce, ‘ByKey operations (except for counting) like groupByKey and reduceByKey, and join operations like cogroup and join.

Shuffle操作

Spark中的某些操作会触发一个称为shuffle的事件。shuffle是Spark的一种机制,用于重新分发数据,以便在不同的分区中对其进行分组。这通常涉及到跨执行器和机器复制数据,使洗牌成为一个复杂而昂贵的操作。

背景

为了了解在shuffle期间发生了什么,我们可以考虑reduceByKey操作的示例。reduceByKey操作生成一个新的RDD,其中一个键的所有值被组合成一个元组(tuple),这个元组是针对与该键相关联的所有值执行reduce函数的键和结果。挑战在于,并不是一个键的所有值都必须位于同一个分区,甚至是同一台计算机上,但它们必须位于同一个位置才能计算结果。

在Spark中,数据通常不分布在分区之间,而是分布在特定操作所必需的位置。在计算过程中,单个任务将在单个分区上操作—因此,要组织单个reduceByKey reduce任务要执行的所有数据,Spark需要执行all-to-all操作。它必须从所有分区中读取以找到所有键的所有值,然后将所有分区中的值组合在一起以计算每个键的最终结果—这称为shuffle。

尽管新洗牌数据的每个分区中的元素集是确定的,分区本身的顺序也是确定的,但这些元素的顺序不是确定的。如果在洗牌后需要可预测的有序数据,则可以使用:

映射分区以对每个分区进行排序,例如,使用.sorted

重新分区和SortWithinPartitions以在同时重新分区的情况下高效地对分区进行排序

sortBy制造全球订购的RDD

可能导致混乱的操作包括重新分区操作,如重新分区和合并;按键操作(计数除外),如groupByKey和reduceByKey;连接操作,如cogroup和join。
Spark Core进阶

相关文章: