一、前言
在完成了前面的理论学习后,现在可以从源码角度来解析Zookeeper的细节,首先笔者想从序列化入手,因为在网络通信、数据存储中都用到了序列化,下面开始分析。
二、序列化
序列化主要在zookeeper.jute包中,其中涉及的主要接口如下
· InputArchive
· OutputArchive
· Index
· Record
2.1 InputArchive
其是所有反序列化器都需要实现的接口,其方法如下
public interface InputArchive { // 读取byte类型 public byte readByte(String tag) throws IOException; // 读取boolean类型 public boolean readBool(String tag) throws IOException; // 读取int类型 public int readInt(String tag) throws IOException; // 读取long类型 public long readLong(String tag) throws IOException; // 读取float类型 public float readFloat(String tag) throws IOException; // 读取double类型 public double readDouble(String tag) throws IOException; // 读取String类型 public String readString(String tag) throws IOException; // 通过缓冲方式读取 public byte[] readBuffer(String tag) throws IOException; // 开始读取记录 public void readRecord(Record r, String tag) throws IOException; // 开始读取记录 public void startRecord(String tag) throws IOException; // 结束读取记录 public void endRecord(String tag) throws IOException; // 开始读取向量 public Index startVector(String tag) throws IOException; // 结束读取向量 public void endVector(String tag) throws IOException; // 开始读取Map public Index startMap(String tag) throws IOException; // 结束读取Map public void endMap(String tag) throws IOException; }
InputArchive的类结构如下
1. BinaryInputArchive
public class BinaryInputArchive implements InputArchive { // DataInput接口,用于从二进制流中读取字节 private DataInput in; // 静态方法,用于获取Archive static public BinaryInputArchive getArchive(InputStream strm) { return new BinaryInputArchive(new DataInputStream(strm)); } // 内部类,对应BinaryInputArchive索引 static private class BinaryIndex implements Index { private int nelems; BinaryIndex(int nelems) { this.nelems = nelems; } public boolean done() { return (nelems <= 0); } public void incr() { nelems--; } } /** Creates a new instance of BinaryInputArchive */ // 构造函数 public BinaryInputArchive(DataInput in) { this.in = in; } // 读取字节 public byte readByte(String tag) throws IOException { return in.readByte(); } // 读取boolean类型 public boolean readBool(String tag) throws IOException { return in.readBoolean(); } // 读取int类型 public int readInt(String tag) throws IOException { return in.readInt(); } // 读取long类型 public long readLong(String tag) throws IOException { return in.readLong(); } // 读取float类型 public float readFloat(String tag) throws IOException { return in.readFloat(); } // 读取double类型 public double readDouble(String tag) throws IOException { return in.readDouble(); } // 读取String类型 public String readString(String tag) throws IOException { // 确定长度 int len = in.readInt(); if (len == -1) return null; byte b[] = new byte[len]; // 从输入流中读取一些字节,并将它们存储在缓冲区数组b中 in.readFully(b); return new String(b, "UTF8"); } // 最大缓冲值 static public final int maxBuffer = Integer.getInteger("jute.maxbuffer", 0xfffff); // 读取缓冲 public byte[] readBuffer(String tag) throws IOException { // 确定长度 int len = readInt(tag); if (len == -1) return null; // Since this is a rough sanity check, add some padding to maxBuffer to // make up for extra fields, etc. (otherwise e.g. clients may be able to // write buffers larger than we can read from disk!) if (len < 0 || len > maxBuffer + 1024) { // 检查长度是否合理 throw new IOException("Unreasonable length = " + len); } byte[] arr = new byte[len]; // 从输入流中读取一些字节,并将它们存储在缓冲区数组arr中 in.readFully(arr); return arr; } // 读取记录 public void readRecord(Record r, String tag) throws IOException { // 反序列化,动态调用 r.deserialize(this, tag); } // 开始读取记录,实现为空 public void startRecord(String tag) throws IOException {} // 结束读取记录,实现为空 public void endRecord(String tag) throws IOException {} // 开始读取向量 public Index startVector(String tag) throws IOException { // 确定长度 int len = readInt(tag); if (len == -1) { return null; } // 返回索引 return new BinaryIndex(len); } // 结束读取向量 public void endVector(String tag) throws IOException {} // 开始读取Map public Index startMap(String tag) throws IOException { // 返回索引 return new BinaryIndex(readInt(tag)); } // 结束读取Map,实现为空 public void endMap(String tag) throws IOException {} }