【发布时间】:2019-01-29 15:12:41
【问题描述】:
我正在 Kafka Connect 中编写 SinkConnector 并遇到问题。这个连接器有这样的配置:
{
"connector.class" : "a.b.ExampleFileSinkConnector",
"tasks.max" : '1',
"topics" : "mytopic",
"maxFileSize" : "50"
}
我这样定义连接器的配置:
@Override public ConfigDef config()
{
ConfigDef result = new ConfigDef();
result.define("maxFileSize", Type.STRING, "10", Importance.HIGH, "size of file");
return result;
}
在连接器中,我这样启动任务:
@Override public List<Map<String, String>> taskConfigs(int maxTasks) {
List<Map<String, String>> result = new ArrayList<Map<String,String>>();
for (int i = 0; i < maxTasks; i++) {
Map<String, String> taskConfig = new HashMap<>();
taskConfig.put("connectorName", connectorName);
taskConfig.put("taskNumber", Integer.toString(i));
taskConfig.put("maxFileSize", maxFileSize);
result.add(taskConfig);
}
return result;
}
一切顺利。
但是,当启动任务时(在 taskConfigs() 中),如果我添加这个:
taskConfig.put("epoch", "123");
这会破坏整个基础架构:所有连接器都会在无限循环中停止并重新启动。
连接日志文件中没有任何异常或错误可以提供帮助。
使其工作的唯一方法是在连接器配置中添加“epoch”,我不想这样做,因为它是连接器必须发送给任务的内部参数。它不打算暴露给连接器的用户。
我注意到的另一点是,除了将其设置为默认值之外,无法更新任何连接器配置参数的值。更改参数并将其发送到任务会产生相同的行为。
非常感谢您在此问题上的任何帮助。
编辑:这里是 SinkTask::start() 的代码
@Override public void start(Map<String, String> taskConfig) {
try {
connectorName = taskConfig.get("connectorName");
log.info("{} -- Task.start()", connectorName);
fileNamePattern = taskConfig.get("fileNamePattern");
rootDir = taskConfig.get("rootDir");
fileExtension = taskConfig.get("fileExtension");
maxFileSize = SimpleFileSinkConnector.parseIntegerConfig(taskConfig.get("maxFileSize"));
maxTimeMinutes = SimpleFileSinkConnector.parseIntegerConfig(taskConfig.get("maxTimeMinutes"));
maxNumRecords = SimpleFileSinkConnector.parseIntegerConfig(taskConfig.get("maxNumRecords"));
taskNumber = SimpleFileSinkConnector.parseIntegerConfig(taskConfig.get("taskNumber"));
epochStart = SimpleFileSinkConnector.parseLongConfig(taskConfig.get("epochStart"));
log.info("{} -- fileNamePattern: {}, rootDir: {}, fileExtension: {}, maxFileSize: {}, maxTimeMinutes: {}, maxNumRecords: {}, taskNumber: {}, epochStart : {}",
connectorName, fileNamePattern, rootDir, fileExtension, maxFileSize, maxTimeMinutes, maxNumRecords, taskNumber, epochStart);
if (taskNumber == 0) {
checkTempFilesForPromotion();
}
computeInitialFilename();
log.info("{} -- Task.start() END", connectorName);
} catch (Exception e) {
log.info("{} -- Task.start() EXCEPTION : {}", connectorName, e.getLocalizedMessage());
}
}
【问题讨论】:
-
你能添加你的
SinkTask::start(...)源代码吗? -
您曾写道,“没有异常或错误”,但“这会破坏整个基础架构”。可以用“无限循环”添加部分日志吗?
标签: apache-kafka apache-kafka-connect