我也对这个连接器任务感到困惑。
一开始我以为kafka connect解决了多客户端数据重复的问题。但实际上kafka connect选择了其他方式来避免这个问题。我跟踪了源代码并找到了它的实际实现。
(我现在无法发布图片,您可以点击链接查看图片。)
首先是 taskConfigs 接口:
taskConfigs interface
/**
* Returns a set of configurations for Tasks based on the current configuration,
* producing at most count configurations.
*
* @param maxTasks maximum number of configurations to generate
* @return configurations for Tasks
*/
public abstract List<Map<String, String>> taskConfigs(int maxTasks);
然后是对这个接口的调用:
call for this interface
org.apache.kafka.connect.runtime.distributed.DistributedHerder:
final List<Map<String, String>> taskProps = worker.connectorTaskConfigs(connName, connConfig);
boolean changed = false;
int currentNumTasks = configState.taskCount(connName);
if (taskProps.size() != currentNumTasks) {
log.debug("Change in connector task count from {} to {}, writing updated task configurations", currentNumTasks, taskProps.size());
changed = true;
}
kafka connect 如何使用任务道具:
how kafka connect use task props
if (changed) {
List<Map<String, String>> rawTaskProps = reverseTransform(connName, configState, taskProps);
if (isLeader()) {
configBackingStore.putTaskConfigs(connName, rawTaskProps);
cb.onCompletion(null, null);
}
这意味着 kafka connect 使用任务配置来创建内部线程。
所以有多少任务运行是由 taskConfigs 方法上的连接器实现决定的。
让我们看看在 mqtt 源连接器上的实现:
implementation on mqtt source connector
public List<Map<String, String>> taskConfigs(int maxTasks) {
List<Map<String, String>> result = new ArrayList<>();
int taskId = 0;
for (List<String> mqttTopics : (Iterable<List<String>>)ConnectorUtils.groupPartitions(this.config.mqttTopics, maxTasks)) {
if (mqttTopics.isEmpty())
continue;
Map<String, String> settings = new LinkedHashMap<>(this.settings);
settings.put("mqtt.topics", Joiner.on(',').join(mqttTopics));
settings.put("task.id", Integer.toString(taskId++));
result.add(settings);
}
return result;
}
此任务按 mqtt 主题分组。
实际上我只是声明了一个主题过滤器,所以我总是得到一个 mqtt 源任务。将运行多少任务取决于连接器上的 taskConfigs 方法实现,输入是连接器配置中的最大任务..
您可以阅读此方法的连接器上的源代码实现。