【问题标题】:Serde class for custom JSON (Kotlin & Kafka)自定义 JSON 的 Serde 类(Kotlin 和 Kafka)
【发布时间】:2020-04-03 23:44:07
【问题描述】:

我正在用 Kotlin 编写一个 kafka 流应用程序,它使用 JSON 消息(没有 AVRO 或 Schema Registry)。

在 MyMessage.kt 中,我已将 MyMessage 类声明为 @Serializable

MyMessage.kt

import kotlinx.serialization.Serializable

@Serializable
data class MyMessage(val from: String, val to: String, val msg: String)

Streaming.kt

val s: KString<String, MyMessage> = streamsBuilder()
    .stream("my", Consumed.with(Serdes.String, Serdes.serdeFrom(MyMessage::class.java)

运行此程序时,我在上面的行中收到以下错误:

Exception in thread "main" java.lang.IllegalArgumentException: Unknown class for built-in serializer. Supported types are: String, Short, Integer, Long, Float, Double, ByteArray, ByteBuffer, Bytes, UUID

我错过了什么?

【问题讨论】:

    标签: kotlin serialization apache-kafka-streams


    【解决方案1】:

    Serdes.serdesFrom() 的参数需要SerializerDeserializer 对象(这两个接口都是来自包org.apache.kafka.common.serialization 的Kafka 接口,与@Serializable 注释无关。

    您需要创建类MyMessageSerializer extends SerializerMyMessageDeserialzer extends Deserializer 并将这些对象传递给方法。 要使用这两个类实现实际的序列化/反序列化,您可以根据需要依赖默认的序列化/反序列化。

    【讨论】:

      猜你喜欢
      • 2020-12-22
      • 1970-01-01
      • 1970-01-01
      • 2021-05-11
      • 1970-01-01
      • 1970-01-01
      • 2017-03-09
      • 2020-08-28
      • 2017-12-10
      相关资源
      最近更新 更多