【问题标题】:How to deploy kafka sink connection with multiple topics and table destination如何部署具有多个主题和表目标的 kafka sink 连接
【发布时间】:2020-07-27 22:25:49
【问题描述】:

通过我的previous question,我决定更多地同意消费者部署与 Kafka 分布式数据库实时同步。相同的情况;我有数百个表要从 PostgreSQL 拉到 SQL Server。从 PostgreSQL 到 Kafka,我使用了带有 wal2json 插件的 Debezium 连接器。从 Kafka 到 SQL Server,我使用 JDBC 连接器。我有三个相同的设置经纪人(不同的地址):

broker.id=0
broker.rack=1
port=9093
listeners=PLAINTEXT://0.0.0.0:9093
advertised.listeners=PLAINTEXT://localhost:9093
log.dir=/home/admin/kafka/tmp/kafka_log1
offsets.topic.num.partition=1
offsets.topic.replication.factor=3
min.isnyc.replicas=2
default.replication.factor=3
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
zookeeper.connection.timeout.ms=7200000
delete.topic.enable=true
message.max.bytes=50497182 
replica.fetch.max.bytes=50497182
group.max.session.timeout.ms=7200000

我已经尝试了一些可能的解决方案,例如:

  1. 将主题设置为使用 1 个分区和 3 个副本。由于我的表名称中有_,因此我会收到警告。
kafka-topics.sh -create --bootstrap-server localhost:9093,localhost:9094,localhost:9095  --replication-factor 3 --partitions 1 --topic $topic_name --config retention.ms=5400000
  1. 我将 debezium 和 jdbc 连接器与不同的工作人员分开。我有两个配置相同的工作人员(除了主机端口,debezium 为 8085,sink 为 8084),如下所示:
bootstrap.servers=localhost:9093,localhost:9094,localhost:9095
group.id=debezium-cluster
key.converter.schemas.enable=true
value.converter.schemas.enable=true
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
offset.storage.topic=connect-offsets-debezium
offset.storage.replication.factor=3
config.storage.topic=connect-configs-debezium
status.storage.topic=connect-status-debezium
producer.buffer.memory=29999999
producer.max.buffered.records=19999999
producer.max.request.size=51497182 
producer.retries=100
producer.max.in.flight.requests.per.connection=1
producer.request.timeout.ms=20000
producer.enable.idempotence=true
producer.retry.backoff.ms=500
producer.send.buffer.bytes=50497182
producer.receive.buffer.bytes=50497182
producer.ack=1
offset.flush.timeout.ms=300000
producer.buffer.memory=51497182
consumer.enable.auto.commit=true
consumer.retries=100
consumer.auto.commit.interval.ms=100000
consumer.max.partition.fetch.bytes=50497182
consumer.max.poll.records=10000
consumer.request.timeout.ms=20000
consumer.retry.backoff.ms=50000
consumer.session.timeout.ms=50000
consumer.auto.offset.reset=latest
consumer.isolation.level=read_committed
consumer.max.poll.interval.ms=5400000
fetch_max_bytes=50497182
rest.port=8085
plugin.path=/home/admin/kafka/connectors
  1. 环路接收器连接器一一无:
#!/bin/bash
CSV_LIST="/home/admin/kafka/main/config/tables/table_lists.csv"
DATA=${CSV_LIST}

while IFS=',' read table pk mode; do
topic_name=${table} 
curl -X POST http://localhost:8084/connectors -H 'Content-Type:application/json' -d '{"name" :"sqlservercon_'$topic_name'",
    "config":{"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
            "topics":"'$table'",
            "connection.url":"jdbc:sqlserver://-:1433",
            "connection.user":"-",
            "connection.password":"-",
            "transforms":"unwrap",
            "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
            "transforms.unwrap.drop.tombstones":"false",
            "auto.create":"true",
            "insert.mode":"'$mode'",
            "pk.fields":" '$pk'",
            "pk.mode":"record_value",
            "destination.table.format":"db.dbo.'$table'"
}}' | jq
done < ${DATA}

我如何部署它:

  1. 启动zookeeper和kafka服务器
  2. 创建主题
  3. 为 Debezium 源启动 kafka 工作者
  4. 添加 debezium 连接器(因为 1 db 只需要一个连接器)
  5. 为 sink 启动 kafka worker
  6. 循环添加 jdbc 连接器

不幸的是,由于几个死锁情况和消费者不知情,将所有数据移动到新的 SQL Server 数据库仍然不满意。我想知道是否有关于最佳消费者部署的好建议。我需要为每个连接器添加一个工作人员还是在每个主题之间进行切换。

【问题讨论】:

  • 我已经和我的主管讨论过这个问题。由于我使用insert.mode=upsert,sql server 中的合并操作似乎不太快处理 kafka 连接流。我认为这在加载超过数百万的数据时会出现问题。这仍然是一个初步调查,所以我仍然愿意听取专家的建议。我仍然不考虑使用 Confluent 包,因为我需要计算我的业务案例所需的要求。

标签: sql-server apache-kafka kafka-consumer-api apache-kafka-connect


【解决方案1】:

我已经检查我认为由于 Kafka 连接 jdbc 使用 batch.record 来组织应该发送到 SQL 服务器的记录数,当我使用大记录的 upsert 时似乎有问题。我认为我必须在源和接收器中将批次减少到 1。这仍然是初步的答案。此外,如果有人知道如何显示用于在 Kafka 连接 JDBC 中插入的 SQL 查询,这将有助于我了解有关 JDBC 行为的机制以及如何解决死锁。

根据我的经验,如果目标数据库存在但内部没有表,则最佳实践是优先考虑必须首先插入哪个表,然后等到它完成而不使用插入。小于100000行的表可以归为一组,但大维度表必须单独拉取。

【讨论】:

    猜你喜欢
    • 2018-08-03
    • 2020-12-20
    • 2020-12-11
    • 2019-11-12
    • 2019-11-24
    • 2018-11-04
    • 1970-01-01
    • 1970-01-01
    • 2022-10-21
    相关资源
    最近更新 更多