【问题标题】:Kafka Streams: serialize back to avroKafka Streams:序列化回 avro
【发布时间】:2020-04-04 20:31:02
【问题描述】:

我正在尝试构建一个获取 Avro 主题的流,进行简单的转换,然后以 Avro 格式再次将其发送回另一个主题,但我有点卡在最后的序列化部分。

我创建了一个 AVRO 架构,我正在导入它并使用它来创建特定的 Avro Serde。但我不知道如何使用这个 serde 将电影对象序列化回 AVRO。

这是流类:

class StreamsProcessor(val brokers: String, val schemaRegistryUrl: String) {

    private val logger = LogManager.getLogger(javaClass)

    fun process() {
        val streamsBuilder = StreamsBuilder()

        val avroSerde = GenericAvroSerde().apply {
            configure(mapOf(Pair("schema.registry.url", schemaRegistryUrl)), false)
        }

        val movieAvro = SpecificAvroSerde<Movie>().apply{
            configure(mapOf(Pair("schema.registry.url", schemaRegistryUrl)), false)
        }

        val movieAvroStream: KStream<String, GenericRecord> = streamsBuilder
                .stream(movieAvroTopic, Consumed.with(Serdes.String(), avroSerde))

        val movieStream: KStream<String, StreamMovie> = movieAvroStream.map {_, movieAvro ->
            val movie = StreamMovie(
                    movieId = movieAvro["name"].toString() + movieAvro["year"].toString(),
                    director = movieAvro["director"].toString(),
            )
             KeyValue("${movie.movieId}", movie)
        }

        // This where I'm stuck, the call is wrong because movieStream is not a <String, movieAvro> object 
        movieStream.to(movieTopic, Produced.with(Serdes.String(), movieAvro))

        val topology = streamsBuilder.build()

        val props = Properties()
        props["bootstrap.servers"] = brokers
        props["application.id"] = "movies-stream"
        val streams = KafkaStreams(topology, props)
        streams.start()
    }
}

谢谢

【问题讨论】:

    标签: avro apache-kafka-streams


    【解决方案1】:

    结果流的类型是 KStream&lt;String, StreamMovie&gt;,因此使用的值 Serde 应该是 SpecificAvroSerde&lt;StreamMovie&gt; 类型。

    你为什么尝试使用SpecificAvroSerde&lt;Movie&gt;?如果Movie 是所需的输出类型,则应在map 步骤中创建Movie 对象而不是StreamMovie 对象,并相应地更改结果KStream 的值类型。

    比较https://github.com/confluentinc/kafka-streams-examples/blob/5.4.1-post/src/test/java/io/confluent/examples/streams/SpecificAvroIntegrationTest.java

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-11-18
      • 2019-01-15
      • 2018-08-11
      • 1970-01-01
      • 2019-07-30
      • 2015-08-01
      • 2021-08-08
      • 2021-03-11
      相关资源
      最近更新 更多