我们在https://www.cnblogs.com/dongxiao-yang/p/9403427.html文章里分析了flink提交single job到yarn集群上的代码,flink在1.5版本后对整个框架的deploy方式重构了全新的流程(参考https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077),本文基于flink1.6.1版本源码分析一下新模式在yarn的整个流程。

 

一 初始化

客户端本地整个初始化流程与https://www.cnblogs.com/dongxiao-yang/p/9403427.html差不多,由于newmode的关系,几个有区别的地方为

1 final ClusterDescriptor<T> clusterDescriptor = customCommandLine.createClusterDescriptor(commandLine); ,返回的具体对象类为YarnClusterDescriptor

2 ClientFrontend.runProgram方法会进入if (isNewMode && clusterId == null && runOptions.getDetachedMode()) {..方法块,调用路径为

YarnClusterDescriptor.deployJobCluster->AbstractYarnClusterDescriptor.deployInternal->startAppMaster

这个时候我们发现AM的启动类变成了YarnJobClusterEntrypoint

 

二 YarnJobClusterEntrypoint

 YarnJobClusterEntrypoint的main函数是整个AM进程的启动入口,在方法的最后会调用其祖父类ClusterEntrypoint的startCluster方法开启整个集群组件的启动过程。

具体调用链路为startCluster->runCluster->startClusterComponents

protected void startClusterComponents(
			Configuration configuration,
			RpcService rpcService,
			HighAvailabilityServices highAvailabilityServices,
			BlobServer blobServer,
			HeartbeatServices heartbeatServices,
			MetricRegistry metricRegistry) throws Exception {
		synchronized (lock) {
			dispatcherLeaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever();

			resourceManagerRetrievalService = highAvailabilityServices.getResourceManagerLeaderRetriever();

			LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
				rpcService,
				DispatcherGateway.class,
				DispatcherId::fromUuid,
				10,
				Time.milliseconds(50L));

			LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
				rpcService,
				ResourceManagerGateway.class,
				ResourceManagerId::fromUuid,
				10,
				Time.milliseconds(50L));

			// TODO: Remove once we have ported the MetricFetcher to the RpcEndpoint
			final ActorSystem actorSystem = ((AkkaRpcService) rpcService).getActorSystem();
			final Time timeout = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));

			webMonitorEndpoint = createRestEndpoint(
				configuration,
				dispatcherGatewayRetriever,
				resourceManagerGatewayRetriever,
				transientBlobCache,
				rpcService.getExecutor(),
				new AkkaQueryServiceRetriever(actorSystem, timeout),
				highAvailabilityServices.getWebMonitorLeaderElectionService());

			LOG.debug("Starting Dispatcher REST endpoint.");
			webMonitorEndpoint.start();

			resourceManager = createResourceManager(
				configuration,
				ResourceID.generate(),
				rpcService,
				highAvailabilityServices,
				heartbeatServices,
				metricRegistry,
				this,
				clusterInformation,
				webMonitorEndpoint.getRestBaseUrl());

			jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, rpcService.getAddress());

			final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint);

			dispatcher = createDispatcher(
				configuration,
				rpcService,
				highAvailabilityServices,
				resourceManager.getSelfGateway(ResourceManagerGateway.class),
				blobServer,
				heartbeatServices,
				jobManagerMetricGroup,
				metricRegistry.getMetricQueryServicePath(),
				archivedExecutionGraphStore,
				this,
				webMonitorEndpoint.getRestBaseUrl(),
				historyServerArchivist);

			LOG.debug("Starting ResourceManager.");
			resourceManager.start();
			resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);

			LOG.debug("Starting Dispatcher.");
			dispatcher.start();
			dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);
		}
	}

从上述代码里可以发现,AM里面包含两个重要的全新组件:ResourceManager和Dispatcher

 

在FLIP6的改进下,Resource这个全新的角色定义如下:

The main tasks of the ResourceManager are

  • Acquire new TaskManager (or slots) by starting containers, or allocating them to a job

  • Giving failure notifications to JobManagers and TaskManagers

  • Caching TaskManagers (containers) to be reused, releasing TaskManagers (containers) that are unused for a certain period.

大体来说就是由ResourceManager负责和YARN集群进行资源申请上的沟通,并给指定JobManager分配特定

flink on yarn部分源码解析 (FLIP-6 new mode)

 

aa

在yarn模式下,ResourceManager对应的实现类为YarnResourceManager,在这个类的initialize方法中,我们可以发现它实例化了两个client,resourceManagerClient和nodeManagerClient,这两个客户端分别包含了Yarn框架的AMRMClientAsync和NMClient,分别用来负责和Yarn的ResourceManager和NodeManager通信。

    @Override
    protected void initialize() throws ResourceManagerException {
        try {
            resourceManagerClient = createAndStartResourceManagerClient(
                yarnConfig,
                yarnHeartbeatIntervalMillis,
                webInterfaceUrl);
        } catch (Exception e) {
            throw new ResourceManagerException("Could not start resource manager client.", e);
        }

        nodeManagerClient = createAndStartNodeManagerClient(yarnConfig);
    }
View Code

相关文章:

  • 2018-11-23
  • 2021-12-24
  • 2022-12-23
  • 2022-12-23
  • 2021-09-23
猜你喜欢
  • 2022-12-23
  • 2021-12-12
  • 2022-12-23
  • 2021-08-30
  • 2021-07-15
  • 2021-12-29
相关资源
相似解决方案