先从客户端开始
从FailoverClusterInvoker#doInvoke 得Result result = invoker.invoke(invocation);
会进入InvokerWrapper#invoke
接下去会进入一个链式结构,这是一种责任连模式得体现
ProtocolFilterWrapper#buildInvokerChain
方法中有两个参数,这两个参数记录这当前过滤器和下一个 final Filter filter = filters.get(i); final Invoker<T> next = last;
这里截取consumer端的filter,MyDubboInvoker是自己定义的一个fitler,链式调用一直调用到
AbstractInvoker#invoke
RpcInvocation invocation = (RpcInvocation) inv;这里会封装调用参数
由于这时候的invoke是一个DubboInvoker,因为在服务引用的时候
采用DubboProtocol引用的时候回new一个DubboInvoker
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
我们进入DubboInvoker#doInvoker
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout) ;
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else {
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
这里有三种调用模式,第一种是异步有返回值的调用方式,第二种是异步无返回值的调用方式,第三种是同步调用,这个后面说,我们先看同步调用
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion("2.0.0");
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try{
channel.send(req);
}catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
接下去的调用链
HeaderExchangeChannel#request----->AbstractPeer#send--->NettyChannel#send.
ChannelFuture future = channel.write(message);
message中封装了一些入参信息
--------------------------------------------------
以上是客户端的整体调用链,接下去是服务端接收请求
从NettyHandler#messageReceived方法开始
跟踪到AllChannelHandler#received
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
这里会获取一个线程池,
线程池中runnable方法
public void run() {
switch (state) {
。。。。。
case RECEIVED:
。。。。。
handler.received(channel, message);
。。。。。
handler=DecoderHandler,这关系到编解码的问题
DecoderHandler#received
handler.received(channel, message);
handler=HeaderExchangeHandler
HeaderExchangeHandler#received
Response response = handleRequest(exchangeChannel, request);
这一句话是关键
handleRequest中进入
Object result = handler.reply(channel, msg);
此时的handler=DubboProtocol
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);
//如果是callback 需要处理高版本调用低版本的问题
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || methodsStr.indexOf(",") == -1){
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods){
if (inv.getMethodName().equals(method)){
hasMethod = true;
break;
}
}
}
if (!hasMethod){
logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv );
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
return invoker.invoke(inv);
首先获取一个将message封装成Invocation,然后获取一个invoker
.......
DubboExporter<?> exporter = (DubboExporter)this.exporterMap.get(serviceKey);
if (exporter == null) {
throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + this.exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);
} else {
return exporter.getInvoker();
}
z这里看到一个exporterMap,这个在之前服务发布的时候提过,可以查看服务发布那一篇
这里拿到一个DubboExporter,然后从DubboExporter中获取一个invoker
此时的invoker也是包含调用链fitler的,和客户端一样
最终到达AbstractProxyInvoker#invoke方法
return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));
执行到JavassistProxyFactory的这句话
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
这是一个反射调用,由于被调用的方法是一个代理类,所以这里就直接回调用到具体方法
回到HeaderExchangeHeadler#received方法
channel.send(response);
这里拿到刚刚返回的response重新发送会客户端,具体过程和客户端发送的原理一模一样
--------------------------------------
接下去看客户端接收服务端返回的结果
大部分过程都一样,区别在HeaderExchangerHeadler#received
由于message属于response,所以走这个分支
else if (message instanceof Response) {
handleResponse(channel, (Response) message);
} 进入
DefaultFuture.received(channel, response);
这里做了一些动作,是关于异步变同步的问题,这一部分我们下一篇说明
--------------------
本节完