分析对象: 
hadoop版本:hadoop 0.20.203.0

必备技术点: 
1. 动态代理(参考 :http://www.cnblogs.com/sh425/p/6893662.html )
2. Java NIO(参考 :http://www.cnblogs.com/sh425/p/6893501.html )
3. Java网络编程

目录: 
一.RPC协议
二.ipc.RPC源码分析
三.ipc.Client源码分析
四.ipc.Server源码分析 

分析: 

一.RPC协议 

在分析协议之前,我觉得我们很有必要先搞清楚协议是什么。下面我就谈一点自己的认识吧。如果你学过java的网络编程,你一定知道:当客户端发送一个字节给服务端时,服务端必须也要有一个读字节的方法在阻塞等待;反之亦然。 这种我把它称为底层的通信协议。可是对于一个大型的网络通信系统来说,很显然这种说法的协议粒度太小,不方便我们理解整个网络通信的流程及架构,所以我造了个说法:架构层次的协议。通俗一点说,就是我把某些接口和接口中的方法称为协议,客户端和服务端只要实现这些接口中的方法就可以进行通信了,从这个角度来说,架构层次协议的说法就可以成立了(注:如果从架构层次的协议来分析系统,我们就先不要太在意方法的具体实现,呵呵,我相信你懂得~)。

Hadoop的RPC机制正是采用了这种“架构层次的协议”,有一整套作为协议的接口。如图:

 

 

下面就几个重点的协议介绍一下吧:

 

VersionedProtocol :它是所有RPC协议接口的父接口,其中只有一个方法:getProtocolVersion()

(1)HDFS相关 
ClientDatanodeProtocol :一个客户端和datanode之间的协议接口,用于数据块恢复
ClientProtocol :client与Namenode交互的接口,所有控制流的请求均在这里,如:创建文件、删除文件等;
DatanodeProtocol : Datanode与Namenode交互的接口,如心跳、blockreport等;
NamenodeProtocol :SecondaryNode与Namenode交互的接口。

(2)Mapreduce相关 
InterDatanodeProtocol :Datanode内部交互的接口,用来更新block的元数据;
InnerTrackerProtocol :TaskTracker与JobTracker交互的接口,功能与DatanodeProtocol相似;
JobSubmissionProtocol :JobClient与JobTracker交互的接口,用来提交Job、获得Job等与Job相关的操作;
TaskUmbilicalProtocol :Task中子进程与母进程交互的接口,子进程即map、reduce等操作,母进程即TaskTracker,该接口可以回报子进程的运行状态(词汇扫盲: umbilical 脐带的, 关系亲密的) 。

 

一下子罗列了这么多的协议,有些人可能要问了,hadoop是怎么使用它们的呢?呵呵,不要着急哦,其实本篇博客所分析的是hadoop的RPC机制底层的具体实现,而这些协议却是应用层上的东西,比如hadoop是怎么样保持“心跳”的啊。所以在我的下一篇博客:源码级分析hadoop的心跳机制中会详细说明以上协议是怎样被使用的。尽请期待哦~。现在就开始我们的RPC源码之旅吧•••

二.ipc.RPC源码分析 

ipc.RPC类中有一些内部类,为了大家对RPC类有个初步的印象,就先罗列几个我们感兴趣的分析一下吧:

 

Invocation :用于封装方法名和参数,作为数据传输层,相当于VO吧。
ClientCache :用于存储client对象,用socket factory作为hash key,存储结构为hashMap <SocketFactory, Client>。
Invoker :是动态代理中的调用实现类,继承了InvocationHandler.
Server :是ipc.Server的实现类。

 

从以上的分析可以知道,Invocation类仅作为VO,ClientCache类只是作为缓存,而Server类用于服务端的处理,他们都和客户端的数据流和业务逻辑没有关系。现在就只剩下Invoker类了。如果你对动态代理(参考:http://weixiaolu.iteye.com/blog/1477774 )比较了解的话,你一下就会想到,我们接下来去研究的就是RPC.Invoker类中的invoke()方法了。代码如下:

代码一:

  • public Object invoke(Object proxy, Method method, Object[] args)  
  •   throws Throwable {  
  •   •••  
  •   ObjectWritable value = (ObjectWritable)  
  •     client.call(new Invocation(method, args), remoteId);  
  •   •••  
  •   return value.get();  
  • }  
  •  

    呵呵,如果你发现这个invoke()方法实现的有些奇怪的话,那你就对了。一般我们看到的动态代理的invoke()方法中总会有 method.invoke(ac, arg);  这句代码。而上面代码中却没有,这是为什么呢?其实使用 method.invoke(ac, arg); 是在本地JVM中调用;而在hadoop中,是将数据发送给服务端,服务端将处理的结果再返回给客户端,所以这里的invoke()方法必然需要进行网络通信。而网络通信就是下面的这段代码实现的:

    代码二:

  • ObjectWritable value = (ObjectWritable)  
  • client.call(new Invocation(method, args), remoteId);  
  •  

    Invocation类在这里封装了方法名和参数,充当VO。其实这里网络通信只是调用了Client类的call()方法。那我们接下来分析一下ipc.Client源码吧。不过在分析ipc.Client源码之前,为了不让我们像盲目的苍蝇一样乱撞,我想先确定一下我们分析的目的是什么,我总结出了三点需要解决的问题:


    1. 客户端和服务端的连接是怎样建立的?
    2. 客户端是怎样给服务端发送数据的?
    3. 客户端是怎样获取服务端的返回数据的?


    基于以上三个问题,我们开始吧!!!

    三.ipc.Client源码分析 

    同样,为了对Client类有个初步的了解,我们也先罗列几个我们感兴趣的内部类:

     

    Call :用于封装Invocation对象,作为VO,写到服务端,同时也用于存储从服务端返回的数据
    Connection :用以处理远程连接对象。继承了Thread
    ConnectionId :唯一确定一个连接

     

    问题1:客户端和服务端的连接是怎样建立的? 

    下面我们来看看Client类中的cal()方法吧:

    代码三:

  • public Writable call(Writable param, ConnectionId remoteId)    
  •                        throws InterruptedException, IOException {  
  •     Call call = new Call(param);       //将传入的数据封装成call对象  
  •     Connection connection = getConnection(remoteId, call);   //获得一个连接  
  •     connection.sendParam(call);     // 向服务端发送call对象  
  •     boolean interrupted = false;  
  •     synchronized (call) {  
  •       while (!call.done) {  
  •         try {  
  •           call.wait(); // 等待结果的返回,在Call类的callComplete()方法里有notify()方法用于唤醒线程  
  •         } catch (InterruptedException ie) {  
  •           // 因中断异常而终止,设置标志interrupted为true  
  •           interrupted = true;  
  •         }  
  •       }  
  •       if (interrupted) {  
  •         Thread.currentThread().interrupt();  
  •       }  
  •   
  •       if (call.error != null) {  
  •         if (call.error instanceof RemoteException) {  
  •           call.error.fillInStackTrace();  
  •           throw call.error;  
  •         } else { // 本地异常  
  •           throw wrapException(remoteId.getAddress(), call.error);  
  •         }  
  •       } else {  
  •         return call.value; //返回结果数据  
  •       }  
  •     }  
  •   }  
  •  

    具体代码的作用我已做了注释,所以这里不再赘述。但到目前为止,你依然不知道RPC机制底层的网络连接是怎么建立的。呵呵,那我们只好再去深究了,分析代码后,我们会发现和网络通信有关的代码只会是下面的两句了:

    代码四:

  • Connection connection = getConnection(remoteId, call);   //获得一个连接  
  • connection.sendParam(call);      // 向服务端发送call对象  
  •  

    先看看是怎么获得一个到服务端的连接吧,下面贴出ipc.Client类中的getConnection()方法。

    代码五:

  • private Connection getConnection(ConnectionId remoteId,  
  •                                    Call call)  
  •                                    throws IOException, InterruptedException {  
  •     if (!running.get()) {  
  •       // 如果client关闭了  
  •       throw new IOException("The client is stopped");  
  •     }  
  •     Connection connection;  
  • //如果connections连接池中有对应的连接对象,就不需重新创建了;如果没有就需重新创建一个连接对象。  
  • //但请注意,该//连接对象只是存储了remoteId的信息,其实还并没有和服务端建立连接。  
  •     do {  
  •       synchronized (connections) {  
  •         connection = connections.get(remoteId);  
  •         if (connection == null) {  
  •           connection = new Connection(remoteId);  
  •           connections.put(remoteId, connection);  
  •         }  
  •       }  
  •     } while (!connection.addCall(call)); //将call对象放入对应连接中的calls池,就不贴出源码了  
  •    //这句代码才是真正的完成了和服务端建立连接哦~  
  •     connection.setupIOstreams();  
  •     return connection;  
  •   }  
  •  

    如果你还有兴趣继续分析下去,那我们就一探建立连接的过程吧,下面贴出Client.Connection类中的setupIOstreams()方法:

    代码六:

  • private synchronized void setupIOstreams() throws InterruptedException {  
  •  •••  
  •     try {  
  •      •••  
  •       while (true) {  
  •         setupConnection();  //建立连接  
  •         InputStream inStream = NetUtils.getInputStream(socket);     //获得输入流  
  •         OutputStream outStream = NetUtils.getOutputStream(socket);  //获得输出流  
  •         writeRpcHeader(outStream);  
  •         •••  
  •         this.in = new DataInputStream(new BufferedInputStream  
  •             (new PingInputStream(inStream)));   //将输入流装饰成DataInputStream  
  •         this.out = new DataOutputStream  
  •         (new BufferedOutputStream(outStream));   //将输出流装饰成DataOutputStream  
  •         writeHeader();  
  •         // 跟新活动时间  
  •         touch();  
  •         //当连接建立时,启动接受线程等待服务端传回数据,注意:Connection继承了Tread  
  •         start();  
  •         return;  
  •       }  
  •     } catch (IOException e) {  
  •       markClosed(e);  
  •       close();  
  •     }  
  •   }  
  •  

    再有一步我们就知道客户端的连接是怎么建立的啦,下面贴出Client.Connection类中的setupConnection()方法:

    代码七:

  • private synchronized void setupConnection() throws IOException {  
  •     short ioFailures = 0;  
  •     short timeoutFailures = 0;  
  •     while (true) {  
  •       try {  
  •         this.socket = socketFactory.createSocket(); //终于看到创建socket的方法了  
  •         this.socket.setTcpNoDelay(tcpNoDelay);  
  •        •••  
  •         // 设置连接超时为20s  
  •         NetUtils.connect(this.socket, remoteId.getAddress(), 20000);  
  •         this.socket.setSoTimeout(pingInterval);  
  •         return;  
  •       } catch (SocketTimeoutException toe) {  
  •         /* 设置最多连接重试为45次。 
  •          * 总共有20s*45 = 15 分钟的重试时间。 
  •          */  
  •         handleConnectionFailure(timeoutFailures++, 45, toe);  
  •       } catch (IOException ie) {  
  •         handleConnectionFailure(ioFailures++, maxRetries, ie);  
  •       }  
  •     }  
  •   }  
  •  

    终于,我们知道了客户端的连接是怎样建立的了,其实就是创建一个普通的socket进行通信。呵呵,那服务端是不是也是创建一个ServerSocket进行通信的呢?呵呵,先不要急,到这里我们只解决了客户端的第一个问题,下面还有两个问题没有解决呢,我们一个一个地来解决吧。

    问题2:客户端是怎样给服务端发送数据的? 

    我们回顾一下代码四吧。第一句为了完成连接的建立,我们已经分析完毕;而第二句是为了发送数据,呵呵,分析下去,看能不能解决我们的问题呢。下面贴出Client.Connection类的sendParam()方法吧:

    代码八:

  • public void sendParam(Call call) {  
  •       if (shouldCloseConnection.get()) {  
  •         return;  
  •       }  
  •       DataOutputBuffer d=null;  
  •       try {  
  •         synchronized (this.out) {  
  •           if (LOG.isDebugEnabled())  
  •             LOG.debug(getName() + " sending #" + call.id);  
  •           //创建一个缓冲区  
  •           d = new DataOutputBuffer();  
  •           d.writeInt(call.id);  
  •           call.param.write(d);  
  •           byte[] data = d.getData();  
  •           int dataLength = d.getLength();  
  •           out.writeInt(dataLength);        //首先写出数据的长度  
  •           out.write(data, 0, dataLength); //向服务端写数据  
  •           out.flush();  
  •         }  
  •       } catch(IOException e) {  
  •         markClosed(e);  
  •       } finally {  
  •         IOUtils.closeStream(d);  
  •       }  
  •     }    
  •  

    其实这就是java io的socket发送数据的一般过程哦,没有什么特别之处。到这里问题二也解决了,来看看问题三吧。

    问题3:客户端是怎样获取服务端的返回数据的? 

    我们再回顾一下代码六吧。代码六中,当连接建立时会启动一个线程用于处理服务端返回的数据,我们看看这个处理线程是怎么实现的吧,下面贴出Client.Connection类和Client.Call类中的相关方法吧:

    代码九:

  • 方法一:    
  •   public void run() {  
  •       •••  
  •       while (waitForWork()) {  
  •         receiveResponse();  //具体的处理方法  
  •       }  
  •       close();  
  •      •••  
  • }  
  •   
  • 方法二:  
  • private void receiveResponse() {  
  •       if (shouldCloseConnection.get()) {  
  •         return;  
  •       }  
  •       touch();  
  •       try {  
  •         int id = in.readInt();                    // 阻塞读取id  
  •         if (LOG.isDebugEnabled())  
  •           LOG.debug(getName() + " got value #" + id);  
  •           Call call = calls.get(id);    //在calls池中找到发送时的那个对象  
  •         int state = in.readInt();     // 阻塞读取call对象的状态  
  •         if (state == Status.SUCCESS.state) {  
  •           Writable value = ReflectionUtils.newInstance(valueClass, conf);  
  •           value.readFields(in);           // 读取数据  
  •         //将读取到的值赋给call对象,同时唤醒Client等待线程,贴出setValue()代码方法三  
  •           call.setValue(value);                
  •           calls.remove(id);               //删除已处理的call      
  •         } else if (state == Status.ERROR.state) {  
  •         •••  
  •         } else if (state == Status.FATAL.state) {  
  •         •••  
  •         }  
  •       } catch (IOException e) {  
  •         markClosed(e);  
  •       }  
  • }  
  •   
  • 方法三:  
  • public synchronized void setValue(Writable value) {  
  •       this.value = value;  
  •       callComplete();   //具体实现  
  • }  
  • protected synchronized void callComplete() {  
  •       this.done = true;  
  •       notify();         // 唤醒client等待线程  
  •     }  
  •  

    代码九完成的功能主要是:启动一个处理线程,读取从服务端传来的call对象,将call对象读取完毕后,唤醒client处理线程。就这么简单,客户端就获取了服务端返回的数据了哦~。客户端的源码分析就到这里了哦,下面我们来分析Server端的源码吧。

    四.ipc.Server源码分析 

    同样,为了让大家对ipc.Server有个初步的了解,我们先分析一下它的几个内部类吧:

     

    Call :用于存储客户端发来的请求
    Listener : 监听类,用于监听客户端发来的请求,同时Listener内部还有一个静态类,Listener.Reader,当监听器监听到用户请求,便让Reader读取用户请求。
    Responder :响应RPC请求类,请求处理完毕,由Responder发送给请求客户端。
    Connection :连接类,真正的客户端请求读取逻辑在这个类中。
    Handler :请求处理类,会循环阻塞读取callQueue中的call对象,并对其进行操作。

     

    如果你看过ipc.Server的源码,你会发现其实ipc.Server是一个abstract修饰的抽象类。那随之而来的问题就是:hadoop是怎样初始化RPC的Server端的呢?这个问题着实也让我想了好长时间。不过后来我想到Namenode初始化时一定初始化了RPC的Sever端,那我们去看看Namenode的初始化源码吧:

    1. 初始化Server


    代码十:

  • private void initialize(Configuration conf) throws IOException {  
  •    •••  
  •     // 创建 rpc server  
  •     InetSocketAddress dnSocketAddr = getServiceRpcServerAddress(conf);  
  •     if (dnSocketAddr != null) {  
  •       int serviceHandlerCount =  
  •         conf.getInt(DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,  
  •                     DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);  
  •       //获得serviceRpcServer  
  •       this.serviceRpcServer = RPC.getServer(this, dnSocketAddr.getHostName(),   
  •           dnSocketAddr.getPort(), serviceHandlerCount,  
  •           false, conf, namesystem.getDelegationTokenSecretManager());  
  •       this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();  
  •       setRpcServiceServerAddress(conf);  
  • }  
  • //获得server  
  •     this.server = RPC.getServer(this, socAddr.getHostName(),  
  •         socAddr.getPort(), handlerCount, false, conf, namesystem  
  •         .getDelegationTokenSecretManager());  
  •   
  •    •••  
  •     this.server.start();  //启动 RPC server   Clients只允许连接该server  
  •     if (serviceRpcServer != null) {  
  •       serviceRpcServer.start();  //启动 RPC serviceRpcServer 为HDFS服务的server  
  •     }  
  •     startTrashEmptier(conf);  
  •   }  
  •  

    查看Namenode初始化源码得知:RPC的server对象是通过ipc.RPC类的getServer()方法获得的。下面咱们去看看ipc.RPC类中的getServer()源码吧:

    代码十一:

  • public static Server getServer(final Object instance, final String bindAddress, final int port,  
  •                                  final int numHandlers,  
  •                                  final boolean verbose, Configuration conf,  
  •                                  SecretManager<? extends TokenIdentifier> secretManager)   
  •     throws IOException {  
  •     return new Server(instance, conf, bindAddress, port, numHandlers, verbose, secretManager);  
  •   }  
  •  

    这时我们发现getServer()是一个创建Server对象的工厂方法,但创建的却是RPC.Server类的对象。哈哈,现在你明白了我前面说的“RPC.Server是ipc.Server的实现类”了吧。不过RPC.Server的构造函数还是调用了ipc.Server类的构造函数的,因篇幅所限,就不贴出相关源码了。

    2. 运行Server 
    如代码十所示,初始化Server后,Server端就运行起来了,看看ipc.Server的start()源码吧:

    代码十二:

  • /** 启动服务 */  
  • public synchronized void start() {  
  •   responder.start();  //启动responder  
  •   listener.start();   //启动listener  
  •   handlers = new Handler[handlerCount];  
  •     
  •   for (int i = 0; i < handlerCount; i++) {  
  •     handlers[i] = new Handler(i);  
  •     handlers[i].start();   //逐个启动Handler  
  •   }  
  • }  
  •  

    3. Server处理请求 

    1)建立连接 
    分析过ipc.Client源码后,我们知道Client端的底层通信直接采用了阻塞式IO编程,当时我们曾做出猜测:Server端是不是也采用了阻塞式IO。现在我们仔细地分析一下吧,如果Server端也采用阻塞式IO,当连接进来的Client端很多时,势必会影响Server端的性能。hadoop的实现者们考虑到了这点,所以他们采用了java  NIO来实现Server端,java NIO可参考博客:http://weixiaolu.iteye.com/blog/1479656  。那Server端采用java NIO是怎么建立连接的呢?分析源码得知,Server端采用Listener监听客户端的连接,下面先分析一下Listener的构造函数吧:

    代码十三:

  • public Listener() throws IOException {  
  •   address = new InetSocketAddress(bindAddress, port);  
  •   // 创建ServerSocketChannel,并设置成非阻塞式  
  •   acceptChannel = ServerSocketChannel.open();  
  •   acceptChannel.configureBlocking(false);  
  •   
  •   // 将server socket绑定到本地端口  
  •   bind(acceptChannel.socket(), address, backlogLength);  
  •   port = acceptChannel.socket().getLocalPort();   
  •   // 获得一个selector  
  •   selector= Selector.open();  
  •   readers = new Reader[readThreads];  
  •   readPool = Executors.newFixedThreadPool(readThreads);  
  •   //启动多个reader线程,为了防止请求多时服务端响应延时的问题  
  •   for (int i = 0; i < readThreads; i++) {         
  •     Selector readSelector = Selector.open();  
  •     Reader reader = new Reader(readSelector);  
  •     readers[i] = reader;  
  •     readPool.execute(reader);  
  •   }  
  •   // 注册连接事件  
  •   acceptChannel.register(selector, SelectionKey.OP_ACCEPT);  
  •   this.setName("IPC Server listener on " + port);  
  •   this.setDaemon(true);  
  • }  
  •  

    在启动Listener线程时,服务端会一直等待客户端的连接,下面贴出Server.Listener类的run()方法:

    代码十四:

  • public void run() {  
  •    •••  
  •     while (running) {  
  •       SelectionKey key = null;  
  •       try {  
  •         selector.select();  
  •         Iterator<SelectionKey> iter = selector.selectedKeys().iterator();  
  •         while (iter.hasNext()) {  
  •           key = iter.next();  
  •           iter.remove();  
  •           try {  
  •             if (key.isValid()) {  
  •               if (key.isAcceptable())  
  •                 doAccept(key);     //具体的连接方法  
  •             }  
  •           } catch (IOException e) {  
  •           }  
  •           key = null;  
  •         }  
  •       } catch (OutOfMemoryError e) {  
  •      •••           
  •   }  
  •  

    下面贴出Server.Listener类中doAccept ()方法中的关键源码吧:

    代码十五:

  •     void doAccept(SelectionKey key) throws IOException,  OutOfMemoryError {  
  •       Connection c = null;  
  •       ServerSocketChannel server = (ServerSocketChannel) key.channel();  
  •       SocketChannel channel;  
  •       while ((channel = server.accept()) != null) { //建立连接  
  •         channel.configureBlocking(false);  
  •         channel.socket().setTcpNoDelay(tcpNoDelay);  
  •         Reader reader = getReader();  //从readers池中获得一个reader  
  •         try {  
  •           reader.startAdd(); // 激活readSelector,设置adding为true  
  •           SelectionKey readKey = reader.registerChannel(channel);//将读事件设置成兴趣事件  
  •           c = new Connection(readKey, channel, System.currentTimeMillis());//创建一个连接对象  
  •           readKey.attach(c);   //将connection对象注入readKey  
  •           synchronized (connectionList) {  
  •             connectionList.add(numConnections, c);  
  •             numConnections++;  
  •           }  
  •         •••   
  •         } finally {  
  • //设置adding为false,采用notify()唤醒一个reader,其实代码十三中启动的每个reader都使  
  • //用了wait()方法等待。因篇幅有限,就不贴出源码了。  
  •           reader.finishAdd();  
  •         }  
  •       }  
  •     }  
  •  

    当reader被唤醒,reader接着执行doRead()方法。

    2)接收请求 
    下面贴出Server.Listener.Reader类中的doRead()方法和Server.Connection类中的readAndProcess()方法源码:

    代码十六:

  • 方法一:     
  •  void doRead(SelectionKey key) throws InterruptedException {  
  •       int count = 0;  
  •       Connection c = (Connection)key.attachment();  //获得connection对象  
  •       if (c == null) {  
  •         return;    
  •       }  
  •       c.setLastContact(System.currentTimeMillis());  
  •       try {  
  •         count = c.readAndProcess();    // 接受并处理请求    
  •       } catch (InterruptedException ieo) {  
  •        •••  
  •       }  
  •      •••      
  • }  
  •   
  • 方法二:  
  • public int readAndProcess() throws IOException, InterruptedException {  
  •       while (true) {  
  •         •••  
  •         if (!rpcHeaderRead) {  
  •           if (rpcHeaderBuffer == null) {  
  •             rpcHeaderBuffer = ByteBuffer.allocate(2);  
  •           }  
  •          //读取请求头  
  •           count = channelRead(channel, rpcHeaderBuffer);  
  •           if (count < 0 || rpcHeaderBuffer.remaining() > 0) {  
  •             return count;  
  •           }  
  •         // 读取请求版本号    
  •           int version = rpcHeaderBuffer.get(0);  
  •           byte[] method = new byte[] {rpcHeaderBuffer.get(1)};  
  •         •••    
  •          
  •           data = ByteBuffer.allocate(dataLength);  
  •         }  
  •         // 读取请求    
  •         count = channelRead(channel, data);  
  •           
  •         if (data.remaining() == 0) {  
  •          •••  
  •           if (useSasl) {  
  •          •••  
  •           } else {  
  •             processOneRpc(data.array());//处理请求  
  •           }  
  •         •••  
  •           }  
  •         }   
  •         return count;  
  •       }  
  •     }  
  •  

    3)获得call对象 
    下面贴出Server.Connection类中的processOneRpc()方法和processData()方法的源码。

    代码十七:

  • 方法一:     
  •  private void processOneRpc(byte[] buf) throws IOException,  
  •         InterruptedException {  
  •       if (headerRead) {  
  •         processData(buf);  
  •       } else {  
  •         processHeader(buf);  
  •         headerRead = true;  
  •         if (!authorizeConnection()) {  
  •           throw new AccessControlException("Connection from " + this  
  •               + " for protocol " + header.getProtocol()  
  •               + " is unauthorized for user " + user);  
  •         }  
  •       }  
  • }  
  • 方法二:  
  •     private void processData(byte[] buf) throws  IOException, InterruptedException {  
  •       DataInputStream dis =  
  •         new DataInputStream(new ByteArrayInputStream(buf));  
  •       int id = dis.readInt();      // 尝试读取id  
  •       Writable param = ReflectionUtils.newInstance(paramClass, conf);//读取参数  
  •       param.readFields(dis);          
  •           
  •       Call call = new Call(id, param, this);  //封装成call  
  •       callQueue.put(call);   // 将call存入callQueue  
  •       incRpcCount();  // 增加rpc请求的计数  
  •     }  
  •  

    4)处理call对象 
    你还记得Server类中还有个Handler内部类吗?呵呵,对call对象的处理就是它干的。下面贴出Server.Handler类中run()方法中的关键代码:

    代码十八:

  • while (running) {  
  •       try {  
  •         final Call call = callQueue.take(); //弹出call,可能会阻塞  
  •         •••  
  •         //调用ipc.Server类中的call()方法,但该call()方法是抽象方法,具体实现在RPC.Server类中  
  •         value = call(call.connection.protocol, call.param, call.timestamp);  
  •         synchronized (call.connection.responseQueue) {  
  •           setupResponse(buf, call,   
  •                       (error == null) ? Status.SUCCESS : Status.ERROR,   
  •                       value, errorClass, error);  
  •            •••  
  •           //给客户端响应请求  
  •           responder.doRespond(call);  
  •         }  
  • }  
  •  

     

    5)返回请求 
    下面贴出Server.Responder类中的doRespond()方法源码: 

    代码十九:

  • 方法一:     
  •  void doRespond(Call call) throws IOException {  
  •       synchronized (call.connection.responseQueue) {  
  •         call.connection.responseQueue.addLast(call);  
  •         if (call.connection.responseQueue.size() == 1) {  
  •            // 返回响应结果,并激活writeSelector  
  •           processResponse(call.connection.responseQueue, true);  
  •         }  
  •       }  
  • }  
  •  

    小结:


    到这里,hadoop RPC机制的源码分析就结束了。

     

    转自:http://www.cnblogs.com/hd-zg/p/6010824.html

    相关文章: