在上一篇我们看到jobmanager的启动类org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
来看一下StandaloneSessionClusterEntrypoint启动类的一些重要的方法:
--->>>
我们先看main方法:
ClusterEntrypoint是一个抽象类,
--->>>
调用它的方法:
public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {
final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName();
try {
//在这里启动了集群
clusterEntrypoint.startCluster();
} catch (ClusterEntrypointException e) {
LOG.error(String.format("Could not start cluster entrypoint %s.", clusterEntrypointName), e);
System.exit(STARTUP_FAILURE_RETURN_CODE);
}
clusterEntrypoint.getTerminationFuture().whenComplete((applicationStatus, throwable) -> {
final int returnCode;
if (throwable != null) {
returnCode = RUNTIME_FAILURE_RETURN_CODE;
} else {
returnCode = applicationStatus.processExitCode();
}
LOG.info("Terminating cluster entrypoint process {} with exit code {}.", clusterEntrypointName, returnCode, throwable);
System.exit(returnCode);
});
}
--->>>
启动集群的具体方法:
public void startCluster() throws ClusterEntrypointException {
LOG.info("Starting {}.", getClass().getSimpleName());
try {
configureFileSystems(configuration);
SecurityContext securityContext = installSecurityContext(configuration);
securityContext.runSecured((Callable<Void>) () -> {
//启动集群,传入配置文件参数
runCluster(configuration);
return null;
});
} catch (Throwable t) {
final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
try {
// clean up any partial state
shutDownAsync(
ApplicationStatus.FAILED,
ExceptionUtils.stringifyException(strippedThrowable),
false).get(INITIALIZATION_SHUTDOWN_TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
strippedThrowable.addSuppressed(e);
}
throw new ClusterEntrypointException(
String.format("Failed to initialize the cluster entrypoint %s.", getClass().getSimpleName()),
strippedThrowable);
}
}
--->>>
我们看看 runCluster这个方法做了啥子:
private void runCluster(Configuration configuration) throws Exception {
synchronized (lock) {
//todo 初始化了一些ClusterEntrypoint.java中的一些服务,比如:HA,blob,heartbeat,metricRegistry这些
initializeServices(configuration);
//todo 将host信息写入配置
configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());
final DispatcherResourceManagerComponentFactory<?> dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration);
//todo 将工厂真正开启接口,其中包括了一些创建以及启动ResourceManager(有用于请求solt的RPC,初始化所有solt到resourceManager的soltManager的RPC(这个会在jobmanager接收到jobGraph后调用),TM心跳等),启动web服务
clusterComponent = dispatcherResourceManagerComponentFactory.create(
configuration,
commonRpcService,
haServices,
blobServer,
heartbeatServices,
metricRegistry,
archivedExecutionGraphStore,
new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),
this);
clusterComponent.getShutDownFuture().whenComplete(
(ApplicationStatus applicationStatus, Throwable throwable) -> {
//todo 出错了,关闭方法
if (throwable != null) {
shutDownAsync(
ApplicationStatus.UNKNOWN,
ExceptionUtils.stringifyException(throwable),
false);
} else {
//todo 这是一般的关闭路径。如果是单独的更具体的关闭
//todo 已经触发,这将什么也不做
shutDownAsync(
applicationStatus,
null,
true);
}
});
}
}
--->>>
具体的ResourceManager的初始化:
点进去:
这是一个接口:
DispatcherResourceManagerComponentFactory
我们看到抽象类
AbstractDispatcherResourceManagerComponentFactory 实现了这个接口
创建resourceManager 用于接收slot的请求:
我们发现它是一个接口:
来看类:
package org.apache.flink.runtime.resourcemanager;
这是一个枚举类:
public enum StandaloneResourceManagerFactory implements ResourceManagerFactory<ResourceID> {
}
点进去看看发现这个类创建了一个继承了resourceManager 这个抽象类
Resourcemanager 这个抽象类:
它
继承ResourceIDRetrievable>
继承FencedRpcEndpoint<ResourceManagerId>
实现 ResourceManagerGateway, LeaderContender {
这个接口下的几个重要的RPC方法具体实现:
1)向resourceManager请求slot:
2)这个rpc想resourceManager发送包括像taskManagaer有多少可分配的solt,哪些已分配的solt,solt的状态等
--->>>
最后创建完毕之后调用启动方法:
调用的是rpcServer
--->>>
完事之后开始调度了:
--->>>
创建了一个Dispatcher调度对象
看下Dispatcher是用来干嘛的(StandaloneDispatcher都是调用了父类的初始化方法super()创建一个Dispatcher.java对象)
来看一下Dispatcher实现了什么接口(ResourceManager同理)
看它实现的接口:
里面有提交job:
ok,我们再返回去找到具体的实现方法:
点进去:
最后:
他实现了submitJob()接口用于启动一个RPC,接受参数可以看到接受到一个JobGraph,这就意味着这和job任务启动有关,后面随缘更新到job启动Graph转换会提到
回到前面的Dispatcher.start()将传入的rpcService启动起来了,等待接受来自Driver端提交上来的JobGraph差不多启动完成了
参考:https://www.cnblogs.com/ljygz/p/11405572.html