【问题标题】:Why I receive a lot of duplicates with debezium?为什么我会收到很多带有 debezium 的重复项?
【发布时间】:2020-11-11 17:53:39
【问题描述】:

我正在使用 docker-compose 在本地部署中测试 Debezium 平台。这是我的测试用例:

  1. 运行 postgres、kafka、zookeeper 和 debezium/connect:1.3 的 3 个副本
  2. 使用以下配置在其中一个副本中配置连接器:
{
  "name": "database-connector",  
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector", 
    "plugin.name": "wal2json",
    "slot.name": "database",
    "database.hostname": "debezium_postgis_1", 
    "database.port": "5432", 
    "database.user": "postgres", 
    "database.password": "postgres", 
    "database.dbname" : "database", 
    "database.server.name": "database",
    "heartbeat.interval.ms": 5000,
    "table.whitelist": "public.outbox",
    "transforms.outbox.table.field.event.id": "event_uuid",
    "transforms.outbox.table.field.event.key": "event_name",
    "transforms.outbox.table.field.event.payload": "payload",
    "transforms.outbox.table.field.event.payload.id": "event_uuid",
    "transforms.outbox.route.topic.replacement": "${routedByValue}",
    "transforms.outbox.route.by.field": "topic",
    "transforms": "outbox",
    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
    "max.batch.size": 1,
    "offset.commit.policy": "io.debezium.engine.spi.OffsetCommitPolicy.AlwaysCommitOffsetPolicy",
    "binary.handling.mode": "bytes"
  }
}
  1. 通过从另一个类调用此方法来运行一个脚本,该脚本在发件箱表中执行 2000 次插入
    @Transactional
    public void write(String eventName, String topic, byte[] payload) {
        Outbox newRecord = new Outbox(eventName, topic, payload);
        repository.save(newRecord);
        repository.delete(newRecord);
    }
  1. 几秒钟后(当我在 Kafka 上看到第一条消息时),我杀死了正在处理流的副本。假设它成功发送了 200 条关于正确主题的消息。
  2. 我从 debezium 存储偏移量的主题得到最后一条偏移量消息:
{
   "transaction_id": null,
   "lsn_proc": 24360992,
   "lsn": 24495808,
   "txId": 560,
   "ts_usec": 1595337502556806
}
  1. 然后我打开一个 db shell 并运行以下命令 SELECT slot_name, restart_lsn - pg_lsn('0/0') as restart_lsn, confirmed_flush_lsn - pg_lsn('0/0') as confirmed_flush_lsn FROM pg_replication_slots; 和 postgres 回复:
[
  {
    "slot_name": "database",
    "restart_lsn": 24360856,
    "confirmed_flush_lsn": 24360992
  }
]
  1. 我杀死副本 5 分钟后,Kafka 重新平衡连接器,并在其中一个活动副本上部署新的运行任务。
  2. 新的连接器开始处理流,但它似乎从头开始,因为完成后我在 Kafka 上发现了 2200 条消息。 使用该配置(max.batch.size: 1AlwaysCommitPolicy)我希望看到最多 2001 条消息。 我哪里错了?

【问题讨论】:

    标签: debezium


    【解决方案1】:

    我在我的配置中发现了问题: "offset.commit.policy": "io.debezium.engine.spi.OffsetCommitPolicy.AlwaysCommitOffsetPolicy" 仅适用于嵌入式 API。

    此外,debezium/connect:1.3 docker 映像的 OFFSET_FLUSH_INTERVAL_MS 的默认值为 1 分钟。因此,如果我在容器的前 1 分钟内停止容器,则不会在 kafka 上存储任何偏移量

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-08-10
      • 1970-01-01
      • 2016-03-04
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-04-20
      相关资源
      最近更新 更多