一、前言
前一篇已经分析了序列化,这篇接着分析Zookeeper的持久化过程源码,持久化对于数据的存储至关重要,下面进行详细分析。
二、持久化总体框架
持久化的类主要在包org.apache.zookeeper.server.persistence下,此次也主要是对其下的类进行分析,其包下总体的类结构如下图所示。
· TxnLog,接口类型,读取事务性日志的接口。
· FileTxnLog,实现TxnLog接口,添加了访问该事务性日志的API。
· Snapshot,接口类型,持久层快照接口。
· FileSnap,实现Snapshot接口,负责存储、序列化、反序列化、访问快照。
· FileTxnSnapLog,封装了TxnLog和SnapShot。
· Util,工具类,提供持久化所需的API。
下面先来分析TxnLog和FileTxnLog的源码。
三、TxnLog源码分析
TxnLog是接口,规定了对日志的响应操作。
public interface TxnLog { /** * roll the current * log being appended to * @throws IOException */ // 回滚日志 void rollLog() throws IOException; /** * Append a request to the transaction log * @param hdr the transaction header * @param r the transaction itself * returns true iff something appended, otw false * @throws IOException */ // 添加一个请求至事务性日志 boolean append(TxnHeader hdr, Record r) throws IOException; /** * Start reading the transaction logs * from a given zxid * @param zxid * @return returns an iterator to read the * next transaction in the logs. * @throws IOException */ // 读取事务性日志 TxnIterator read(long zxid) throws IOException; /** * the last zxid of the logged transactions. * @return the last zxid of the logged transactions. * @throws IOException */ // 事务性操作的最新zxid long getLastLoggedZxid() throws IOException; /** * truncate the log to get in sync with the * leader. * @param zxid the zxid to truncate at. * @throws IOException */ // 清空日志,与Leader保持同步 boolean truncate(long zxid) throws IOException; /** * the dbid for this transaction log. * @return the dbid for this transaction log. * @throws IOException */ // 获取数据库的id long getDbId() throws IOException; /** * commmit the trasaction and make sure * they are persisted * @throws IOException */ // 提交事务并进行确认 void commit() throws IOException; /** * close the transactions logs */ // 关闭事务性日志 void close() throws IOException; /** * an iterating interface for reading * transaction logs. */ // 读取事务日志的迭代器接口 public interface TxnIterator { /** * return the transaction header. * @return return the transaction header. */ // 获取事务头部 TxnHeader getHeader(); /** * return the transaction record. * @return return the transaction record. */ // 获取事务 Record getTxn(); /** * go to the next transaction record. * @throws IOException */ // 下个事务 boolean next() throws IOException; /** * close files and release the * resources * @throws IOException */ // 关闭文件释放资源 void close() throws IOException; } }
其中,TxnLog除了提供读写事务日志的API外,还提供了一个用于读取日志的迭代器接口TxnIterator。
四、FileTxnLog源码分析
对于LogFile而言,其格式可分为如下三部分
LogFile:
FileHeader TxnList ZeroPad
FileHeader格式如下
FileHeader: {
magic 4bytes (ZKLG)
version 4bytes
dbid 8bytes
}
TxnList格式如下
TxnList:
Txn || Txn TxnList
Txn格式如下
Txn:
checksum Txnlen TxnHeader Record 0x42
Txnlen格式如下
Txnlen:
len 4bytes
TxnHeader格式如下
TxnHeader: {
sessionid 8bytes
cxid 4bytes
zxid 8bytes
time 8bytes
type 4bytes
}
ZeroPad格式如下
ZeroPad:
0 padded to EOF (filled during preallocation stage)
了解LogFile的格式对于理解源码会有很大的帮助。
4.1 属性
public class FileTxnLog implements TxnLog { private static final Logger LOG; // 预分配大小 64M static long preAllocSize = 65536 * 1024; // 魔术数字,默认为1514884167 public final static int TXNLOG_MAGIC = ByteBuffer.wrap("ZKLG".getBytes()).getInt(); // 版本号 public final static int VERSION = 2; /** Maximum time we allow for elapsed fsync before WARNing */ // 进行同步时,发出warn之前所能等待的最长时间 private final static long fsyncWarningThresholdMS; // 静态属性,确定Logger、预分配空间大小和最长时间 static { LOG = LoggerFactory.getLogger(FileTxnLog.class); String size = System.getProperty("zookeeper.preAllocSize"); if (size != null) { try { preAllocSize = Long.parseLong(size) * 1024; } catch (NumberFormatException e) { LOG.warn(size + " is not a valid value for preAllocSize"); } } fsyncWarningThresholdMS = Long.getLong("fsync.warningthresholdms", 1000); } // 最大(新)的zxid long lastZxidSeen; // 存储数据相关的流 volatile BufferedOutputStream logStream = null; volatile OutputArchive oa; volatile FileOutputStream fos = null; // log目录文件 File logDir; // 是否强制同步 private final boolean forceSync = !System.getProperty("zookeeper.forceSync", "yes").equals("no");; // 数据库id long dbId; // 流列表 private LinkedList<FileOutputStream> streamsToFlush = new LinkedList<FileOutputStream>(); // 当前大小 long currentSize; // 写日志文件 File logFileWrite = null; }
4.2. 核心函数
1. append函数
public synchronized boolean append(TxnHeader hdr, Record txn) throws IOException { if (hdr != null) { // 事务头部不为空 if (hdr.getZxid() <= lastZxidSeen) { // 事务的zxid小于等于最后的zxid LOG.warn("Current zxid " + hdr.getZxid() + " is <= " + lastZxidSeen + " for " + hdr.getType()); } if (logStream==null) { // 日志流为空 if(LOG.isInfoEnabled()){ LOG.info("Creating new log file: log." + Long.toHexString(hdr.getZxid())); } // logFileWrite = new File(logDir, ("log." + Long.toHexString(hdr.getZxid()))); fos = new FileOutputStream(logFileWrite); logStream=new BufferedOutputStream(fos); oa = BinaryOutputArchive.getArchive(logStream); // FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId); // 序列化 fhdr.serialize(oa, "fileheader"); // Make sure that the magic number is written before padding. // 刷新到磁盘 logStream.flush(); // 当前通道的大小 currentSize = fos.getChannel().position(); // 添加fos streamsToFlush.add(fos); } // 填充文件 padFile(fos); // Serializes transaction header and transaction data into a byte buffer. // 将事务头和事务数据序列化成Byte Buffer byte[] buf = Util.marshallTxnEntry(hdr, txn); if (buf == null || buf.length == 0) { // 为空,抛出异常 throw new IOException("Faulty serialization for header " + "and txn"); } // 生成一个验证算法 Checksum crc = makeChecksumAlgorithm(); // Updates the current checksum with the specified array of bytes // 使用Byte数组来更新当前的Checksum crc.update(buf, 0, buf.length); // 写long类型数据 oa.writeLong(crc.getValue(), "txnEntryCRC"); // Write the serialized transaction record to the output archive. // 将序列化的事务记录写入OutputArchive Util.writeTxnBytes(oa, buf); return true; } return false; }