一、前言

  前一篇已经分析了序列化,这篇接着分析Zookeeper的持久化过程源码,持久化对于数据的存储至关重要,下面进行详细分析。

二、持久化总体框架

  持久化的类主要在包org.apache.zookeeper.server.persistence下,此次也主要是对其下的类进行分析,其包下总体的类结构如下图所示。

  【Zookeeper】源码分析之持久化(一)之FileTxnLog

  · TxnLog,接口类型,读取事务性日志的接口。

  · FileTxnLog,实现TxnLog接口,添加了访问该事务性日志的API。

  · Snapshot,接口类型,持久层快照接口。

  · FileSnap,实现Snapshot接口,负责存储、序列化、反序列化、访问快照。

  · FileTxnSnapLog,封装了TxnLog和SnapShot。

  · Util,工具类,提供持久化所需的API。

  下面先来分析TxnLog和FileTxnLog的源码。

三、TxnLog源码分析

  TxnLog是接口,规定了对日志的响应操作。  

public interface TxnLog {
    
    /**
     * roll the current
     * log being appended to
     * @throws IOException 
     */
    // 回滚日志
    void rollLog() throws IOException;
    /**
     * Append a request to the transaction log
     * @param hdr the transaction header
     * @param r the transaction itself
     * returns true iff something appended, otw false 
     * @throws IOException
     */
    // 添加一个请求至事务性日志
    boolean append(TxnHeader hdr, Record r) throws IOException;

    /**
     * Start reading the transaction logs
     * from a given zxid
     * @param zxid
     * @return returns an iterator to read the 
     * next transaction in the logs.
     * @throws IOException
     */
    // 读取事务性日志
    TxnIterator read(long zxid) throws IOException;
    
    /**
     * the last zxid of the logged transactions.
     * @return the last zxid of the logged transactions.
     * @throws IOException
     */
    // 事务性操作的最新zxid
    long getLastLoggedZxid() throws IOException;
    
    /**
     * truncate the log to get in sync with the 
     * leader.
     * @param zxid the zxid to truncate at.
     * @throws IOException 
     */
    // 清空日志,与Leader保持同步
    boolean truncate(long zxid) throws IOException;
    
    /**
     * the dbid for this transaction log. 
     * @return the dbid for this transaction log.
     * @throws IOException
     */
    // 获取数据库的id
    long getDbId() throws IOException;
    
    /**
     * commmit the trasaction and make sure
     * they are persisted
     * @throws IOException
     */
    // 提交事务并进行确认
    void commit() throws IOException;
   
    /** 
     * close the transactions logs
     */
    // 关闭事务性日志
    void close() throws IOException;
    /**
     * an iterating interface for reading 
     * transaction logs. 
     */
    // 读取事务日志的迭代器接口
    public interface TxnIterator {
        /**
         * return the transaction header.
         * @return return the transaction header.
         */
        // 获取事务头部
        TxnHeader getHeader();
        
        /**
         * return the transaction record.
         * @return return the transaction record.
         */
        // 获取事务
        Record getTxn();
     
        /**
         * go to the next transaction record.
         * @throws IOException
         */
        // 下个事务
        boolean next() throws IOException;
        
        /**
         * close files and release the 
         * resources
         * @throws IOException
         */
        // 关闭文件释放资源
        void close() throws IOException;
    }
}

  其中,TxnLog除了提供读写事务日志的API外,还提供了一个用于读取日志的迭代器接口TxnIterator。

四、FileTxnLog源码分析

  对于LogFile而言,其格式可分为如下三部分

  LogFile:

    FileHeader TxnList ZeroPad

  FileHeader格式如下  

  FileHeader: {
    magic 4bytes (ZKLG)
    version 4bytes
    dbid 8bytes
  }

  TxnList格式如下

  TxnList:

    Txn || Txn TxnList

  Txn格式如下

  Txn:

    checksum Txnlen TxnHeader Record 0x42

  Txnlen格式如下

  Txnlen:
    len 4bytes

  TxnHeader格式如下

  TxnHeader: {

    sessionid 8bytes
    cxid 4bytes
      zxid 8bytes
    time 8bytes
    type 4bytes
  }

  ZeroPad格式如下

  ZeroPad:
    0 padded to EOF (filled during preallocation stage)

  了解LogFile的格式对于理解源码会有很大的帮助。

  4.1 属性  

public class FileTxnLog implements TxnLog {
    private static final Logger LOG;
    
    // 预分配大小 64M
    static long preAllocSize =  65536 * 1024;
    
    // 魔术数字,默认为1514884167
    public final static int TXNLOG_MAGIC =
        ByteBuffer.wrap("ZKLG".getBytes()).getInt();

    // 版本号
    public final static int VERSION = 2;

    /** Maximum time we allow for elapsed fsync before WARNing */
    // 进行同步时,发出warn之前所能等待的最长时间
    private final static long fsyncWarningThresholdMS;

    // 静态属性,确定Logger、预分配空间大小和最长时间
    static {
        LOG = LoggerFactory.getLogger(FileTxnLog.class);

        String size = System.getProperty("zookeeper.preAllocSize");
        if (size != null) {
            try {
                preAllocSize = Long.parseLong(size) * 1024;
            } catch (NumberFormatException e) {
                LOG.warn(size + " is not a valid value for preAllocSize");
            }
        }
        fsyncWarningThresholdMS = Long.getLong("fsync.warningthresholdms", 1000);
    }
    
    // 最大(新)的zxid
    long lastZxidSeen;
    // 存储数据相关的流
    volatile BufferedOutputStream logStream = null;
    volatile OutputArchive oa;
    volatile FileOutputStream fos = null;

    // log目录文件
    File logDir;
    
    // 是否强制同步
    private final boolean forceSync = !System.getProperty("zookeeper.forceSync", "yes").equals("no");;
    
    // 数据库id
    long dbId;
    
    // 流列表
    private LinkedList<FileOutputStream> streamsToFlush =
        new LinkedList<FileOutputStream>();
    
    // 当前大小
    long currentSize;
    // 写日志文件
    File logFileWrite = null;
}

  4.2. 核心函数 

  1. append函数

    public synchronized boolean append(TxnHeader hdr, Record txn)
        throws IOException
    {
        if (hdr != null) { // 事务头部不为空
            if (hdr.getZxid() <= lastZxidSeen) { // 事务的zxid小于等于最后的zxid
                LOG.warn("Current zxid " + hdr.getZxid()
                        + " is <= " + lastZxidSeen + " for "
                        + hdr.getType());
            }
            if (logStream==null) { // 日志流为空
               if(LOG.isInfoEnabled()){
                    LOG.info("Creating new log file: log." +  
                            Long.toHexString(hdr.getZxid()));
               }
               
               // 
               logFileWrite = new File(logDir, ("log." + 
                       Long.toHexString(hdr.getZxid())));
               fos = new FileOutputStream(logFileWrite);
               logStream=new BufferedOutputStream(fos);
               oa = BinaryOutputArchive.getArchive(logStream);
               // 
               FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
               // 序列化
               fhdr.serialize(oa, "fileheader");
               // Make sure that the magic number is written before padding.
               // 刷新到磁盘
               logStream.flush();
               
               // 当前通道的大小
               currentSize = fos.getChannel().position();
               // 添加fos
               streamsToFlush.add(fos);
            }
            
            // 填充文件
            padFile(fos);
            
            // Serializes transaction header and transaction data into a byte buffer.
            // 将事务头和事务数据序列化成Byte Buffer
            byte[] buf = Util.marshallTxnEntry(hdr, txn);
            if (buf == null || buf.length == 0) { // 为空,抛出异常
                throw new IOException("Faulty serialization for header " +
                        "and txn");
            }
            // 生成一个验证算法
            Checksum crc = makeChecksumAlgorithm();
            // Updates the current checksum with the specified array of bytes
            // 使用Byte数组来更新当前的Checksum
            crc.update(buf, 0, buf.length);
            // 写long类型数据
            oa.writeLong(crc.getValue(), "txnEntryCRC");
            // Write the serialized transaction record to the output archive.
            // 将序列化的事务记录写入OutputArchive
            Util.writeTxnBytes(oa, buf);
            
            return true;
        }
        return false;
    }
View Code

相关文章:

  • 2021-08-10
  • 2022-12-23
  • 2021-09-02
  • 2021-10-08
  • 2021-10-25
  • 2021-06-24
  • 2021-12-20
  • 2021-11-30
猜你喜欢
  • 2022-01-14
  • 2021-05-29
  • 2021-05-16
  • 2021-08-23
  • 2021-07-16
  • 2021-10-16
  • 2021-07-19
相关资源
相似解决方案