【问题标题】:Spring Cloud stream deserialize different pojosSpring Cloud 流反序列化不同的 pojo
【发布时间】:2021-01-07 07:36:02
【问题描述】:

我正在使用 Kstream 来使用 kafka 消息并将其保存到我的数据库中。这些消息属于不同的 pojo,目前我正在使用对象映射器来创建对象,然后将它们保存在数据库中。我读过可以使用 Jsondeserialzerserde,但我不确定如何使用它映射到不同的 pojo。为每个 pojo 定制一个 serde 是没有意义的。请帮忙 。提前致谢。

这是我的代码:

    public Consumer<KStream<String, String>> process() {

        return input ->
                inpu.foreach((key, value) -> {
                    ObjectMapper mapper = new ObjectMapper();
                    try {

                        if(value.contains("Teacher"))
                        {
                            Teacher teacher= mapper.readValue(value,Teacher.class);
                            teacherRepository.save(teacher);
                        }
                        else if(value.contains("Student"))
                        {
                            Student student= mapper.readValue(value,Student.class);
                            studentRepository.save(student)
                        }
                        else  if(value.contains("Principal"))
                        {
                            Principal principal= mapper.readValue(value,Principal.class);
                            principalRepository.save(Principal);
                        }
                    } catch (JsonProcessingException e) {
                        e.printStackTrace();
                    }

                });    }

}

【问题讨论】:

    标签: java spring apache-kafka apache-kafka-streams spring-cloud-stream


    【解决方案1】:

    Spring Cloud Stream 中的 Kafka Streams binder 不直接提供任何机制来执行您的要求。您的消费者的类型签名表明您将其作为String 使用,因此绑定器可以在您不明确提供任何Serde 的情况下推断该信息。但是,如果您想使用 jackson 进一步将 String 转换为其他类型,您需要在业务逻辑中自行完成,因为您已经拥有它。如果您只有几个有限类型,我认为这样做没有问题。

    【讨论】:

    • 明白了。谢谢你的解释。
    猜你喜欢
    • 1970-01-01
    • 2023-01-28
    • 2023-03-23
    • 2018-12-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多