SpillableMemoryChannel是1.5版本新增的一个channel。这个channel优先将evnet放在内存中,一旦内存达到设定的容量就使用file channel写入磁盘。然后读的时候会按照顺序读取:会通过一个DrainOrderQueue来保证不管是内存中的还是溢出(本文的“溢出”指的是内存channel已满,需要使用file channel存储数据)文件中的顺序。这个Channel是memory channel和file channel的一个折中,虽然在内存中的数据仍然可能因为进程的突然中断而丢失,但是相对于memory channel而言一旦sink处理速度跟不上不至于丢失数据(后者一旦满了爆发异常会丢失后续的数据),提高了数据的可靠性;相对于file channel而言自然是大大提高了速度,但是可靠性较file channel有所降低。
我们来看一下SpillableMemoryChannel的继承结构:SpillableMemoryChannel extends FileChannel,原来SpillableMemoryChannel是file的子类,天热具有file channel的特性。但是它的BasicTransactionSemantics是自己实现的。接下来我们来分析分析这个channel,这个channel可以看成是两个channel。相关内容传送门:Flume-NG源码阅读之FileChannel 和 flume-ng源码阅读memory-channel(原创) 。
Stack,在用作队列时快于 LinkedList,但是不是线程安全的不支持多线程并发操作;put操作总是对queue中的最后(尾)一个元素操作,take操作总是对queue中第一个(头)操作;put时,如果是内存channel,在queue增加的就是正数,如果是溢出操作增加的就是负数,内存和溢出分别对应queue中不同的元素(可以分类去读);take时,如果从内存中取数据,就会使得queue第一个元素的值不断缩小(正数)至0,然后删除这个元素,如果是从溢出文件中取数据则会使得queue中第一个元素不断增大(负数)至0,然后删除这个元素;这样就会形成流,使得put不断追加数据到流中,take不断从流中取数据,这个流就是有序的,且流中元素其实就是内存中的evnet个数和溢出文件中event的个数。
1 public static class DrainOrderQueue { 2 public ArrayDeque<MutableInteger> queue = new ArrayDeque<MutableInteger>(1000); 3 4 public int totalPuts = 0; // for debugging only 5 private long overflowCounter = 0; // # of items in overflow channel 6 7 public String dump() { 8 StringBuilder sb = new StringBuilder(); 9 10 sb.append(" [ "); 11 for (MutableInteger i : queue) { 12 sb.append(i.intValue()); 13 sb.append(" "); 14 } 15 sb.append("]"); 16 return sb.toString(); 17 } 18 19 public void putPrimary(Integer eventCount) { 20 totalPuts += eventCount; 21 if ( (queue.peekLast() == null) || queue.getLast().intValue() < 0) { //获取,但不移除此双端队列的最后一个元素;如果此双端队列为空,则返回 null 22 queue.addLast(new MutableInteger(eventCount)); 23 } else { 24 queue.getLast().add(eventCount);//获取,但不移除此双端队列的第一个元素。 25 } 26 } 27 28 public void putFirstPrimary(Integer eventCount) { 29 if ( (queue.peekFirst() == null) || queue.getFirst().intValue() < 0) { //获取,但不移除此双端队列的第一个元素;如果此双端队列为空,则返回 null。 30 queue.addFirst(new MutableInteger(eventCount)); 31 } else { 32 queue.getFirst().add(eventCount);//获取,但不移除此双端队列的第一个元素。 33 } 34 } 35 36 public void putOverflow(Integer eventCount) { 37 totalPuts += eventCount; 38 if ( (queue.peekLast() == null) || queue.getLast().intValue() > 0) { 39 queue.addLast(new MutableInteger(-eventCount)); 40 } else { 41 queue.getLast().add(-eventCount); 42 } 43 overflowCounter += eventCount; 44 } 45 46 public void putFirstOverflow(Integer eventCount) { 47 if ( (queue.peekFirst() == null) || queue.getFirst().intValue() > 0) { 48 queue.addFirst(new MutableInteger(-eventCount)); 49 } else { 50 queue.getFirst().add(-eventCount); 51 } 52 overflowCounter += eventCount; 53 } 54 55 public int front() { 56 return queue.getFirst().intValue(); 57 } 58 59 public boolean isEmpty() { 60 return queue.isEmpty(); 61 } 62 63 public void takePrimary(int takeCount) { 64 MutableInteger headValue = queue.getFirst(); 65 66 // this condition is optimization to avoid redundant conversions of 67 // int -> Integer -> string in hot path 68 if (headValue.intValue() < takeCount) { 69 throw new IllegalStateException("Cannot take " + takeCount + 70 " from " + headValue.intValue() + " in DrainOrder Queue"); 71 } 72 73 headValue.add(-takeCount); 74 if (headValue.intValue() == 0) { 75 queue.removeFirst(); 76 } 77 } 78 79 public void takeOverflow(int takeCount) { 80 MutableInteger headValue = queue.getFirst(); 81 if(headValue.intValue() > -takeCount) { 82 throw new IllegalStateException("Cannot take " + takeCount + " from " 83 + headValue.intValue() + " in DrainOrder Queue head " ); 84 } 85 86 headValue.add(takeCount); 87 if (headValue.intValue() == 0) { 88 queue.removeFirst(); //获取并移除此双端队列第一个元素。 89 } 90 overflowCounter -= takeCount; 91 } 92 93 }
我们一个方法一个方法的来剖析这个类:
(1)dump(),这个方法比较简单就是获得queue中所有元素的数据量;
(2)putPrimary(Integer eventCount),这个方法用在put操作的commit时,在commitPutsToPrimary()方法中被调用,表示向内存提交数据。这个方法会尝试获取queue中最后一个元素,如果为空(说明没数据)或者元素数值小于0(说明这个元素是面向溢出文件的),就新建一个元素赋值这个事务的event数量加入queue;否则表示当前是的元素表征的是内存中的event数量,直接累加即可。
(3)putFirstPrimary(Integer eventCount),在doRollback()回滚的时候被调用,表示将takeList中的数据放回内存memQueue的头。这个方法会尝试获取queue中第一个元素,如果为空(说明没数据)或者元素数值小于0(说明这个元素是面向溢出文件的),就新建一个元素赋值takeList的event数量加入queue;否则表示当前是的元素表征的是内存中的event数量,直接累加即可。
(4)putOverflow(Integer eventCount),这个方法发生在put操作的commit时,在commitPutsToOverflow_core方法和start()方法中,后者是设置初始量,前者表示内存channel已满要溢出到file channel。这个方法会尝试获取queue中最后一个元素,如果为空(说明没数据)或者元素数值大于0(表示这个元素是面向内存的),就新建一个元素赋值这个事务的event数量加入queue,这里赋值为负数;否则表示当前是的元素表征的是溢出文件中的event数量,直接累加负数即可。
(5)putFirstOverflow(Integer eventCount),在doRollback()回滚的时候被调用,表示将takeList中event的数量放回溢出文件。这个方法会尝试获取queue中第一个元素,如果为空(说明没数据)或者元素数值大于0(表示这个元素是面向内存的),就新建一个元素赋值这个事务的 event数量加入queue,这里赋值为负数;否则表示当前是的元素表征的是溢出到文件中的event数量,直接累加负数即可。
(6)front(),返回queue中第一个元素的值
(7)takePrimary(int takeCount),这个方法在doTake()中被调用,表示take发生之后,要将内存中的event数量减takeCount(这个值一般都是1,即每次取一个)。这个方法会获取第一个元素的值(表示内存channel中有多少event),如果这个值比takeCount小,说明内存中没有足够的数量,这种情况不应该发生,报错;否则将这个元素的值减去takeCount,表示已取出takeCount个。最后如果这个元素的值为0,则从queue中删除这个元素。注意这里虽然是可以取takeCount个,但是源码调用这个参数都是一次取1个而已。
(8)takeOverflow(int takeCount),这个方法在doTake()中被调用,表示take发生之后,要将溢出文件中的event数量加上takeCount(这个值一般都是1,即每次取一个)。这个 方法会获取第一个元素的值(表示溢出文件中有多少event),如果这个值比takeCount的负值大,说明文件中没有足够的数量,这种情况不应该发生,报错;否则将这个元素的值加上takeCount,表示已取出takeCount个。最后如果这个元素的值为0,则从queue中删除这个元素。注意这里虽然是可以取 takeCount个,但是源码调用这个参数都是一次取1个而已。