【发布时间】:2020-11-11 17:53:39
【问题描述】:
我正在使用 docker-compose 在本地部署中测试 Debezium 平台。这是我的测试用例:
- 运行 postgres、kafka、zookeeper 和 debezium/connect:1.3 的 3 个副本
- 使用以下配置在其中一个副本中配置连接器:
{
"name": "database-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "wal2json",
"slot.name": "database",
"database.hostname": "debezium_postgis_1",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "database",
"database.server.name": "database",
"heartbeat.interval.ms": 5000,
"table.whitelist": "public.outbox",
"transforms.outbox.table.field.event.id": "event_uuid",
"transforms.outbox.table.field.event.key": "event_name",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.table.field.event.payload.id": "event_uuid",
"transforms.outbox.route.topic.replacement": "${routedByValue}",
"transforms.outbox.route.by.field": "topic",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"max.batch.size": 1,
"offset.commit.policy": "io.debezium.engine.spi.OffsetCommitPolicy.AlwaysCommitOffsetPolicy",
"binary.handling.mode": "bytes"
}
}
- 通过从另一个类调用此方法来运行一个脚本,该脚本在发件箱表中执行 2000 次插入
@Transactional
public void write(String eventName, String topic, byte[] payload) {
Outbox newRecord = new Outbox(eventName, topic, payload);
repository.save(newRecord);
repository.delete(newRecord);
}
- 几秒钟后(当我在 Kafka 上看到第一条消息时),我杀死了正在处理流的副本。假设它成功发送了 200 条关于正确主题的消息。
- 我从 debezium 存储偏移量的主题得到最后一条偏移量消息:
{
"transaction_id": null,
"lsn_proc": 24360992,
"lsn": 24495808,
"txId": 560,
"ts_usec": 1595337502556806
}
- 然后我打开一个 db shell 并运行以下命令
SELECT slot_name, restart_lsn - pg_lsn('0/0') as restart_lsn, confirmed_flush_lsn - pg_lsn('0/0') as confirmed_flush_lsn FROM pg_replication_slots;和 postgres 回复:
[
{
"slot_name": "database",
"restart_lsn": 24360856,
"confirmed_flush_lsn": 24360992
}
]
- 我杀死副本 5 分钟后,Kafka 重新平衡连接器,并在其中一个活动副本上部署新的运行任务。
- 新的连接器开始处理流,但它似乎从头开始,因为完成后我在 Kafka 上发现了 2200 条消息。
使用该配置(
max.batch.size: 1和AlwaysCommitPolicy)我希望看到最多 2001 条消息。 我哪里错了?
【问题讨论】:
标签: debezium