Flink作业提交到Yarn上之后,后续的AM的生成、Job的处理过程和Flink基本没什么关系了,但是为大致了解Flink on yarn的Per-Job模式的整体过程,这里还是将这系列博客归到Flink源码阅读系列了,本系列博客计划三篇。
本文着重分析submitApplication之后,Yarn的ResourceManager为任务的ApplicationMater分配container的过程。
说明:文中源码是从Flink 1.9中跳转过去,主要涉及hadoop-yarn-server-resourcemanager-2.4.1.jar、flink-shaded-hadoop-2-2.4.1-7.0.jar。
博主水平有限,欢迎大伙留言交流。
涉及的重要概念【1】:
1)RMApp:每个application是一个RMApp对象,其包含了application的各种信息,实现类为RMAppImpl;
2)RMAppAttempt:RMApp可以有多个app attempt,即对应着多个RMAppAttempt对象,也就是任务状态的变化的过程。具体对应着那个,取决于前面的RMAppAttempt是否执行成功,如果不成功,会启动另外一个,直到运行成功;
3)Dispatcher:中央事件调度器,各个状态机的事件调度器会在中央事件调度器中注册。该调度器维护了一个事件队列,其会不断扫描整个队列,取出事件并检查事件类型,然后交给相应的事件调度器处理。其实现类为AsyncDispatcher和MultiThreadedDispatcher,后者是创建一个list用于放AsyncDispatcher
2、事件的提交到调度
1、在Flink on yarn的Per-job模式源码解析一文中提到,client提交的报文被封装成request后被ClientRMService.submitApplication()方法处理。其过程如下:
1)在该方法中会先检查与Yarn RM相互独立的配置,比如applicationId、提交到的资源对列名、任务名等;
2)调用RMAppManager.submitApplication()提交任务。
代码如下:
1 public SubmitApplicationResponse submitApplication( 2 SubmitApplicationRequest request) throws YarnException { 3 ApplicationSubmissionContext submissionContext = request 4 .getApplicationSubmissionContext(); 5 ApplicationId applicationId = submissionContext.getApplicationId(); 6 7 // ApplicationSubmissionContext needs to be validated for safety - only 8 // those fields that are independent of the RM's configuration will be 9 // checked here, those that are dependent on RM configuration are validated 10 // in RMAppManager. 11 12 String user = null; 13 try { 14 // Safety 15 user = UserGroupInformation.getCurrentUser().getShortUserName(); 16 } catch (IOException ie) { 17 LOG.warn("Unable to get the current user.", ie); 18 RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST, 19 ie.getMessage(), "ClientRMService", 20 "Exception in submitting application", applicationId); 21 throw RPCUtil.getRemoteException(ie); 22 } 23 //开始检查applicationId是否已存在,检查对列是否设置等 24 // Check whether app has already been put into rmContext, 25 // If it is, simply return the response 26 if (rmContext.getRMApps().get(applicationId) != null) { 27 LOG.info("This is an earlier submitted application: " + applicationId); 28 return SubmitApplicationResponse.newInstance(); 29 } 30 31 if (submissionContext.getQueue() == null) { 32 submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME); 33 } 34 if (submissionContext.getApplicationName() == null) { 35 submissionContext.setApplicationName( 36 YarnConfiguration.DEFAULT_APPLICATION_NAME); 37 } 38 if (submissionContext.getApplicationType() == null) { 39 submissionContext 40 .setApplicationType(YarnConfiguration.DEFAULT_APPLICATION_TYPE); 41 } else { 42 if (submissionContext.getApplicationType().length() > YarnConfiguration.APPLICATION_TYPE_LENGTH) { 43 submissionContext.setApplicationType(submissionContext 44 .getApplicationType().substring(0, 45 YarnConfiguration.APPLICATION_TYPE_LENGTH)); 46 } 47 } 48 49 try { 50 //提交application 51 // call RMAppManager to submit application directly 52 rmAppManager.submitApplication(submissionContext, 53 System.currentTimeMillis(), user); 54 55 LOG.info("Application with id " + applicationId.getId() + 56 " submitted by user " + user); 57 RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST, 58 "ClientRMService", applicationId); 59 } catch (YarnException e) { 60 LOG.info("Exception in submitting application with id " + 61 applicationId.getId(), e); 62 RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST, 63 e.getMessage(), "ClientRMService", 64 "Exception in submitting application", applicationId); 65 throw e; 66 } 67 68 SubmitApplicationResponse response = recordFactory 69 .newRecordInstance(SubmitApplicationResponse.class); 70 return response; 71 }