Flink作业提交到Yarn上之后,后续的AM的生成、Job的处理过程和Flink基本没什么关系了,但是为大致了解Flink on yarn的Per-Job模式的整体过程,这里还是将这系列博客归到Flink源码阅读系列了,本系列博客计划三篇。

  本文着重分析submitApplication之后,Yarn的ResourceManager为任务的ApplicationMater分配container的过程。

  说明:文中源码是从Flink 1.9中跳转过去,主要涉及hadoop-yarn-server-resourcemanager-2.4.1.jar、flink-shaded-hadoop-2-2.4.1-7.0.jar。

  博主水平有限,欢迎大伙留言交流。

涉及的重要概念【1】:

  1)RMApp:每个application是一个RMApp对象,其包含了application的各种信息,实现类为RMAppImpl;

  2)RMAppAttempt:RMApp可以有多个app attempt,即对应着多个RMAppAttempt对象,也就是任务状态的变化的过程。具体对应着那个,取决于前面的RMAppAttempt是否执行成功,如果不成功,会启动另外一个,直到运行成功;

  3)Dispatcher:中央事件调度器,各个状态机的事件调度器会在中央事件调度器中注册。该调度器维护了一个事件队列,其会不断扫描整个队列,取出事件并检查事件类型,然后交给相应的事件调度器处理。其实现类为AsyncDispatcher和MultiThreadedDispatcher,后者是创建一个list用于放AsyncDispatcher

2、事件的提交到调度  

  1、在Flink on yarn的Per-job模式源码解析一文中提到,client提交的报文被封装成request后被ClientRMService.submitApplication()方法处理。其过程如下:

  1)在该方法中会先检查与Yarn RM相互独立的配置,比如applicationId、提交到的资源对列名、任务名等;

  2)调用RMAppManager.submitApplication()提交任务。

  代码如下:

 1 public SubmitApplicationResponse submitApplication(
 2       SubmitApplicationRequest request) throws YarnException {
 3     ApplicationSubmissionContext submissionContext = request
 4         .getApplicationSubmissionContext();
 5     ApplicationId applicationId = submissionContext.getApplicationId();
 6 
 7     // ApplicationSubmissionContext needs to be validated for safety - only
 8     // those fields that are independent of the RM's configuration will be
 9     // checked here, those that are dependent on RM configuration are validated
10     // in RMAppManager.
11 
12     String user = null;
13     try {
14       // Safety
15       user = UserGroupInformation.getCurrentUser().getShortUserName();
16     } catch (IOException ie) {
17       LOG.warn("Unable to get the current user.", ie);
18       RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
19           ie.getMessage(), "ClientRMService",
20           "Exception in submitting application", applicationId);
21       throw RPCUtil.getRemoteException(ie);
22     }
23     //开始检查applicationId是否已存在,检查对列是否设置等
24     // Check whether app has already been put into rmContext,
25     // If it is, simply return the response
26     if (rmContext.getRMApps().get(applicationId) != null) {
27       LOG.info("This is an earlier submitted application: " + applicationId);
28       return SubmitApplicationResponse.newInstance();
29     }
30 
31     if (submissionContext.getQueue() == null) {
32       submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
33     }
34     if (submissionContext.getApplicationName() == null) {
35       submissionContext.setApplicationName(
36           YarnConfiguration.DEFAULT_APPLICATION_NAME);
37     }
38     if (submissionContext.getApplicationType() == null) {
39       submissionContext
40         .setApplicationType(YarnConfiguration.DEFAULT_APPLICATION_TYPE);
41     } else {
42       if (submissionContext.getApplicationType().length() > YarnConfiguration.APPLICATION_TYPE_LENGTH) {
43         submissionContext.setApplicationType(submissionContext
44           .getApplicationType().substring(0,
45             YarnConfiguration.APPLICATION_TYPE_LENGTH));
46       }
47     }
48 
49     try {
50     //提交application
51       // call RMAppManager to submit application directly
52       rmAppManager.submitApplication(submissionContext,
53           System.currentTimeMillis(), user);
54 
55       LOG.info("Application with id " + applicationId.getId() + 
56           " submitted by user " + user);
57       RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST,
58           "ClientRMService", applicationId);
59     } catch (YarnException e) {
60       LOG.info("Exception in submitting application with id " +
61           applicationId.getId(), e);
62       RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
63           e.getMessage(), "ClientRMService",
64           "Exception in submitting application", applicationId);
65       throw e;
66     }
67 
68     SubmitApplicationResponse response = recordFactory
69         .newRecordInstance(SubmitApplicationResponse.class);
70     return response;
71   }
View Code

相关文章: