我们在https://www.cnblogs.com/dongxiao-yang/p/9403427.html文章里分析了flink提交single job到yarn集群上的代码,flink在1.5版本后对整个框架的deploy方式重构了全新的流程(参考https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077),本文基于flink1.6.1版本源码分析一下新模式在yarn的整个流程。
一 初始化
客户端本地整个初始化流程与https://www.cnblogs.com/dongxiao-yang/p/9403427.html差不多,由于newmode的关系,几个有区别的地方为
1 final ClusterDescriptor<T> clusterDescriptor = customCommandLine.createClusterDescriptor(commandLine); ,返回的具体对象类为YarnClusterDescriptor
2 ClientFrontend.runProgram方法会进入if (isNewMode && clusterId == null && runOptions.getDetachedMode()) {..方法块,调用路径为
YarnClusterDescriptor.deployJobCluster->AbstractYarnClusterDescriptor.deployInternal->startAppMaster
这个时候我们发现AM的启动类变成了YarnJobClusterEntrypoint
二 YarnJobClusterEntrypoint
YarnJobClusterEntrypoint的main函数是整个AM进程的启动入口,在方法的最后会调用其祖父类ClusterEntrypoint的startCluster方法开启整个集群组件的启动过程。
具体调用链路为startCluster->runCluster->startClusterComponents
protected void startClusterComponents(
Configuration configuration,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
BlobServer blobServer,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry) throws Exception {
synchronized (lock) {
dispatcherLeaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever();
resourceManagerRetrievalService = highAvailabilityServices.getResourceManagerLeaderRetriever();
LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
rpcService,
DispatcherGateway.class,
DispatcherId::fromUuid,
10,
Time.milliseconds(50L));
LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
rpcService,
ResourceManagerGateway.class,
ResourceManagerId::fromUuid,
10,
Time.milliseconds(50L));
// TODO: Remove once we have ported the MetricFetcher to the RpcEndpoint
final ActorSystem actorSystem = ((AkkaRpcService) rpcService).getActorSystem();
final Time timeout = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
webMonitorEndpoint = createRestEndpoint(
configuration,
dispatcherGatewayRetriever,
resourceManagerGatewayRetriever,
transientBlobCache,
rpcService.getExecutor(),
new AkkaQueryServiceRetriever(actorSystem, timeout),
highAvailabilityServices.getWebMonitorLeaderElectionService());
LOG.debug("Starting Dispatcher REST endpoint.");
webMonitorEndpoint.start();
resourceManager = createResourceManager(
configuration,
ResourceID.generate(),
rpcService,
highAvailabilityServices,
heartbeatServices,
metricRegistry,
this,
clusterInformation,
webMonitorEndpoint.getRestBaseUrl());
jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, rpcService.getAddress());
final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint);
dispatcher = createDispatcher(
configuration,
rpcService,
highAvailabilityServices,
resourceManager.getSelfGateway(ResourceManagerGateway.class),
blobServer,
heartbeatServices,
jobManagerMetricGroup,
metricRegistry.getMetricQueryServicePath(),
archivedExecutionGraphStore,
this,
webMonitorEndpoint.getRestBaseUrl(),
historyServerArchivist);
LOG.debug("Starting ResourceManager.");
resourceManager.start();
resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
LOG.debug("Starting Dispatcher.");
dispatcher.start();
dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);
}
}
从上述代码里可以发现,AM里面包含两个重要的全新组件:ResourceManager和Dispatcher
在FLIP6的改进下,Resource这个全新的角色定义如下:
The main tasks of the ResourceManager are
-
Acquire new TaskManager (or slots) by starting containers, or allocating them to a job
-
Giving failure notifications to JobManagers and TaskManagers
-
Caching TaskManagers (containers) to be reused, releasing TaskManagers (containers) that are unused for a certain period.
大体来说就是由ResourceManager负责和YARN集群进行资源申请上的沟通,并给指定JobManager分配特定
aa
在yarn模式下,ResourceManager对应的实现类为YarnResourceManager,在这个类的initialize方法中,我们可以发现它实例化了两个client,resourceManagerClient和nodeManagerClient,这两个客户端分别包含了Yarn框架的AMRMClientAsync和NMClient,分别用来负责和Yarn的ResourceManager和NodeManager通信。
@Override protected void initialize() throws ResourceManagerException { try { resourceManagerClient = createAndStartResourceManagerClient( yarnConfig, yarnHeartbeatIntervalMillis, webInterfaceUrl); } catch (Exception e) { throw new ResourceManagerException("Could not start resource manager client.", e); } nodeManagerClient = createAndStartNodeManagerClient(yarnConfig); }