Dubbo服务暴露过程
代理暴露操作都是围绕着invoker展开的,invoker在不同状态的转化
Protocol维护了invoker的生命周期
ServiceImpl
服务提供方实现ServiceAPI的业务细节
获取触发器 - getInvoker
将实现类实例包装成invoker
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// ServiceImpl的包装类
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
Wrapper - 动态的ServiceImpl的处理类
通过(javassist实现)动态生成一个继承Wrapper的ServiceImpl的动态代理类
相当于是ServiceImpl特制的处理类,这个处理类的实例会缓存在Wrapper声明的一个map中,键为ServiceImpl的Class对象
private static final Map<Class<?>, Wrapper> WRAPPER_MAP = new ConcurrentHashMap<Class<?>, Wrapper>();
用Wrapper的三个代理方法来操作ServiceImpl实例的属性和方法,而不用显式的调用(控制反转)
/**
* 设置ServiceImpl实例的属性
* @param o serviceImpl instance
* @param n property name
* @param v property value
*/
public void setPropertyValue(Object o, String n, Object v) {
}
/**
* 获取ServiceImpl实例的属性
* @param o serviceImpl instance
* @param n property name
*/
public Object getPropertyValue(Object o, String n) {
}
/**
* 调用ServiceImpl实例的方法
* @param o serviceImpl instance
* @param n method name
* @param p 方法参数类型?
* @param v 方法参数列表
*/
public Object invokeMethod(Object o, String n, Class[] p, Object[] v){
}
ServiceImpl Invoker - 本地业务实现触发器
Invoker的作用
-
保存和维护实现类(proxy)、服务接口(type)和服务地址(url)的对应关系
-
doinvoke不用场景下的不同的实现 SPI指定的默认实现是javassist
-
getInvoke -
org.apache.dubbo.rpc.proxy.AbstractProxyInvoker的子类本地ServiceImpl实例的代理
-
refer -
org.apache.dubbo.rpc.protocol.AbstractInvoker的子类远程服务的本地代理
-
public abstract class AbstractProxyInvoker<T> implements Invoker<T> {
private final T proxy;
private final Class<T> type;
// URL在本地的方法中不被用到,会在暴露服务时用到
private final URL url;
// 属性构造器 略
@Override
public Result invoke(Invocation invocation) throws RpcException {
RpcContext rpcContext = RpcContext.getContext();
try {
Object obj = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
if (RpcUtils.isFutureReturnType(invocation)) {
return new AsyncRpcResult((CompletableFuture<Object>) obj);
} else if (rpcContext.isAsyncStarted()) {
return new AsyncRpcResult(rpcContext.getAsyncContext().getInternalFuture());
} else {
return new RpcResult(obj);
}
} catch (InvocationTargetException e) {
if (rpcContext.isAsyncStarted() && !rpcContext.stopAsync()) {
logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);
}
return new RpcResult(e.getTargetException());
} catch (Throwable e) {
throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
protected abstract Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable;
}
暴露本地服务 - export
Protocol模块将invoker暴露成网络服务
通过远程协议将服务接口的本地业务实现暴露成网络服务,默认实现协议是dubbo,还可以选择hessian、rest、http、rmi、thrift等
org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol
step 1:将Invoker对象转化成Exporter
Exporter是对本地实现代理Invoker的封装(要与远端代理invoker区分开)
protected final Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap<>();
// balabala
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
// 初始化exporterMap 将invoker封装成Exporter 注册到集合中
String key = serviceKey(url);// key是暴露的网络资源地址
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
// balabala
}
DubboExporter
继承AbstractExporter,AbstractExporter实现的功能是对Invoker对象操作的封装
DubboExporter是专用于DubboProtcol:作用是通过key和exporterMap来维护(指代)protocol缓存中的一个exporter
step 2:暴露网络服务
//servicekey-stubmethods
private final ConcurrentMap<String, String> stubServiceMethodsMap = new ConcurrentHashMap<>();
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
// 空值警告
}
} else {
// 注册到服务列表
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
// 检查serverMap缓存,通过createServer(url)开启网络服务
openServer(url);
optimizeSerialization(url);
}
step 3:处理远端调用
初始化好的ExchangeServer和Exporter是如何调用的?
ExchangeHandlerAdapter 适配处理器
定义在DubboProtocol中的私有属性requestHandler
创建服务时,会将这个handler绑定到这个server上,提供回调
server = Exchangers.bind(url, requestHandler);
对于服务端,当监听到远端调用请求时,会回调requestHandler的reply方法
Invocation中包含服务调用地址信息和服务方法参数
-
根据invocation中的服务调用地址信息找到对应的实现类代理invoker
-
invoker根据invocation中的服务方法参数,执行实现逻辑,得到执行结果Result
-
将result保证成CompletableFuture返回给server,server将执行结果返回给客户端
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
/**
* 服务端应答
* 处理客户端请求数据(Invocation),触发本地方法实现,并异步返回
* @see HeaderExchangeHandler#handleRequest(ExchangeChannel, Request)
*/
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
// 根据调用信息,获取注册表中对应的exporter(getInvoker为宿主DubboProtocol中的实现)
Invoker<?> invoker = getInvoker(channel, inv);
// 检查调用请求的方法 在本地注册表中是否存在 balabala
// 设置RpcContext(线程本地存储)balabala
// 触发本地服务实现 返回方法执行结果 Result是对结果的封装
Result result = invoker.invoke(inv);
// 包装返回值 balabala
}
// 异常处理 balabala
}
// 其他实现 balabala
}
服务暴露实现
URL
包含了服务暴露的所有信息
ExchangeServer
对网络通信服务端操作的封装(包装Server对象,如NettyServer),还有对应的ExchangeClient
通过一顿设计模式的操作,由
HeaderExchanger调用NettyTransporter创建一个NettyServer
Exchanger
负责数据交换和网络通信的组件,封装里数据传输层Transporter
相关设计模式
@SPI(HeaderExchanger.NAME)
public interface Exchanger {
@Adaptive({Constants.EXCHANGER_KEY})
ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException;
@Adaptive({Constants.EXCHANGER_KEY})
ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException;
}
Transporter
创建服务,远程数据传输的实现(默认使用netty)
public class NettyTransporter implements Transporter {
public static final String NAME = "netty3";
@Override
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
@Override
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
return new NettyClient(url, listener);
}
}