【发布时间】:2020-11-08 10:40:53
【问题描述】:
我正在使用在 Schema Registry 中注册的 Avro 模式生成 Kafka 记录。
主题已注册,因为当我点击 http://localhost:8081/subjects/player-value/versions/2
我明白了:
{
subject: "player-value",
version: 2,
id: 21,
schema: "{"type":"record","name":"player_value","namespace":"com.mycorp.mynamespace","doc":"Sample schema to help you get started.","fields":[{"name":"id","type":"int","doc":"The int type is a 32-bit signed integer."},{"name":"first_name","type":"string","doc":"The string is a unicode character sequence."}]}"
}
我正在下载架构,然后使用 GenericRecord 生成具有该架构的主题。
我已将主题价值策略设置为 RecordNamingStragegy。
我像这样创建一个 GenericRecord:
Schema schema = new Schema.Parser().parse(subject.schema);
System.out.println(subject.schema);
return new GenericData.Record(schema);
record.put("id", 1);
record.put("first_name", "foobar");
subject.schema 在哪里:
{"type":"record","name":"player_value","namespace":"com.mycorp.mynamespace","doc":"Sample schema to help you get started.","fields":[{"name":"id","type":"int","doc":"The int type is a 32-bit signed integer."},{"name":"first_name","type":"string","doc":"The string is a unicode character sequence."}]}
但是,当我制作时,我得到了这个错误:
SerializationException: Error retrieving Avro schema: {"type":"record","name":"player_value","namespace":"com.mycorp.mynamespace","doc":"Sample schema to help you get started.","fields":[{"name":"id","type":"int","doc":"The int type is a 32-bit signed integer."},{"name":"first_name","type":"string","doc":"The string is a unicode character sequence."}]}
这是我的完整代码(您不需要阅读全部):
public static void main(String[] args) throws Exception {
schemaRegistryUtil.downloadSchema("player-value", 2)
.thenApply(subject -> {
Schema schema = new Schema.Parser().parse(subject.schema);
System.out.println(subject.schema);
return new GenericData.Record(schema);
})
.thenApply(record -> {
record.put("id", 1);
record.put("first_name", "Totti");
return record;
})
.thenApply(record -> producer.produce("some-key", record, TOPIC))
.whenCompleteAsync((metadata, throwable) -> {
if (throwable != null) {
System.out.println(String.format("Error happened %s", throwable.getMessage()));
} else {
System.out.println("all good man");
}
});
}
更新
很有趣,如果我删除了
properties.setProperty(AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY, RecordNameStrategy.class.getName());
效果很好!
【问题讨论】:
标签: java apache-kafka avro confluent-schema-registry