楔子

Flink 在大数据领域已经被应用的越来越广泛,很多大公司内部都有它的身影,那么问题来了,Flink 到底是用来做什么的呢?

首先提到 Flink 必然绕不开流计算(或者说流式计算、流处理等等),没错,Flink 是一个分布式、高性能的流计算引擎。比如天猫的成交额一分钟能破百亿,大屏实时监控等等,其背后靠的就是一套强大的流计算引擎来支撑,从而实时动态地得到统计结果。

目前在流计算领域,最主流的技术有:Storm、Spark Streaming、Flink,但是能同时做到低延时、Exactly-Once、以及高吞吐,在开源界只有 Flink 有这个能力。面对日益增长的数据规模,以及延时越来越低的数据处理要求,流计算正在成为数据处理平台所必备的能力之一。在好几年前,我们还停留在 Hadoop、MapReduce、Hive 上面,之后 Spark 项目异军突起、逐渐成为大数据领域的当红明星,即便现在很多公司所使用的仍是 Hadoop Spark 等离线处理技术。但是在未来,流计算一定会成为分布式计算的主要方向之一,而如果想掌握流计算相关的技术,Flink 必然是我们的首选。

另外除了 Flink 之外,还会涉及到 Python,因为 Python 是目前的主流语言之一,所以 Python + Flink(pyflink) 就诞生了。并且我本人也是 Python 方向的,所以当涉及到使用代码操作 Flink 时,只使用 Python 进行操作。尽管 Flink 对 Python 的支持不像 Java 和 Scala 那么完美,但是对我而言没得选。

大数据技术发展

从 Google 的三驾马车 GFS、MapReduce、BigTable 开始,大数据在不断地发展,而在大数据处理里面,计算模式可以分为四种。

全面解析流处理框架 Flink,以及和 Python 的结合

而我们这里重点关注批计算和流计算,那么这两者有什么区别呢?

1. 数据时效性不同:流式计算具有实时、低延迟等特点;批量计算则是非实时、高延迟的。

2. 数据特征不同:流式计算的数据一般是动态的、没有边界的,数据像水流一样源源不断、你不知道什么时候会结束;而批量计算的数据则是静态的,每次处理的数据量是已知的。

3. 应用场景不同:流式计算应用在实时场景,或者说时效性要求比较高的场景,比如实时推荐、业务监控等等;而批量计算一般也说成是批处理,应用在实时性要求不高、离线计算的场景下,比如数据分析、离线报表等等。

4. 运行方式不同:流式计算的任务是持续进行的,来了就立刻处理;批量计算的任务则是一次性完成,可以理解为将数据先积攒起来,达到一定规模之后再一次性处理。

所以概念还是很好理解的,比如批量计算就是将数据一批一批的处理,但如果每一批的这个量非常非常小,那么是不是就变成流计算了呢。像 Spark Streaming 就是这么做的,虽然 Spark Streaming 也可以做流处理,但它的实时性达不到 Flink 这个级别,就是因为它本质还是批处理,只不过它的每次处理的批(batch)非常小罢了。因此 Spark Streaming 是伪实时的,只不过对于很多场景还是可以接收的,因此仍被广泛使用。但如果对实时性要求特别严苛的话,那么就需要使用 Flink 了。

因此批计算本身就是一种特殊的流计算,批和流本身是相辅相成的,当批小到可以忽略不计的时候就就变成流了。

为什么流式计算将成为主流

数据的实时性要求越来越高,越来越多的业务更加强调时效性,比如实时推荐、风控等业务,这些业务促使数据处理技术像流式计算慢慢普及。

流计算技术日趋成熟,像 Spark Streaming、Flink 等一系列流式计算引擎变得越来越稳定、同时也越来越容易上手。因此越来越多的用户选择流处理技术,来处理自身的一些业务。

批计算会带来一些计算和存储上的成本,以往对于离线做大数据处理来讲,数据需要先存储在分布式文件系统当中,然后再基于这些数据进行分析,这就导致我们的一个存储成本相对较高。

适合流计算的场景

交通工具、工业设备和农业机械上的传感器就会将数据发送到流处理应用程序,该应用程序会监控性能,从而提前检测潜在的缺陷。

金融机构实时跟踪市场波动,计算风险价值,然后根据股票价格变动自动重新平衡投资组合。

房地产网站跟踪客户移动设备中的部分数据,然后根据其地理位置实时建议应走访的房产。

当然除了以上之外场景还有很多,比如电商的推荐系统,就是最让人反感的 "猜你喜欢",还有天气预测、流量统计等等,场景非常多。

主流的流式计算引擎

我们说流计算引擎(或者框架)有很多种,那么它们的特点如何呢?

Storm:

  • 最早使用的流处理框架,社区比较成熟
  • 支持原生流处理,即单事件来处理数据流,所有记录一个接一个处理
  • 延迟性低,在毫秒级
  • 但是消息保障能力弱,消息传输可能会出现重复,但是不会丢失
  • 吞吐量比较低

Spark Streaming:

  • Spark Streaming 属于 Spark API 的一个扩展
  • 以固定间隔(如几秒钟)处理一段段的批处理作业,即微批处理
  • 延迟性较高,在秒级
  • 消息保障能力强,能保证消息传输既不会丢失也不会重复
  • 具有非常高的吞吐

Spark Streaming 实际上就是将数据转成一个个的微批,然后交给 Spark Engine 处理完毕之后再发送给下游。所以如果对实时性要求不是特别高,对延迟的容忍度比较大的话,那么采用 Spark Streaming 是一个非常不错的选择。

Flink:

  • 真正的流处理框架,底层采用的数据模型是 DataFlow Model
  • 和 Storm 一样,延迟不超过毫秒级
  • 消息能够既不丢失也不重复
  • 具有非常高的吞吐
  • 原生支持流处理

所以 Flink 无疑是最优的,因为选择一个流处理框架我们主要从四个方面考虑:

  • 1. 低延迟:是否能达到毫秒级
  • 2. 高吞吐:是否能达到每秒千万级吞吐
  • 3. 准确性:是否能保证 exactly-once 语义,消息不会重复也不会丢失,只会被传输一次
  • 4. 易用性:是否易于开发,比如支持 SQL,以及其它的编程语言

而能够同时满足以上四点的只有 Flink,其它的都有相应的缺点。比如:Storm 不能保证高吞吐、以及不支持 SQL;Spark Streaming 延迟相对较高,不能做到真正的实时,只能是伪实时。因此,Flink 是我们的最终选择。

关于 Flink 先有一个基本的认识,至于它的更多特性,我们后面慢慢说。

Flink 集群基本架构

到目前为止我们对 Flink 已经有了一个最最基本的了解,下面来看看 Fink 集群的基本架构。

全面解析流处理框架 Flink,以及和 Python 的结合

整个 Flink 集群也是遵循 Master Worker 这样的一个架构模式,其中 JobManager 为管理节点,TaskManager 为工作节点。

JobManager:简称 JM,它是管理节点,负责管理整个集群的计算资源、Job 的调度与执行,以及 CheckPoint 的协调。

TaskManager:简称 TM,它是工作节点,可以有很多个,每个 TM 一般部署在不同的节点上。JM 会将整个作业切分成多个 Task,然后分别发送到不同的 TM 上面,再由 TM 提供计算资源来对任务进行处理。

Client:客户端,它负责接收用户提交的 Application Jar,这些 Application Jar 会通过命令行的方式提交给客户端,客户端会在本地内部产生一个 JobGraph 对象(也就是我们上面说的 Job)。然后通过 rpc 的方式将 JobGraph 提交到 JobManager 上面,如果成功提交,那么 JM 会给客户端返回一个 JobClient,客户端通过 JobClient 可以和 JM 进行通信,获取 Job 的执行状态。

JobManager

简单介绍了 Flink 集群架构之后,我们再来详细介绍一下里面的组件,首先是 JM。

全面解析流处理框架 Flink,以及和 Python 的结合

这里面涉及了很多概念,我们慢慢说。

Checkpoint Coordinator:

Checkpoint Coordinator 指的是 TM 里 Checkpoint 的协调工作,Checkpoint 会通过文件检查点的方式来记录一些状态性的数据,主要和容错相关,而 JM 会负责 TM 里面 Checkpoint 的协调与执行;

JobGraph >>> Execution Graph:

客户端会在内部生成一个 JobGraph 给 JM,JM 在收到 JobGraph 时又会生成一个 Execution Graph(物理执行逻辑图),这两者的区别可以简单想象成 SQL 语句和 SQL 执行计划之间的区别,然后 JM 再将 Execution Graph 拆分成多个执行单元(Task)发送到 TM 上面。

RPC 通信(Actor System):

JM 和 TM、以及和客户端都是通过 Akka 来实现 RPC 通信,而 Akka 里面最核心的组件就是 Actor System,它负责两个进程之间的通信。

Job 接收(Job Dispatch):

客户端实际上会提交不同的 JobGraph,所以 JM 也会接收不同的 JobGraph,然后再拆分成多个 Task 分发到不同的 TM 上,所以这会涉及到 Job 的分发过程,而 JM 内部有一个 Job Dispatch 组件专门负责干这件事情。

集群资源管理(ResourceManager):

JM 内部有一个集群资源管理器,我们说 JM 会负责整个集群资源的管理,而不同的部署模式会对应不同实现的资源管理器,比如 On Standalone、On Kubernetes、On Yarn 等等。

TaskManager 的注册与管理:

JM 会负责 TM 注册,当我们尝试启动一个 TM 时,它会主动和 JM 建立 RPC 连接,这个过程中 JM 会将对应的 TM 的信息保存在本地,之后会不断地对 TM 进行心跳检测。

TaskManager

再来看看 TM,我们说它是真正负责提供计算资源来执行任务的。

全面解析流处理框架 Flink,以及和 Python 的结合

JM 会将 Task 发送到 TM 上,TM 内部会有一个 Task Scheduling,这个 Task Scheduling 可以理解成一个线程池,而 TM 就是整个进程。当 JM 提交 Task 到 TM 的时候,线程池内部就会启动一个线程来负责对任务进行执行,执行任务的线程就是 Task Slot(任务资源槽)。

然后是 Data Exchange,像我们在做 MapReduce、Spark 程序的时候经常会涉及到 Shuffle 的操作,而 Shuffle 就是数据在不同的节点之间进行传输和交互的过程(代价会比较昂贵)。对于 Flink 也是如此,如果数据进行了一个 GroupByKey 操作,那么就会涉及到数据的交互,而在 TM 里面也提供相应的 Shuffle Environment 来支持 Shuffle 操作。

我们说当出现 Shuffle 操作时意味着数据要跨节点传输,而传输也是通过 RPC 的方式,通过 Network Manager 组件提供的基于 Netty 实现的网络通信站。

在 TM 里面还有一个比较重要的 Memory Management,它和内存管理相关。当 Task 提交进来之后肯定还会伴随着相应的数据,这些数据肯定是需要内存的,而 Memory Management 则负责这些内存的管理。

最后就是我们之前提到的,当 TM 启动之后会向 JM 进行注册。

Client

最后是客户端,我们说客户端会接收用户发来的应用程序,然后在内部生成对应的 JobGraph,再拆分成多个 Task 发送到不同的 JM 上。

全面解析流处理框架 Flink,以及和 Python 的结合

用户可以使用 Java、Scala、Python、SQL 等语言编写 Flink 应用程序,然后发送给客户端 Client,再由 Client 提交到集群。而用户在将应用程序发送给客户端的时候,会在本地启动一个相应的 Client 进程,该进程就会负责解析用户提交的应用程序。解析的时候会将应用程序里面的 main 方法拿出来在自己的进程里面执行,执行的主要目的就是生成对应的 JobGraph 对象,这个 JobGraph 是什么后面会说,目前可以认为它是将我们应用程序内的代码使用 DAG 模式进行的一种表达。

在 Client 里面还有几个核心的概念,比如说 Context Environment,它表示相应的环境,实际上 Client 进行第一步就会创建 Context Environment,然后在 Context Environment 中执行 main 方法。当生成 JobGraph 对象时,会和依赖的一些 jar 包一块被提交到 JM 上、即 Job Submit,当然这个过程也是通过 RPC 的方式。

因此像 JobGraph(作业、或者说 Job)的生成、内部 DAG 的构建等等都是在客户端里面完成的。

JobGraph

我们说 Client 会提交 JobGraph 到 JM 上,那么这个 JobGraph 到底长什么样子呢?

全面解析流处理框架 Flink,以及和 Python 的结合

首先我们可以采用不同的 API 编写应用程序,比如:DataStream、DataSet、Flink SQL、Table 等等。不管采用哪种方式,编写的出的应用程序最终都要打成相应的 Jar 包,当然这也不是唯一的方式,比如 SQL Client 模式可以直接向客户端提交一些 SQL 脚本,但是绝大部分情况都是打成一个可执行的 Jar 包,再调用 flink run 命令执行。

而在客户端中我们看到有一个 Exectuor,也就是执行器,当然执行器也分为几种类型,比如本地执行器、远程执行器、On Yarn 执行器,执行器后面会详细说,先来看看它的通用功能。首先会通过反射的方式调用应用程序里面的 main 方法,对应 Application Code 的执行;然后调用应用程序的 Execute 方法,将应用程序转成 StreamGraph,从图中可以看出这个 Streamgraph 只是描述了一个转换的大概逻辑,仅仅只是一个 DataFlow,但没有体现出算子的并行度;接下来再将 StreamGraph 转成 JobGraph,此时会对每一个算子进行拆解、指定相应的并行度;最后再调用 submit 将生成的 JobGraph(一个有向无环图)提交到 JM 中,而 DAG 里面算子的执行,则依赖 JM 将任务调度到 TM 上。

因此 JobGraph 就是应用程序对应的一个 DAG,也就是通过有向无环图的方式去表达应用程序,而且不同接口的应用程序最终都会生成 JobGraph,此时也具备统一性,也就是不管采用什么样的 API 最终提交给 JM 的都是相同标准的 JobGraph。

Flink 集群运行模式以及资源管理器

Flink 集群的运行模式有以下几种:

  • Session Mode
  • Per-Job Mode
  • Application Mode(Flink 1.11 版本提出)

集群运行模式分为这几种的原因主要从两方面考量:1. 集群的生命周期和资源隔离;2. 程序 main 方法的执行是在 Client 中还是在 JobManager 中。关于第二个方面可能有人会纳闷,之前不是说了吗?main 方法是在 Client 的 Executor 组件中执行的。但是在 Flink 1.11 版本的时候,Application 的 main 方法运行在 Flink 集群上,而不在客户端。

那么下面来分别看看这几种运行模式之间的区别。

集群运行模式

Session Mode

对于 Session 集群运行模式来说,所有提交的 Job 都在一个 Runtime 中运行。

全面解析流处理框架 Flink,以及和 Python 的结合

JM 和多个 TM 整体组成一个集群,所有的 Job 都会发送到同一个 JM 上,因此 JM 会管理多个不同的 Job,并进行协调、以及为 Job 在 TM 上申请 Task Slot 进行计算。因此 Session 模式的特点就是多个 Job 共享同一个 JM,或者说同一个集群的 Master 节点。而对于客户端而言,会执行一些方法生成相应的 JobGraph 对象,然后连同依赖的 Jar 包一块传到 Master 节点上面。

所以对于 Session 模式的集群,JM 的生命周期不受 Job 的影响,不管提交多少个 Job,JM 始终处于运行状态。

Session 模式的好处显然是资源充分共享,利用率高,因为所有的 Job 都提交到同一个 JM 上。并且此时的 Job 由 JM 负责管理,运维也会简单。而缺点也明显,就是资源隔离相对较差,而且对于非 Native 类型部署时,TM 不容易扩展、Slot 计算资源伸缩性较差(Native 是什么后面会说)。

Per-Job Mode

从名字上可以看出,对于 Per-Job 模式而言每一个 Job 对应一个集群,每个 Job 独占一个 JM。

全面解析流处理框架 Flink,以及和 Python 的结合

当我们提交一个 Job 之后,它会交给单独的 JM 负责,然后 JM 会去启动对应的 TM。而 TM 也会和相应的 Job 相互绑定,之间也不会形成共享机制。比如另一个 Job 提交上来之后,会启动一个新的 JM,然后由 JM 去启动对应的 TM。而当 Job 执行完毕之后,JM 和对应的 TM 会被释放、回收,因此 Per-Job 一个最大的特点就是 JM 的生命周期和 Job 是绑定的。

对于 Per-Job 模式而言,在一套集群资源管理器(Cluster Manger)上会启动多个 JM,它的好处就是 Job 之间的隔离是充分的。并且每个 JM 会为 Job 指定相应的 TM,而根据 Job 的不同 TM Slots 的数量可以不同;至于缺点肯定都能想到,那就是浪费资源,因为 JM 是需要消耗资源的。此外 Job 的管理不再由 JM 负责,而是完全交给 Cluster Manager,此时的管理会比较复杂。

Application Mode

无论是 Session 模式还是 Per-Job 模式,它的模式都是一样的,比如用户想要提交一个应用程序:

  • 1. 首先需要下载应用依赖的一些 Jar 包,以及安装客户端
  • 2. 在 Client 中执行 main 方法生成 JobGraph 对象
  • 3. 将 JobGraph 和依赖一块提交到集群上运行,这一步会消耗带宽,并且依赖每次都要上传,再加上依赖的 Jar 包也可能会比较大
  • 4. 等待 Job 运行结果

但是问题来了,首先生成 JobGraph 是需要消耗 CPU 资源的,任务多的话会导致客户端压力增大。而且生成 Jobgraph 是同步的,任务一多的话也可能会造成阻塞,加上 JobGraph 和依赖的 Jar 包的提交也是需要时间的,如果 Jar 包比较大的话,会非常依赖带宽,更何况这些依赖每次都要上传。而对于流平台而言重要的就是实时,不能陷入等待。因此后来就有人提出,能不能把生成 JobGraph 这一步从客户端移到 JM 上,这样可以释放本地客户端的压力。最终在客户端里,只需要负责命令的提交、下发,以及等待 Job 的运行结果即可。

因此这就涉及到了下面的 Application 模式:

全面解析流处理框架 Flink,以及和 Python 的结合

可以看到 JM 首先会去分布式文件存储系统中拉取依赖的包,然后根据应用程序生成 JobGraph 对象,然后执行、调度,整个过程都是在 JM 当中进行的。这样做的好处就是,客户端不需要再每一次都将 Jar 包提交到 JM 上,从而避免网络传输上的消耗、以及客户端的负载。而且 Application Mode 也实现了资源隔离,虽然所有的 Job 共享一个 JM,但是在 JM 内部又在 Application 层面实现了资源隔离。那么 Application 模式的资源隔离和 Per-Job 模式的资源隔离有什么区别呢?举个栗子:Per-Job 模式实现的隔离可以看成是虚拟机层面的隔离,每个 Job 对应的 JM 可以看成是单独的虚机,显然这是比较耗资源的;而 Application 模式实现的隔离可以看成是容器层面的隔离,所有 Job 共享一个 JM,但它们都在各自的容器中,因此即实现了隔离又避免了资源的浪费。而在执行任务的时候,也会启动相应的 TM。

但是 Application 模式功能太新,是在 Flink 1.11 版本中提出的,还未经过生产验证。而且集群资源管理器只支持 Yarn 和 Kubernete,对于 Yarn 可以直接去分布式文件存储(如HDFS)中获取依赖的包;对于 Kubernetes 则是需要先将依赖的包打成镜像,会多一步镜像构建的过程。

Flink 集群资源管理器

在了解完 Flink 的集群运行模式之后,我们来看看 Flink 支持哪些集群资源管理器,就目前来说 Flink 支持的 Cluster Manager 有以下几种:

  • Local
  • Standalone
  • Hadoop Yarn
  • Apache Mesos
  • Docker
  • Kubernetes

但是我们说 Flink 有三种运行模式,但这三种模式并不是每一个 Cluster Manager 都同时支持。

全面解析流处理框架 Flink,以及和 Python 的结合

很明显对于用户而言,首选的就是 On Yarn 和 On Kubernetes,On Yarn 用的还是比较多的,但 On k8s 是未来的主流。

Native 集群部署

这里面涉及到一个概念,就是 Native 集群部署,我们来解释一下。对于 Native 集群部署来说,当 Cluster Manager 以 Session 模式启动集群时,只启动 JM 不会启动 TM,只有在客户端提交 Job 之后,JM 才会跟 Cluster Manager 进行交互、申请资源,动态启动 TM 满足计算要求。至于其它的 Job 也是同理,因此 TM 的申请是由提交的 Job 来决定的,这样做的好处就是可以极大的利用 Cluster 上的资源,不会造成资源的预占用(没拉????️先把坑占了)。

支持 Native 部署模式的 Cluster Manager 有 Yarn、k8s、Mesos,Standalone 是不支持的。对于 Standalone 而言,TM 节点实例有多少个需要事先指定好、并先启动,然后 JM 才会接收 Job,而不是在 JM 接收 Job 之后再去启动 TM,因为 TM 有多少个事先都已经启动完毕了。

Flink on Standalone

然后我们来看看当 Cluster Manager 为 Standalone 这种模式的集群,如果用过 Spark 的话,那么应该很容易理解,因为 Spark 也有 Standalone,就是真正意义上的分布式。多个节点之间,其中一个为主节点,其余的为从节点,节点之间通过 RPC 进行通信。

全面解析流处理框架 Flink,以及和 Python 的结合

对于 Flink on Standalone 这种模式来说,TM必须要事先进行注册,也意味着能够计算的资源会一开始就确定好。然后 JM 和 TM 运行的时候也会有相应的进程,如果 JM 和 TM 都在同一个节点上,那么就是单机 Standalone 模式,也就是伪集群;如果在多个节点上,那么就是多机 Standalone 模式。

Flink on Standalone 只支持 Session 运行模式。

下面我们就来搭建 Standalone 集群,首先是操作系统,像 Flink 这种大数据组件显然要跑在 Linux 上,这里我采用的是阿里云 CentOS 7。但要想运行 Flink 应用程序,还需要安装 jdk,这个过程就不说了。

[root@satori ~]# java -version
java version "1.8.0_221"
Java(TM) SE Runtime Environment (build 1.8.0_221-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.221-b11, mixed mode)

jdk 只要不低于 1.8 就行。

然后我们还要去 Flink 官网 https://flink.apache.org/downloads.html 下载 Flink,因为它是 Apache 的一个顶级项目,所以地址是 flink.apache.org。这里我们下载 1.12.4,解压到 /opt 目录下。

[root@satori flink-1.12.4]# pwd
/opt/flink-1.12.4
[root@satori flink-1.12.4]# ll
total 600
drwxr-xr-x  2 501 games   4096 Jun  5 00:16 bin
drwxr-xr-x  2 501 games   4096 May 11 04:45 conf
drwxr-xr-x  7 501 games   4096 Jun  5 00:16 examples
drwxr-xr-x  2 501 games   4096 Jun  5 00:16 lib
-rw-r--r--  1 501 games  11357 Aug  5  2019 LICENSE
drwxr-xr-x  2 501 games   4096 Jun  5 00:16 licenses
drwxr-xr-x  2 501 games   4096 Sep 13  2020 log
-rw-r--r--  1 501 games 563344 May 11 04:45 NOTICE
drwxr-xr-x  3 501 games   4096 Jun  5 00:16 opt
drwxr-xr-x 10 501 games   4096 Jun  5 00:16 plugins
-rw-r--r--  1 501 games   1309 Sep 13  2020 README.txt
[root@satori flink-1.12.4]# 

如果你用过其它的大数据组件的话,你会发现它们的目录结构基本上都是一致的。bin 目录是一些启动脚本,conf 目录是配置文件相关的,examples 目录里面是一些测试案例,lib 目录存放的是依赖的 Jar 包,log 目录存放日志等等。

如果是单机 Standalone,那么以上就算部署完了,可如果是多机部署的话,那么还需要配置其它节点,但并不麻烦,因为所有节点的配置都完全相同,配好了一个节点,然后其它的节点再重复一遍即可。

首先你的每一个节点都需要安装 jdk 并配置 JAVA_HOME 路径和环境变量,这点是毋庸置疑的,此外每个节点都要安装 Flink,并且安装路径要一致、版本要一致,最后就是这些节点之间可以互相访问。

假设我们有 5 个节点,分别是 Node 1、2、3、4、5,那么其中一个节点(比如Node 1)作为 Master 节点仅部署 JM,剩余的节点(Node 2 ~ Node5)作为 Worker 节点部署 TM。

全面解析流处理框架 Flink,以及和 Python 的结合

然后就是修改配置文件,要指定谁是主节点、谁是工作节点,首先需要修改 conf/flink-conf.yaml,里面的配置是和 Flink 集群运行时相关的,里面的第一个配置选项是 jobmanager.rpc.address: localhost,我们需要将里面的 localhost 改成 Master 节点的地址。

然后修改 conf/masters,也是在里面指定 Master 节点的地址以及监听的端口,默认里面只有一个 localhost:8081。我们需要将 localhost 改成 Master 的地址,当然为了保证高可用,你可以配置多个 Master 节点。注意:这个 8081 指的是 webUI 端口,我们通过 Master 的 ip:8081 即可通过浏览器查看 Flink 集群的运行状态;而在 conf/flink-conf.yaml 文件中有一个 jobmanager.rpc.port: 6123,这个 6123 是节点之间进行 RPC 通信的地址。

接下来再修改 conf/workers,显然我们要配置工作节点了,里面只有一个 localhost,说明默认当前节点既是主节点也是从节点、或者说工作节点。然后我们在里面写上工作节点的地址,如果建立了主机名到 IP 之间的映射,那么你也可以写主机名。

xx.xx.xx.xx  # Node 2 节点
xx.xx.xx.xx  # Node 3 节点
xx.xx.xx.xx  # Node 4 节点
xx.xx.xx.xx  # Node 5 节点

以上就配置好了 Worker 节点,然后上面这些所做的配置要同步到所有的节点中,因此我们一般先配 Master,配好之后再同步到其它的 Worker 上。

这就是 Standalone 比较相似,其实配置过程和 Spark Standalone 比较相似,指定 Master、指定 Worker,然后将配置文件进行同步即可。

配置演示

然后我们来实际搭建一下多机 Standalone 集群,首先在我的阿里云上有三台服务器,分别是:

  • 47.94.174.89,主机名为 satori,2 核心 8GB 内存
  • 47.93.39.238,主机名为 matsuri,2 核心 4GB 内存
  • 47.93.235.147,主机名为 aqua,2 核心 4GB 内存

我们将主机 satori 作为 Master 节点,matsuri 和 aqua 作为 worker 节点,这里我们就搭建一个三个节点的集群吧。

我上面使用的主机就是 satori,所以 java 和 flink 都已经安装完毕,路径如下:

  • java 安装目录:/opt/jdk1.8.0_221/
  • flink 安装目录:/opt/flink-1.12.4/

然后我们来修改配置文件,首先是 conf/flink-conf.yaml,将 jobmanager.rpc.address 改成 Master 节点的地址。

jobmanager.rpc.address: 47.94.174.89

接下来修改 conf/masters,指定 Master 节点 IP 和绑定的 webUI 端口:

47.94.174.89:8081

再修改 conf/workers,指定 worker 节点的 IP:

47.93.39.238
47.93.235.147

以上 Master 节点就配置完毕了,然后配置 worker 节点,同样要安装 jdk、Flink,并且 Flink 的版本和安装目录要保持一致(jdk 的路径最好也一致),以及环境变量的设置。最后再修改 worker 节点的配置,因为我们说配置 worker 节点和配置 Master 节点一模一样,所以你也可以直接通过 scp 命令将 Master 上修改之后的配置文件直接发送过去。

整个集群之间就配置完毕了,下面就可以启动集群了。对了,一定要保证 6123 端口在所有的节点之间都是开放的,否则节点之间是无法通信的。然后还要保证主节点的 8081 对外是开放的,这样我们才能在外界通过浏览器进行查看。当然上面的端口是 Flink 默认的,你也可以改成别的,但是要保证多个节点之间相同的,要监听 6123 就都监听 6123,监听 6124 都监听 6124。

  • 启动 Flink 集群:./bin/start-cluster.sh
  • 关闭 Flink 集群:./bin/stop-cluster.sh

我们只需要在 Master 节点上执行即可,会自动启动 worker 节点上的 TM。另外,默认情况下你在启动或关闭集群的时候应该会让你输入 worker 节点的登陆密码,而且是每一个 worker 节点都要输入一遍,因此我们可以提前配置一下。

# 生成私钥和公钥,一路回车即可,生成的私钥存放在 id_rsa 文件中、公钥则存放在 id_rsa.pub 文件中
ssh-keygen -t rsa  
cd ~/.ssh  # 进入到家目录的 .ssh 目录中
touch authorized_keys  # 创建 authorized_keys 文件

在每个节点上都执行上面几个步骤,那么所有节点的 .ssh 目录中都有 id_rsa、id_rsa.pub 和 authorized_keys 这三个文件。如果想要实现免登陆的话,假设在 A 节点中远程登陆 B 节点想不输入密码,那么就把 A 节点的 id_rsa.pub 里面的内容添加到 B 节点的 authorized_keys 文件中即可。但是注意,这个过程是单向的,如果在 B 节点中远程登陆 A 节点也不想输入密码的话,那么就把 B 节点的 id_rsa.pub 里面的内容添加到 A 节点的 authorized_keys 中。

[root@satori .ssh]# ssh-copy-id -i id_rsa.pub root@47.93.39.238
/usr/bin/ssh-copy-id: INFO: Source of key(s) to be installed: "id_rsa.pub"
/usr/bin/ssh-copy-id: INFO: attempting to log in with the new key(s), to filter out any that are already installed
/usr/bin/ssh-copy-id: INFO: 1 key(s) remain to be installed -- if you are prompted now it is to install the new keys
root@47.93.39.238's password: 

Number of key(s) added: 1

Now try logging into the machine, with:   "ssh 'root@47.93.39.238'"
and check to make sure that only the key(s) you wanted were added.

[root@satori .ssh]# ssh-copy-id -i id_rsa.pub root@47.93.235.147
/usr/bin/ssh-copy-id: INFO: Source of key(s) to be installed: "id_rsa.pub"
/usr/bin/ssh-copy-id: INFO: attempting to log in with the new key(s), to filter out any that are already installed
/usr/bin/ssh-copy-id: INFO: 1 key(s) remain to be installed -- if you are prompted now it is to install the new keys
root@47.93.235.147's password: 

Number of key(s) added: 1

Now try logging into the machine, with:   "ssh 'root@47.93.235.147'"
and check to make sure that only the key(s) you wanted were added.

[root@satori .ssh]#

此时从主机 satori 远程登陆 matsuri、aqua 就无需再输入密码了,同理我们还需要在 matsuri、aqua 上也重复相同的操作,让集群中所有节点之间的通信都畅通无阻。

下面再来启动一下集群,进入 flink 目录中执行 ./bin/start-cluster.sh,当然也可以将 bin 目录配置到环境变量中,直接输入 start-flink.sh 启动。

[root@satori flink-1.12.4]# ./bin/start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host satori.
Starting taskexecutor daemon on host matsuri.
Starting taskexecutor daemon on host aqua.
[root@satori flink-1.12.4]#

很明显,输入的信息表示我们集群启动成功了,其中 satori 是主节点、matsuri 和 aqua 是 worker 节点,资源管理器是 Standalone,运行模式是 Session 模式。然后我们输入 jps 命令查看一下:

[root@satori flink-1.12.4]# jps
10978 Jps
10902 StandaloneSessionClusterEntrypoint
[root@satori flink-1.12.4]# 
[root@satori flink-1.12.4]# ssh root@47.93.39.238
Last login: Sat Jun  5 11:17:20 2021 from 47.94.174.89

Welcome to Alibaba Cloud Elastic Compute Service !

[root@matsuri ~]# jps
27634 TaskManagerRunner
27708 Jps
[root@matsuri ~]# 

主节点和从节点之间启动的服务是不同的,然后再来通过 webUI 查看一下,输入 47.94.174.89:8081 即可。

全面解析流处理框架 Flink,以及和 Python 的结合

当然还有很多其它相关的信息,可以自己点击查看,到此为止我们的 Standalone 多机部署以及启动就已经完成了。而接下来的关键就是我们如何把应用程序提交到 Standalone 集群上面。

关闭集群是:stop-cluster.sh

Python 编写应用程序提交到集群运行

由于我们还没有学习如何编写 flink 应用程序,所以这里我们就用一个只包含简单 print 语句进行测试。

# /root/1.py
if __name__ == "__main__":
    print("这是一个简单的测试用例")

当然我们说 flink 安装目录下的 examples 目录里面有一些测试案例,我们也可以直接拿它来做实验。

./bin/flink run -py /root/1.py

通过 flink run 即可运行引用程序,由于 flink 既可运行 java 程序、也可以运行 Python 程序,所以这里我们需要通过参数  -py 来指定 Python 文件的路径。但默认情况下解释器使用的 python2,当然如果你终端输入 python 进入的就是 python3 的话则当我没说,要是我们想指定 flink 使用 python3 解释器的话,则需要配置一个环境变量。

export PYFLINK_CLIENT_EXECUTABLE=/usr/bin/python3

下面来测试一下:

[root@satori flink-1.12.4]# ./bin/flink run -py /root/1.py
这是一个简单的测试用例

很明显结果是成功的,当然这里面没有涉及到任何与 Flink 有关的内容,只是演示如何提交一个 Python 应用程序。当然我们说 flink run 是同时支持 Java、Python 等语言的,但我们这里只介绍 Python,因为我本人不是 Java 方向的,所以关于 Java 如何对接 Flink 就不说了(主要是我不会)。

在之前我们说过,不管我们使用哪种 API 进行编程,最终客户端都会生成 JobGraph 提交到 JM 上。但毕竟 Flink 的内核是采用 Java 语言编写的,如果 Python 应用程序变成 JobGraph 对象被提交到 Flink 集群上运行的话,那么 Python 虚拟机和 Java 虚拟机之间一定有某种方式,使得 Python 可以直接动态访问 Java 中的对象、Java 也可以回调 Python 中的对象。没错,实现这一点的便是 py4j。

提交单个 py 文件我们知道怎么做了,但如果该文件还导入了其它文件该怎么办呢?一个项目中还会涉及到包的存在。其实不管项目里的文件有多少,启动文件只有一个,我们只需要把这个启动文件提交上去即可。

这里我们还是举个栗子,当然我们仍然不涉及具体和 Flink 相关的内容,先把如何提交程序这一步给走通。因为不管编写的程序多复杂,提交这一步骤是不会变的。

先来看看我们编写的程序:

全面解析流处理框架 Flink,以及和 Python 的结合

flink_test 就是我们的主目录,里面有一个 apps 子目录和一个 main.py 文件,apps 目录里面有三个 py 文件,对应的内容分别如图所示。然后我们来将其提交到 Flink Standalone 集群上运行,命令和提交单个文件是一样的:

[root@satori flink-1.12.4]# ./bin/flink run -py /root/flink_test/main.py 
add: 4
sub: 2

Flink on Yarn

介绍完了 Flink on Standalone 之后,我们来看看 Flink on Yarn。Yarn 是一个集群资源管理器,对于做大数据开发的人来说并不陌生,它是一个非常主流的一个集群资源管理器,像 Spark 除了有 Standalone 模式之外,也有 Spark on Yarn。Yarn 可以为上层应用提供统一的资源管理和调度,在其之上可以运行各种作业,比如 MapReduce 作业、Spark 作业,当然还有这里的 Flink 作业。而 Yarn 一般集成在 Hadoop 中,Hadoop 肯定不陌生,它是大数据生态圈中最基本的框架,由三个部分组成:分布式文件存储系统(HDFS)、分布式计算(MapReduce)、集群资源管理(Yarn)。

下面来看一下 Yarn 的架构:

全面解析流处理框架 Flink,以及和 Python 的结合

Resource Manager

1. 处理客户端请求。客户端想访问集群,比如提交一个作业,要经过 Resource Manager,它是整个资源的管理者,管理整个集群的 CPU、内存、磁盘等资源。

2. 监控 Node Manager。

3. 启动或监控 Application Master。

4. 资源的分配和调度。

Node Manager

1. 管理单个节点上的资源,Node Manager 是当前节点资源的管理者,当然它也需要跟 Resource Manager 汇报。

2. 处理来自 Resource Manager 的命令。

3. 处理来自 Application Master 的命令。

Application Master

1. 某个任务的管理者。当任务在 Node Manager 上运行的时候,就是由 Application Master 负责管理,因为每个任务都会对应一个 AM。

2. 负责数据的切分。

3. 为应用程序申请资源并分配给内部的任务。

4. 任务的监控与容错。

Container

Container 是 YARN 中资源的抽象,它封装了节点上的多维度资源,如内存、CPU、磁盘、网络等等。其实 Container 是为 Application Master 服务的,因为任务在运行的时候,需要的内存、CPU 等资源都被虚拟化到 Container 里面了。

然后我们来看看 Flink 如何整合到 Yarn 上面,我们说 Flink 的运行模式有三种,Yarn 都是支持的。我们以 Session 模式为例:

全面解析流处理框架 Flink,以及和 Python 的结合

我们看到图中有一个 Yarn 集群资源管理器,然后启动 Flink 集群的时候,同样也要先申请 Application Master 管理节点。而 AM 和 Flink  的 JM 是在一个进程里面启动,AM 和 JM 会绑定在一起。

客户端首先要通过 Flink-Session 的模式连接到 Yarn 的 ResourceManage,提交相应的 APP,ResourceManager 接收到客户端提交的资源申请信息。然后 ResourceManager 会启动相应的 AM,然后 AM 会将 JM 相关的进程的启动起来,整个过程就是初始化 Flink 的 JM 的过程。然后在 Flink 里面会有一个 Yarn RsourceManager,它其实是和整个 Yarn 的集群资源管理器是相互绑定的。

然后当 JM 进程启动完毕之后,客户端就可以提交作业了,Dispatcher 在接收之后会交给 JM,然后 JM 再切分成多个 Task 发送到 TM 上执行,Task 在运行的时候是在 Container 里面的。此外我们说 Flink on Yarn 这种模式,TM 可以动态申请的。

那么 Flink on Yarn 都有哪些优势和劣势呢?

主要优势:

  • 可以和现有的大数据平台无缝对接(Hadoop 需要 2.4 版本以上)
  • 部署集群与任务提交都非常简单
  • 资源管理统一通过 Yarn,提升整体资源利用率
  • 基于 Native 方法,TM 可以按需申请和启动,从而防止资源浪费
  • 借助于 Hadoop Yarn 提供的自动 failover 机制,可以保证容错,能保证 JM、TM 节点从异常中正常恢复

主要劣势:

  • 资源隔离问题,尤其是网络资源的隔离,Yarn 做的还不够完善。比如离线作业和实时流式作业同时运行,离线作业可能会占用比较多的带宽,之间可能会相互干扰
  • Kerberos 认证失效会导致 CheckPoint 无法持久化

配置演示

更多细节会慢慢展开,下面来安装 Yarn,由于 Yarn 它是 Hadoop 框架内部的一个组件,所以我们直接安装 Hadoop 即可。Hadoop 是 Apache 的一个顶级项目,因此可以去官网 hadoop.apache.org 下载,但是更推荐采用 CDH 版本的,这里我使用的是 hadoop-2.6.0-cdh5.8.5,同样安装在 /opt 目录下。

[root@satori hadoop-2.6.0-cdh5.8.5]# pwd
/opt/hadoop-2.6.0-cdh5.8.5

这里我们要搭建 Hadoop 集群,显然每个节点都要安装。

客户端要有操作 HDFS 文件的权限。

然后我们配置环境变量:

# 将 Hadoop 的 bin 目录和 sbin 目录配置到环境变量中
export HADOOP_HOME=/opt/hadoop-2.6.0-cdh5.8.5
export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH
# 除此之外还要配置 HADOOP_CONF_DIR,也就是 Hadoop 配置文件所在的目录
export HADOOP_CONF_DIR=/opt/hadoop-2.6.0-cdh5.8.5/etc/hadoop

目前位置,我当前的环境变量如下:

# jdk 
export JAVA_HOME=/opt/jdk1.8.0_221/
export PATH=$JAVA_HOME/bin:$PATH
# Flink
export FLINK_HOME=/opt/flink-1.12.4
export PATH=$FLINK_HOME/bin:$PATH
export PYFLINK_CLIENT_EXECUTABLE=/usr/bin/python3
# Hadoop
export HADOOP_HOME=/opt/hadoop-2.6.0-cdh5.8.5
export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH
export HADOOP_CONF_DIR=/opt/hadoop-2.6.0-cdh5.8.5/etc/hadoop

但是注意:这还没完,在 Flink 1.11 版本之后将不再支持 flink-shaded-hadoop-2-uber 包,我们需要自己下载。

直接去 https://flink.apache.org/downloads#additional-components 下载即可,我这里下载的是 Pre-bundled Hadoop 2.8.3,完事之后丢到 Flink 安装目录的 lib 目录下,当然每个节点都要这么做。

但此时仍然无法提交作业到 Yarn 上,因为 Yarn 压根就没有启动,所以还需要先搭建并启动 Hadoop 集群。因此需要修改 hadoop 的一些配置文件,配置文件都位于 HADOOP_CONF_DIR 中。

搭建 Hadoop 集群

这里我们仍然将 satori 主机(47.94.174.89)作为 master,在 HDFS 中则对应 NameNode、在 Yarn 中对应 ResourceManager;将 matsuri 主机(47.93.39.238)和 aqua 主机(47.93.235.147)作为 worker,在 HDFS 中则对应 DataNode、在 Yarn 中对应 NodeManager。

因为  HDFS 和 Yarn 都是集成在 Hadoop 内部的一个组件,它们各自的启动和关闭也是相互独立的。所以我们先配置 HDFS,然后再配置 Yarn。

修改 hadoop-env.sh:

# 默认是 ${JAVA_HOME},需要手动改成 java 的安装路径
export JAVA_HOME=/opt/jdk1.8.0_221  

当前修改的是 master 节点的配置文件,其它节点同理。

修改 core-site.xml:

<!--指定 HDFS 中 NameNode 的地址,也就是 Master 节点的地址,就是我们当前的 satori 节点,IP 为 47.94.174.89-->
<!--但是这里不能指定 47.94.174.89,因为我这里使用的阿里云服务器,所以会有一个公网 IP 和一个内网 IP-->
<!--47.94.174.89 是对外暴露的公网 IP,外界就通过这个 IP 来访问,而内网 IP 可以通过 ifconfig 查看-->
<!--在 master 节点中必须配置内网 IP,否则到时候启动 master 节点的时候会启动不起来-->
<property>
	<name>fs.defaultFS</name>
	<value>hdfs://172.24.60.6:9000</value>
    <!--然后 worker 节点则需要指定 master 节点的公网 IP(47.94.174.89),否则是连接不上 master 的。该配置只有这一点区别,其它的不变-->
</property>

	
<!--指定hadoop运行时产生文件的存储目录,如果不指定,那么重启之后数据就丢失了-->
<!--data 目录会自动创建-->
<property>
	<name>hadoop.tmp.dir</name>
	<value>/opt/hadoop-2.6.0-cdh5.8.5/data</value>
</property>

关于这里的 IP,如果你使用的是本机的虚拟机搭建集群的话就不需要担心了,直接输入 ifconfig 查看 master 的 IP,然后 master 配好之后直接 scp 发过去即可,因为所有节点上的软件的安装路径、版本都是一致的。只不过我这里用的是云服务器,所以在指定 master 节点的 IP 时,在 master 节点上要使用内网 IP,worker 节点使用公网 IP。

xml 文件里面都有一个 configuration 标签,我们将这里所做的配置直接拷贝到里面即可。

修改 hdfs-site.xml:

<!-- 副本系数 -->
<property>
	<name>dfs.replication</name>
	<value>2</value>
</property>

<!-- namenode 连接 datanode 时,默认会进行 host 解析查询,这里指定为 false -->
<property>
    <name>dfs.namenode.datanode.registration.ip-hostname-check</name>
    <value>false</value>
</property>

这里指定 HDFS 文件的副本系数,一般是 3 副本,这里我们的 satori 主机只做 master,matsuri、aqua 主机做 worker 节点,所以副本改成 2。

然后是指定 dfs.namenode.datanode.registration.ip-hostname-check 为 false,这一点非常重要,尤其你使用的是云服务器(并且修改了默认的主机名)。否则你会发现后面明明进程都启动成功了,但总是会出现:There are 0 datanode(s) running and no node(s) are excluded in this operation ...

其它节点同理。

修改 slaves:

47.93.235.147
47.93.39.238

这里指定 worker 节点,我们将 matsuri 和 aqua 节点作为 worker,里面写上其 IP 地址即可,记得其它节点也要这么配。因此我们看到大数据组件的配置都是比较相似的,Flink 也是在配置文件里面写上 worker 节点的地址,只不过配置文件名叫 workers,这里叫 slaves。当然高版本的 Hadoop 也改成了 workers,因为 slaves 在西方比较敏感。

然后就可以启动 HDFS 了,不过首先要格式化 NameNode,在 master 上执行。

# 这里我们配置了环境变量,如果没有配的话,那么需要进入 Hadoop 的安装目录,输入 bin/hdfs
hdfs namenode -format  

如果格式化成功,那么会自动创建 /opt/hadoop-2.6.0-cdh5.8.5/data 目录:

[root@satori ~]# ls /opt/hadoop-2.6.0-cdh5.8.5/data/
dfs
[root@satori ~]# 

然后启动 HDFS,只需要在 master 节点启动即可。启动之后,会发现 worker 节点的 /opt/hadoop-2.6.0-cdh5.8.5/data/ 也自动创建了。

# 如果没有配置环境变量的话,那么需要进入 Hadoop 的安装目录,输入 sbin/start-dfs.sh
# 关闭的话,输入 stop-dfs.sh
start-dfs.sh

全面解析流处理框架 Flink,以及和 Python 的结合

这里要保证主机之间端口是开放的,可以互相通信。并且配置免密码登录,否则在启动 worker 时需要输入密码。然后我们输入 jps 查看相关进程:

[root@satori ~]# jps
28080 Jps
27447 SecondaryNameNode
27263 NameNode

# --------------------------
[root@matsuri ~]# jps
15626 Jps
15324 DataNode

# --------------------------
[root@aqua ~]# jps
31233 Jps
30958 DataNode

我们看到相关进程都已经启动,并且 matsuri 和 aqua 是在启动 master 节点时自动启动的。此时 HDFS 文件系统就可以正常工作了。

另外启动时会输出日志,这些日志被写在了文件中,而文件路径图中已经告诉我们了。如果输出 jps 的时候发现相关进程没有启动,可以查看日志文件,里面会记录启动失败的原因以及相关信息。只不过图中给的日志文件路径是错的,我们需要将结尾的 .out 改成 .log,这应该是 Hadoop 内部的小失误,但是高版本的 Hadoop 是否存在这个小失误,个人就不清楚了。

我们可以输入 http://47.94.174.89:50070 通过 webUI 查看 HDFS 文件系统:

全面解析流处理框架 Flink,以及和 Python 的结合

显示有两个活跃的 Node 节点,显然我们 HDFS 集群是启动成功了的。但是注意:虽然成功启动了 HDFS,但 Yarn 还没有启动,所以我们接下来还要配置并启动 Yarn。

修改 yarn-env.sh:

和 hadoop-env.sh 一样,只需要指定 java 的安装路径即可。

export JAVA_HOME=/opt/jdk1.8.0_221/

其它节点同理。

修改 yarn-site.xml:

<!-- ResourceManager 对客户端暴露的地址
     客户端通过该地址向 RM 提交应用程序,杀死应用程序等 -->
<property>
    <name>yarn.resourcemanager.address</name>
    <value>172.24.60.6:8032</value>
</property>

<!-- ResourceManager 对 ApplicationMaster 暴露的访问地址。
     ApplicationMaster 通过该地址向 RM 申请资源、释放资源等 -->
<property>
    <name>yarn.resourcemanager.scheduler.address</name>
    <value>172.24.60.6:8030</value>
</property>

<!-- ResourceManager 对 NodeManager暴露的地址
     NodeManager通过该地址向 RM 汇报心跳,领取任务等 -->
<property>
    <name>yarn.resourcemanager.resource-tracker.address</name>
    <value>172.24.60.6:8031</value>
</property>

<!-- ResourceManager 对管理员暴露的访问地址
     管理员通过该地址向 RM 发送管理命令等。 -->
<property>
    <name>yarn.resourcemanager.admin.address</name>
    <value>172.24.60.6:8033</value>
</property>

<!-- ResourceManager对外 webUI 地址,用户可通过该地址在浏览器中查看集群各类信息 -->
<property>
    <name>yarn.resourcemanager.webapp.address</name>
    <value>172.24.60.6:8088</value>
</property>

以上监听的端口是默认的,我们没有做改动,但是这里依旧要写内网 IP,然后 worker 节点在配置的时候写成公网 IP,否则 Yarn 启动不起来。

然后启动即可:

# 没有配置环境变量,则需要去 Hadoop 的安装目录,输入 sbin/start-yarn.sh
# 关闭的话是 stop-yarn.sh
start-yarn.sh

全面解析流处理框架 Flink,以及和 Python 的结合

我们看到在 master 节点启动之后,worker 节点也启动了,还是输入 jps 查看一下相关进程。

[root@satori ~]# jps
29649 Jps
29377 ResourceManager
27447 SecondaryNameNode
27263 NameNode

# --------------------------
[root@matsuri ~]# jps
16218 NodeManager
15324 DataNode
16349 Jps

# --------------------------
[root@aqua ~]# jps
31988 Jps
31853 NodeManager
30958 DataNode

因为我们在 slaves 配置了 worker 节点,所以 satori 节点负责启动 ResourceManager,matsuri 节点和 aqua 节点负责启动 NodeManager。如果我们在 slaves 中将 satori 节点的 IP 也加进去了,那么 satori 节点除了会启动 NameNode、ResourceManager 之外,还会启动 DataNode、NodeManager。

然后输入 http://47.94.174.89:8088 即可通过 webUI 查看集群的资源情况:

全面解析流处理框架 Flink,以及和 Python 的结合

Yarn 是管理整个集群资源的,所以可以看到整个集群的资源情况,其中内存总量是 16 GB,活跃的节点是两个。

Python 编写应用程序提交到集群(Yarn)运行

我们 Flink 有三种运行模式,Yarn 都是支持的。

Session 模式运行:

# -jm 表示为 JobManager 申请的堆内存
# -tm 表示为 TaskbManager 申请的堆内存
yarn-session.sh -jm 512m -tm 1024m -py xxx.py

Job 模式运行:

flink run -m yarn-cluster -p 4 -yjm 1024m -ytm 1024m -py xxx.py

Application 模式运行:

flink run-application -t yarn-application \
    -Djobmanager.memory.process.size=512m \
    -Dtaskmanager.memory.process.size=1024m -py xxx.py

这里我们编写一个 Python 程序:

if __name__ == "__main__":
    print("hello world")

然后我们来以 Job 模式运行一下:

全面解析流处理框架 Flink,以及和 Python 的结合

可以看到成功提交到 yarn 上执行了,然后是一些参数,我们可以自己指定、也可以使用默认的,当然也可以修改 Flink 运行时的配置文件,后面在介绍 flink-conf.yaml 的时候再说。

分布式流处理模型

下面说一下分布式流处理模型,我们说 Flink 底层采用的流处理模型是 DataFlow,它是由 Google 开源的一篇论文演化而来的。论文中的 DataFlow 模型重点关注的是如何去平衡正确性、延迟、大规模数据的处理,以及无界乱序数据的处理,而这些也都是我们在流处理当中需要重点关注的方向。

其实整个分布式流处理模型主要分为三个部分:加载数据源(Source)、转换操作(Transformation)、数据输出(Sink)。

全面解析流处理框架 Flink,以及和 Python 的结合

数据源关注的是如何跟外部的数据系统进行交互,包括消息中间件、Socket 等等;Operation 则是负责对数据进行转换操作,比如过滤、映射、分组聚合等等,然后再将不同的流进行组合得到最终结果;Sink 则是负责将 Operation 返回的结果输出到外部系统中,最终让用户获取分布式流处理加工完成的数据。

其实不光是 Flink,基本上整个数据处理都是这三步:读取数据、操作数据、输出数据。

全面解析流处理框架 Flink,以及和 Python 的结合

对于分布式流处理模型,我们可以从并行计算,各个节点可以分布在不同的 Task 线程中运行,数据在 Operation 之间传递,可以从上一个 Operation 节点直接 Push 到下一个 Operation 节点。

并且从图中我们也可以看到,Operation 和 Operation 之间也是存在 Shuffle 操作的,但是它和 MapReduce 的 Shuffle 还有略微不同。我们这里的数据是上游的 Operation 节点直接推送给下游,而 MapReduce 中的 Shuffle 则是下游去上游进行数据拉取。这也是 DataFlow 模型的一个特点,数据是通过上游的算子直接 Push 到下游的算子上,像 Apache Storm、Apache Flink 以及 Apache Beam 都是采用的 DataFlow 模型。

DataStream API 实践原理(Python)

了解完分布式流处理的一些基本概念之后,我们看一下 Flink DataStream API 如何去实现 DataFlow Model,以及常用的 DataStream API 的一些操作。

全面解析流处理框架 Flink,以及和 Python 的结合

DataStream API 是 Flink 里面最核心的 API,专门用于处理流式数据,并且 DataStream API 可以支持 Java、Scala、Python 等多种语言,当然它也是 Table API 和 SQL 底层的支撑。

在 Flink 1.12 之前 DataStream API 是不支持 Python 的,但是从 Flink 1.12 的时候提供了对 Python DataStream API 的初步支持,可以进行一些诸如 map、flat_map、filter、key_by 等无状态类型的操作,很明显支持的功能还不够完善。因此工作中使用 pyfink 的话,不建议操作 DataStream API,更建议去操作 Table 和 SQL 这种上层的 API。因为 Table API 和 SQL 则是很早就支持了,而且支持的也比较好,完全可以用于生产。最关键的是操作起来也更加的方便,还可以和 pandas 的 DataFrame 进行互相转换。不过虽然不推荐工作中操作 DataStream API,但简单了解一下还是有必要的,毕竟 Table 和 SQL 都是基于 DataStream。

下面我们就来看看如何使用 Python 操作 DataStream API,一个 Python 版的 Flink 程序是什么样子的。不过 Python 若想操作 Flink,需要安装一个第三方包,直接 pip install apache-flink 即可,然后导入的时候使用 pyflink。不过在下载相关依赖包的时候,有一个依赖包会非常的大,因此下载的时候可以指定一个国内的源,比如豆瓣源、清华源等等。

这里以 Standalone 模式进行演示,所以要首先启动集群:start-cluster.sh

from pyflink.datastream import StreamExecutionEnvironment

env = StreamExecutionEnvironment.get_execution_environment()

如果使用 DataStream API 作业来说,首先需要创建 StreamExecutionEnvironment 对象,它是一个入口点、或者说执行环境。然后我们可以设置一些参数,比如并行度、最大并行度等等,有很多参数可以设置,直接输入 env.set_ ,然后 PyCharm 会自动提示。比如我们来设置一下并行度:

env.set_parallelism(2)

既然要使用 DataStream API,那么必然要有 DataStream 对象,显然它是外部数据在 Flink 中的表现形式。读取外部数据,得到 DataStream 对象,然后就可以通过 DataStream 提供的算子来对数据做各种各样的变换。

累了,先停更·········

相关文章:

  • 2022-02-24
  • 2021-06-19
  • 2021-05-27
  • 2021-07-21
  • 2021-11-04
  • 2021-12-29
  • 2021-07-08
猜你喜欢
  • 2021-05-19
  • 2021-06-06
  • 2021-12-02
  • 2021-07-10
  • 2021-10-15
  • 2021-07-12
  • 2022-01-30
相关资源
相似解决方案