一、前言

  在完成了前面的理论学习后,现在可以从源码角度来解析Zookeeper的细节,首先笔者想从序列化入手,因为在网络通信、数据存储中都用到了序列化,下面开始分析。

二、序列化

  序列化主要在zookeeper.jute包中,其中涉及的主要接口如下

    · InputArchive

    · OutputArchive

    · Index

    · Record

  2.1 InputArchive

  其是所有反序列化器都需要实现的接口,其方法如下 

public interface InputArchive {
    // 读取byte类型
    public byte readByte(String tag) throws IOException;
    // 读取boolean类型
    public boolean readBool(String tag) throws IOException;
    // 读取int类型
    public int readInt(String tag) throws IOException;
    // 读取long类型
    public long readLong(String tag) throws IOException;
    // 读取float类型
    public float readFloat(String tag) throws IOException;
    // 读取double类型
    public double readDouble(String tag) throws IOException;
    // 读取String类型
    public String readString(String tag) throws IOException;
    // 通过缓冲方式读取
    public byte[] readBuffer(String tag) throws IOException;
    // 开始读取记录
    public void readRecord(Record r, String tag) throws IOException;
    // 开始读取记录
    public void startRecord(String tag) throws IOException;
    // 结束读取记录
    public void endRecord(String tag) throws IOException;
    // 开始读取向量
    public Index startVector(String tag) throws IOException;
    // 结束读取向量
    public void endVector(String tag) throws IOException;
    // 开始读取Map
    public Index startMap(String tag) throws IOException;
    // 结束读取Map
    public void endMap(String tag) throws IOException;
}

  InputArchive的类结构如下

  【Zookeeper】源码分析之序列化

  1. BinaryInputArchive  

public class BinaryInputArchive implements InputArchive {
    // DataInput接口,用于从二进制流中读取字节
    private DataInput in;
    
    // 静态方法,用于获取Archive
    static public BinaryInputArchive getArchive(InputStream strm) {
        return new BinaryInputArchive(new DataInputStream(strm));
    }
    
    // 内部类,对应BinaryInputArchive索引
    static private class BinaryIndex implements Index {
        private int nelems;
        BinaryIndex(int nelems) {
            this.nelems = nelems;
        }
        public boolean done() {
            return (nelems <= 0);
        }
        public void incr() {
            nelems--;
        }
    }
    /** Creates a new instance of BinaryInputArchive */
    // 构造函数
    public BinaryInputArchive(DataInput in) {
        this.in = in;
    }
    
    // 读取字节
    public byte readByte(String tag) throws IOException {
        return in.readByte();
    }
    
    // 读取boolean类型
    public boolean readBool(String tag) throws IOException {
        return in.readBoolean();
    }
    
    // 读取int类型
    public int readInt(String tag) throws IOException {
        return in.readInt();
    }
    
    // 读取long类型
    public long readLong(String tag) throws IOException {
        return in.readLong();
    }
    
    // 读取float类型
    public float readFloat(String tag) throws IOException {
        return in.readFloat();
    }
    
    // 读取double类型
    public double readDouble(String tag) throws IOException {
        return in.readDouble();
    }
    
    // 读取String类型
    public String readString(String tag) throws IOException {
        // 确定长度
        int len = in.readInt();
        if (len == -1) return null;
        byte b[] = new byte[len];
        // 从输入流中读取一些字节,并将它们存储在缓冲区数组b中
        in.readFully(b);
        return new String(b, "UTF8");
    }
    
    // 最大缓冲值
    static public final int maxBuffer = Integer.getInteger("jute.maxbuffer", 0xfffff);

    // 读取缓冲
    public byte[] readBuffer(String tag) throws IOException {
        // 确定长度
        int len = readInt(tag);
        if (len == -1) return null;
        // Since this is a rough sanity check, add some padding to maxBuffer to
        // make up for extra fields, etc. (otherwise e.g. clients may be able to
        // write buffers larger than we can read from disk!)
        if (len < 0 || len > maxBuffer + 1024) { // 检查长度是否合理
            throw new IOException("Unreasonable length = " + len);
        }
        byte[] arr = new byte[len];
        // 从输入流中读取一些字节,并将它们存储在缓冲区数组arr中
        in.readFully(arr);
        return arr;
    }
    
    // 读取记录
    public void readRecord(Record r, String tag) throws IOException {
        // 反序列化,动态调用
        r.deserialize(this, tag);
    }
    
    // 开始读取记录,实现为空
    public void startRecord(String tag) throws IOException {}
    
    // 结束读取记录,实现为空
    public void endRecord(String tag) throws IOException {}
    
    // 开始读取向量
    public Index startVector(String tag) throws IOException {
        // 确定长度
        int len = readInt(tag);
        if (len == -1) {
            return null;
        }
        // 返回索引
        return new BinaryIndex(len);
    }
    
    // 结束读取向量
    public void endVector(String tag) throws IOException {}
    
    // 开始读取Map
    public Index startMap(String tag) throws IOException {
        // 返回索引
        return new BinaryIndex(readInt(tag));
    }
    
    // 结束读取Map,实现为空
    public void endMap(String tag) throws IOException {}
    
}
View Code

相关文章: