【发布时间】:2020-04-04 20:31:02
【问题描述】:
我正在尝试构建一个获取 Avro 主题的流,进行简单的转换,然后以 Avro 格式再次将其发送回另一个主题,但我有点卡在最后的序列化部分。
我创建了一个 AVRO 架构,我正在导入它并使用它来创建特定的 Avro Serde。但我不知道如何使用这个 serde 将电影对象序列化回 AVRO。
这是流类:
class StreamsProcessor(val brokers: String, val schemaRegistryUrl: String) {
private val logger = LogManager.getLogger(javaClass)
fun process() {
val streamsBuilder = StreamsBuilder()
val avroSerde = GenericAvroSerde().apply {
configure(mapOf(Pair("schema.registry.url", schemaRegistryUrl)), false)
}
val movieAvro = SpecificAvroSerde<Movie>().apply{
configure(mapOf(Pair("schema.registry.url", schemaRegistryUrl)), false)
}
val movieAvroStream: KStream<String, GenericRecord> = streamsBuilder
.stream(movieAvroTopic, Consumed.with(Serdes.String(), avroSerde))
val movieStream: KStream<String, StreamMovie> = movieAvroStream.map {_, movieAvro ->
val movie = StreamMovie(
movieId = movieAvro["name"].toString() + movieAvro["year"].toString(),
director = movieAvro["director"].toString(),
)
KeyValue("${movie.movieId}", movie)
}
// This where I'm stuck, the call is wrong because movieStream is not a <String, movieAvro> object
movieStream.to(movieTopic, Produced.with(Serdes.String(), movieAvro))
val topology = streamsBuilder.build()
val props = Properties()
props["bootstrap.servers"] = brokers
props["application.id"] = "movies-stream"
val streams = KafkaStreams(topology, props)
streams.start()
}
}
谢谢
【问题讨论】: