【问题标题】:Confluent Kafka Streams - Could not find class io.confluent.connect.avro.ConnectDefaultConfluent Kafka Streams - 找不到类 io.confluent.connect.avro.ConnectDefault
【发布时间】:2018-04-15 08:30:52
【问题描述】:

我正在使用带有查询模式的 jdbc 源连接器,并且似乎没有指定表名,在模式注册表中为记录键和记录值注册的模式具有空模式名称,并且被分配了默认名称Confluent 的 AvroData 类中定义的“ConnectDefault”https://github.com/confluentinc/schema-registry/blob/master/avro-converter/src/main/java/io/confluent/connect/avro/AvroData.java

使用生成的 avro 源和 SpecificAvroSerde 运行 Kafka Streams 应用程序时,出现错误:

Exception in thread "streams-app-6e39ebfd-db14-49bc-834f-afaf108a6d25-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Failed to deserialize value for record. topic=topic-name, partition=0, offset=0
  at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:46)
  at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84)
  at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
  at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:474)
  at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:642)
  at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
  at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:519)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 2
Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class io.confluent.connect.avro.ConnectDefault specified in writer's schema whilst finding reader's schema for a SpecificRecord.

我尝试发布主题中键和值模式的新版本,并使用表名作为模式名,并删除具有\"name\":\"ConnectDefault\",\"namespace\":\"io.confluent.connect.avro\" 属性的原始版本,但没有成功。我是否缺少一个名为 ConnectDefault 的类,或者我可以在源连接器的某处指定一个没有命名空间的架构名称?


我的 Kafka Streams 配置:

streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-app");
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

我的 Kafka Connect 配置:

name=source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:oracle:thin:
mode=incrementing
incrementing.column.name=id
query=QUERY
topic.prefix=topic-name

transforms=InsertKey, ExtractId
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=id
transforms.ExtractId.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.ExtractId.field=id
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false

value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schemas.enable=true
value.converter.schema.registry.url=http://localhost:8081

【问题讨论】:

  • 您的 Kafka Streams 配置是什么?您必须指向模式注册表以允许 Kafka Streams 从那里获取反序列化器。参照。 github.com/confluentinc/kafka-streams-examples/blob/3.3.0-post/…
  • 我通过在 avro 模式文件中设置 "namespace": "io.confluent.connect.avro", "name": "ConnectDefault" 解决了这个问题。所以我使用生成的源 ConnectDefault.java 而不是 TableName.java

标签: java apache-kafka-streams apache-kafka-connect confluent-platform confluent-schema-registry


【解决方案1】:

问题在于,在查询模式下,jdbc 源连接器的模式名称默认为 null。 https://github.com/confluentinc/kafka-connect-jdbc/issues/90

看起来可以通过在具有 SetSchemaMetadata 转换的源连接器中添加带有 SMT(单消息转换)的架构名称来解决此问题。 https://cwiki.apache.org/confluence/display/KAFKA/KIP-66%3A+Single+Message+Transforms+for+Kafka+Connect

transforms=setValueSchema
transforms.setValueSchema.type=org.apache.kafka.connect.transforms.SetSchemaMetadata$Value
transforms.setValueSchema.schema.name=io.confluent.connect.avro.ConnectDefault

【讨论】:

  • 我无法使用 ConnectDefault 更改名称解决它,也找不到使用 SMT 的示例。您能否更详细地提供使用 SMT 的解决方案? :) 谢谢! (我正在使用 cassandra sink 连接器,我遇到了同样的问题)
  • @frm 我添加了示例connect-standalone transform config
【解决方案2】:

默认情况下,kafka 会在 ConnectDefault 中搜索 key schema。

只需创建名称为“ConnectDefault”的关键模式类,并将该类保存在包“io.confluent.connect.avro”中。

【讨论】:

    猜你喜欢
    • 2017-08-30
    • 2017-03-26
    • 2022-05-31
    • 2020-10-09
    • 2021-01-11
    • 2021-10-14
    • 2017-08-13
    • 1970-01-01
    • 2019-02-16
    相关资源
    最近更新 更多