【问题标题】:Error With RowKey Definition on Confluent BigTable Sink ConnectorConfluent BigTable Sink 连接器上的 RowKey 定义错误
【发布时间】:2022-01-28 22:22:03
【问题描述】:

我正在尝试使用 Confluent 中的 BigTable Sink 连接器从 kafka 读取数据并将其写入我的 BigTable 实例,但我收到以下消息错误:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.ConnectException: Error with RowKey definition: Row key definition was defined, but received, deserialized kafka key is not a struct. Unable to construct a row key.
    at io.confluent.connect.bigtable.client.RowKeyExtractor.getRowKey(RowKeyExtractor.java:69)
    at io.confluent.connect.bigtable.client.BufferedWriter.addWriteToBatch(BufferedWriter.java:84)
    at io.confluent.connect.bigtable.client.InsertWriter.write(InsertWriter.java:47)
    at io.confluent.connect.bigtable.BaseBigtableSinkTask.put(BaseBigtableSinkTask.java:99)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
    ... 10 more

由于某些技术限制,消息生产者将无法生成具有 key 属性的消息,因此,我正在使用一些 Transforms 从有效负载中获取信息并将其设置为关键消息。

这是我的连接器有效负载:

{
  "name" : "DATALAKE.BIGTABLE.SINK.QUEUEING.ZTXXD",
  "config" : {
    "connector.class" : "io.confluent.connect.gcp.bigtable.BigtableSinkConnector",
    "key.converter" : "org.apache.kafka.connect.storage.StringConverter",
    "value.converter" : "org.apache.kafka.connect.json.JsonConverter",
    "topics" : "APP-DATALAKE-QUEUEING-ZTXXD_DATALAKE-V1",
    "transforms" : "HoistField,AddKeys,ExtractKey",
    "gcp.bigtable.project.id" : "bigtable-project-id",
    "gcp.bigtable.instance.id" : "bigtable-instance-id",
    "gcp.bigtable.credentials.json" : "XXXXX",
    "transforms.ExtractKey.type" : "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.HoistField.field" : "raw_data_cf",
    "transforms.ExtractKey.field" : "KEY1,ATT1",
    "transforms.HoistField.type" : "org.apache.kafka.connect.transforms.HoistField$Value",
    "transforms.AddKeys.type" : "org.apache.kafka.connect.transforms.ValueToKey",
    "transforms.AddKeys.fields" : "KEY1,ATT1",
    "row.key.definition" : "KEY1,ATT1",
    "table.name.format" : "raw_ZTXXD_DATALAKE",
    "consumer.override.group.id" : "svc-datalake-KAFKA_2_BIGTABLE",
    "confluent.topic.bootstrap.servers" : "xxxxxx:9092",
    "input.data.format" : "JSON",
    "confluent.topic" : "_dsp-confluent-license",
    "input.key.format" : "STRING",
    "key.converter.schemas.enable" : "false",
    "confluent.topic.security.protocol" : "SASL_SSL",
    "row.key.delimiter" : "/",
    "confluent.topic.sasl.jaas.config" : "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"XXXXX\" password=\"XXXXXX\";",
    "value.converter.schemas.enable" : "false",
    "auto.create.tables" : "true",
    "auto.create.column.families" : "true",
    "confluent.topic.sasl.mechanism" : "PLAIN"
  }
}

这是我发给 Kafka 的信息:

{
    "MANDT": "110",
    "KEY1": "1",
    "KEY2": null,
    "ATT1": "1M",
    "ATT2": "0000000000",
    "TABLE_NAME": "ZTXXD_DATALAKE",
    "IUUC_OPERATION": "I",
    "CREATETIMESTAMP": "2022-01-24T20:26:45.247Z"
}
  

在我的变换中,我做了三个操作:

  1. HoistField 将我的有效负载放入一个两级结构中(BigTable 的连接文档说 connect 需要一个两级结构以便能够推断出族列

  2. addKey 正在将我认为是关键的列添加到消息键

  3. ExtractKey 是从头中添加的字段中移除键,只留下值本身。

我一直在阅读 Bigtable 的此连接器的文档,但我不清楚该连接器是否适用于 JSON 格式。可以告诉我吗?

【问题讨论】:

  • 这真的是 kafka 中的事件吗?还是来自其他代理?如果是文字记录,你能解释一下你的转换的每一步吗?例如,当值的记录数组中有多个对象时,您希望发生什么?更重要的是,据我所知,没有任何现有转换可以解析数组类型。
  • 嗨@OneCricketeer!感谢您的提问 =) 此消息是由 Kafka Rest 代理发送的,因此最终发送到主题的信息就是值中的内容。在这个代理后面,我有一个 API 可以为 Kafka 主题发送记录,而不是记录数组。无论如何,我更改了原始帖子中的有效负载。

标签: apache-kafka apache-kafka-connect google-cloud-bigtable bigtable


【解决方案1】:

JSON 应该可以工作,但是...

反序列化的 kafka 密钥不是结构

这是因为您在值转换器上设置了schemas.enable=false 属性,这样当您执行ValueToKey 时,它就不是Connect Struct 类型; HoistField 会生成一个 Java Map。

如果您无法使用架构注册表并切换序列化格式,那么您需要尝试找到一种方法让 REST 代理在生成数据之前推断 JSON 消息的架构 (我不认为它可以)。否则,您的记录需要包含schemapayload 字段,并且您需要在转换器上启用模式。 Explained here

另一种选择 - 可能有一个转换项目可以设置记录的架构,但它不是内置的..(它不是 SetSchemaMetadata 的一部分)

【讨论】:

    猜你喜欢
    • 2020-07-14
    • 2021-04-07
    • 2018-12-29
    • 2022-11-10
    • 2020-07-28
    • 1970-01-01
    • 2022-05-31
    • 1970-01-01
    • 2020-01-07
    相关资源
    最近更新 更多