【问题标题】:SMT Timestamp converter throwing NullPointerExceptionSMT 时间戳转换器抛出 NullPointerException
【发布时间】:2019-01-29 03:13:12
【问题描述】:

使用: confluent-5.1.0

SINK 配置:

curl -X POST \
  http://localhost:8083/connectors \
  -H 'cache-control: no-cache' \
  -H 'content-type: application/json' \
  -d '{
  "name": "dbz-sink-connector-1",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "type.name": "dbauditt4",
    "topic.index.map": "our3.platform.business:plat_index",
    "topics.regex":"our3.platform.business",
    "key.ignore": "true",
    "connection.url": "http://localhost:9200",
    "group.id":"plot",
    "key.converter":"org.apache.kafka.connect.json.JsonConverter",
    "value.converter":"org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable":"false",
    "value.converter.schemas.enable":"false",
    "transforms": "timestamp_convertor",
    "transforms.timestamp_convertor.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
    "transforms.timestamp_convertor.target.type": "string",
    "transforms.timestamp_convertor.format": "yyyy-MM-dd HH:mm:ss.SSSSSS",
    "transforms.timestamp_convertor.field":"data.ts_ms"
  }
}';

kafka 主题中的示例消息是:

{
  "data": {
    "before": null,
    "after": {
      "Id": 331458,
      "business_id": 532334,
      "sms_opted": 1
    },
    "source": {
      "version": "0.7.5",
      "name": "our3",
      "server_id": 810143323,
      "ts_sec": 1548661255,
      "gtid": null,
      "file": "mysql-bin-changelog.001786",
      "pos": 1719980,
      "row": 0,
      "snapshot": false,
      "thread": 11674162,
      "db": "platform",
      "table": "business"
    },
    "op": "c",
    "ts_ms": 1548661255851
  }
}

连接器抛出空指针异常。

java.lang.Thread.run(Thread.java:748) 引起:java.lang.NullPointerException at org.apache.kafka.connect.transforms.TimestampConverter.inferTimestampType(TimestampConverter.java:422) at

谁能帮帮我。我究竟做错了什么 ?

【问题讨论】:

  • 在 Kafka Connect 中,您不能通过 . 引用嵌套结构字段,您必须使用 ex 提取值。 org.apache.kafka.connect.transforms.Flatten 转型。我认为,如果没有架构,您可能会遇到映射值的问题。

标签: apache-kafka apache-kafka-connect


【解决方案1】:

开箱即用,没有一个 SMT 支持嵌套字段访问,例​​如 data.ts(我认为您的意思是 data.ts_ms

看来您正在使用 Debezium,所以您可以使用 CDC Event Flattening,或者您可以将 Elasticsearch 配置为使用 dynamic mapping for time fields

【讨论】:

  • 非常感谢您让我知道 SMT 不支持嵌套字段访问。 CDC 事件展平对我不起作用,因为它删除了更改事件的“之前”数据。我是弹性新手,我完全不知道如何将“data.source.ts_sec”从纪元转换为 dateTimeFormat。感谢您指导查看动态映射.. 试图理解它。
  • @SahilGupta,如果您使用org.apache.kafka.connect.transforms.Flatten 转换和. 作为分隔符,您将能够引用类似data.ts_ms 的字段。
  • @wardziniak:如果我对 debezium 生成的数据使用“Flatten”转换器,我会得到 nullPointer。
猜你喜欢
  • 1970-01-01
  • 2011-06-06
  • 1970-01-01
  • 2018-11-10
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2022-07-20
相关资源
最近更新 更多