一、前言
前面学习了请求处理链的RequestProcessor父类,接着学习PrepRequestProcessor,其通常是请求处理链的第一个处理器。
二、PrepRequestProcessor源码分析
2.1 类的继承关系
public class PrepRequestProcessor extends Thread implements RequestProcessor {}
说明:可以看到PrepRequestProcessor继承了Thread类并实现了RequestProcessor接口,表示其可以作为线程使用。
2.2 类的属性
public class PrepRequestProcessor extends Thread implements RequestProcessor { // 日志记录器 private static final Logger LOG = LoggerFactory.getLogger(PrepRequestProcessor.class); // 是否跳过ACL,需查看系统配置 static boolean skipACL; static { skipACL = System.getProperty("zookeeper.skipACL", "no").equals("yes"); if (skipACL) { LOG.info("zookeeper.skipACL==\"yes\", ACL checks will be skipped"); } } /** * this is only for testing purposes. * should never be useed otherwise */ // 仅用作测试使用 private static boolean failCreate = false; // 已提交请求队列 LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>(); // 下个处理器 RequestProcessor nextProcessor; // Zookeeper服务器 ZooKeeperServer zks; }
说明:类的核心属性有submittedRequests和nextProcessor,前者表示已经提交的请求,而后者表示提交的下个处理器。
2.3 类的构造函数
public PrepRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) { // 调用父类Thread构造函数 super("ProcessThread(sid:" + zks.getServerId() + " cport:" + zks.getClientPort() + "):"); // 类属性赋值 this.nextProcessor = nextProcessor; this.zks = zks; }
说明:该构造函数首先会调用父类Thread的构造函数,然后利用构造函数参数给nextProcessor和zks赋值。
2.4 核心函数分析
1. run函数
public void run() { try { while (true) { // 无限循环 // 从队列中取出一个请求 Request request = submittedRequests.take(); // long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK; if (request.type == OpCode.ping) { // 请求类型为PING traceMask = ZooTrace.CLIENT_PING_TRACE_MASK; } if (LOG.isTraceEnabled()) { // 是否可追踪 ZooTrace.logRequest(LOG, traceMask, 'P', request, ""); } if (Request.requestOfDeath == request) { // 在关闭处理器之后,会添加requestOfDeath,表示关闭后不再处理请求 break; } // 调用pRequest函数 pRequest(request); } } catch (InterruptedException e) { // 中断异常 LOG.error("Unexpected interruption", e); } catch (RequestProcessorException e) { // 请求处理异常 if (e.getCause() instanceof XidRolloverException) { LOG.info(e.getCause().getMessage()); } LOG.error("Unexpected exception", e); } catch (Exception e) { // 其他异常 LOG.error("Unexpected exception", e); } LOG.info("PrepRequestProcessor exited loop!"); }
说明:run函数是对Thread类run函数的重写,其核心逻辑相对简单,即不断从队列中取出request进行处理,其会调用pRequest函数,其源码如下
protected void pRequest(Request request) throws RequestProcessorException { // LOG.info("Prep>>> cxid = " + request.cxid + " type = " + // request.type + " id = 0x" + Long.toHexString(request.sessionId)); // 将请求的hdr和txn设置为null request.hdr = null; request.txn = null; try { switch (request.type) { // 确定请求类型 case OpCode.create: // 创建节点请求 // 新生创建节点请求 CreateRequest createRequest = new CreateRequest(); // 处理请求 pRequest2Txn(request.type, zks.getNextZxid(), request, createRequest, true); break; case OpCode.delete: // 删除节点请求 // 新生删除节点请求 DeleteRequest deleteRequest = new DeleteRequest(); // 处理请求 pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true); break; case OpCode.setData: // 设置数据请求 // 新生设置数据请求 SetDataRequest setDataRequest = new SetDataRequest(); // 处理请求 pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true); break; case OpCode.setACL: // 设置ACL请求 // 新生设置ACL请求 SetACLRequest setAclRequest = new SetACLRequest(); // 处理请求 pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest, true); break; case OpCode.check: // 检查版本请求 // 新生检查版本请求 CheckVersionRequest checkRequest = new CheckVersionRequest(); // 处理请求 pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest, true); break; case OpCode.multi: // 多重请求 // 新生多重请求 MultiTransactionRecord multiRequest = new MultiTransactionRecord(); try { // 将ByteBuffer转化为Record ByteBufferInputStream.byteBuffer2Record(request.request, multiRequest); } catch(IOException e) { // 出现异常则重新生成Txn头 request.hdr = new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(), zks.getTime(), OpCode.multi); throw e; } List<Txn> txns = new ArrayList<Txn>(); //Each op in a multi-op must have the same zxid! long zxid = zks.getNextZxid(); KeeperException ke = null; //Store off current pending change records in case we need to rollback // 存储当前挂起的更改记录,以防我们需要回滚 HashMap<String, ChangeRecord> pendingChanges = getPendingChanges(multiRequest); int index = 0; for(Op op: multiRequest) { // 遍历请求 Record subrequest = op.toRequestRecord() ; /* If we've already failed one of the ops, don't bother * trying the rest as we know it's going to fail and it * would be confusing in the logfiles. */ if (ke != null) { // 发生了异常 request.hdr.setType(OpCode.error); request.txn = new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue()); } /* Prep the request and convert to a Txn */ else { // 未发生异常 try { // 将Request转化为Txn pRequest2Txn(op.getType(), zxid, request, subrequest, false); } catch (KeeperException e) { // 转化发生异常 if (ke == null) { ke = e; } // 设置请求头的类型 request.hdr.setType(OpCode.error); // 设置请求的Txn request.txn = new ErrorTxn(e.code().intValue()); LOG.info("Got user-level KeeperException when processing " + request.toString() + " aborting remaining multi ops." + " Error Path:" + e.getPath() + " Error:" + e.getMessage()); // 设置异常 request.setException(e); /* Rollback change records from failed multi-op */ // 从多重操作中回滚更改记录 rollbackPendingChanges(zxid, pendingChanges); } } //FIXME: I don't want to have to serialize it here and then // immediately deserialize in next processor. But I'm // not sure how else to get the txn stored into our list. // 序列化 ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); request.txn.serialize(boa, "request") ; ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray()); txns.add(new Txn(request.hdr.getType(), bb.array())); index++; } // 给请求头赋值 request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid, zks.getTime(), request.type); // 设置请求的Txn request.txn = new MultiTxn(txns); break; //create/close session don't require request record case OpCode.createSession: // 创建会话请求 case OpCode.closeSession: // 关闭会话请求 pRequest2Txn(request.type, zks.getNextZxid(), request, null, true); break; //All the rest don't need to create a Txn - just verify session // 所有以下请求只需验证会话即可 case OpCode.sync: case OpCode.exists: case OpCode.getData: case OpCode.getACL: case OpCode.getChildren: case OpCode.getChildren2: case OpCode.ping: case OpCode.setWatches: zks.sessionTracker.checkSession(request.sessionId, request.getOwner()); break; } } catch (KeeperException e) { // 发生KeeperException异常 if (request.hdr != null) { request.hdr.setType(OpCode.error); request.txn = new ErrorTxn(e.code().intValue()); } LOG.info("Got user-level KeeperException when processing " + request.toString() + " Error Path:" + e.getPath() + " Error:" + e.getMessage()); request.setException(e); } catch (Exception e) { // 其他异常 // log at error level as we are returning a marshalling // error to the user LOG.error("Failed to process " + request, e); StringBuilder sb = new StringBuilder(); ByteBuffer bb = request.request; if(bb != null){ bb.rewind(); while (bb.hasRemaining()) { sb.append(Integer.toHexString(bb.get() & 0xff)); } } else { sb.append("request buffer is null"); } LOG.error("Dumping request buffer: 0x" + sb.toString()); if (request.hdr != null) { request.hdr.setType(OpCode.error); request.txn = new ErrorTxn(Code.MARSHALLINGERROR.intValue()); } } // 给请求的zxid赋值 request.zxid = zks.getZxid(); // 传递给下个处理器进行处理 nextProcessor.processRequest(request); }