【问题标题】:Implement dockerized kafka sink connector to mongo将 dockerized kafka sink 连接器实现到 mongo
【发布时间】:2019-12-23 22:23:53
【问题描述】:

我正在尝试使用 docker 实现与 mongodb 和 mysql 的 kafka 连接。

我想要的是下图:

Kafka Connect MongoDB:

我见过official mongodb repository 的docker-compose。它有两个问题:

  1. 这对我来说太复杂了。因为它已经运行了多个 mongodb 容器,并且还使用了许多消耗大量资源的图像。

  2. 它有一些问题没有解决,最终导致 kafka 到 mongodb 连接出现故障。 Here你可以看到我的问题。

我在 docker-compose.yml 中使用 debezium 进行连接的实现如下:

version: '3.2'
services:
  kafka:
    image: wurstmeister/kafka:latest
    ports:
      - target: 9094
        published: 9094
        protocol: tcp
        mode: host
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092
      KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_LOG_DIRS: /kafka/logs
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
      - kafka:/kafka

  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
    volumes:
      - zookeeper:/opt/zookeeper-3.4.13

  mongo:
    image: mongo
    container_name: mongo
    ports:
      - 27017:27017

  connect:
    image: debezium/connect
    container_name: connect
    ports:
      - 8083:8083
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=1
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets

volumes:
  kafka:
  zookeeper:

正如@cricket_007 所说,我不应该将debezium 用于我的目的。所以我使用了confluentinc/kafka-connect-datagen 图像。在这里,我在 docker-compose.yml 文件中添加了以下内容,而不是 debezium

connect:
    image: confluentinc/kafka-connect-datagen
    build:
      context: .
      dockerfile: Dockerfile
    hostname: connect
    container_name: connect
    depends_on: 
      - zookeeper
    ports: 
      - 8083:8083
    environment: 
      CONNECT_BOOTSTRAP_SERVERS: 'kafka:9092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
      CONNECT_PLUGIN_PATH: /usr/share/confluent-hub-components
      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      # Assumes image is based on confluentinc/kafka-connect-datagen:latest which is pulling 5.2.2 Connect image
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.2.2.jar
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
    command: "bash -c 'if [ ! -d /usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen ]; then echo \"WARNING: Did not find directory for kafka-connect-datagen (did you remember to run: docker-compose up -d --build ?)\"; fi ; /etc/confluent/docker/run'"
    volumes:
      - ../build/confluent/kafka-connect-mongodb:/usr/share/confluent-hub-components/kafka-connect-mongodb

Dockerfile:

FROM confluentinc/cp-kafka-connect
ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"
RUN  confluent-hub install --no-prompt confluentinc/kafka-connect-datagen

问题:

  1. Kafka-connect-datagen 图像生成虚假数据,正如the repository 中所述,它不适合生产。我想要的只是将Kafka连接到mongodb,不多也不少。明确地说,我如何使用 curl 从 kafka 发送数据并将它们保存在 mongodb 集合中?

  2. 我面临CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL is required. 错误。正如@cricket_007 所说,schema-registry 是可选的。那么我怎样才能摆脱那个图像呢?

  3. 在最后一步,我尝试按照 README.md 中的说明运行存储库的 docker-compose 文件,不幸的是我遇到了另一个错误:

    警告:无法访问 http://localhost:8083 上配置的 kafka 系统 注意:此脚本需要 curl。

  4. 每当我没有对配置进行任何更改时,我都会遇到另一个错误:

Kafka Connectors: 

{"error_code":409,"message":"Cannot complete request momentarily due to stale configuration (typically caused by a concurrent config change)"}

请帮我找到问题的答案。

我的输出:

Building the MongoDB Kafka Connector

> Task :shadowJar
FatJar: /home/mostafa/Documents/Docker/kafka-mongo/build/libs/kafka-mongo-0.3-SNAPSHOT-all.jar (2.108904 MB)

Deprecated Gradle features were used in this build, making it incompatible with Gradle 6.0.
Use '--warning-mode all' to show the individual deprecation warnings.
See https://docs.gradle.org/5.2/userguide/command_line_interface.html#sec:command_line_warnings

BUILD SUCCESSFUL in 4h 26m 25s
7 actionable tasks: 7 executed
Unzipping the confluent archive plugin....

Archive:  ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT.zip
   creating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/
   creating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/etc/
  inflating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/etc/MongoSinkConnector.properties  
  inflating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/etc/MongoSourceConnector.properties  
   creating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/lib/
  inflating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/lib/kafka-mongo-0.3-SNAPSHOT-all.jar  
  inflating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/manifest.json  
   creating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/assets/
  inflating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/assets/mongodb-leaf.png  
  inflating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/assets/mongodb-logo.png  
   creating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/doc/
  inflating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/doc/README.md  
  inflating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/doc/LICENSE.txt  
Starting docker .
Creating volume "docker_rs2" with default driver
Creating volume "docker_rs3" with default driver
Building connect
Step 1/3 : FROM confluentinc/cp-kafka-connect:5.2.2
 ---> 32bb41f78617
Step 2/3 : ENV CONNECT_PLUGIN_PATH="/usr/share/confluent-hub-components"
 ---> Using cache
 ---> 9e4fd4f10a38
Step 3/3 : RUN  confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:latest
 ---> Using cache
 ---> 5f879008bb73

Successfully built 5f879008bb73
Successfully tagged confluentinc/kafka-connect-datagen:latest
Recreating mongo1 ... 
Recreating mongo1        ... done
Creating mongo3          ... done
Starting broker   ... done
Creating mongo2          ... done
Starting schema-registry ... done
Starting connect         ... done
Creating rest-proxy      ... done
Creating ksql-server              ... done
Creating docker_kafka-topics-ui_1 ... done
Creating control-center           ... done
Creating ksql-cli                 ... done


Waiting for the systems to be ready.............
WARNING: Could not reach configured kafka system on http://localhost:8082 
Note: This script requires curl.



SHUTTING DOWN


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    68  100    68    0     0     23      0  0:00:02  0:00:02 --:--:--    23
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    61  100    61    0     0   4066      0 --:--:-- --:--:-- --:--:--  4066
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    63  100    63    0     0   9000      0 --:--:-- --:--:-- --:--:--  9000
MongoDB shell version v4.0.12
connecting to: mongodb://127.0.0.1:27017/?gssapiServiceName=mongodb
Implicit session: session { "id" : UUID("80ebb904-f81a-4230-b63b-4e62f65fbeb7") }
MongoDB server version: 4.0.12
{
        "ok" : 1,
        "operationTime" : Timestamp(1567235833, 1),
        "$clusterTime" : {
                "clusterTime" : Timestamp(1567235833, 1),
                "signature" : {
                        "hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
                        "keyId" : NumberLong(0)
                }
        }
}
Stopping ksql-cli                 ... done
Stopping control-center           ... done
Stopping docker_kafka-topics-ui_1 ... done
Stopping ksql-server              ... done
Stopping rest-proxy               ... done
Stopping mongo1                   ... done
Stopping mongo2                   ... done
Stopping mongo3                   ... done
Stopping connect                  ... done
Stopping broker                   ... done
Stopping zookeeper                ... done
Removing ksql-cli                 ... 
Removing control-center           ... done
Removing docker_kafka-topics-ui_1 ... done
Removing ksql-server              ... done
Removing rest-proxy               ... done
Removing mongo1                   ... done
Removing mongo2                   ... done
Removing mongo3                   ... done
Removing connect                  ... done
Removing schema-registry          ... done
Removing broker                   ... done
Removing zookeeper                ... done
Removing network docker_default
Removing network docker_localnet

WARNING: Could not reach configured kafka system on http://localhost:8082 
Note: This script requires curl.

【问题讨论】:

    标签: mongodb docker apache-kafka apache-kafka-connect


    【解决方案1】:

    我创建了以下 docker-compose 文件(查看GitHub中的所有文件):

    version: '3.6'
    services:
      zookeeper:
        image: confluentinc/cp-zookeeper:5.1.2
        hostname: zookeeper
        container_name: zookeeper
        ports:
          - "2181:2181"
        networks:
          - localnet
        environment:
          ZOOKEEPER_CLIENT_PORT: 2181
          ZOOKEEPER_TICK_TIME: 2000
    
      broker:
        image: confluentinc/cp-enterprise-kafka:5.1.2
        hostname: broker
        container_name: broker
        depends_on:
          - zookeeper
        ports:
          - "29092:29092"
          - "9092:9092"
        networks:
          - localnet
        environment:
          KAFKA_BROKER_ID: 1
          KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
          KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
          KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
          CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
          CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
          CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
          CONFLUENT_METRICS_ENABLE: 'true'
          CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
    
      connect:
        image: confluentinc/cp-kafka-connect:5.1.2
        build:
          context: .
          dockerfile: Dockerfile
        hostname: connect
        container_name: connect
        depends_on:
          - zookeeper
          - broker
        ports:
          - "8083:8083"
        networks:
          - localnet
        environment:
          CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
          CONNECT_REST_ADVERTISED_HOST_NAME: connect
          CONNECT_REST_PORT: 8083
          CONNECT_GROUP_ID: compose-connect-group
          CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
          CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
          CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
          CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
          CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
          CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
          CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
          CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
          CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
          CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
          CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
          CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
          CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR,com.mongodb.kafka=DEBUG"
          CONNECT_PLUGIN_PATH: /usr/share/confluent-hub-components
          CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
          # Assumes image is based on confluentinc/kafka-connect-datagen:latest which is pulling 5.2.2 Connect image
          CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.2.2.jar
          CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
          CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
        command: "bash -c 'if [ ! -d /usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen ]; then echo \"WARNING: Did not find directory for kafka-connect-datagen (did you remember to run: docker-compose up -d --build ?)\"; fi ; /etc/confluent/docker/run'"
        volumes:
          - ./kafka-connect-mongodb:/usr/share/confluent-hub-components/kafka-connect-mongodb
    
    # MongoDB Replica Set
      mongo1:
        image: "mongo:4.0-xenial"
        container_name: mongo1
        command: --replSet rs0 --smallfiles --oplogSize 128
        volumes:
          - rs1:/data/db
        networks:
          - localnet
        ports:
          - "27017:27017"
        restart: always
      mongo2:
        image: "mongo:4.0-xenial"
        container_name: mongo2
        command: --replSet rs0 --smallfiles --oplogSize 128
        volumes:
          - rs2:/data/db
        networks:
          - localnet
        ports:
          - "27018:27017"
        restart: always
      mongo3:
        image: "mongo:4.0-xenial"
        container_name: mongo3
        command: --replSet rs0 --smallfiles --oplogSize 128
        volumes:
          - rs3:/data/db
        networks:
          - localnet
        ports:
          - "27019:27017"
        restart: always
    
    networks:
      localnet:
        attachable: true
    
    volumes:
      rs1:
      rs2:
      rs3:
    

    执行docker-compose up后,你必须配置你的MongoDB集群:

    docker-compose exec mongo1 /usr/bin/mongo --eval '''if (rs.status()["ok"] == 0) {
        rsconf = {
          _id : "rs0",
          members: [
            { _id : 0, host : "mongo1:27017", priority: 1.0 },
            { _id : 1, host : "mongo2:27017", priority: 0.5 },
            { _id : 2, host : "mongo3:27017", priority: 0.5 }
          ]
        };
        rs.initiate(rsconf);
    }
    rs.conf();'''
    

    确保您的插件已安装:

    curl localhost:8083/connector-plugins | jq
    
    [
      {
        "class": "com.mongodb.kafka.connect.MongoSinkConnector",
        "type": "sink",
        "version": "0.2"
      },
      {
        "class": "com.mongodb.kafka.connect.MongoSourceConnector",
        "type": "source",
        "version": "0.2"
      },
      {
        "class": "io.confluent.connect.gcs.GcsSinkConnector",
        "type": "sink",
        "version": "5.0.1"
      },
      {
        "class": "io.confluent.connect.storage.tools.SchemaSourceConnector",
        "type": "source",
        "version": "2.1.1-cp1"
      },
      {
        "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "type": "sink",
        "version": "2.1.1-cp1"
      },
      {
        "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
        "type": "source",
        "version": "2.1.1-cp1"
      }
    ]
    

    正如您在上面看到的,MongoDB 连接器插件可供使用。假设您有一个名为 mydb 的数据库和一个名为 products 的集合,我创建名为 sink-connector.json 的 JSON 文件:

    {
        "name": "mongo-sink",
        "config": {
            "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
            "tasks.max": "1",
            "topics": "product.events",
            "connection.uri": "mongodb://mongo1:27017,mongo2:27017,mongo3:27017",
            "database": "mydb",
            "collection": "products",
            "key.converter": "org.apache.kafka.connect.storage.StringConverter",
            "value.converter": "org.apache.kafka.connect.json.JsonConverter",
            "value.converter.schemas.enable": "false"
        }
    }
    

    现在使用连接 RESTful API 创建连接器:

    curl -X POST -H "Content-Type: application/json" -d @sink-connector.json http://localhost:8083/connectors | jq
    

    您可以查看连接器的状态:

    curl http://localhost:8083/connectors/mongo-sink/status | jq
        {
    "name": "mongo-sink",
    "connector": {
        "state": "RUNNING",
        "worker_id": "connect:8083"
    },
    "tasks": [
        {
        "id": 0,
        "state": "RUNNING",
        "worker_id": "connect:8083"
        }
    ],
    "type": "sink"
    }
    

    现在让我们创建一个 Kafka 主题。首先,我们必须连接到 Kafka 容器:

    docker-compose exec broker bash
    

    然后创建主题:

    kafka-topics --zookeeper zookeeper:2181 --create --topic product.events --partitions 1 --replication-factor 1
    

    现在生产产品进入主题:

    kafka-console-producer --broker-list localhost:9092 --topic product.events
    >{"Name": "Hat", "Price": 25}
    >{"Name": "Shoe", "Price": 15}
    

    您可以在图像中查看结果:

    希望这会对你有所帮助。

    【讨论】:

    • 您好 Morteza,感谢您的描述性回答。我也对你的回答投了赞成票。我试图实现的是尝试最小的方式。这个图像对我的使用来说听起来很复杂。我只想将kafka连接到mongo。正如我提到的,我的主要问题是第三个和第四个问题。请您提供更好的方法吗?
    • 嗨 Mosfata,我更新了答案,因此您可以克隆 GitHub 存储库。你只需要运行 'docker-compose up' 并按顺序运行这篇文章中的命令。
    • 嗨 Morteza,我已经尝试了你说的步骤。但是项目的根目录中没有任何 Dockerfile。每当我运行 docker-compose up 命令时,它都希望在根目录中找到 Dockerfile(在 docker-compose 所在的同一目录中),但它不能。请您自己试一试好吗?感谢您的关注:)
    • 您好,docker compose 文件中的所有图片都是官方的。无需从 docker 文件构建映像。
    • 删除 docker-compose 文件中的“build”部分,然后重试。
    【解决方案2】:

    Debezium Mongo 读取数据。如果您想要接收器连接器,则需要使用您找到的官方连接器,但例如,Github 上也有其他可用的连接器。

    Kafka Connect 使用 REST API,因此您还需要创建一个包含所有连接和主题详细信息的 JSON 有效负载。您在该仓库中找到了指南

    它运行了多个 mongodb 容器,还使用了许多消耗大量资源的图像。

    您不需要 KSQL、控制中心、REST 代理、主题 UI 等。只有 Kafka、Zookeeper、Connect、Mongo 和可选的 Schema Registry。所以只需删除 compose 文件中的其他容器。您可能也不需要多个 Mongo 容器,但是您需要重新配置环境变量以仅调整到一个实例

    如何使用 curl 从 kafka 发送数据并将它们保存在 mongodb 集合中?

    如果您确实想使用curl,那么您将需要启动 REST 代理容器。这将使您通过Could not reach configured kafka system on http://localhost:8082 错误消息。

    【讨论】:

    • 嗨@cricket_007,我编辑了我的帖子并遇到了一些新错误。请您回答我的问题并添加您的 docker-compose 文件好吗?
    • 1) 如果不使用 REST 代理或其他 Web 应用程序,您将无法使用 curl 将数据发送到 Kafka。您需要编写生产者或使用数据生成容器(Confluent 有单独的教程)。 2) 如果您使用AvroConverter,那么您需要一个模式注册表并提供它的 URL。如果您的数据不是 Avro,您可以改用 JsonConverter。 3) 端口 8082 来自 REST 代理,因此您实际上可能需要运行该容器。
    • 能否请您添加最小的 docker-compose.yml 文件来实现目标?
    • 我没有...我觉得这个 + Mongo 容器应该没问题github.com/mongodb/mongo-kafka/blob/master/docker/…
    • 哦!不 :( 我在这上面花了很多时间,但它不能正常工作!而且它对我的目的来说太复杂了。
    猜你喜欢
    • 2019-12-20
    • 1970-01-01
    • 2019-07-15
    • 2022-07-29
    • 1970-01-01
    • 1970-01-01
    • 2021-08-28
    • 2018-04-20
    • 2019-01-21
    相关资源
    最近更新 更多