SpillableMemoryChannel是1.5版本新增的一个channel。这个channel优先将evnet放在内存中,一旦内存达到设定的容量就使用file channel写入磁盘。然后读的时候会按照顺序读取:会通过一个DrainOrderQueue来保证不管是内存中的还是溢出(本文的“溢出”指的是内存channel已满,需要使用file channel存储数据)文件中的顺序。这个Channel是memory channel和file channel的一个折中,虽然在内存中的数据仍然可能因为进程的突然中断而丢失,但是相对于memory channel而言一旦sink处理速度跟不上不至于丢失数据(后者一旦满了爆发异常会丢失后续的数据),提高了数据的可靠性;相对于file channel而言自然是大大提高了速度,但是可靠性较file channel有所降低。

  我们来看一下SpillableMemoryChannel的继承结构:SpillableMemoryChannel extends FileChannel,原来SpillableMemoryChannel是file的子类,天热具有file channel的特性。但是它的BasicTransactionSemantics是自己实现的。接下来我们来分析分析这个channel,这个channel可以看成是两个channel。相关内容传送门:Flume-NG源码阅读之FileChannel 和 flume-ng源码阅读memory-channel(原创) 。

   一、首先来看configure(Context context)方法,这个方法是对这个channel进行配置。一些主要参数介绍:
  (1)Semaphore totalStored,这两个channel【内存channel(并不是flume内置的memory channel,这里是新实现的一个,本文中的“内存channel”若无说明就是新实现的这个)和溢出而使用的file channel】中event数量的总和的信号量,初始为0;
  (2)ArrayDeque<Event> memQueue,这就是这里的内存channel,使用可以改变大小的数组双端队列ArrayDeque,存储event数据;
  (3)int memoryCapacity(对应参数名"memoryCapacity"),内存channel中存储的event的最大数量;
  (4)Semaphore memQueRemaining,内存channel剩余的可存储event的数量的信号量,初始大小为memoryCapacity;
  (5)int overflowTimeout(对应参数名"overflowTimeout"),溢出超时时间,指的是内存channel满了之后,切换到file channel的等待时间,默认是3s;
  (6)double overflowDeactivationThreshold(对应参数名"overflowDeactivationThreshold"),指的是停止溢出的阈值------内存channel剩余内存(这里指可再存储的event数量),默认5%;
  (7)volatile int byteCapacityBufferPercentage(对应参数名"byteCapacityBufferPercentage"),用来限制内存channel使用物理内存量,默认20;
  (8)volatile double avgEventSize()(对应参数名"avgEventSize"),指定每个event的大小,用来计算内存channel可以使用的slot总数量,会把event量化为slot,而不是字节,默认500;
  (9)volatile int byteCapacity(对应参数名"byteCapacity"),slot数量,默认是JVM可使用的最大物理内存(可通过配置"byteCapacity"参数来控制物理内存使用)的80%* (1 - byteCapacityBufferPercentage * .01 )) / avgEventSize得来;
  (10)Semaphore bytesRemaining,内存channel中剩余可使用的slot数量信号量,初始大小是byteCapacity;
  (11)volatile int lastByteCapacity,动态加载配置文件时才会有用,记录上一次的ByteCapacity,用于修改bytesRemaining信号量的大小;
  (12)int overflowCapacity(对应参数名"overflowCapacity"),用于设置file channel的容量,默认是1亿;
  此外,boolean overflowDisabled用来是否禁用溢出,只要overflowCapacity不小于1就不会禁用;boolean overflowActivated表示是否可以使用溢出,默认是false;还会对对file channel的"keep-alive"设置为0;最后会通过super.configure(context)来对file channel进行配置。对于file channel的配置信息可以和SpillableMemoryChannel的配置信息在一起配置。
  二、start()方法,首先会super.start()启动file channel,获取file中溢出的数据量overFlowCount,重置totalStored和DrainOrderQueue对象drainOrder,内存channel的start是不会有数据的。
  三、需要讲一下DrainOrderQueue drainOrder = new DrainOrderQueue()。我们知道SpillableMemoryChannel其实是由两个channel组成,分别是内存channel和file channel,因此数据也会分布在内存和磁盘文件之中,那我们take时,是什么机制呢?换句话说就是什么时候读内存中的数据,什么时候读磁盘上文件的数据?take的顺序怎么样呢?我们希望take的顺序和put的顺序一样,先put的应该先take,所以我们应该给所有的put(包括内存和文件)进行“编号”使得可以有序的take,还要注意的就是需要标示这个take是应该从内存还是file中去读。为此设计了DrainOrderQueue类,来使得有序的put和take。
  这个类设计的狠精巧,是保证take和put正常合理操作的关键。在讲之前先大概说一下原理:这个类的关键属性是ArrayDeque<MutableInteger> queue,这也是一个ArrayDeque,ArrayDeque特性是数组可变且大小不受限制,可在头尾操作,此类很可能在用作堆栈时快于 Stack,在用作队列时快于 LinkedList,但是不是线程安全的不支持多线程并发操作;put操作总是对queue中的最后(尾)一个元素操作,take操作总是对queue中第一个(头)操作;put时,如果是内存channel,在queue增加的就是正数,如果是溢出操作增加的就是负数,内存和溢出分别对应queue中不同的元素(可以分类去读);take时,如果从内存中取数据,就会使得queue第一个元素的值不断缩小(正数)至0,然后删除这个元素,如果是从溢出文件中取数据则会使得queue中第一个元素不断增大(负数)至0,然后删除这个元素;这样就会形成流,使得put不断追加数据到流中,take不断从流中取数据,这个流就是有序的,且流中元素其实就是内存中的evnet个数和溢出文件中event的个数。
  好了,DrainOrderQueue详细代码如下:
 1   public static class DrainOrderQueue {
 2     public ArrayDeque<MutableInteger> queue = new ArrayDeque<MutableInteger>(1000);
 3 
 4     public int totalPuts = 0;  // for debugging only
 5     private long overflowCounter = 0; // # of items in overflow channel
 6 
 7     public  String dump() {
 8       StringBuilder sb = new StringBuilder();
 9 
10       sb.append("  [ ");
11       for (MutableInteger i : queue) {
12         sb.append(i.intValue());
13         sb.append(" ");
14       }
15       sb.append("]");
16       return  sb.toString();
17     }
18 
19     public void putPrimary(Integer eventCount) {
20       totalPuts += eventCount;
21       if (  (queue.peekLast() == null) || queue.getLast().intValue() < 0) {    //获取,但不移除此双端队列的最后一个元素;如果此双端队列为空,则返回 null
22         queue.addLast(new MutableInteger(eventCount));
23       } else {
24         queue.getLast().add(eventCount);//获取,但不移除此双端队列的第一个元素。
25       }
26     }
27 
28     public void putFirstPrimary(Integer eventCount) {
29       if ( (queue.peekFirst() == null) || queue.getFirst().intValue() < 0) {    //获取,但不移除此双端队列的第一个元素;如果此双端队列为空,则返回 null。
30         queue.addFirst(new MutableInteger(eventCount));
31       } else {
32         queue.getFirst().add(eventCount);//获取,但不移除此双端队列的第一个元素。
33       }
34     }
35 
36     public void putOverflow(Integer eventCount) {
37       totalPuts += eventCount;
38       if ( (queue.peekLast() == null) ||  queue.getLast().intValue() > 0) {
39         queue.addLast(new MutableInteger(-eventCount));
40       } else {
41         queue.getLast().add(-eventCount);
42       }
43       overflowCounter += eventCount;
44     }
45 
46     public void putFirstOverflow(Integer eventCount) {
47       if ( (queue.peekFirst() == null) ||  queue.getFirst().intValue() > 0) {
48         queue.addFirst(new MutableInteger(-eventCount));
49       }  else {
50         queue.getFirst().add(-eventCount);
51       }
52       overflowCounter += eventCount;
53     }
54 
55     public int front() {
56       return queue.getFirst().intValue();
57     }
58 
59     public boolean isEmpty() {
60       return queue.isEmpty();
61     }
62 
63     public void takePrimary(int takeCount) {
64       MutableInteger headValue = queue.getFirst();
65 
66       // this condition is optimization to avoid redundant conversions of
67       // int -> Integer -> string in hot path
68       if (headValue.intValue() < takeCount)  {
69         throw new IllegalStateException("Cannot take " + takeCount +
70                 " from " + headValue.intValue() + " in DrainOrder Queue");
71       }
72 
73       headValue.add(-takeCount);
74       if (headValue.intValue() == 0) {
75         queue.removeFirst();
76       }
77     }
78 
79     public void takeOverflow(int takeCount) {
80       MutableInteger headValue = queue.getFirst();
81       if(headValue.intValue() > -takeCount) {
82         throw new IllegalStateException("Cannot take " + takeCount + " from "
83                 + headValue.intValue() + " in DrainOrder Queue head " );
84       }
85 
86       headValue.add(takeCount);
87       if (headValue.intValue() == 0) {
88         queue.removeFirst();    //获取并移除此双端队列第一个元素。
89       }
90       overflowCounter -= takeCount;
91     }
92 
93   }
View Code

  我们一个方法一个方法的来剖析这个类:

  (1)dump(),这个方法比较简单就是获得queue中所有元素的数据量;

  (2)putPrimary(Integer eventCount),这个方法用在put操作的commit时,在commitPutsToPrimary()方法中被调用,表示向内存提交数据。这个方法会尝试获取queue中最后一个元素,如果为空(说明没数据)或者元素数值小于0(说明这个元素是面向溢出文件的),就新建一个元素赋值这个事务的event数量加入queue;否则表示当前是的元素表征的是内存中的event数量,直接累加即可。

  (3)putFirstPrimary(Integer eventCount),在doRollback()回滚的时候被调用,表示将takeList中的数据放回内存memQueue的头。这个方法会尝试获取queue中第一个元素,如果为空(说明没数据)或者元素数值小于0(说明这个元素是面向溢出文件的),就新建一个元素赋值takeList的event数量加入queue;否则表示当前是的元素表征的是内存中的event数量,直接累加即可。

  (4)putOverflow(Integer eventCount),这个方法发生在put操作的commit时,在commitPutsToOverflow_core方法和start()方法中,后者是设置初始量,前者表示内存channel已满要溢出到file channel。这个方法会尝试获取queue中最后一个元素,如果为空(说明没数据)或者元素数值大于0(表示这个元素是面向内存的),就新建一个元素赋值这个事务的event数量加入queue,这里赋值为负数;否则表示当前是的元素表征的是溢出文件中的event数量,直接累加负数即可。

  (5)putFirstOverflow(Integer eventCount),在doRollback()回滚的时候被调用,表示将takeList中event的数量放回溢出文件。这个方法会尝试获取queue中第一个元素,如果为空(说明没数据)或者元素数值大于0(表示这个元素是面向内存的),就新建一个元素赋值这个事务的 event数量加入queue,这里赋值为负数;否则表示当前是的元素表征的是溢出到文件中的event数量,直接累加负数即可。

  (6)front(),返回queue中第一个元素的值

  (7)takePrimary(int takeCount),这个方法在doTake()中被调用,表示take发生之后,要将内存中的event数量减takeCount(这个值一般都是1,即每次取一个)。这个方法会获取第一个元素的值(表示内存channel中有多少event),如果这个值比takeCount小,说明内存中没有足够的数量,这种情况不应该发生,报错;否则将这个元素的值减去takeCount,表示已取出takeCount个。最后如果这个元素的值为0,则从queue中删除这个元素。注意这里虽然是可以取takeCount个,但是源码调用这个参数都是一次取1个而已。

  (8)takeOverflow(int takeCount),这个方法在doTake()中被调用,表示take发生之后,要将溢出文件中的event数量加上takeCount(这个值一般都是1,即每次取一个)。这个 方法会获取第一个元素的值(表示溢出文件中有多少event),如果这个值比takeCount的负值大,说明文件中没有足够的数量,这种情况不应该发生,报错;否则将这个元素的值加上takeCount,表示已取出takeCount个。最后如果这个元素的值为0,则从queue中删除这个元素。注意这里虽然是可以取 takeCount个,但是源码调用这个参数都是一次取1个而已。

相关文章:

  • 2021-06-26
  • 2021-08-28
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2021-12-24
  • 2021-05-28
猜你喜欢
  • 2021-08-16
  • 2022-02-18
  • 2021-06-17
  • 2021-11-08
  • 2022-12-23
  • 2021-07-23
  • 2021-09-02
相关资源
相似解决方案