【问题标题】:Kafka Connector always works as a single taskKafka 连接器始终作为单个任务工作
【发布时间】:2020-09-08 21:04:10
【问题描述】:

我正在尝试使用 kafka 将数据从 oracleDB 传输到 mongoDB。 所以我像上图一样配置了kafka集群。 我知道调整分区和 tasks.max 允许并行处理。 但是,当我运行连接器时,它始终作为单个任务运行,无法并行处理。 我需要做任何其他设置吗?

这是我的配置。

  1. 主题创建

bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092,127.0.0.2:9092,127.0.0.1:9093 --partitions 3 --topic 主题A

  1. 连接器配置

    {
    "name": "rawsumc-source",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url": "jdbc:oracle:thin:@127.0.0.1:1521/orcl",
        "connection.user": "test",
        "connection.password": "test",
        "topic.prefix": "topicA",
        "mode": "bulk",
        "poll.interval.ms": "360000000",
        "numeric.mapping": "best_fit",
        "tasks.max": "10",
        "connection.type": "lz4",
        "query": "select CAST(NO_TT AS NUMBER(10,0)) AS NO_TT,CAST(NO_SEQ AS NUMBER(10,0)) AS NO_SEQ,DNT_CLCT from table_a",
        "name": "rawsumc-source"
    },
    "tasks": [
        {
            "connector": "rawsumc-source",
            "task": 0
        }
    ],
    "type": "source"}
    

【问题讨论】:

    标签: apache-kafka apache-kafka-connect mongodb-kafka-connector


    【解决方案1】:

    根据docs

    tasks.max - 应该为此连接器创建的最大 个任务。如果连接器无法达到这种并行度,它可能会创建更少个任务。

    使用 JdbcSourceConnector limits 的自定义查询来执行单个任务。

    【讨论】:

    • 亚历山大感谢您的回复。但是当我在没有自定义查询的情况下进行测试时,它也可以作为一项任务。连接器集群中有两个worker启动并运行,主题分区为三个。
    • 我只尝试一张桌子。 "table.whitelist": "TA_TEST"
    • 没关系,因为对于表模式,实际任务数是 max.tasks 和表数的最小值。请查看sources
    【解决方案2】:

    我也对这个连接器任务感到困惑。 一开始我以为kafka connect解决了多客户端数据重复的问题。但实际上kafka connect选择了其他方式来避免这个问题。我跟踪了源代码并找到了它的实际实现。

    (我现在无法发布图片,您可以点击链接查看图片。) 首先是 taskConfigs 接口: taskConfigs interface

        /**
     * Returns a set of configurations for Tasks based on the current configuration,
     * producing at most count configurations.
     *
     * @param maxTasks maximum number of configurations to generate
     * @return configurations for Tasks
     */
    public abstract List<Map<String, String>> taskConfigs(int maxTasks);
    

    然后是对这个接口的调用: call for this interface

    org.apache.kafka.connect.runtime.distributed.DistributedHerder:
      final List<Map<String, String>> taskProps = worker.connectorTaskConfigs(connName, connConfig);
            boolean changed = false;
            int currentNumTasks = configState.taskCount(connName);
            if (taskProps.size() != currentNumTasks) {
                log.debug("Change in connector task count from {} to {}, writing updated task configurations", currentNumTasks, taskProps.size());
                changed = true;
            } 
    

    kafka connect 如何使用任务道具: how kafka connect use task props

    if (changed) {
                List<Map<String, String>> rawTaskProps = reverseTransform(connName, configState, taskProps);
                if (isLeader()) {
                    configBackingStore.putTaskConfigs(connName, rawTaskProps);
                    cb.onCompletion(null, null);
                }
    

    这意味着 kafka connect 使用任务配置来创建内部线程。 所以有多少任务运行是由 taskConfigs 方法上的连接器实现决定的。

    让我们看看在 mqtt 源连接器上的实现: implementation on mqtt source connector

       public List<Map<String, String>> taskConfigs(int maxTasks) {
     List<Map<String, String>> result = new ArrayList<>();
     int taskId = 0;
     for (List<String> mqttTopics : (Iterable<List<String>>)ConnectorUtils.groupPartitions(this.config.mqttTopics, maxTasks)) {
       if (mqttTopics.isEmpty())
         continue; 
       Map<String, String> settings = new LinkedHashMap<>(this.settings);
       settings.put("mqtt.topics", Joiner.on(',').join(mqttTopics));
       settings.put("task.id", Integer.toString(taskId++));
       result.add(settings);
     } 
     return result;
    

    } 此任务按 mqtt 主题分组。 实际上我只是声明了一个主题过滤器,所以我总是得到一个 mqtt 源任务。将运行多少任务取决于连接器上的 taskConfigs 方法实现,输入是连接器配置中的最大任务.. 您可以阅读此方法的连接器上的源代码实现。

    【讨论】:

      猜你喜欢
      • 2021-08-16
      • 1970-01-01
      • 1970-01-01
      • 2020-05-08
      • 2020-06-05
      • 2021-11-23
      • 2020-11-01
      • 1970-01-01
      • 2022-09-26
      相关资源
      最近更新 更多