基于ThriftSource,MemoryChannel,HdfsSink三个组件,对Flume数据传输的事务进行分析,如果使用的是其他组件,Flume事务具体的处理方式将会不同。

Flume的事务处理原理: 

Flume在对Channel进行Put和Take操作的时候,必须要用事物包住,比如:

  • Channel ch = new MemoryChannel();
  • Transaction txn = ch.getTransaction();
  • //事物开始
  • txn.begin();
  • try {
  •    Event eventToStage = EventBuilder.withBody(\"Hello Flume!\",
  •    Charset.forName(\"UTF-8\"));
  •    //往临时缓冲区Put数据
  •    ch.put(eventToStage);
  •    //或者ch.take()
  •    //将这些数据提交到channel中
  •     txn.commit();
  • } catch (Throwable t) {
  •   txn.rollback();
  •   if (t instanceof Error) {
  •     throw (Error)t;
  •   }
  • } finally {
  •   txn.close();
  • }
  • Put事务流程

    Put事务可以分为以下阶段:

    • doPut:将批数据先写入临时缓冲区putList
    • doCommit:检查channel内存队列是否足够合并。
    • doRollback:channel内存队列空间不足,抛弃数据   (这个地方个人理解可能会存在数据丢失)

    我们从Source数据接收到写入Channel这个过程对Put事物进行分析。

    理解FlumeNG的batchSize和transactionCapacity参数和传输事务的原理 【转】

    ThriftSource会spawn多个Worker线程(ThriftSourceHandler)去处理数据,Worker处理数据的接口,我们只看batch批量处理这个接口:

  • @Override
  • public Status appendBatch(List<ThriftFlumeEvent> events) throws TException { 
  •       List<Event> flumeEvents = Lists.newArrayList(); 
  •       for(ThriftFlumeEvent event : events) { 
  •         flumeEvents.add(EventBuilder.withBody(event.getBody(), event.getHeaders())); 
  •       } 
  •       //ChannelProcessor,在Source初始化的时候传进来.将数据写入对应的Channel
  •       getChannelProcessor().processEventBatch(flumeEvents); 
  •         ... 
  •       return Status.OK; 
  •     }

  • 事务逻辑都在processEventBatch这个方法里:

  • public void processEventBatch(List<Event> events) {
  •     ...
  •     //预处理每行数据,有人用来做ETL嘛
  •     events = interceptorChain.intercept(events);
  •     ...
  •     //分类数据,划分不同的channel集合对应的数据
  •     // Process required channels
  •     Transaction tx = reqChannel.getTransaction();
  •     ...
  •         //事务开始,tx即MemoryTransaction类实例
  •         tx.begin();
  •         List<Event> batch = reqChannelQueue.get(reqChannel);
  •         for (Event event : batch) {
  •           // 这个put操作实际调用的是transaction.doPut
  •           reqChannel.put(event);
  •         }
  •         //提交,将数据写入Channel的队列中
  •         tx.commit();
  •       } catch (Throwable t) {
  •         //回滚
  •         tx.rollback();
  •         ...
  •       }
  •     }
  •     ...
  •   }
  •  

    每个Worker线程都拥有一个Transaction实例,保存在Channel(BasicChannelSemantics)里的ThreadLocal变量currentTransaction.

    那么,事务到底做了什么?
    理解FlumeNG的batchSize和transactionCapacity参数和传输事务的原理 【转】

    实际上,Transaction实例包含两个双向阻塞队列LinkedBlockingDeque(感觉没必要用双向队列,每个线程写自己的putList,又不是多个线程?),分别为:

    • putList
    • takeList

    对于Put事物操作,当然是只用到putList了。putList就是一个临时的缓冲区,数据会先put到putList,最后由commit方法会检查channel是否有足够的缓冲区,有则合并到channel的队列。
    channel.put -> transaction.doPut:

  • protected void doPut(Event event) throws InterruptedException {
  •       //计算数据字节大小
  •       int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize);
  •       //写入临时缓冲区putList
  •       if (!putList.offer(event)) {
  •         throw new ChannelException(
  •           \"Put queue for MemoryTransaction of capacity \" +
  •             putList.size() + \" full, consider committing more frequently, \" +
  •             \"increasing capacity or increasing thread count\");
  •       }
  •       putByteCounter += eventByteSize;
  •     }
  • transaction.commit:

  •     @Override
  •     protected void doCommit() throws InterruptedException {
  •       //检查channel的队列剩余大小是否足够
  •       ...
  •       int puts = putList.size();
  •       ...
  •       synchronized(queueLock) {
  •         if(puts > 0 ) {
  •           while(!putList.isEmpty()) {
  •             //写入到channel的队列
  •             if(!queue.offer(putList.removeFirst())) {
  •               throw new RuntimeException(\"Queue add failed, this shouldn\'t be able to happen\");
  •             }
  •           }
  •         }
  •         //清除临时队列
  •         putList.clear();
  •         ...
  •       }
  •       ...
  •     }
  • 如果在事务期间出现异常,比如channel剩余空间不足,则rollback:

  • @Override
  •     protected void doRollback() {
  •     ...
  •         //抛弃数据,没合并到channel的内存队列
  •         putList.clear();
  •       ...
  •     }



  • Take事务

     

    Take事务分为以下阶段:

    • doTake:先将数据取到临时缓冲区takeList
    • 将数据发送到下一个节点
    • doCommit:如果数据全部发送成功,则清除临时缓冲区takeList
    • doRollback:数据发送过程中如果出现异常,rollback将临时缓冲区takeList中的数据归还给channel内存队列。

    理解FlumeNG的batchSize和transactionCapacity参数和传输事务的原理 【转】

    Sink其实是由SinkRunner线程调用Sink.process方法来了处理数据的。我们从HdfsEventSink的process方法说起,Sink类都有个process方法,用来处理传输数据的逻辑。:

  • public Status process() throws EventDeliveryException {
  •     ...
  •     Transaction transaction = channel.getTransaction();
  •     ...
  •     //事务开始
  •     transaction.begin();
  •     ...
  •       for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
  •         //take数据到临时缓冲区,实际调用的是transaction.doTake
  •         Event event = channel.take();
  •         if (event == null) {
  •           break;
  •         }
  •         ...
  •       //写数据到HDFS
  •       bucketWriter.append(event);
  •       ...
  •       // flush all pending buckets before committing the transaction
  •       for (BucketWriter bucketWriter : writers) {
  •         bucketWriter.flush();
  •       }
  •       //commit
  •       transaction.commit();
  •       ...
  •     } catch (IOException eIO) {
  •       transaction.rollback();
  •       ...
  •     } finally {
  •       transaction.close();
  •     }
  •   }
  •  

    大致流程图:
    理解FlumeNG的batchSize和transactionCapacity参数和传输事务的原理 【转】

    接着看看channel.take,作用是将数据放到临时缓冲区,实际调用的是transaction.doTake:

  • protected Event doTake() throws InterruptedException {
  •       ...
  •       //从channel内存队列取数据
  •       synchronized(queueLock) {
  •         event = queue.poll();
  •       }
  •       ...
  •       //将数据放到临时缓冲区
  •       takeList.put(event);
  •       ...
  •       return event;
  •     }
  • 接着,HDFS写线程bucketWriter将take到的数据写到HDFS,如果批数据都写完了,则要commit了:

  • protected void doCommit() throws InterruptedException {
  •     ...
  •     takeList.clear();
  •     ...
  • }
  •  

    很简单,其实就是清空takeList而已。如果bucketWriter在写数据到HDFS的时候出现异常,则要rollback:

  • protected void doRollback() {
  •       int takes = takeList.size();
  •       //检查内存队列空间大小,是否足够takeList写回去
  •       synchronized(queueLock) {
  •         Preconditions.checkState(queue.remainingCapacity() >= takeList.size(), \"Not enough space in memory channel \" +
  •             \"queue to rollback takes. This should never happen, please report\");
  •         while(!takeList.isEmpty()) {
  •           queue.addFirst(takeList.removeLast());
  •         }
  •         ...
  •       }
  •       ...
  •     }
  •  

    读完代码可见 

    batchSize是针对Source和Sink提出的一个概念,它用来限制source和sink对event批量处理的。

    即一次性你可以处理batchSize个event,这个一次性就是指在一个事务中。

    这个参数值越大,每个事务提交的范围就越大,taskList的清空等操作次数会减少,因此性能肯定会提升,但是可能在出错时,回滚的返回也会变大。

    接下来看一下
     

    内存通道中的内部类MemoryTransaction:

     private class MemoryTransaction extends BasicTransactionSemantics {
        private LinkedBlockingDeque takeList;
        private LinkedBlockingDeque putList;
        private final ChannelCounter channelCounter;
        private int putByteCounter = 0;
        private int takeByteCounter = 0;
    
        public MemoryTransaction(int transCapacity, ChannelCounter counter) {
          putList = new LinkedBlockingDeque(transCapacity);
          takeList = new LinkedBlockingDeque(transCapacity);
    
          channelCounter = counter;
        }

    可见transactionCapacity参数其实 

    相关文章:

    • 2021-06-18
    • 2022-12-23
    • 2022-12-23
    • 2022-03-06
    • 2021-06-24
    • 2022-12-23
    • 2021-08-16
    • 2022-12-23
    猜你喜欢
    • 2022-12-23
    • 2022-02-02
    • 2021-09-16
    • 2022-12-23
    • 2022-12-23
    • 2021-04-06
    • 2022-12-23
    相关资源
    相似解决方案