先从客户端开始

从FailoverClusterInvoker#doInvoke 得Result result = invoker.invoke(invocation);

会进入InvokerWrapper#invoke

接下去会进入一个链式结构,这是一种责任连模式得体现

ProtocolFilterWrapper#buildInvokerChain
方法中有两个参数,这两个参数记录这当前过滤器和下一个
final Filter filter = filters.get(i);
final Invoker<T> next = last;

dubbo源码解析(10)dubbo网络通信

这里截取consumer端的filter,MyDubboInvoker是自己定义的一个fitler,链式调用一直调用到

AbstractInvoker#invoke
RpcInvocation invocation = (RpcInvocation) inv;这里会封装调用参数

由于这时候的invoke是一个DubboInvoker,因为在服务引用的时候

采用DubboProtocol引用的时候回new一个DubboInvoker

public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
    // create rpc invoker.
    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
    invokers.add(invoker);
    return invoker;
}

我们进入DubboInvoker#doInvoker

if (isOneway) {
   boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
    currentClient.send(inv, isSent);
    RpcContext.getContext().setFuture(null);
    return new RpcResult();
} else if (isAsync) {
   ResponseFuture future = currentClient.request(inv, timeout) ;
    RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
    return new RpcResult();
} else {
   RpcContext.getContext().setFuture(null);
    return (Result) currentClient.request(inv, timeout).get();
}

这里有三种调用模式,第一种是异步有返回值的调用方式,第二种是异步无返回值的调用方式,第三种是同步调用,这个后面说,我们先看同步调用

public ResponseFuture request(Object request, int timeout) throws RemotingException {
    if (closed) {
        throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
    }
    // create request.
    Request req = new Request();
    req.setVersion("2.0.0");
    req.setTwoWay(true);
    req.setData(request);
    DefaultFuture future = new DefaultFuture(channel, req, timeout);
    try{
        channel.send(req);
    }catch (RemotingException e) {
        future.cancel();
        throw e;
    }
    return future;
}

接下去的调用链

HeaderExchangeChannel#request----->AbstractPeer#send--->NettyChannel#send.

ChannelFuture future = channel.write(message);

message中封装了一些入参信息

--------------------------------------------------

以上是客户端的整体调用链,接下去是服务端接收请求

从NettyHandler#messageReceived方法开始

跟踪到AllChannelHandler#received

 ExecutorService cexecutor = getExecutorService();
    try {
        cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
    } catch (Throwable t) {
        throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
    }
}

这里会获取一个线程池,

线程池中runnable方法

public void run() {
    switch (state) {
        。。。。。
        case RECEIVED:
         。。。。。
                handler.received(channel, message);

。。。。。

handler=DecoderHandler,这关系到编解码的问题

DecoderHandler#received

handler.received(channel, message);

handler=HeaderExchangeHandler

HeaderExchangeHandler#received

Response response = handleRequest(exchangeChannel, request);

这一句话是关键

handleRequest中进入

Object result = handler.reply(channel, msg);

此时的handler=DubboProtocol

if (message instanceof Invocation) {
    Invocation inv = (Invocation) message;
    Invoker<?> invoker = getInvoker(channel, inv);
    //如果是callback 需要处理高版本调用低版本的问题
    if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){
        String methodsStr = invoker.getUrl().getParameters().get("methods");
        boolean hasMethod = false;
        if (methodsStr == null || methodsStr.indexOf(",") == -1){
            hasMethod = inv.getMethodName().equals(methodsStr);
        } else {
            String[] methods = methodsStr.split(",");
            for (String method : methods){
                if (inv.getMethodName().equals(method)){
                    hasMethod = true;
                    break;
                }
            }
        }
        if (!hasMethod){
            logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv );
            return null;
        }
    }
    RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
    return invoker.invoke(inv);

首先获取一个将message封装成Invocation,然后获取一个invoker

.......
DubboExporter<?> exporter = (DubboExporter)this.exporterMap.get(serviceKey);
if (exporter == null) {
    throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + this.exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);
} else {
    return exporter.getInvoker();
}

z这里看到一个exporterMap,这个在之前服务发布的时候提过,可以查看服务发布那一篇

这里拿到一个DubboExporter,然后从DubboExporter中获取一个invoker

此时的invoker也是包含调用链fitler的,和客户端一样

dubbo源码解析(10)dubbo网络通信

最终到达AbstractProxyInvoker#invoke方法

 

return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));

执行到JavassistProxyFactory的这句话

return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);

这是一个反射调用,由于被调用的方法是一个代理类,所以这里就直接回调用到具体方法

回到HeaderExchangeHeadler#received方法

channel.send(response);

这里拿到刚刚返回的response重新发送会客户端,具体过程和客户端发送的原理一模一样

--------------------------------------

接下去看客户端接收服务端返回的结果

大部分过程都一样,区别在HeaderExchangerHeadler#received

由于message属于response,所以走这个分支
else if (message instanceof Response) {
    handleResponse(channel, (Response) message);
} 进入
DefaultFuture.received(channel, response);

这里做了一些动作,是关于异步变同步的问题,这一部分我们下一篇说明

--------------------

本节完

 

相关文章: