文章目录
Distributed Runtime
参考:https://ci.apache.org/projects/flink/flink-docs-release-1.8/concepts/runtime.html
笔者根据FLink1.8版本官方文档进行翻译整理,可能会有偏差。
Tasks and Operator Chains
For distributed execution, Flink chains operator subtasks together into tasks. Each task is executed by one thread. Chaining operators together into tasks is a useful optimization: it reduces the overhead of thread-to-thread handover and buffering, and increases overall throughput while decreasing latency. The chaining behavior can be configured; see the chaining docs for details.
对于分布式执行,Flink将operator子任务链接到task中。每个任务由一个线程执行。将operator链接到task是一个有用的优化:它减少了线程到线程的切换和缓冲的开销,并在降低延迟的同时提高了总体吞吐量。可以配置链接行为。有关详细信息,请参见链接文档。
The sample dataflow in the figure below is executed with five subtasks, and hence with five parallel threads.
下图中的示例数据流由五个子任务执行,因此由五个并行线程执行。
Flink是一个分布式流计算引擎,Flink引擎会将一个计算任务拆分若干个Task (等价于Spark的stage),每一个计算任务都是由若干个Task组成的有向无环图 (Dataflow graph --类似于spark中的DAG),Flink通过OperatorChain方式尽可能的将一些operators合并到一个Task中作为一个执行逻辑,每个Task一般都会并行执行,系统会根据Task的并行度构建SubTasks,每一个SubTask本质上就是一个线程;Flink使用OperatorChain目的是为了减少不必要的线程开销以及网络开销。
- Task:等价于Spark中的Stage,每个Task会根据并行度创建多个SubTask;
- SubTask:等价于Stage中的一个分区,每个SubTask对应一个线程;
- OperatorChain:将多个Operator操作符归并到一个Task计算逻辑的一种机制;
Job Managers, Task Managers, Clients
The Flink runtime consists of two types of processes:
The JobManagers (also called masters) coordinate the distributed execution. They schedule tasks, coordinate checkpoints, coordinate recovery on failures, etc.
There is always at least one Job Manager. A high-availability setup will have multiple JobManagers, one of which one is always the leader, and the others are standby.
The TaskManagers (also called workers) execute the tasks (or more specifically, the subtasks) of a dataflow, and buffer and exchange the data streams.
There must always be at least one TaskManager.
Flink运行时包含两种类型的过程:
- Job Managers(masters):协调分布式执行,负责安排任务、协调checkpoints、故障恢复等。每个 Job 至少会有一个 JobManager,高可用部署下会有多个 JobManagers,其中一个作为leader,其余处于 standby 状态;
- Task Managers(workers):执行 dataflow中的 tasks(准确来说是 subtasks),并且缓存和交换数据 streams。每个 Job至少会有一个 TaskManager;
The JobManagers and TaskManagers can be started in various ways: directly on the machines as a standalone cluster, in containers, or managed by resource frameworks like YARN or Mesos. TaskManagers connect to JobManagers, announcing themselves as available, and are assigned work.
JobManager和TaskManager可以通过多种方式启动:作为独立群集直接在计算机上,在容器中启动,或者由诸如YARN或Mesos的资源框架进行管理。 TaskManager连接到JobManager,宣布自己可用,并被分配工作。
The client is not part of the runtime and program execution, but is used to prepare and send a dataflow to the JobManager. After that, the client can disconnect, or stay connected to receive progress reports. The client runs either as part of the Java/Scala program that triggers the execution, or in the command line process
./bin/flink run ....
Client并不是计算的一部分,Client仅仅负责提交任务的 DataFlow graph给 JobManager,提交完成之后,客户端可以断开连接,也可以保持连接来接收进度报告。Client要么作为触发执行的Java / Scala程序的一部分运行,要么在命令行过程 ./bin/flink run ...中运行。
Task Slots and Resources
Each worker (TaskManager) is a JVM process, and may execute one or more subtasks in separate threads. To control how many tasks a worker accepts, a worker has so called task slots (at least one).
每一个Worker (TaskManager) 都是一个JVM进程,可以执行一个或多个子任务 (Thread/SubTask);通过Task Slot (任务槽)来控制Worker节点能够接受多少个Task,每个节点至少有一个Task Slot。
Each task slot represents a fixed subset of resources of the TaskManager. A TaskManager with three slots, for example, will dedicate 1/3 of its managed memory to each slot. Slotting the resources means that a subtask will not compete with subtasks from other jobs for managed memory, but instead has a certain amount of reserved managed memory. Note that no CPU isolation happens here; currently slots only separate the managed memory of tasks.
每个TaskSlot表示的是TaskManager的一份固定资源子集,例如,具有三个slots的TaskManager 会将其管理的内存资源分成三等份给每个slot。分配资源意味着子任务不会与其他作业的子任务竞争托管内存,而是具有一定数量的保留托管内存。请注意,此处没有发生CPU隔离。当前插槽仅将任务的托管内存分开。
每个TaskSlot表示的是TaskManager的一份固定资源子集,例如,具有三个slots的TaskManager 会将其管理的内存资源分成三等份给每个slot。每个Job启动的时候,都拥有自己固定的Task Slot,也就意味着避免了不同Job间在运行时产生内存资源抢占,这些被分配的Task Slot资源只能被当前Job的所有Task使用,不同Job的Task之间不存在资源共享和抢占问题。
By adjusting the number of task slots, users can define how subtasks are isolated from each other. Having one slot per TaskManager means each task group runs in a separate JVM (which can be started in a separate container, for example). Having multiple slots means more subtasks share the same JVM. Tasks in the same JVM share TCP connections (via multiplexing) and heartbeat messages. They may also share data sets and data structures, thus reducing the per-task overhead.
通过调整任务槽的数量,用户可以定义子任务如何相互隔离。每个TaskManager具有一个插槽,意味着每个任务组都在单独的JVM中运行(例如,可以在单独的容器中启动)。具有多个插槽意味着更多子任务共享同一JVM。同一JVM中的任务共享TCP连接(通过多路复用)和心跳消息。他们还可以共享数据集和数据结构,从而减少每个任务的开销。
一个Job会被拆分成若干个Task,每个Task由若干个SubTask构成 (取决于Task的并行度)。默认Task Slot所对应的内存资源只能在同一个Job下的不同Task的SubTask之间进行共享,也就意味着同一个Task的不同SubTask不能运行在同一个Task Slot中,但是如果同一个Job的不同Task的SubTask却可以。
如果同一个Job的不同Task的SubTask不共用Task Slot,会导致资源浪费。例如下图中
source、map操作定位为资源稀疏性操作,因为该操作占用内存量小,而keyBy/window/apply涉及Shuffle会占用大量的内存资源,定位为资源密集型操作,比较吃内存。
By default, Flink allows subtasks to share slots even if they are subtasks of different tasks, so long as they are from the same job. The result is that one slot may hold an entire pipeline of the job. Allowing this slot sharing has two main benefits:
- A Flink cluster needs exactly as many task slots as the highest parallelism used in the job. No need to calculate how many tasks (with varying parallelism) a program contains in total.
- It is easier to get better resource utilization. Without slot sharing, the non-intensive source/map() subtasks would block as many resources as the resource intensive window subtasks. With slot sharing, increasing the base parallelism in our example from two to six yields full utilization of the slotted resources, while making sure that the heavy subtasks are fairly distributed among the TaskManagers.
默认情况下,Flink允许子任务共享slot,即使它们是不同任务的子任务也是如此,只要它们来自同一作业即可。结果是一个slot可以容纳整个作业管道。允许slot共享有两个主要好处:
- Flink集群所需的任务槽与作业中使用的最高并行度恰好一样多。无需计算一个程序总共包含多少个任务(并行度各不相同)。
- 资源可以更好地利用。如果没有slot共享,则非密集型
source / map()子任务将占用与资源密集型window子任务一样多的资源。通过slot共享,我们的示例中的基本并行度从2增加到6,可以充分利用插槽资源,同时确保繁重的子任务在TaskManager之间公平分配。
As a rule-of-thumb, a good default number of task slots would be the number of CPU cores. With hyper-threading, each slot then takes 2 or more hardware thread contexts.
根据经验,默认的任务插槽数量应该是CPU内核的数量。使用多线程,每个插槽将占用2个或更多硬件线程上下文。
可以看出Flink默认行为是尝试将同一个job的下的不同Task的SubTask进行Task slot共享。也就意味着一个Job的运行所需要的Task Slot的个数应该等于该Job中Task并行度的最大值。当然用户也可以通过程序干预Flink Task间Task Slot共享策略。
结论:Flink的job运行所需要的资源数是自动计算出来的,无需用户指定,用户只需指定计算并行度即可。
State Backends
The exact data structures in which the key/values indexes are stored depends on the chosen state backend. One state backend stores data in an in-memory hash map, another state backend uses RocksDB as the key/value store. In addition to defining the data structure that holds the state, the state backends also implement the logic to take a point-in-time snapshot of the key/value state and store that snapshot as part of a checkpoint.
key/value索引的确切数据结构取决于所选的state backend。一个state backend将数据存储在内存中的哈希图中,而另一个state backend使用RocksDB作为key/value存储。除了定义保存状态的数据结构之外,state backend还实现逻辑以获取key/value状态的时间点快照并将该快照存储为检查点的一部分。
Savepoints
Programs written in the Data Stream API can resume execution from a savepoint. Savepoints allow both updating your programs and your Flink cluster without losing any state.
用Data Stream API编写的程序可以从保存点恢复执行。保存点允许更新程序和Flink集群,而不会丢失任何状态。
Savepoints are manually triggered checkpoints, which take a snapshot of the program and write it out to a state backend. They rely on the regular checkpointing mechanism for this. During execution programs are periodically snapshotted on the worker nodes and produce checkpoints. For recovery only the last completed checkpoint is needed and older checkpoints can be safely discarded as soon as a new one is completed.
保存点是手动触发的检查点,它们为程序创建快照并将其写到state backend。他们为此依靠常规的检查点机制。在执行期间,程序会定期在工作节点上快照并产生检查点。为了进行恢复,仅需要最后完成的检查点,并且可以在新的检查点完成后立即安全地丢弃较旧的检查点。
Savepoints are similar to these periodic checkpoints except that they are triggered by the user and don’t automatically expire when newer checkpoints are completed. Savepoints can be created from the command line or when cancelling a job via the REST API.
保存点与这些定期检查点类似,除了它们由用户触发并且在更新的检查点完成时不会自动过期。可以从命令行或通过REST API取消作业时创建保存点。