【发布时间】:2021-10-04 19:35:30
【问题描述】:
我正在尝试使用来自我的 kafka 的消息,源消息以 Avro 格式序列化(我使用 AWS 架构注册表)。
连接器配置:
{
"name": "s3-sink-db01",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.bucket.name": "de-team",
"name": "s3-sink-db01",
"tasks.max": "3",
"s3.region": "ap-south-1",
"s3.part.size": "5242880",
"s3.compression.type": "gzip",
"timezone": "UTC",
"locale": "en",
"flush.size": "10",
"rotate.interval.ms": "10",
"topics.regex": "mysql-db01.(.*)",
"internal.key.converter.schemas.enable": "false",
"key.converter.schemas.enable": "false",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"internal.value.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"partitioner.class": "io.confluent.connect.storage.partitioner.HourlyPartitioner",
"path.format": "YYYY/MM/dd/HH",
"partition.duration.ms": "3600000",
"key.converter": "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter",
"value.converter": "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter",
"key.converter.region": "ap-south-1",
"value.converter.region": "ap-south-1",
"key.converter.schemaAutoRegistrationEnabled": "true",
"value.converter.schemaAutoRegistrationEnabled": "true",
"key.converter.avroRecordType": "GENERIC_RECORD",
"value.converter.avroRecordType": "GENERIC_RECORD",
"internal.key.converter.schemas.enable": "false",
"key.converter.schemas.enable": "false",
"internal.value.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"rotate.schedule.interval.ms": "3600000"
}
}
但是当我尝试配置接收器连接器时,它给出了以下错误。
ERROR WorkerSinkTask{id=s3-sink-db01-2} Error converting message key in topic 'mysql-db01.devdb.table1' partition 0 at offset 0 and timestamp 1627302045505: Converting byte[] to Kafka Connect data failed due to serialization error: (org.apache.kafka.connect.runtime.WorkerSinkTask:532)
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
...
...
...
ERROR WorkerSinkTask{id=s3-sink-db01-2} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:184)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
Caused by: com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException: Didn't find secondary deserializer.
【问题讨论】:
-
1)
schemas.enable设置仅适用于 JSON,您不需要它们 2) 不清楚您如何更新连接工作程序的类路径以添加这些类 -
1.我会尝试不使用这个
schema.enable2)我提到了这个以将其添加到工作人员中(github.com/awslabs/…) -
所以,在
mvn clean install之后,你会得到一些 JAR 文件。而不是更新kafka-run-class,或者定义CLASSPATH,您是否尝试修改plugin.path连接配置? -
是的,我都做了。它适用于我的源连接器(debezium)
-
好吧,我从来没有使用过 AWS 架构注册表,但是添加序列化程序的方法并没有真正改变。这是类路径或插件路径问题
标签: apache-kafka apache-kafka-connect confluent-platform