一、前言

  前面学习了请求处理链的RequestProcessor父类,接着学习PrepRequestProcessor,其通常是请求处理链的第一个处理器。

二、PrepRequestProcessor源码分析

  2.1 类的继承关系  

public class PrepRequestProcessor extends Thread implements RequestProcessor {}

  说明:可以看到PrepRequestProcessor继承了Thread类并实现了RequestProcessor接口,表示其可以作为线程使用。

  2.2 类的属性

public class PrepRequestProcessor extends Thread implements RequestProcessor {
    // 日志记录器
    private static final Logger LOG = LoggerFactory.getLogger(PrepRequestProcessor.class);

    // 是否跳过ACL,需查看系统配置
    static boolean skipACL;
    static {
        skipACL = System.getProperty("zookeeper.skipACL", "no").equals("yes");
        if (skipACL) {
            LOG.info("zookeeper.skipACL==\"yes\", ACL checks will be skipped");
        }
    }

    /**
     * this is only for testing purposes.
     * should never be useed otherwise
     */
    // 仅用作测试使用
    private static  boolean failCreate = false;

    // 已提交请求队列
    LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>();

    // 下个处理器
    RequestProcessor nextProcessor;

    // Zookeeper服务器
    ZooKeeperServer zks;
}

  说明:类的核心属性有submittedRequests和nextProcessor,前者表示已经提交的请求,而后者表示提交的下个处理器。

  2.3 类的构造函数  

    public PrepRequestProcessor(ZooKeeperServer zks,
            RequestProcessor nextProcessor) {
        // 调用父类Thread构造函数
        super("ProcessThread(sid:" + zks.getServerId()
                + " cport:" + zks.getClientPort() + "):");
        // 类属性赋值
        this.nextProcessor = nextProcessor;
        this.zks = zks;
    }

  说明:该构造函数首先会调用父类Thread的构造函数,然后利用构造函数参数给nextProcessor和zks赋值。

  2.4 核心函数分析

  1. run函数 

    public void run() {
        try {
            while (true) { // 无限循环
                // 从队列中取出一个请求
                Request request = submittedRequests.take();
                // 
                long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
                if (request.type == OpCode.ping) { // 请求类型为PING
                    traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
                }
                if (LOG.isTraceEnabled()) { // 是否可追踪
                    ZooTrace.logRequest(LOG, traceMask, 'P', request, "");
                }
                if (Request.requestOfDeath == request) { // 在关闭处理器之后,会添加requestOfDeath,表示关闭后不再处理请求
                    break;
                }
                // 调用pRequest函数
                pRequest(request);
            }
        } catch (InterruptedException e) { // 中断异常
            LOG.error("Unexpected interruption", e);
        } catch (RequestProcessorException e) { // 请求处理异常
            if (e.getCause() instanceof XidRolloverException) {
                LOG.info(e.getCause().getMessage());
            }
            LOG.error("Unexpected exception", e);
        } catch (Exception e) { // 其他异常
            LOG.error("Unexpected exception", e);
        }
        LOG.info("PrepRequestProcessor exited loop!");
    }

  说明:run函数是对Thread类run函数的重写,其核心逻辑相对简单,即不断从队列中取出request进行处理,其会调用pRequest函数,其源码如下  

    protected void pRequest(Request request) throws RequestProcessorException {
        // LOG.info("Prep>>> cxid = " + request.cxid + " type = " +
        // request.type + " id = 0x" + Long.toHexString(request.sessionId));
        // 将请求的hdr和txn设置为null
        request.hdr = null;
        request.txn = null;
        
        try {
            switch (request.type) { // 确定请求类型
                case OpCode.create: // 创建节点请求
                // 新生创建节点请求
                CreateRequest createRequest = new CreateRequest();
                // 处理请求
                pRequest2Txn(request.type, zks.getNextZxid(), request, createRequest, true);
                break;
            case OpCode.delete: // 删除节点请求
                // 新生删除节点请求
                DeleteRequest deleteRequest = new DeleteRequest();               
                // 处理请求
                pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true);
                break;
            case OpCode.setData: // 设置数据请求
                // 新生设置数据请求
                SetDataRequest setDataRequest = new SetDataRequest();                
                // 处理请求
                pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true);
                break;
            case OpCode.setACL: // 设置ACL请求
                // 新生设置ACL请求
                SetACLRequest setAclRequest = new SetACLRequest();                
                // 处理请求
                pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest, true);
                break;
            case OpCode.check: // 检查版本请求
                // 新生检查版本请求
                CheckVersionRequest checkRequest = new CheckVersionRequest();   
                // 处理请求
                pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest, true);
                break;
            case OpCode.multi: // 多重请求
                // 新生多重请求
                MultiTransactionRecord multiRequest = new MultiTransactionRecord();
                try {
                    // 将ByteBuffer转化为Record
                    ByteBufferInputStream.byteBuffer2Record(request.request, multiRequest);
                } catch(IOException e) {
                   // 出现异常则重新生成Txn头
                   request.hdr =  new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(),
                            zks.getTime(), OpCode.multi);
                   throw e;
                }
                List<Txn> txns = new ArrayList<Txn>();
                //Each op in a multi-op must have the same zxid!
                long zxid = zks.getNextZxid();
                KeeperException ke = null;

                //Store off current pending change records in case we need to rollback
                // 存储当前挂起的更改记录,以防我们需要回滚
                HashMap<String, ChangeRecord> pendingChanges = getPendingChanges(multiRequest);

                int index = 0;
                for(Op op: multiRequest) { // 遍历请求
                    Record subrequest = op.toRequestRecord() ;

                    /* If we've already failed one of the ops, don't bother
                     * trying the rest as we know it's going to fail and it
                     * would be confusing in the logfiles.
                     */
                    if (ke != null) { // 发生了异常
                        request.hdr.setType(OpCode.error);
                        request.txn = new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue());
                    } 
                    
                    /* Prep the request and convert to a Txn */
                    else { // 未发生异常
                        try {
                            // 将Request转化为Txn
                            pRequest2Txn(op.getType(), zxid, request, subrequest, false);
                        } catch (KeeperException e) { // 转化发生异常
                            if (ke == null) {
                                ke = e;
                            }
                            // 设置请求头的类型
                            request.hdr.setType(OpCode.error);
                            // 设置请求的Txn
                            request.txn = new ErrorTxn(e.code().intValue());
                            LOG.info("Got user-level KeeperException when processing "
                                    + request.toString() + " aborting remaining multi ops."
                                    + " Error Path:" + e.getPath()
                                    + " Error:" + e.getMessage());
                            // 设置异常
                            request.setException(e);
 
                            /* Rollback change records from failed multi-op */
                            // 从多重操作中回滚更改记录
                            rollbackPendingChanges(zxid, pendingChanges);
                        }
                    }

                    //FIXME: I don't want to have to serialize it here and then
                    //       immediately deserialize in next processor. But I'm 
                    //       not sure how else to get the txn stored into our list.
                    // 序列化
                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
                    BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
                    request.txn.serialize(boa, "request") ;
                    ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());

                    txns.add(new Txn(request.hdr.getType(), bb.array()));
                    index++;
                }
                
                // 给请求头赋值
                request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid, zks.getTime(), request.type);
                // 设置请求的Txn
                request.txn = new MultiTxn(txns);
                
                break;

            //create/close session don't require request record
            case OpCode.createSession: // 创建会话请求
            case OpCode.closeSession: // 关闭会话请求
                pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
                break;
 
            //All the rest don't need to create a Txn - just verify session
            // 所有以下请求只需验证会话即可
            case OpCode.sync: 
            case OpCode.exists:
            case OpCode.getData:
            case OpCode.getACL:
            case OpCode.getChildren:
            case OpCode.getChildren2:
            case OpCode.ping:
            case OpCode.setWatches: 
                zks.sessionTracker.checkSession(request.sessionId,
                        request.getOwner());
                break;
            }
        } catch (KeeperException e) { // 发生KeeperException异常
            if (request.hdr != null) {
                request.hdr.setType(OpCode.error);
                request.txn = new ErrorTxn(e.code().intValue());
            }
            LOG.info("Got user-level KeeperException when processing "
                    + request.toString()
                    + " Error Path:" + e.getPath()
                    + " Error:" + e.getMessage());
            request.setException(e);
        } catch (Exception e) { // 其他异常
            // log at error level as we are returning a marshalling
            // error to the user
            LOG.error("Failed to process " + request, e);

            StringBuilder sb = new StringBuilder();
            ByteBuffer bb = request.request;
            if(bb != null){
                bb.rewind();
                while (bb.hasRemaining()) {
                    sb.append(Integer.toHexString(bb.get() & 0xff));
                }
            } else {
                sb.append("request buffer is null");
            }

            LOG.error("Dumping request buffer: 0x" + sb.toString());
            if (request.hdr != null) {
                request.hdr.setType(OpCode.error);
                request.txn = new ErrorTxn(Code.MARSHALLINGERROR.intValue());
            }
        }
        // 给请求的zxid赋值
        request.zxid = zks.getZxid();
        // 传递给下个处理器进行处理
        nextProcessor.processRequest(request);
    }
View Code

相关文章:

  • 2021-12-11
  • 2021-11-18
  • 2021-09-01
  • 2022-02-04
  • 2022-12-23
猜你喜欢
  • 2022-02-16
  • 2021-06-15
  • 2021-10-01
  • 2021-07-31
  • 2022-12-23
  • 2021-07-17
  • 2021-09-17
相关资源
相似解决方案