【问题标题】:kafka - S3 sink connector - Didn't find secondary deserializerkafka - S3接收器连接器 - 没有找到辅助解串器
【发布时间】:2021-10-04 19:35:30
【问题描述】:

我正在尝试使用来自我的 kafka 的消息,源消息以 Avro 格式序列化(我使用 AWS 架构注册表)。

连接器配置:

{
        "name": "s3-sink-db01",
        "config": {
                "connector.class": "io.confluent.connect.s3.S3SinkConnector",
                "storage.class": "io.confluent.connect.s3.storage.S3Storage",
                "s3.bucket.name": "de-team",
                "name": "s3-sink-db01",
                "tasks.max": "3",
                "s3.region": "ap-south-1",
                "s3.part.size": "5242880",
                "s3.compression.type": "gzip",
                "timezone": "UTC",
                "locale": "en",
                "flush.size": "10",
                "rotate.interval.ms": "10",
                "topics.regex": "mysql-db01.(.*)",
                "internal.key.converter.schemas.enable": "false",
                "key.converter.schemas.enable": "false",
                "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
                "internal.value.converter.schemas.enable": "false",
                "value.converter.schemas.enable": "false",
                "partitioner.class": "io.confluent.connect.storage.partitioner.HourlyPartitioner",
                "path.format": "YYYY/MM/dd/HH",
                "partition.duration.ms": "3600000",
 "key.converter": "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter",
"value.converter": "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter",
"key.converter.region": "ap-south-1",
"value.converter.region": "ap-south-1",
"key.converter.schemaAutoRegistrationEnabled": "true",
"value.converter.schemaAutoRegistrationEnabled": "true",
"key.converter.avroRecordType": "GENERIC_RECORD",
"value.converter.avroRecordType": "GENERIC_RECORD",
"internal.key.converter.schemas.enable": "false",
"key.converter.schemas.enable": "false",
"internal.value.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
                "rotate.schedule.interval.ms": "3600000"
        }
}

但是当我尝试配置接收器连接器时,它给出了以下错误。

ERROR WorkerSinkTask{id=s3-sink-db01-2} Error converting message key in topic 'mysql-db01.devdb.table1' partition 0 at offset 0 and timestamp 1627302045505: Converting byte[] to Kafka Connect data failed due to serialization error:  (org.apache.kafka.connect.runtime.WorkerSinkTask:532)

 org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
...
...
...
 ERROR WorkerSinkTask{id=s3-sink-db01-2} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:184)

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler

at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)

Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:

Caused by: com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException: Didn't find secondary deserializer.

【问题讨论】:

  • 1) schemas.enable 设置仅适用于 JSON,您不需要它们 2) 不清楚您如何更新连接工作程序的类路径以添加这些类
  • 1.我会尝试不使用这个schema.enable 2)我提到了这个以将其添加到工作人员中(github.com/awslabs/…
  • 所以,在mvn clean install 之后,你会得到一些 JAR 文件。而不是更新kafka-run-class,或者定义CLASSPATH,您是否尝试修改plugin.path连接配置?
  • 是的,我都做了。它适用于我的源连接器(debezium)
  • 好吧,我从来没有使用过 AWS 架构注册表,但是添加序列化程序的方法并没有真正改变。这是类路径或插件路径问题

标签: apache-kafka apache-kafka-connect confluent-platform


【解决方案1】:

问题是由于键/值转换器造成的。我已经通过更正解决了这个问题,一个依赖问题是here,解决方案是here

【讨论】:

    猜你喜欢
    • 2022-11-24
    • 2017-08-30
    • 2020-08-08
    • 2020-09-17
    • 2023-01-06
    • 2021-10-24
    • 2021-02-25
    • 2019-06-17
    • 2021-09-18
    相关资源
    最近更新 更多