【问题标题】:custom Flume interceptor: intercept() method called multiple times for the same Event自定义 Flume 拦截器:为同一个 Event 多次调用 intercept() 方法
【发布时间】:2016-03-15 17:00:20
【问题描述】:

TL;DR

当 Flume 源无法将事务推送到管道中的下一个通道时,它是否始终保留事件实例以供下次尝试使用?

一般来说,有一个 有状态 Flume 拦截器是否安全,其中事件的处理取决于先前处理的事件?

完整的问题描述:

我正在考虑利用 Apache Kafka 提供的关于主题分区在消费者组中的消费者之间分布方式的保证的可能性,以在现有的基于 Flume 的日志整合架构中执行流式重复数据删除。

使用 Flume 的 Kafka Source 和自定义路由到 Kafka 主题分区,我可以确保应该进入相同逻辑“重复数据删除队列”的每个事件都将由集群中的单个 Flume 代理处理(只要集群内没有代理停止/启动)。我使用定制的 Flume 拦截器进行了以下设置:

[带有去重拦截器的KafkaSource]-->()MemoryChannel)-->[HDFSSink]

似乎当 Flume Kafka 源运行器无法将一批事件推送到内存通道时,属于该批的事件实例再次传递给我的拦截器的 intercept() 方法。在这种情况下,很容易将标签(以 Flume 事件标头的形式)添加到已处理的事件中,以区分实际重复的事件与重新处理的失败批次中的事件。

但是,我想知道是否有任何明确保证失败事务中的事件实例被保留以供下次尝试,或者是否有可能从实际源(在本例中为 Kafka)再次读取事件并从零开始重建。在这种情况下,我的拦截器会认为这些事件是重复的并丢弃它们,即使它们从未传递到通道。

编辑

这就是我的拦截器如何区分已经处理的事件实例和未处理的事件:

public Event intercept(Event event) {
  Map<String,String> headers = event.getHeaders();
  // tagHeaderName is the name of the header used to tag events, never null
  if( !tagHeaderName.isEmpty() ) {
    // Don't look further if event was already processed...
    if( headers.get(tagHeaderName)!=null )
      return event;
    // Mark it as processed otherwise...
    else
      headers.put(tagHeaderName, "");
  }
  // Continue processing of event...
}

【问题讨论】:

    标签: apache-kafka flume flume-ng


    【解决方案1】:

    我遇到了类似的问题:

    当 sink 写入失败时,Kafka Source 仍然持有已经被拦截器处理的数据。在下一次尝试中,这些数据将发送到拦截器,并被一次又一次地处理。通过阅读KafkaSource的代码,我相信它是错误的。

    我的拦截器将从原始消息中删除一些信息,并修改原始消息。由于此错误,重试机制将永远无法按预期工作。

    到目前为止,这不是一个简单的解决方案。

    【讨论】:

    • 同一个Kafka事件在与通道的事务失败时被拦截器再次处理,这本身不是问题;毕竟,事件无法传递,因此需要重试。我想知道的是,是否可以保证 JVM 中的相同事件 instance 以及拦截器所做的任何修改都会再次使用,或者是否有可能(或者在我的情况下, 风险),该事件将从它来自的 Kafka 主题中再次读取。
    • 请注意,如果您只是想在拦截器的代码中区分已处理的事件实例与另一个事件实例,您可以在事件中添加 tag 标头并跳过处理带有此 tag 标头的事件数。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-08-18
    • 1970-01-01
    • 2011-10-21
    • 1970-01-01
    • 1970-01-01
    • 2015-08-14
    相关资源
    最近更新 更多