【问题标题】:kafka connect - auditing - triggering an event when tasks finishedkafka connect - 审计 - 任务完成时触发事件
【发布时间】:2017-10-17 22:33:22
【问题描述】:

我们正在使用 kafka 构建异常管理工具。将有源连接器 - 它将从物理文件中提取记录。另一方面,将有 sink connect (mongodb-sinkconnect),它将从主题中提取记录并将其推送到 mongoDb。一切正常。

我们需要在不同的主题中捕获事件(出于审计目的)。 诸如

之类的事件
  1. 源任务(文件轮询任务)启动事件 例如,如果收到文件 A
  2. 源任务(文件轮询任务)结束事件 例如,如果文件 A 已完全处理
  3. Sink Task(推送记录到mongodb任务)启动事件 例如,文件 A 的记录由 mongodb-connect 开始处理
  4. Sink 任务(推送记录到 mongodb 任务)结束事件 示例,文件 A 的记录完全推送到 MongoDB

我有几个问题: 1.我们可以通过在SourceTask中实例化一个KafkaProducer来向不同的主题发送事件,一旦文件处理完毕,我们就发送一个事件

public class FileSourceTask extends SourceTask {
    private Producer<Key, Event> auditProducer;

    public void start(Map<String, String> props) {
       auditProducer = new KafkaProducer<Key, Event>(auditProps);
    }

    public List<SourceRecord> poll() {
        List<SourceRecord> results = this.filePoller.poll();
        if(results.isEmpty() && eventNotSentForCurrentFile) {
          Event event = new Event();
          auditProducer.send(
          new ProducerRecord<Key, Event>(this.props.get("event.topic"), key, event));

        }
       // futher processing  
     }

上述方法是否正确?

  1. 上述解决方案工作正常 - 因为它运行一个任务 (maxTasks = 1),但对于我们的用例,在接收任务 (mongoDB 连接) 中实现这一点非常困难。由于该主题是分区的,因此将创建许多任务。我们无法跟踪接收任务的开始事件和结束事件

请提出解决此问题的方法。

非常感谢。

【问题讨论】:

    标签: java apache-kafka apache-kafka-connect


    【解决方案1】:

    我认为,您可以围绕 Kafka-connect ReST API 构建一些东西

    https://docs.confluent.io/current/connect/restapi.html#get--connectors-(string-name)-status

    但是有了这个,你需要让观察者保持连接器状态,一旦连接器的所有任务都完成了,你就可以采取行动了。

    【讨论】:

    • 这并不能真正回答问题,因为连接器没有“完成”状态,只有“正在运行/暂停/未分配/失败”
    猜你喜欢
    • 2021-05-06
    • 1970-01-01
    • 2018-10-08
    • 2020-09-12
    • 2012-06-06
    • 1970-01-01
    相关资源
    最近更新 更多