【问题标题】:Kafka connect JDBC source connector not workingKafka连接JDBC源连接器不起作用
【发布时间】:2019-11-17 18:56:41
【问题描述】:

大家好, 我正在使用用于 postgres 的 Kafka JDBC 源连接器。以下是我的连接器配置。有些如何它没有带来任何数据。这个配置有什么问题?

{
    "name": "test-connection",
    "config": {
       "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
       "mode": "timestamp",
       "timestamp.column.name": "TEST_DT",
       "topic.prefix": "test",
       "connection.password": "xxxxxx",
       "validate.non.null": "false",
       "connection.user": "xxxxxx",
       "table.whitelist": "test.test",
       "connection.url": "jdbc:postgresql://xxxx:5432/xxxx?ssl=true&stringtype=unspecified",
       "name": "test-connection"
},
  "tasks": [],
  "type": "source"
}

我需要创建主题还是自动生成主题?

我希望根据示例数据正在流动,但数据没有流动。以下是我在 kafka 连接中看到的日志。但是,没有数据流入。

日志

[2019-07-07 20:52:37,465] INFO WorkerSourceTask{id=test-connection-0} 提交偏移量 (org.apache.kafka.connect.runtime.WorkerSourceTask) [2019-07-07 20:52:37,465] INFO WorkerSourceTask{id=test-connection-0} 刷新 0 条未完成的偏移提交消息 (org.apache.kafka.connect.runtime.WorkerSourceTask)

【问题讨论】:

    标签: postgresql jdbc apache-kafka apache-kafka-connect confluent-platform


    【解决方案1】:

    我需要创建主题还是自动生成主题?

    它会使用您在"topic.prefix": "test" 中设置的“test”前缀自动生成

    所以您的主题称为“testtest-connection”或“testtest.test”

    您可能正在使用 Avro 模式,如果是这样,您必须使用 Avro 消费者来使用主题。

    【讨论】:

    • 感谢您的回复。但是,我检查了主题,但没有一个主题被创建。另外,我没有使用任何架构。
    • 我也面临同样的问题。连接器工作并有一个任务,但没有分配给它的主题。
    【解决方案2】:

    我遇到了完全相同的问题,日志中没有错误,尽管我在 postgres 中添加/修改记录并且它没有发送任何消息。正如您提到的那样,在 INFO 模式下获得相同的日志消息。在这里,我解决了它,可能其中一个或所有这些都可能导致此问题。所以请检查你最后的问题是什么。如果它解决了您的问题,请接受此作为答案。

    1. "table.whitelist" : "public.abcd"

    2. 通常当我们运行数据库时(例如通过 Docker),时区处于 UTC 模式,如果您所在的时区超过该时区,那么在内部查询数据时,它会将过滤条件设置为过滤您的数据出去。克服最好的方法是您的时间戳列应该是“带有时区的时间戳”,这解决了我的问题。我所做的另一个变化是我插入数据并将该列的值指定为“now() -interval '3 days'”以确保数据是旧的并立即流向主题。好吧,最好是给出带有时区的时间戳,而不是这个 hack。

    3. 最后,另一种可能的解决方案可能是在给连接器配置时告诉您 postgres db 是哪个时区。您可以谷歌搜索该物业。由于第 2 点解决了我的问题,所以我没有尝试这个。

      创建表 public.abcd ( id 序列主键, 标题 VARCHAR(100) 不为 NULL, update_ts TIMESTAMP 与时区默认 now() 不为空 );

    我的配置有效。以备不时之需。

    {
      "name": "jdbc_source_connector_postgresql_004",
      "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url": "jdbc:postgresql://192.168.99.116:30000/mydb",
        "connection.user": "sachin",
        "connection.password": "123456",
        "topic.prefix": "thesach004_",
        "poll.interval.ms" : 1000,
        "mode":"timestamp",
        "table.whitelist" : "public.abcd",
        "timestamp.column.name": "update_ts",
        "validate.non.null": false,
        "transforms":"createKey,extractInt",
        "transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
        "transforms.createKey.fields":"id",
        "transforms.extractInt.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
        "transforms.extractInt.field":"id"
      }
    }
    

    -$achin

    【讨论】:

      猜你喜欢
      • 2017-10-12
      • 2018-02-06
      • 2018-05-01
      • 2020-01-15
      • 2021-08-18
      • 2020-06-02
      • 2021-05-07
      • 2018-08-07
      • 2019-06-17
      相关资源
      最近更新 更多