个人感觉学习Flink其实最不应该错过的博文是Flink社区的博文系列,里面的文章是不会让人失望的。强烈安利:https://ververica.cn/developers-resources/。  

  本文是自己第一次尝试写源码阅读的文章,会努力将原理和源码实现流程结合起来。文中有几个点目前也是没有弄清楚,若是写在一篇博客里,时间跨度太大,但又怕后期遗忘,所以先记下来,后期进一步阅读源码后再添上,若是看到不完整版博文的看官,对不住!

  文中若是写的不准确的地方欢迎留言指出

  源码系列基于Flink 1.9

二、Per-job提交任务原理

  Flink on Yarn模式下提交任务整体流程图如下(图源自Flink社区,链接见Ref  [1]

Flink源码阅读(一)——Flink on Yarn的Per-job模式源码简析

  图1 Flink Runtime层架构图

     2.1. Runtime层架构简介

  Flink采取的是经典的master-salve模式,图中的AM(ApplicationMater)为master,TaskManager是salve。

  AM中的Dispatcher用于接收client提交的任务和启动相应的JobManager ;JobManager用于任务的接收,task的分配、管理task manager等;ResourceManager主要用于资源的申请和分配。

  这里有点需要注意:Flink本身也是具有ResourceManager和TaskManager的,这里虽然是on Yarn模式,但Flink本身也是拥有一套资源管理架构,虽然各个组件的名字一样,但这里yarn只是一个资源的提供者,若是standalone模式,资源的提供者就是物理机或者虚拟机了。 

  2.2. Flink on Yarn 的Per-job模式提交任务的整体流程:

  1)执行Flink程序,就类似client,主要是将代码进行优化形成JobGraph,向yarn的ResourceManager中的ApplicationManager申请资源启动AM(ApplicationMater),AM所在节点是Yarn上的NodeManager上;

  2)当AM起来之后会启动Dispatcher、ResourceManager,其中Dispatcher会启动JobManager,ResourceManager会启动slotManager用于slot的管理和分配;

  3)JobManager向ResourceManager(RM)申请资源用于任务的执行,最初TaskManager还没有启动,此时,RM会向yarn去申请资源,获得资源后,会在资源中启动TaskManager,相应启动的slot会向slotManager中注册,然后slotManager会将slot分配给只需资源的task,即向JobManager注册信息,然后JobManager就会将任务提交到对应的slot中执行。其实Flink on yarn的session模式和Per-job模式最大的区别是,提交任务时RM已向Yarn申请了固定大小的资源,其TaskManager是已经启动的。

  资源分配如详细过程图下:

Flink源码阅读(一)——Flink on Yarn的Per-job模式源码简析

 图2 slot管理图,源自Ref[1]

  更详细的过程解析,强烈推荐Ref [2],是阿里Flink大牛写的,本博客在后期的源码分析过程也多依据此博客。 

三、源码简析

  提交任务语句

./flink run -m yarn-cluster ./flinkExample.jar

    1、Client端提交任务阶段分析

  flink脚本的入口类是org.apache.flink.client.cli.CliFrontend。

  1)在CliFronted类的main()方法中,会加载flnk以及一些全局的配置项之后,根据命令行参数run,调用run()->runProgram()->deployJobCluster(),具体的代码如下:

private <T> void runProgram(
            CustomCommandLine<T> customCommandLine,
            CommandLine commandLine,
            RunOptions runOptions,
            PackagedProgram program) throws ProgramInvocationException, FlinkException {
        final ClusterDescriptor<T> clusterDescriptor = customCommandLine.createClusterDescriptor(commandLine);

        try {
            final T clusterId = customCommandLine.getClusterId(commandLine);

            final ClusterClient<T> client;

            // directly deploy the job if the cluster is started in job mode and detached
            if (clusterId == null && runOptions.getDetachedMode()) {
                int parallelism = runOptions.getParallelism() == -1 ? defaultParallelism : runOptions.getParallelism();
          //构建JobGraph
                final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism);

                final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(commandLine);
          //将任务提交到yarn上 client
= clusterDescriptor.deployJobCluster( clusterSpecification, jobGraph, runOptions.getDetachedMode()); logAndSysout("Job has been submitted with JobID " + jobGraph.getJobID()); ...................... } else{........}

  2)提交任务会调用YarnClusterDescriptor 类中deployJobCluster()->AbstractYarnClusterDescriptor类中deployInteral(),该方法会一直阻塞直到ApplicationMaster/JobManager在yarn上部署成功,其中最关键的调用是对startAppMaster()方法的调用,代码如下:

 1 protected ClusterClient<ApplicationId>     deployInternal(
 2             ClusterSpecification clusterSpecification,
 3             String applicationName,
 4             String yarnClusterEntrypoint,
 5             @Nullable JobGraph jobGraph,
 6             boolean detached) throws Exception {
 7 
 8         //1、验证集群是否可以访问
 9         //2、若用户组是否开启安全认证
10         //3、检查配置以及vcore是否满足flink集群申请的需求
11         //4、指定的对列是否存在
12         //5、检查内存是否满足flink JobManager、NodeManager所需
13         //....................................
14 
15         //Entry
16         ApplicationReport report = startAppMaster(
17                 flinkConfiguration,
18                 applicationName,
19                 yarnClusterEntrypoint,
20                 jobGraph,
21                 yarnClient,
22                 yarnApplication,
23                 validClusterSpecification);
24 
25         //6、获取flink集群端口、地址信息
26         //..........................................
27     }

  3)方法AbstractYarnClutserDescriptor.startAppMaster()主要是将配置文件和相关文件上传至分布式存储如HDFS,以及向Yarn上提交任务等,源码分析如下:

 1 public ApplicationReport startAppMaster(
 2             Configuration configuration,
 3             String applicationName,
 4             String yarnClusterEntrypoint,
 5             JobGraph jobGraph,
 6             YarnClient yarnClient,
 7             YarnClientApplication yarnApplication,
 8             ClusterSpecification clusterSpecification) throws Exception {
 9 
10         // .......................
11 
12         //1、上传conf目录下logback.xml、log4j.properties
13 
14         //2、上传环境变量中FLINK_PLUGINS_DIR ,FLINK_LIB_DIR包含的jar
15         addEnvironmentFoldersToShipFiles(systemShipFiles);
16         //...........
17         //3、设置applications的高可用的方案,通过设置AM重启次数,默认为1
18         //4、上传ship files、user jars、
19         //5、为TaskManager设置slots、heap memory
20         //6、上传flink-conf.yaml
21         //7、序列化JobGraph后上传
22         //8、登录权限检查
23 
24         //.................
25 
26         //获得启动AM container的Java命令
27         final ContainerLaunchContext amContainer = setupApplicationMasterContainer(
28                 yarnClusterEntrypoint,
29                 hasLogback,
30                 hasLog4j,
31                 hasKrb5,
32                 clusterSpecification.getMasterMemoryMB());
33 
34         //9、为aAM启动绑定环境参数以及classpath和环境变量
35 
36         //..........................
37 
38         final String customApplicationName = customName != null ? customName : applicationName;
39         //10、应用名称、应用类型、用户提交的应用ContainerLaunchContext
40         appContext.setApplicationName(customApplicationName);
41         appContext.setApplicationType(applicationType != null ? applicationType : "Apache Flink");
42         appContext.setAMContainerSpec(amContainer);
43         appContext.setResource(capability);
44 
45         if (yarnQueue != null) {
46             appContext.setQueue(yarnQueue);
47         }
48 
49         setApplicationNodeLabel(appContext);
50 
51         setApplicationTags(appContext);
52 
53         //11、部署失败删除yarnFilesDir
54         // add a hook to clean up in case deployment fails
55         Thread deploymentFailureHook = new DeploymentFailureHook(yarnClient, yarnApplication, yarnFilesDir);
56         Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
57 
58         LOG.info("Submitting application master " + appId);
59         
60         //Entry
61         yarnClient.submitApplication(appContext);
62 
63         LOG.info("Waiting for the  cluster to be allocated");
64         final long startTime = System.currentTimeMillis();
65         ApplicationReport report;
66         YarnApplicationState lastAppState = YarnApplicationState.NEW;
67         //12、阻塞等待直到running
68         loop: while (true) {
69             //...................
70             //每隔250ms通过YarnClient获取应用报告
71             Thread.sleep(250);
72         }
73         //...........................
74         //13、部署成功删除shutdown回调
75         // since deployment was successful, remove the hook
76         ShutdownHookUtil.removeShutdownHook(deploymentFailureHook, getClass().getSimpleName(), LOG);
77         return report;
78     }

   4)应用提交的Entry是YarnClientImpl.submitApplication(),该方法在于调用了ApplicationClientProtocolPBClientImpl.submitApplication(),其具体代码如下:

 1 public SubmitApplicationResponse submitApplication(SubmitApplicationRequest request) throws YarnException, IOException {
 2 //取出报文
 3         SubmitApplicationRequestProto requestProto = ((SubmitApplicationRequestPBImpl)request).getProto();
 4 
 5         try {
 6 //将报文发送发送到服务端,并将返回结果构成response
 7             return new SubmitApplicationResponsePBImpl(this.proxy.submitApplication((RpcController)null, requestProto));
 8         } catch (ServiceException var4) {
 9             RPCUtil.unwrapAndThrowException(var4);
10             return null;
11         }
12     }
View Code

相关文章:

  • 2022-12-23
  • 2021-11-27
  • 2021-08-04
  • 2021-09-07
  • 2022-12-23
  • 2021-09-19
  • 2021-12-10
  • 2019-09-03
猜你喜欢
  • 2022-01-14
  • 2022-12-23
  • 2022-01-30
  • 2022-12-23
  • 2021-09-07
  • 2022-12-23
相关资源
相似解决方案