【问题标题】:Can there be 2 Debezium connector running in one Kafka cluster?一个 Kafka 集群中可以运行 2 个 Debezium 连接器吗?
【发布时间】:2023-02-26 04:00:53
【问题描述】:

来自Debezium 官方页面,这张图片显示多个 Debezium 连接器可以连接到同一个 Kafka。

所以我有 2 个数据库、2 个 Debeziums、1 个运行在 docker-compose 中的 Kafka,但似乎只有 1 个 debezium 将更新发送到 kafka(从 kafdrop 观看)。

这是我的 docker-compose 文件:

version: '3.6'
services:

  hero_db:
    image: postgres:14
    restart: always
    environment:
      POSTGRES_PASSWORD: postgrespassword
    ports:
      - '5432:5432'
    expose:
      - '5432'
    command: [ "postgres", "-c", "wal_level=logical" ]
    volumes:
      - hero_db_data:/var/lib/postgresql/data

  villian_db:
    image: postgres:14
    restart: always
    environment:
      POSTGRES_PASSWORD: postgrespassword
    ports:
      - '2345:2345'
    expose:
      - '2345'
    command: [ "postgres", "-c", "wal_level=logical" ]
    volumes:
      - villian_db_data:/var/lib/postgresql/data

  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 22181:2181

  kafka:
    image: confluentinc/cp-kafka:5.3.1
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    depends_on:
      - zookeeper

  kafdrop:
    image: obsidiandynamics/kafdrop
    container_name: kafdrop
    ports:
      - "9000:9000"
    environment:
      KAFKA_BROKERCONNECT: "kafka:9092"
      JVM_OPTS: "-Xms16M -Xmx48M -Xss180K -XX:-TieredCompilation -XX:+UseStringDeduplication -noverify"
    depends_on:
      - kafka

  hero_debezium:
    image: debezium/connect:1.9
    ports:
      - 8083:8083
    expose:
      - '8083'
    environment:
      CONFIG_STORAGE_TOPIC: hero_configs
      OFFSET_STORAGE_TOPIC: hero_offsets
      STATUS_STORAGE_TOPIC: hero_statuses
      BOOTSTRAP_SERVERS: kafka:9092
    depends_on: [ zookeeper, kafka, hero_db ]

  villian_debezium:
    image: debezium/connect:1.9
    ports:
      - 8084:8083
    expose:
      - '8084'
    environment:
      CONFIG_STORAGE_TOPIC: villian_configs
      OFFSET_STORAGE_TOPIC: villian_offsets
      STATUS_STORAGE_TOPIC: villian_statuses
      BOOTSTRAP_SERVERS: kafka:9092
    depends_on: [ zookeeper, kafka, villian_db ]

volumes:
  hero_db_data:
  villian_db_data:

这是 hero_dbz 和 villian_dbz 的 json 中的 debezium 配置文件:
hero_dbz.json

{
    "name": "hero-postgresql-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "plugin.name": "pgoutput",
        "database.hostname": "hero_db",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "postgrespassword",
        "database.dbname": "postgres",
        "database.server.name": "hero_server",
        "table.include.list": "public.heroes",
        "table.whitelist": "public.heroes",
        "topic.prefix": "topic_heroes"
    }
}

villian_dbz.json

{
    "name": "villian-postgresql-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "plugin.name": "pgoutput",
        "database.hostname": "villian_db",
        "database.port": "2345",
        "database.user": "postgres",
        "database.password": "postgrespassword",
        "database.dbname": "postgres",
        "database.server.name": "villian_server",
        "table.include.list": "public.villians",
        "table.whitelist": "public.villians",
        "topic.prefix": "topic_villian"
    }
}

我用这些命令配置了 hero_dbz 和 villian_dbz:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 127.0.0.1:8083/connectors/ --data "@hero_dbz.json"
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 127.0.0.1:8084/connectors/ --data "@villian_dbz.json"

这是来自 Kafdrop 的屏幕,只显示来自 hero_db (hero_server.public.heroes) 的数据,但没有显示来自 villian_db 的数据。

【问题讨论】:

    标签: docker apache-kafka debezium debezium-engine


    【解决方案1】:

    在我修复我的 villians_db 以在端口 2345 上运行之后。然后通过他们的 JSON 文件将 hero 和 villian debezium 连接到 Kafka。
    在 villian_dbz 中,它不断显示此消息:

    2023-02-25 20:43:15 2023-02-25 13:43:15,210 INFO   ||  [Worker clientId=connect-1, groupId=1] Rebalance started   [org.apache.kafka.connect.runtime.distributed.WorkerCoordinator]
    2023-02-25 20:43:15 2023-02-25 13:43:15,210 INFO   ||  [Worker clientId=connect-1, groupId=1] (Re-)joining group   [org.apache.kafka.connect.runtime.distributed.WorkerCoordinator]
    2023-02-25 20:43:15 2023-02-25 13:43:15,211 INFO   ||  [Worker clientId=connect-1, groupId=1] Successfully joined group with generation Generation{generationId=6, memberId='connect-1-a2647c32-4c10-48be-b832-a38a37a11cd0', protocol='sessioned'}   [org.apache.kafka.connect.runtime.distributed.WorkerCoordinator]
    2023-02-25 20:43:15 2023-02-25 13:43:15,211 INFO   ||  [Worker clientId=connect-1, groupId=1] Successfully synced group in generation Generation{generationId=6, memberId='connect-1-a2647c32-4c10-48be-b832-a38a37a11cd0', protocol='sessioned'}   [org.apache.kafka.connect.runtime.distributed.WorkerCoordinator]
    2023-02-25 20:43:15 2023-02-25 13:43:15,211 INFO   ||  [Worker clientId=connect-1, groupId=1] Joined group at generation 6 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-1-07842920-d640-4c15-ae17-ad4900077143', leaderUrl='http://172.19.0.8:8083/', offset=5, connectorIds=[hero-postgresql-connector], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
    2023-02-25 20:43:15 2023-02-25 13:43:15,211 WARN   ||  [Worker clientId=connect-1, groupId=1] Catching up to assignment's config offset.   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
    2023-02-25 20:43:15 2023-02-25 13:43:15,211 INFO   ||  [Worker clientId=connect-1, groupId=1] Current config state offset -1 is behind group assignment 5, reading to end of config log   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
    2023-02-25 20:43:15 2023-02-25 13:43:15,212 INFO   ||  [Worker clientId=connect-1, groupId=1] Finished reading to end of log and updated config snapshot, new config log offset: -1   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
    2023-02-25 20:43:15 2023-02-25 13:43:15,212 INFO   ||  [Worker clientId=connect-1, groupId=1] Current config state offset -1 does not match group assignment 5. Forcing rebalance.   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
    

    如果只有一个(既不是英雄也不是恶棍)dbz 连接到 Kafka,则不会出现此错误。

    【讨论】:

    • 我已经更新了我的答案,请看一下。
    【解决方案2】:

    运行多个 kafka-connect 服务器应该不是问题。您的配置和设置可能存在问题。也许查看 villian 连接器的日志。

    您的 villan db 内部端口是错误的恕我直言:

    villian_db:
        image: postgres:14
        restart: always
        environment:
          POSTGRES_PASSWORD: postgrespassword
        ports:
          - '2345:5432'
        expose:
          - '2345'
        command: [ "postgres", "-c", "wal_level=logical" ]
        volumes:
          - villian_db_data:/var/lib/postgresql/data
    

    更新:

    请将不同的GROUP_IDs env 添加到连接集群。背景:

    运行 Kafka Connect 服务时需要此环境变量。将其设置为唯一标识服务及其工作人员所属的 Kafka Connect 集群的 ID。

    另一个类似 GROUP_ID: 3GROUP_ID: 2。您的 json 应指向 5432,PostgresDB 的内部端口,因此 .json 文件应指向 5432(两者)

    你的 Zookeeper 配置错误,请修复它:

    ....
    ports:
          - 2181:2181
    

    【讨论】:

    猜你喜欢
    • 2022-01-26
    • 2019-11-08
    • 2020-05-17
    • 2022-10-20
    • 2019-11-27
    • 2020-04-10
    • 1970-01-01
    • 2019-07-22
    • 2020-07-20
    相关资源
    最近更新 更多