【问题标题】:Modify connector config in KafkaConnect before sending to task在发送到任务之前修改 KafkaConnect 中的连接器配置
【发布时间】:2019-01-29 15:12:41
【问题描述】:

我正在 Kafka Connect 中编写 SinkConnector 并遇到问题。这个连接器有这样的配置:

{
    "connector.class" : "a.b.ExampleFileSinkConnector",
    "tasks.max" : '1',
    "topics" : "mytopic",
    "maxFileSize" : "50"
}

我这样定义连接器的配置:

@Override public ConfigDef config()
  {
    ConfigDef result = new ConfigDef();
    result.define("maxFileSize", Type.STRING, "10", Importance.HIGH, "size of file");
    return result;
  }

在连接器中,我这样启动任务:

@Override public List<Map<String, String>> taskConfigs(int maxTasks) {
  List<Map<String, String>> result = new ArrayList<Map<String,String>>();
  for (int i = 0; i < maxTasks; i++) {
    Map<String, String> taskConfig = new HashMap<>();
    taskConfig.put("connectorName",   connectorName);
    taskConfig.put("taskNumber",      Integer.toString(i));
    taskConfig.put("maxFileSize",     maxFileSize);
    result.add(taskConfig);
  }
  return result;
}

一切顺利。

但是,当启动任务时(在 taskConfigs() 中),如果我添加这个:

taskConfig.put("epoch", "123");

这会破坏整个基础架构:所有连接器都会在无限循环中停止并重新启动。

连接日志文件中没有任何异常或错误可以提供帮助。

使其工作的唯一方法是在连接器配置中添加“epoch”,我不想这样做,因为它是连接器必须发送给任务的内部参数。它不打算暴露给连接器的用户。

我注意到的另一点是,除了将其设置为默认值之外,无法更新任何连接器配置参数的值。更改参数并将其发送到任务会产生相同的行为。

非常感谢您在此问题上的任何帮助。

编辑:这里是 SinkTask::start() 的代码

@Override public void start(Map<String, String> taskConfig) {
  try {
    connectorName   = taskConfig.get("connectorName");
    log.info("{} -- Task.start()", connectorName);
    fileNamePattern = taskConfig.get("fileNamePattern");
    rootDir         = taskConfig.get("rootDir");
    fileExtension   = taskConfig.get("fileExtension");
    maxFileSize     = SimpleFileSinkConnector.parseIntegerConfig(taskConfig.get("maxFileSize"));
    maxTimeMinutes  = SimpleFileSinkConnector.parseIntegerConfig(taskConfig.get("maxTimeMinutes"));
    maxNumRecords   = SimpleFileSinkConnector.parseIntegerConfig(taskConfig.get("maxNumRecords"));
    taskNumber      = SimpleFileSinkConnector.parseIntegerConfig(taskConfig.get("taskNumber"));
    epochStart      = SimpleFileSinkConnector.parseLongConfig(taskConfig.get("epochStart"));
    log.info("{} -- fileNamePattern: {}, rootDir: {}, fileExtension: {}, maxFileSize: {}, maxTimeMinutes: {}, maxNumRecords: {}, taskNumber: {}, epochStart : {}",
            connectorName, fileNamePattern, rootDir, fileExtension, maxFileSize, maxTimeMinutes, maxNumRecords, taskNumber, epochStart);
    if (taskNumber == 0) {
      checkTempFilesForPromotion();
    }
    computeInitialFilename();
    log.info("{} -- Task.start() END", connectorName);
  } catch (Exception e) {
    log.info("{} -- Task.start() EXCEPTION : {}", connectorName, e.getLocalizedMessage());
  }
}

【问题讨论】:

  • 你能添加你的SinkTask::start(...)源代码吗?
  • 您曾写道,“没有异常或错误”,但“这会破坏整个基础架构”。可以用“无限循环”添加部分日志吗?

标签: apache-kafka apache-kafka-connect


【解决方案1】:

我们找到了问题的根本原因。 Kafka Connect 框架实际上按设计运行 - 问题与我们如何尝试使用 taskConfigs 配置框架有关。

问题

在我们的设计中,FileSinkConnector 在它的 start() 生命周期方法中设置了一个 epoch,并且这个 epoch 通过 taskConfigs() 生命周期方法传递给它的任务。所以每次连接器的 start() 生命周期方法运行时,都会为任务生成不同的配置——这就是问题所在。

每次生成不同的配置是不行的。事实证明,Connect Framework 检测到配置差异,并将在检测到时重新启动/重新平衡 - 停止和重新启动连接器/任务。重新启动将调用连接器的 stop() 和 start() 方法......这(当然)会产生另一个配置更改(因为新的时代),并且恶性循环开始了!

这是一个有趣且出乎意料的问题...由于 Connect 中的一种行为,我们并不欣赏。这是我们第一次尝试生成不是连接器配置的简单功能的任务配置。

请注意,Connect 中的这种行为是有意为之的,它解决了动态更改配置的实际问题 - 就像 JDBC Sink 连接器在检测到它想要接收的新数据库表时自发更新其配置。

感谢帮助我们的人!

【讨论】:

    猜你喜欢
    • 2011-05-11
    • 1970-01-01
    • 2014-02-06
    • 2012-03-15
    • 1970-01-01
    • 2015-08-16
    • 1970-01-01
    • 2020-10-26
    • 1970-01-01
    相关资源
    最近更新 更多