1.dubbo spi机制
以dubbo源码的dubbo-demo-provider为例说明
dubbo-demo-provider.xml内容如下
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
xmlns="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
<dubbo:application name="demo-provider"/>
<dubbo:registry address="zookeeper://127.0.0.1:2181" />
<dubbo:protocol name="dubbo" port="20880" server="netty4" transporter="netty4" />
<bean />
<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService"/>
</beans>
spring容器启动,会去解析自定义的标签,dubbo的标签定义都在DubboNamespaceHandler内,具体解析就不说了,这个看过spring源码的都知道,不知道也没关系,不影响理解dubbo,下面说下每个dubbo标签的功能
dubbo:application 创建ApplicationConfig,保存配置上的值到ApplicationConfig属性上。
<dubbo: registry> 创建RegistryConfig,保存配置上的值到RegistryConfig属性上。
dubbo:protocol 创建ProtocolConfig,保存配置上的值到ProtocolConfig属性上
dubbo:service 创建ServiceBean,保存配置上的值到ServiceBean属性上,该类和spring紧密联系,它实现了spring bean的初始化接口InitializingBean,销毁接口DisposableBean,感知接口ApplicationContextAware、BeanNameAware,监听器接口ApplicationListener,这里不讲spring的启动流程,看我其它spring启动流程图,就可以看出这些spring接口的功能。
这些都是bean,都是单例对象存在于spring IOC容器内,那么既然是bean,就有bean的初始化、创建了。
对于ServiceBean,由于实现了InitializingBean、ApplicationContextAware、BeanNameAware、ApplicationListener,那么在spring对该bean启动过程会有一些特殊操作,按照该bean的创建执行顺序分别是:
step1:ServiceBean.setBeanName(String) 设置bean名称,实现了BeanNameAware可以自定义名称,这里名称就是interface属性值,在DubboBeanDefinitionParser解析的时候获取的。
step2:ServiceBean.setApplicationContext(ApplicationContext),吧IOC容器对象保存到ServiceBean属性内,同时该方法也把监听器ServiceBean添加到IOC容器的监听器集合中。
step3:ServiceBean.afterPropertiesSet()初始化步骤,服务暴露在该方法内执行。
step4:ServiceBean.onApplicationEvent(ContextRefreshedEvent),spring IOC启动后执行监听器调用这里,如果dubbo服务没有被暴露,则暴露服务。
加载ServiceBean肯定是在创建ServiceBean之前进行的,该类有2个重要属性,分别是父类ServiceConfig中的
private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
搞清楚这两个属性,那么对于dubbo的启动就比较容易明白了。
先看private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();属性在加载的时候赋值过程,这个执行还有点绕,当时看的时候不下画图了,结果被绕晕了,贴个该属性创建的流程图,跟着图看一圈代码,就很容易明白:
题外话,这个属性跟cglib的Enhancer.KEY_FACTORY属性很类似,都是这种写法,创建都是执行了一大堆复杂东西。
核心是一个创建自适应类,在com.alibaba.dubbo.common.extension.ExtensionLoader.createAdaptiveExtensionClassCode()内,
几个注解说明下
@SPI,用于注解在类、接口、枚举类上,作用是标记该接口是一个dubbo spi接口,即一个扩展点,运行时需要通过配置找到具体的实现类。比如Protocol就是个dubbo spi扩展点,用@SPI注解。
@Adaptive,自适应注解,标记在类、接口、枚举类、方法上,标记在类上,说明该类是个自适应类,标记在方法上,说明可以通过该方法,动态的从参数(比如URL)中来确定要使用哪个方法,这个是策略模式的升级版。比如Protocol的export、refer方法就被@Adaptive注解,生成的自适应类Protocol$Adaptive,根据url的protocol不同,使用不同的协议。
@Activate,自动激活注解,标记在类、接口、枚举类、方法上,使用在有多个扩展点实现,需要根据不同条件被激活的场景中,比如filter需要同时被激活多个。
ExtensionLoader类属性我加了注释,帮助阅读源码理解。
public class ExtensionLoader<T> {
private static final Logger logger = LoggerFactory.getLogger(ExtensionLoader.class);
private static final String SERVICES_DIRECTORY = "META-INF/services/";
private static final String DUBBO_DIRECTORY = "META-INF/dubbo/";
private static final String DUBBO_INTERNAL_DIRECTORY = DUBBO_DIRECTORY + "internal/";
private static final Pattern NAME_SEPARATOR = Pattern.compile("\\s*[,]+\\s*");
//全局变量,缓存已经创建的ExtensionLoader[XXXX]
private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<Class<?>, ExtensionLoader<?>>();//{interface com.alibaba.dubbo.common.extension.ExtensionFactory=com.alibaba.dubbo.common.extension.ExtensionLoader[com.alibaba.dubbo.common.extension.ExtensionFactory], interface com.alibaba.dubbo.rpc.Protocol=com.alibaba.dubbo.common.extension.ExtensionLoader[com.alibaba.dubbo.rpc.Protocol]}
//全局变量,缓存创建的ExtensionLoader[XXXX].type文件内的clazz实例对象
private static final ConcurrentMap<Class<?>, Object> EXTENSION_INSTANCES = new ConcurrentHashMap<Class<?>, Object>();//{class com.alibaba.dubbo.config.spring.extension.SpringExtensionFactory=com.alibaba.dubbo.config.spring.extension.SpringExtensionFactory@916eb0, class com.alibaba.dubbo.common.extension.factory.SpiExtensionFactory=com.alibaba.dubbo.common.extension.factory.SpiExtensionFactory@10dbc7a}
// ==============================
/*
* 构造器赋值,以该clazz作为名称的文件是要被加载的
*/
private final Class<?> type;
/*
* 构造器赋值
* 除了ExtensionLoader[ExtensionFactory]该属性是null,其他ExtensionLoader[XXXX]该属性是AdaptiveExtensionFactory
* 该属性的功能是用来在injectExtension(T instance)进行ioc注入(通过setter注入)
*/
private final ExtensionFactory objectFactory;//AdaptiveExtensionFactory,该属性就是用于给创建的XXX$Adaptive注入属性
/*
* 缓存加载/META-INF/dubbo/internal/、/META-INF/dubbo/、/META-INF/services/目录下的ExtensionLoader.type文件内
* 未被@Adaptive注解且构造器的参数非ExtensionFactory.type,则把clazz=>name保存该属性。其中clazz和name就是ExtensionLoader.type文件内的配置,
* 比如class SpringExtensionFactory=spring
* 缓存普通扩展类clazz
*/
private final ConcurrentMap<Class<?>, String> cachedNames = new ConcurrentHashMap<Class<?>, String>();
/*
* 缓存加载/META-INF/dubbo/internal/、/META-INF/dubbo/、/META-INF/services/目录下的ExtensionLoader.type文件内的
* 未被@Adaptive注解且构造器的参数非ExtensionFactory.type的clazz集合
* 缓存普通扩展类clazz和被@Active注解的扩展类,不包括自适应类和warpper类
*/
private final Holder<Map<String, Class<?>>> cachedClasses = new Holder<Map<String, Class<?>>>();//{registry=class com.alibaba.dubbo.registry.integration.RegistryProtocol, injvm=class com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol, dubbo=class com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol, mock=class com.alibaba.dubbo.rpc.support.MockProtocol}
/*
* 缓存加载/META-INF/dubbo/internal/、/META-INF/dubbo/、/META-INF/services/目录下的ExtensionLoader.type文件内
* 未被@Adaptive注解且clazz的构造器参数非ExtensionLoader.type的clazz(dubbo称为wrapper类)且clazz被@Activate注解的name和Activate.value
*/
private final Map<String, Activate> cachedActivates = new ConcurrentHashMap<String, Activate>();//如果被加载的类被@Activate注解,则存放其name和Activate.value
/*
* 保存的是ExtensionLoader.type文件内的name和clazz实例对象,在createExtension()内创建
*/
private final ConcurrentMap<String, Holder<Object>> cachedInstances = new ConcurrentHashMap<String, Holder<Object>>();//{spring=com.alibaba.dubbo.common.utils.Holder@1282ed8, spi=com.alibaba.dubbo.common.utils.Holder@f57048}
/*
* 自适应对象,
* 如果ExtensionLoader.type文件内的类有被@Adaptive注解,则保存的是该类,否则通常由createAdaptiveExtension()创建
* 比如ExtensionLoader[ExtensionFactory]该值是AdaptiveExtensionFactory
* 比如Protocol是Protocol$Adaptive,是由createAdaptiveExtension()创建
* 缓存实例化后的自适应扩展类,只能有一个
*/
private final Holder<Object> cachedAdaptiveInstance = new Holder<Object>();//value=AdaptiveExtensionFactory
/*
* 缓存/META-INF/dubbo/internal/、/META-INF/dubbo/、/META-INF/services/目录下的ExtensionLoader.type文件内被@Adaptive注解的clazz,
* ExtensionLoader.type文件内最多只能有一个被@Adaptive注解的clazz,超过一个则抛异常
* 如果没有,则执行createAdaptiveExtensionClass()创建
*/
private volatile Class<?> cachedAdaptiveClass = null;//如果要加载的类被Adaptive注解,则赋值为该class。对于ExtensionFactory来说,其三个子类AdaptiveExtensionFactory、SpiExtensionFactory、SpringExtensionFactory只有AdaptiveExtensionFactory被注解了,因此就是AdaptiveExtensionFactory
/*
* 存放的是@SPI注解的value值,比如com.alibaba.dubbo.rpc.Protocol被@SPI("dubbo")注解,则该值就是dubbo
* 在ExtensionLoader.loadExtensionClasses()赋值
*/
private String cachedDefaultName;
private volatile Throwable createAdaptiveInstanceError;
/*
* 缓存/META-INF/dubbo/internal/、/META-INF/dubbo/、/META-INF/services/目录下的ExtensionLoader.type文件内
* 未被@Adaptive注解且clazz的构造器参数是ExtensionLoader.type的clazz,就是包装类
* 缓存wrapper类
*/
private Set<Class<?>> cachedWrapperClasses;
}
核心方法有四个,下面一个个说明
1.1.ExtensionLoader.getExtensionLoader(Class)
创建接口clazz对应的ExtensionLoader对象。该方法功能就是从全局缓存EXTENSION_LOADERS获取一个ExtensionLoader[type]对象,不存在则创建ExtensionLoader[type]对象放入全局缓存EXTENSION_LOADERS,最后返回ExtensionLoader[type]对象。
/*
* 传入参数是clazz,即该clazz是个文件存放在META-INF/dubbo/internal/、META-INF/dubbo/、META-INF/services/,跟jdk spi是类似的
* 从全局缓存EXTENSION_LOADERS获取type对应的ExtensionLoader[type]对象,不存在则创建
*/
@SuppressWarnings("unchecked")
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
if (type == null)
throw new IllegalArgumentException("Extension type == null");
if (!type.isInterface()) {//非接口,抛出异常
throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");
}
if (!withExtensionAnnotation(type)) {//class未被@SPI注解,抛出异常,只有被@SPI注解说明才是一个dubbo扩展接口
throw new IllegalArgumentException("Extension type(" + type +
") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");
}
ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);//从全局缓存获取
if (loader == null) {//不存在则创建
EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));//创建ExtensionLoader[type]对象,属性type存放的就是扩展接口clazz
loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
}
return loader;
}
1.2.ExtensionLoader.getAdaptiveExtension()
功能:获取ExtensionLoader.type接口对应的自适应对象,不存在则创建
//获取接口clazz(即ExtensionLoader.type)的自适应扩展类对象,如果不存在,则创建
@SuppressWarnings("unchecked")
public T getAdaptiveExtension() {
Object instance = cachedAdaptiveInstance.get();//从当前对象查找
if (instance == null) {
if (createAdaptiveInstanceError == null) {
synchronized (cachedAdaptiveInstance) {
instance = cachedAdaptiveInstance.get();
if (instance == null) {
try {
instance = createAdaptiveExtension();//创建自适应对象
cachedAdaptiveInstance.set(instance);//保存到当前对象
} catch (Throwable t) {
createAdaptiveInstanceError = t;
throw new IllegalStateException("fail to create adaptive instance: " + t.toString(), t);
}
}
}
} else {
throw new IllegalStateException("fail to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);
}
}
return (T) instance;//返回自适应对象
}
/*
* 创建自适应扩展并注入属性
*/
@SuppressWarnings("unchecked")
private T createAdaptiveExtension() {
try {
/*
* step1 getAdaptiveExtensionClass 获取自适应对象的clazz
* step2 通过clazz.newInstance()反射创建自适应对象
* step3 进行setter注入属性,这个是dubbo spi ioc功能
*/
return injectExtension((T) getAdaptiveExtensionClass().newInstance());
} catch (Exception e) {
throw new IllegalStateException("Can not create adaptive extension " + type + ", cause: " + e.getMessage(), e);
}
}
/*
* 获取自适应clazz,形式是XXX$Adaptive
*/
private Class<?> getAdaptiveExtensionClass() {
getExtensionClasses();//如果缓存不存在则先加载
if (cachedAdaptiveClass != null) {//说明ExtensionFactory.type文件内的clazz有被@Adaptive注解的类
return cachedAdaptiveClass;
}
return cachedAdaptiveClass = createAdaptiveExtensionClass();//创建自适应对象保存到this.cachedAdaptiveClass
}
/*
* 生成自适应类的java代码,然后使用javaassit编译自适应类java代码为class
*/
private Class<?> createAdaptiveExtensionClass() {
String code = createAdaptiveExtensionClassCode();//生成的是XXX$Adaptive这个java文件内容
//如果需要查看生成的代码,则把变量code保存到文件内即可
ClassLoader classLoader = findClassLoader();
com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();//AdaptiveCompiler
return compiler.compile(code, classLoader);//默认使用JavassistCompiler编译java代码
}
/*
* 生成ExtensionLoader.type接口的实现类,并对接口内被@Adaptive注解的方法生成具体实现,
* 未被@Adaptive注解的方法抛出UnsupportedOperationException,
* 生成的类名是类名是ExtensionLoader.type$Adaptive
*/
private String createAdaptiveExtensionClassCode() {
//代码较多,省略。
//功能就是生成ExtensionLoader.type接口实现类的java代码,最后返的是个java代码对象
}
上步代码展示省略了编译java代码的步骤,这个编译我也没完全看懂,通常也不需要看懂的。
1.3.ExtensionLoader.getExtension(String)
获取name对应的扩展类,比如对于com.alibaba.dubbo.rpc.Protocol,那么name=dubbo的获取的是DubboProtocol,但是由于有wrapper类,因此获取到的对象是ProtocolFilterWrapper,结构是
为什么要使用wrapper呢?wrapper是对目标对象进行了一层装饰,为了增强目标对象,但是又不对目标对象增加额外功能,保持目标对象的功能共用性和单一性。
/*
* 功能:获取普通扩展类对象XXX$Adaptive
* 在ExtensionLoader[XXXX].getExtension(String name)获取ExtensionLoader.type文件内name对应的clazz实例(即自适应类)
* 参数name是url上的属性值(或者是dubbo默认值)
*/
@SuppressWarnings("unchecked")
public T getExtension(String name) {//该方法功能就是获取ExtensionLoader.type文件内名称是name的对象实例,并放入缓存
if (name == null || name.length() == 0)
throw new IllegalArgumentException("Extension name == null");
if ("true".equals(name)) {
return getDefaultExtension();
}
Holder<Object> holder = cachedInstances.get(name);//从缓存获取对应的clazz
if (holder == null) {
cachedInstances.putIfAbsent(name, new Holder<Object>());
holder = cachedInstances.get(name);
}
Object instance = holder.get();
if (instance == null) {
synchronized (holder) {
instance = holder.get();
if (instance == null) {
instance = createExtension(name);
holder.set(instance);
}
}
}
return (T) instance;
}
/*
* 创建ExtensionLoader.type文件内name对应的clazz实例
* dubbo扩展类的4个特性:自动包装,自动加载,自适应,自动激活
* 这里是自动包装,就是目标扩展类自动会用wrapper类包装下
*/
@SuppressWarnings("unchecked")
private T createExtension(String name) {
Class<?> clazz = getExtensionClasses().get(name);//缓存获取
if (clazz == null) {
throw findException(name);
}
try {
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());//创建clzz实例
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
injectExtension(instance);//setter注入
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
//spi的wrapper对象就是该类只有一个构造器且构造器的参数是ExtensionLoader.type,比如ProtocolFilterWrapper、ProtocolListenerWrapper就是DubboProtocol的wrapper类
if (wrapperClasses != null && !wrapperClasses.isEmpty()) {//使用wrapper类装饰,这样作用是为了增强功能,而且是类功能分开
for (Class<?> wrapperClass : wrapperClasses) {
/*
* step1: wrapper类使用instance实例化
* step2: wrapper类属性注入
*/
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));//wrapper实例化并注入
}
}
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
type + ") could not be instantiated: " + t.getMessage(), t);
}
}
1.4.ExtensionLoader.getActivateExtension(URL, String[], String)
/*
* 参数values是url上属性值集合,比如filter属性,配置了多个filter,那么就是filter1、filter2、filter3
* 参数group是url上属性group的value值
* 功能:获取所有自动激活扩展点
*/
public List<T> getActivateExtension(URL url, String[] values, String group) {//遍历cachedActivates集合,返回@Activate.value匹配group名称的实例集合
List<T> exts = new ArrayList<T>();
List<String> names = values == null ? new ArrayList<String>(0) : Arrays.asList(values);
//step0:加入默认的激活
if (!names.contains(Constants.REMOVE_VALUE_PREFIX + Constants.DEFAULT_KEY)) {//-default表示禁用所有默认
//step1:检查缓存,如缓存没有,则加载到缓存
getExtensionClasses();
/*
* step2:遍历ExtensionLoader.type文件内所有被@Activate注解的类,比如com.alibaba.dubbo.rpc.Filter文件内所有被@Activate注解的类
* 根据传入的url匹配条件(匹配group name),得到所有符合激活条件的扩展类实现。然后根据@Activate中配置的before after order排序
*/
for (Map.Entry<String, Activate> entry : cachedActivates.entrySet()) {//
String name = entry.getKey();//获取名称,比如com.alibaba.dubbo.rpc.Filter 文件内future
Activate activate = entry.getValue();//获取类上的Activate注解
if (isMatchGroup(group, activate.group())) {//group匹配,参数group为null 或者 被@Activate.group值集合包含 则匹配
T ext = getExtension(name);//获取名称为name对应的扩展点
if (!names.contains(name)//默认激活是不在url配置
&& !names.contains(Constants.REMOVE_VALUE_PREFIX + name)//-表示禁止
&& isActive(activate, url)) {//处于激活状态
exts.add(ext);
}
}
}
Collections.sort(exts, ActivateComparator.COMPARATOR);//按照@Activate中配置的before after order排序
}
/*
* step3:遍历所有用户自定义扩展类名称,根据用户url配置的顺序,调整扩展点激活顺序(按照用户在url中配置的顺序)
*/
List<T> usrs = new ArrayList<T>();
for (int i = 0; i < names.size(); i++) {
String name = names.get(i);
if (!name.startsWith(Constants.REMOVE_VALUE_PREFIX)//-表示禁用
&& !names.contains(Constants.REMOVE_VALUE_PREFIX + name)) {
//这个理解稍微麻烦些,比如test://localhost/text?ext=order1,default,order2,那么扩展点ext的激活顺序是按照order1->default->order2,这个default是表示前面的默认激活扩展点
if (Constants.DEFAULT_KEY.equals(name)) {
if (!usrs.isEmpty()) {
exts.addAll(0, usrs);//把usrs添加到exts的头部,即default扩展点的前面
usrs.clear();
}
} else {
T ext = getExtension(name);//获取名称name对应的扩展
usrs.add(ext);
}
}
}
if (!usrs.isEmpty()) {
exts.addAll(usrs);
}
return exts;//最后返回的激活扩展点集合是顺序先默认激活+用户自定义激活
}
那么判断是处于激活状态呢?如何判断的在isActive方法内
/*
* 分2种情况
* case1:被@Activate注解的扩展点的value集合是空,返回true,说明处于自动激活状态
* case2:@Activate注解的扩展点的value集合和url上配置的激活key每个比较,如果相同 或者 url上的k是以Activate上的.key结尾 且url上的k的value非空 说明匹配
*/
private boolean isActive(Activate activate, URL url) {
String[] keys = activate.value();
if (keys.length == 0) {
return true;
}
//双层for循环,如果url上的k==Activate上的key || url上的k是以Activate上的.key结尾 且url上的k的value非空 说明匹配
for (String key : keys) {
for (Map.Entry<String, String> entry : url.getParameters().entrySet()) {//遍历url上的参数,
String k = entry.getKey();
String v = entry.getValue();
if ((k.equals(key) || k.endsWith("." + key))//k==key || k是以.key结尾,说明匹配
&& ConfigUtils.isNotEmpty(v)) {
return true;
}
}
}
return false;
}
下面给出个自己分析的时候,生成的自适应类,供参考
链接:https://share.weiyun.com/5qDR7qt
2.provider启动流程
按照自己习惯,先贴总体流程图
图片备份地址 https://gitee.com/yulewo123/mdpicture/raw/master/docs/20201220143631.jpg
总结如下:
invoker对象本质就是对目标对象的代理,在dubbo中对通过代理对目标对象增强了功能,增加了filter,那么最终暴露的是exporter对象,需要把invoker对象包装为Exporter对象,Exporter对象也通过了装饰增加了功能,最终暴露到zk的服务对象的结构是这样的:
总结dubbo启动流程大体是这样的,生成registry协议的url,生成dubbo协议的url,把dubbo url作为属性保存到registry协议url上,属性的export,接着生成目标对象的代理对象invoker,给该invoker增强(增加filter处理链),把invoker包装为protocol协议的export(默认是DubboExporter),启动netty监听端口,增加netty channelHandler处理链,在zk上创建服务节点,接着订阅本节点,zk创建configurators节点,zk watch机制监听该节点变化,有变化,则重新暴露服务,最后把export对象暴露(缓存起来),exporter对象也是经过了多层装饰。
启动过程大量用到了spi机制,根据url上的参数配置spi找到对应的具体实现对象,完全进行了解耦,理解了这个,很容易对自己的项目进行特定扩展,比如最常见增加filter。
3.consumer启动流程
consumer的配置如下
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
xmlns="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
<!-- consumer's application name, used for tracing dependency relationship (not a matching criterion),
don't set it same as provider -->
<dubbo:application name="demo-consumer"/>
<!-- use multicast registry center to discover service -->
<!-- <dubbo:registry address="multicast://224.5.6.7:1234"/> -->
<dubbo:registry address="zookeeper://47.98.54.117:2181" />
<!-- generate proxy for the remote service, then demoService can be used in the same way as the
local regular interface -->
<dubbo:reference />
</beans>
dubbo:reference对应的是ReferenceBean,同ServiceBean一样,也是实现了spring提供的扩展接口,核心入口是ReferenceConfig.createProxy(Map<String,String>),看流程图:
图片备份地址 https://gitee.com/yulewo123/mdpicture/raw/master/docs/20201220143748.jpg
消费端消费provider服务,但是实际都是存在多个provider的,那就为每个provider生成一个对应的invoker对象(默认dubboinvoker),但是最终得表现为一个invoker,那就包装下,把invoker集合包装为一个对象就是RegistryDirectory(实际保存在urlInvokerMap属性上),接着又包装为FailoverClusterInvoker(集群容错)、MockClusterInvoker,最终表现为对外的invoker对象就是MockClusterInvoker,那么该invoker对象结构就是:
MockClusterInvoker
FailoverClusterInvoker
RegistryDirectory.methodInvokerMap集合内存放是proviver的Invoker,表示provider集群
RegistryDirectory$InvokerDelegate
ProtocolFilterWrapper$1[filterchain: ConsumerContextFilter -> FutureFilter -> MonitorFilter]
DubboInvoker
包装为了invoker,但是类型不是所需要的目标类型com.alibaba.dubbo.demo.DemoService,那就对invoker生成代理类,为代理类增加目标接口,这样生成的proxy类实现了目标接口又包装了invoker对象,然后调用的时候proxy类直接调用invoker对象,这个和jdk的动态代理模式思想是一样的,只是使用的javaassit生成的代理类。
为什么要生成代理类呢?因为要远程调用,要连接服务端,涉及到编解码,这些步骤都是公共部分,那么就包装起来,因此invoker类就是包装了公共部分功能。
消费端订阅了zk上的providers节点、configurators、routers节点,在这些节点发发生变化的时候,zk向消费端发送通知,然后消费端从zk拉取到节点变化内容,执行对应的变化,比如重新生成invoker对象。 这里的执行流程是
4.dubbo调用流程
以dubbo源码的例子来说,String hello = demoService.sayHello("world");,该步骤是实际的调用,表面看就是IOC容器内的bean执行方法,但是方法的具体实现是在provider的,使用的是netty,下图是dubbo consume调provider总体流程图
图片备份地址 https://gitee.com/yulewo123/mdpicture/raw/master/docs/20201220143849.jpg
大体功能是:consume发起请求,把要执行的目标方法、参数包装为RpcInvocation,根据集群策略查找到consume启动的时候保存的所有该方法对应的invoker集合,然后经过router筛选后,返回符合的invoker集合,再通过负载均衡策略获取一个invoker,然后执行该invoker的调用,invoker是个装饰对象,执行其内部被装饰的一个个invoker,那么经过invoker和filter处理,最终调用到netty client发送数据,创建com.alibaba.dubbo.remoting.exchange.Request,该对象包装了RpcInvocation,通过Netty进行网络发送,经过netty ChannelHandler处理链的编码写入到缓冲区,刷新缓冲区,从而把数据发送到provider发送出去,因为netty是异步非阻塞的(发送线程和业务线程不是同一个),如果是同步发送方式,ResponseFuture进行同步等待结果,异步的话不需要等待结果。
provider端netty io线程不停轮询selector,如果有可读事件,则触发ChannelHandler链的channelRead()操作,那么由netty读取到consume请求,进行解码,接收处理,根据url获取到provider暴露的服务,从暴露的服务获取到对应的invoker对象,那么执行invoker对象,invoker对象是个装饰链对象,一层层执行内部Invoker,再经过filter链处理,最终执行到目标方法。然后把目标方法进行编码通过netty传输返回到consume。
FIXME 这个图应该用管道图更加清晰,后续改为管道图。