腾一部分笔记到csdn
1. 概述
如果不熟悉该协议,可以先看看 《Dubbo 使用指南 —— dubbo://》 ,简单了解即可。
相比本地暴露,远程暴露会多做如下几件事情:
-
启动通信服务器,绑定服务端口,提供远程调用。
-
向注册中心注册服务提供者,提供服务消费者从注册中心发现服务。
2. 远程暴露源码分析
在 #doExportUrlsFor1Protocol(protocolConfig, registryURLs) 方法中,涉及远程暴露服务的流程如下
|
ServiceConfig.java
->doExportUrlsFor1Protocol()
//有registryURLs
是
->循环registryURLs
->URL monitorUrl = loadMonitor(registryURL);//添加监控
->Invoker<?> invoker= proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));//创建Invoker,本地暴露有介绍
->DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); //封装一下
->Exporter<?> exporter= protocol.export(wrapperInvoker); //暴露服务
Protocol$Adpative.java
-> extension = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension("registry"); //获取到的是注册中心协议
-> extension.export(arg0)
-> ProtocolFilterWrapper.export // 和本地暴露一样,这里不细讲
-> ProtocolListenerWrapper.export// 和本地暴露一样,这里不细讲
RegistryProtocol.java
-> export()
-> final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); // 启动本地通讯服务
-> ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key); //查看是否已经向注册中心注册
-> exporter等于null就创建,exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete),originInvoker);
-> Protocol$Adpative.export()
-> extension = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension("dubbo");
-> extension.export(arg0)
-> ProtocolFilterWrapper.export // 和本地暴露一样,这里不细讲
-> ProtocolListenerWrapper.export// 和本地暴露一样,这里不细讲
1.netty服务暴露的开始-------
DubboProtocol.java
-> export()
-> DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); //创建dubbo的Exporter
-> exporterMap.put(key, exporter); // key等于${group}/${interface}:${version}:${port}
-> openServer(url); // 启动服务
-> createServer(url) //创建Service
-> server = Exchangers.bind(url, requestHandler);//exchaanger是一个信息交换层,注意这里面的requestHandler,就是处理网络请求的,是DubboProtocol实现。通过服务key到protocol的expoerts里面找expoert,里面有invoker
2.信息交换层exchanger开始 -> ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension("header")
HeaderExchanger.java
-> bind()
-> return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
3.网络传输层 transporter--------->Transporters.bind()
-> ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension()
Transporter$Adpative.java
-> extension = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension("netty");
-> extension.bind(arg0, arg1)
NettyTransporter.java
-> bind()
-> new NettyServer(url, listener) //下面都是对应构造方法
AbstractPeer //this.url = url; this.handler = handler;
AbstractEndpoint//codec timeout=1000 connectTimeout=3000
AbstractServer //bindAddress accepts=0 idleTimeout=600000
4.打开断开,暴露netty服务-------------> doOpen()
-> 设置NioServerSocketChannelFactory boss worker的线程池 线程个数为3
-> 设置编解码 hander
-> bootstrap.bind(getBindAddress())
-> this.server=NettyServer
-> heartbeat=60000
-> heartbeatTimeout=180000
-> startHeatbeatTimer()//这是一个心跳定时器,采用了线程池,如果断开就心跳重连。
-> URL registryUrl = getRegistryUrl(originInvoker); //获取注册中心URL
-> final Registry registry = getRegistry(originInvoker); // 获取注册中心对象
-> final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); //获取服务提供者URL
-> register(registryUrl, registedProviderUrl); // 向注册中心注册
-> final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); //创建监听配置规则URL
-> final OverrideListener overrideSubscribeListener = newOverrideListener(overrideSubscribeUrl, originInvoker); //创建监听器
-> registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); //注册监听器
<-------—> returnnew DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registedProviderUrl); //返回一个Exporter
->exporters.add(exporter); //添加到exporters
否
// 暴露服务但不向注册中心暴露
->Invoker<?> invoker= proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
->DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
->Exporter<?> exporter= protocol.export(wrapperInvoker);
->exporters.add(exporter);
|
->Exporter<?> exporter= protocol.export(wrapperInvoker); //暴露服务
此处 Dubbo SPI 自适应的特性的好处就出来了,可以自动根据 URL 参数,获得对应的拓展实现。例如,invoker 传入后,根据 invoker.url 自动获得对应 Protocol 拓展实现为 ReigsterProtocol 。RegistryProtocol 会在其 #export(...) 方法中,使用服务提供者的 URL ( 即注册中心的 URL 的 export 参数值),再次调用 Protocol$Adaptive 获取到的是 DubboProtocol 对象,进行服务暴露。
|
RegistryProtocol#export(...) {
// 1. 启动本地服务器
DubboProtocol#export(...);
// 2. 向注册中心注册。
}
|
实际上,Protocol 有两个 Wrapper 拓展实现类: ProtocolFilterWrapper、ProtocolListenerWrapper 。所以,#export(...) 方法的调用顺序是:
-
-
-
Protocol$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper => RegistryProtocol
-
=>
-
Protocol$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper => DubboProtocol
-
由于ProtocolFilterWrapper、ProtocolListenerWrapper均有判断如果是RegistryProtocol就不处理
-
-
-> URL registryUrl = getRegistryUrl(originInvoker); //获取注册中心URL
originInvoker里面的URL等于。 registry://注册中心ip: 端口/com.alibaba.dubbo.registry.RegistryService?register=zookee & export=服务URL,这里解析成
zookeeper://100.66.178.66:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.0 &export=dubbo%3A%2F%2F100.66.151.177%3A20881%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo- provider%26bind.ip%3D100.66.151.177%26bind.port%3D20881%26default.delay%3D-1%26default.retries%3D0%26delay%3D1%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26logger%3Djcl%26methods%3DcallbackParam%2CsayHello%2Csave//%2Cupdate%2Csay03%2Cdelete%2Csay04%2Cdemo%2Csay01%2Csay02%2Cbye%2Chello02%2Csaves%2Chello01%2Chello%26pid%3D865%26qos.port%3D22222%26side%3Dpro//vider%26timestamp%3D1576056813900&logger=jcl&pid=865&qos.port=22222×tamp=1576056813884
该过程是我们在 3* - API 配置(二)之服务提供者的那张图的反向流程,即红线部分 :
-> server = Exchangers.bind(url, requestHandler);//exchaanger是一个信息交换层,注意这里面的requestHandler,就是处理网络请求的,是DubboProtocol实现。通过服务key到protocol的expoerts里面找expoert,里面有invoker
开启服务器,这里简述下 具体细节如果后面理解了再长篇解释。
requestHanlder是DubboProticol里面new出来里的,他有一个处理方法处理远程调用传过了的Invocation(所以通过远程传输的仅仅是Invocation对象序列化后)解析 从exportMap取出对应的invoker,来调用方法对应方法获取返回结果。
requestHanlder会被封在HeaderExchangeHandler->DecodeHandler了里,通过Exchangers-》HeaderExchanger-》Transporters-〉最终给到NettyTransporter,传递到Nettyserver。
当nettyServer收到信息就会交给requestHanlder处理,获取结果返回
这里有两个问题要解决Exchangers、Transporters作为两个门面模式对应Exchanger和Transporter,但到底这两个层有什么区别?Channel和server又有什么区别?
-> final Registry registry = getRegistry(originInvoker); // 获取注册中心对象
-> final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); //获取服务提供者URL
-> register(registryUrl, registedProviderUrl); // 向注册中心注册
这几步在12 - 注册中心 Zookeeper中详细讲解
-> final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); //创建监听配置规则URL
-> final OverrideListener overrideSubscribeListener = newOverrideListener(overrideSubscribeUrl, originInvoker); //创建监听器
-> registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); //注册监听器
订阅在11 - 注册中心中讲解