【发布时间】:2016-03-15 17:00:20
【问题描述】:
TL;DR
当 Flume 源无法将事务推送到管道中的下一个通道时,它是否始终保留事件实例以供下次尝试使用?
一般来说,有一个 有状态 Flume 拦截器是否安全,其中事件的处理取决于先前处理的事件?
完整的问题描述:
我正在考虑利用 Apache Kafka 提供的关于主题分区在消费者组中的消费者之间分布方式的保证的可能性,以在现有的基于 Flume 的日志整合架构中执行流式重复数据删除。
使用 Flume 的 Kafka Source 和自定义路由到 Kafka 主题分区,我可以确保应该进入相同逻辑“重复数据删除队列”的每个事件都将由集群中的单个 Flume 代理处理(只要集群内没有代理停止/启动)。我使用定制的 Flume 拦截器进行了以下设置:
[带有去重拦截器的KafkaSource]-->()MemoryChannel)-->[HDFSSink]
似乎当 Flume Kafka 源运行器无法将一批事件推送到内存通道时,属于该批的事件实例再次传递给我的拦截器的 intercept() 方法。在这种情况下,很容易将标签(以 Flume 事件标头的形式)添加到已处理的事件中,以区分实际重复的事件与重新处理的失败批次中的事件。
但是,我想知道是否有任何明确保证失败事务中的事件实例被保留以供下次尝试,或者是否有可能从实际源(在本例中为 Kafka)再次读取事件并从零开始重建。在这种情况下,我的拦截器会认为这些事件是重复的并丢弃它们,即使它们从未传递到通道。
编辑
这就是我的拦截器如何区分已经处理的事件实例和未处理的事件:
public Event intercept(Event event) {
Map<String,String> headers = event.getHeaders();
// tagHeaderName is the name of the header used to tag events, never null
if( !tagHeaderName.isEmpty() ) {
// Don't look further if event was already processed...
if( headers.get(tagHeaderName)!=null )
return event;
// Mark it as processed otherwise...
else
headers.put(tagHeaderName, "");
}
// Continue processing of event...
}
【问题讨论】:
标签: apache-kafka flume flume-ng