代理暴露操作都是围绕着invoker展开的,invoker在不同状态的转化

Protocol维护了invoker的生命周期

Dubbo服务暴露过程
下图引用地址
Dubbo服务暴露过程

ServiceImpl

服务提供方实现ServiceAPI的业务细节

获取触发器 - getInvoker

将实现类实例包装成invoker

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
    // ServiceImpl的包装类
    final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
    return new AbstractProxyInvoker<T>(proxy, type, url) {
        @Override
        protected Object doInvoke(T proxy, String methodName,
                                  Class<?>[] parameterTypes,
                                  Object[] arguments) throws Throwable {
            return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
        }
    };
}

Wrapper - 动态的ServiceImpl的处理类

通过(javassist实现)动态生成一个继承Wrapper的ServiceImpl的动态代理类

相当于是ServiceImpl特制的处理类,这个处理类的实例会缓存在Wrapper声明的一个map中,键为ServiceImpl的Class对象

private static final Map<Class<?>, Wrapper> WRAPPER_MAP = new ConcurrentHashMap<Class<?>, Wrapper>(); 

用Wrapper的三个代理方法来操作ServiceImpl实例的属性和方法,而不用显式的调用(控制反转)

/**
  * 设置ServiceImpl实例的属性
  * @param o serviceImpl instance
  * @param n property name
  * @param v property value
  */
public void setPropertyValue(Object o, String n, Object v) {
    
}

/**
  * 获取ServiceImpl实例的属性
  * @param o serviceImpl instance
  * @param n property name
  */
public Object getPropertyValue(Object o, String n) {
    
}
/**
  * 调用ServiceImpl实例的方法
  * @param o serviceImpl instance
  * @param n method name
  * @param p 方法参数类型?
  * @param v 方法参数列表
  */
public Object invokeMethod(Object o, String n, Class[] p, Object[] v){
    
}

ServiceImpl Invoker - 本地业务实现触发器

Invoker的作用

  • 保存和维护实现类(proxy)、服务接口(type)和服务地址(url)的对应关系

  • doinvoke不用场景下的不同的实现 SPI指定的默认实现是javassist

    • getInvoke - org.apache.dubbo.rpc.proxy.AbstractProxyInvoker的子类

      本地ServiceImpl实例的代理

    • refer - org.apache.dubbo.rpc.protocol.AbstractInvoker的子类

      远程服务的本地代理

public abstract class AbstractProxyInvoker<T> implements Invoker<T> {
    private final T proxy;

    private final Class<T> type;

    // URL在本地的方法中不被用到,会在暴露服务时用到
    private final URL url;

	// 属性构造器 略
    
    @Override
    public Result invoke(Invocation invocation) throws RpcException {
        RpcContext rpcContext = RpcContext.getContext();
        try {
            Object obj = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
            if (RpcUtils.isFutureReturnType(invocation)) {
                return new AsyncRpcResult((CompletableFuture<Object>) obj);
            } else if (rpcContext.isAsyncStarted()) { 
                return new AsyncRpcResult(rpcContext.getAsyncContext().getInternalFuture());
            } else {
                return new RpcResult(obj);
            }
        } catch (InvocationTargetException e) {
            if (rpcContext.isAsyncStarted() && !rpcContext.stopAsync()) {
                logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);
            }
            return new RpcResult(e.getTargetException());
        } catch (Throwable e) {
            throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

    protected abstract Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable;
}

暴露本地服务 - export

Protocol模块将invoker暴露成网络服务

通过远程协议将服务接口的本地业务实现暴露成网络服务,默认实现协议是dubbo,还可以选择hessian、rest、http、rmi、thrift等

org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol

step 1:将Invoker对象转化成Exporter

Exporter是对本地实现代理Invoker的封装(要与远端代理invoker区分开)

protected final Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap<>();
// balabala
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    // 初始化exporterMap 将invoker封装成Exporter 注册到集合中
    String key = serviceKey(url);// key是暴露的网络资源地址
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    exporterMap.put(key, exporter);
    // balabala
}

DubboExporter

继承AbstractExporterAbstractExporter实现的功能是对Invoker对象操作的封装

DubboExporter是专用于DubboProtcol:作用是通过key和exporterMap来维护(指代)protocol缓存中的一个exporter

step 2:暴露网络服务

//servicekey-stubmethods
private final ConcurrentMap<String, String> stubServiceMethodsMap = new ConcurrentHashMap<>();

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    //export an stub service for dispatching event
    Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
    Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
    if (isStubSupportEvent && !isCallbackservice) {
        String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
        if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
            // 空值警告
            }
        } else {
        	// 注册到服务列表
            stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
        }
    }
	// 检查serverMap缓存,通过createServer(url)开启网络服务
    openServer(url);
    optimizeSerialization(url);    
}

step 3:处理远端调用

初始化好的ExchangeServer和Exporter是如何调用的?

ExchangeHandlerAdapter 适配处理器

定义在DubboProtocol中的私有属性requestHandler

创建服务时,会将这个handler绑定到这个server上,提供回调

server = Exchangers.bind(url, requestHandler);

对于服务端,当监听到远端调用请求时,会回调requestHandler的reply方法

Invocation中包含服务调用地址信息和服务方法参数

  • 根据invocation中的服务调用地址信息找到对应的实现类代理invoker

  • invoker根据invocation中的服务方法参数,执行实现逻辑,得到执行结果Result

  • 将result保证成CompletableFuture返回给server,server将执行结果返回给客户端

private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
    /**
     * 服务端应答
     * 处理客户端请求数据(Invocation),触发本地方法实现,并异步返回
     * @see HeaderExchangeHandler#handleRequest(ExchangeChannel, Request)
     */
    @Override
    public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) {
        if (message instanceof Invocation) {
            Invocation inv = (Invocation) message;
            // 根据调用信息,获取注册表中对应的exporter(getInvoker为宿主DubboProtocol中的实现)
            Invoker<?> invoker = getInvoker(channel, inv);
			// 检查调用请求的方法 在本地注册表中是否存在 balabala
            
            // 设置RpcContext(线程本地存储)balabala
            
            // 触发本地服务实现 返回方法执行结果 Result是对结果的封装
            Result result = invoker.invoke(inv);
			// 包装返回值 balabala
        }
        // 异常处理 balabala
    }
    // 其他实现 balabala
}

服务暴露实现

URL

包含了服务暴露的所有信息

ExchangeServer

对网络通信服务端操作的封装(包装Server对象,如NettyServer),还有对应的ExchangeClient

通过一顿设计模式的操作,由HeaderExchanger调用NettyTransporter创建一个NettyServer

Exchanger

负责数据交换和网络通信的组件,封装里数据传输层Transporter

相关设计模式

http://en.wikipedia.org/wiki/Message_Exchange_Pattern

http://en.wikipedia.org/wiki/Request-response

@SPI(HeaderExchanger.NAME)
public interface Exchanger {

    @Adaptive({Constants.EXCHANGER_KEY})
    ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException;

    @Adaptive({Constants.EXCHANGER_KEY})
    ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException;

}

Transporter

创建服务,远程数据传输的实现(默认使用netty)

public class NettyTransporter implements Transporter {
    public static final String NAME = "netty3";
    @Override
    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyServer(url, listener);
    }

    @Override
    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyClient(url, listener);
    }

}

相关文章: