【发布时间】:2021-01-07 07:36:02
【问题描述】:
我正在使用 Kstream 来使用 kafka 消息并将其保存到我的数据库中。这些消息属于不同的 pojo,目前我正在使用对象映射器来创建对象,然后将它们保存在数据库中。我读过可以使用 Jsondeserialzerserde,但我不确定如何使用它映射到不同的 pojo。为每个 pojo 定制一个 serde 是没有意义的。请帮忙 。提前致谢。
这是我的代码:
public Consumer<KStream<String, String>> process() {
return input ->
inpu.foreach((key, value) -> {
ObjectMapper mapper = new ObjectMapper();
try {
if(value.contains("Teacher"))
{
Teacher teacher= mapper.readValue(value,Teacher.class);
teacherRepository.save(teacher);
}
else if(value.contains("Student"))
{
Student student= mapper.readValue(value,Student.class);
studentRepository.save(student)
}
else if(value.contains("Principal"))
{
Principal principal= mapper.readValue(value,Principal.class);
principalRepository.save(Principal);
}
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}); }
}
【问题讨论】:
标签: java spring apache-kafka apache-kafka-streams spring-cloud-stream