【问题标题】:Debezium Postgres sink connector fails to insert values with type DATEDebezium Postgres 接收器连接器无法插入 DATE 类型的值
【发布时间】:2020-03-28 11:21:09
【问题描述】:

设置源连接器和接收器连接器后,DATE 类型 Postgres 列出现问题。

错误:列“foo”是日期类型,但表达式是整数类型

我检查了 Avro 架构,发现列 foo 被序列化为 io.debezium.time.Date

{
    "default": null,
    "name": "foo",
    "type": [
        "null",
        {
            "connect.name": "io.debezium.time.Date",
            "connect.version": 1,
            "type": "int"
        }
    ]
}

我应该怎么做才能让接收器连接器正确插入这个值(如DATE,而不是INTEGER)?

完整的堆栈跟踪:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "test_table" ("id","foo") VALUES (75046,18577) ON CONFLICT ("id") DO UPDATE SET "foo"=EXCLUDED."foo" was aborted: ERROR: column "foo" is of type date but expression is of type integer
  Hint: You will need to rewrite or cast the expression.
  Position: 249  Call getNextException to see other errors in the batch.
org.postgresql.util.PSQLException: ERROR: column "foo" is of type date but expression is of type integer
  Hint: You will need to rewrite or cast the expression.
  Position: 249
org.postgresql.util.PSQLException: ERROR: column "foo" is of type date but expression is of type integer
  Hint: You will need to rewrite or cast the expression.
  Position: 249

    at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:89)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
    ... 10 more
Caused by: java.sql.SQLException: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "test_table" ("id","foo") VALUES (75046,18577) ON CONFLICT ("id") DO UPDATE SET "foo"=EXCLUDED."foo" was aborted: ERROR: column "foo" is of type date but expression is of type integer
  Hint: You will need to rewrite or cast the expression.
  Position: 249  Call getNextException to see other errors in the batch.
org.postgresql.util.PSQLException: ERROR: column "foo" is of type date but expression is of type integer
  Hint: You will need to rewrite or cast the expression.
  Position: 249
org.postgresql.util.PSQLException: ERROR: column "foo" is of type date but expression is of type integer
  Hint: You will need to rewrite or cast the expression.
  Position: 249

    ... 12 more

源配置:

{
    "name": "dbz-source-test-1",
    "config": {
        "name":"dbz-source-test-1",
        "connector.class":"io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname":"some.host",
        "database.port":"5432",
        "database.user":"test_debezium",
        "database.password":"password",
        "database.dbname":"dbname",
        "plugin.name":"wal2json_rds",
        "slot.name":"wal2json_rds",
        "database.server.name":"server_test",
        "table.whitelist":"public.test_table",
        "transforms":"route",
        "transforms.route.type":"org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.route.regex":"([^.]+)\\.([^.]+)\\.([^.]+)",
        "transforms.route.replacement":"dbz_source_$3",
        "topic.selection.strategy":"topic_per_table",
        "include.unknown.datatypes":true,
        "decimal.handling.mode":"double",
        "snapshot.mode":"never"
    }
}

接收器配置:

{
    "name": "dbz-sink-test-1",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "config.providers" : "file",
        "config.providers.file.class" : "org.apache.kafka.common.config.provider.FileConfigProvider",
        "config.providers.file.param.secrets" : "/opt/mysecrets",
        "topics": "dbz_source_test_table",
        "connection.url": "someurl",
        "connection.user": "${file:/opt/mysecrets.properties:user}",
        "connection.password" : "${file:/opt/mysecrets.properties:pass}",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
        "table.name.format": "dbz_source_",
        "insert.mode": "upsert",
        "pk.field": "id",
        "pk.mode": "record_value"
    }
}

【问题讨论】:

  • 你能分享整个堆栈跟踪吗?它发生在 Debezium 或 sink 连接器中吗?事情也应该在默认的时间精度模式下工作,所以这里可能有一个错误。
  • @Gunnar 连接器本身异常,使用堆栈跟踪和连接器配置更新问题
  • 好的,谢谢。您是如何在接收器端创建表模式的?如果您手动创建它并将列类型设置为DATE,则该行为是意料之中的,因为 JDBC 接收器连接器不知道如何将传入的 int 值转换为该列类型。那么使用connect 类型映射是正确的方法;或者,您可以使用 SMT 自己将传入的 int 转换为正确的类型。
  • @Gunnar 是的,你说得对,我手动创建了架构,列类型为 DATE

标签: postgresql apache-kafka apache-kafka-connect debezium


【解决方案1】:

我修复了切换源连接器time.precision.mode config to connect的问题

当 time.precision.mode 配置属性设置为 connect 时,连接器将使用预定义的 Kafka Connect 逻辑类型。当消费者只知道内置的 Kafka Connect 逻辑类型并且无法处理可变精度的时间值时,这可能很有用。

之后它的序列化类型就不同了:

{
    "default": null,
    "name": "foo",
    "type": [
        "null",
        {
            "connect.name": "org.apache.kafka.connect.data.Date",
            "connect.version": 1,
            "logicalType": "date",
            "type": "int"
        }
    ]
}

Sink 连接器识别org.apache.kafka.connect.data.Date 类型并正确插入。

【讨论】:

  • 哇,这花了我很长时间才找到,干得好!
猜你喜欢
  • 2019-05-08
  • 2019-05-25
  • 2020-04-20
  • 2020-12-30
  • 2018-11-22
  • 2020-11-10
  • 1970-01-01
  • 2020-05-07
  • 2021-07-13
相关资源
最近更新 更多