【问题标题】:Kafka avrò can't locate subjectKafka avrò 找不到主题
【发布时间】:2020-11-08 10:40:53
【问题描述】:

我正在使用在 Schema Registry 中注册的 Avro 模式生成 Kafka 记录。

主题已注册,因为当我点击 http://localhost:8081/subjects/player-value/versions/2

我明白了:

{
subject: "player-value",
version: 2,
id: 21,
schema: "{"type":"record","name":"player_value","namespace":"com.mycorp.mynamespace","doc":"Sample schema to help you get started.","fields":[{"name":"id","type":"int","doc":"The int type is a 32-bit signed integer."},{"name":"first_name","type":"string","doc":"The string is a unicode character sequence."}]}"
}

我正在下载架构,然后使用 GenericRecord 生成具有该架构的主题。

我已将主题价值策略设置为 RecordNamingStragegy。

我像这样创建一个 GenericRecord:

Schema schema = new Schema.Parser().parse(subject.schema);
                    System.out.println(subject.schema);
                    return new GenericData.Record(schema);
record.put("id", 1);
                    record.put("first_name", "foobar");

subject.schema 在哪里:

{"type":"record","name":"player_value","namespace":"com.mycorp.mynamespace","doc":"Sample schema to help you get started.","fields":[{"name":"id","type":"int","doc":"The int type is a 32-bit signed integer."},{"name":"first_name","type":"string","doc":"The string is a unicode character sequence."}]}

但是,当我制作时,我得到了这个错误:

SerializationException: Error retrieving Avro schema: {"type":"record","name":"player_value","namespace":"com.mycorp.mynamespace","doc":"Sample schema to help you get started.","fields":[{"name":"id","type":"int","doc":"The int type is a 32-bit signed integer."},{"name":"first_name","type":"string","doc":"The string is a unicode character sequence."}]}

这是我的完整代码(您不需要阅读全部):

public static void main(String[] args) throws Exception {
        schemaRegistryUtil.downloadSchema("player-value", 2)
                .thenApply(subject -> {
                    Schema schema = new Schema.Parser().parse(subject.schema);
                    System.out.println(subject.schema);
                    return new GenericData.Record(schema);
                })
                .thenApply(record -> {
                    record.put("id", 1);
                    record.put("first_name", "Totti");
                    return record;
                })
                .thenApply(record -> producer.produce("some-key", record, TOPIC))
                .whenCompleteAsync((metadata, throwable) -> {
                    if (throwable != null) {
                        System.out.println(String.format("Error happened %s", throwable.getMessage()));
                    } else {
                        System.out.println("all good man");
                    }
                });


    }

更新

很有趣,如果我删除了

properties.setProperty(AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY, RecordNameStrategy.class.getName());

效果很好!

【问题讨论】:

    标签: java apache-kafka avro confluent-schema-registry


    【解决方案1】:

    架构注册表中的架构主题有不同的命名策略。 This link 提供了策略的描述,以下是该页面的摘录..

    任何实现 io.confluent.kafka.serializers.subject.SubjectNameStrategy可以 指定的。默认情况下,<topic>-value 用作主题。

    默认为 TopicNameStrategy,它由 topic_name-keytopic-name-value 组成,具体取决于提供给序列化程序的配置 isKey。这允许主题只有一个主题。

    另一种命名策略是RecordNameStrategy,它根据 avro 记录命名。来自文档..

    对于发布到 Kafka 主题 <topicName> 的任何 Avro 记录类型, 在主题名称下的注册表中注册模式 <topicName>-<recordName>,其中<recordName> 是完全合格的 Avro 记录名称。

    此策略允许一个主题包含不同记录的混合 类型,因为没有执行主题内兼容性检查

    此外,不同的主题可能包含互不兼容的版本 具有相同的记录名称,因为兼容性检查的范围是 特定主题中的特定记录名称。

    有时,即使架构注册表不可访问,我们也会遇到异常,虽然您似乎并非如此。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2021-11-15
      • 1970-01-01
      • 1970-01-01
      • 2019-04-23
      • 1970-01-01
      • 1970-01-01
      • 2015-09-03
      相关资源
      最近更新 更多