【发布时间】:2021-02-18 19:57:53
【问题描述】:
在我的 spirng boot 项目中,我有一个模型类(FunctionModel):
public class FunctionModel {
protected Integer id;
protected String name;
@ApiModelProperty(value = "Identifier of function")
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
@ApiModelProperty(value = "Name of function")
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return ReflectionToStringBuilder.toString(this, ToStringStyle.SHORT_PREFIX_STYLE);
}
}
我用 Kafka 发送一条消息,这条消息被 KafkaListener 以这种方式消费:
@KafkaListener(topics = ConstantTopics.TOPIC_FUNCTION, containerFactory = "kafkaListenerContainerFactoryString")
public void consumerJsonFunction(Object o) throws Exception {
logger.info("Consumed message: " + o);
System.out.println("Consume message:" + o.toString());
}
所以,在控制台中我看到了这个日志:
[2m2021-02-02 17:13:21.950[0;39m [32m INFO[0;39m [35m18680[0;39m [2m---[0;39m [2m[ntainer#5-0-C-1][0;39m [36mc.n.g.s.service.SupervisorService [0;39m [2m:[0;39m Consumed message: ConsumerRecord(topic = function, partition = 0, offset = 21, CreateTime = 1612282401918, serialized key size = -1, serialized value size = 175, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 110, 116, 116, 100, 97, 116, 97, 46, 105, 111, 116, 46, 101, 108, 100, 46, 97, 112, 105, 46, 109, 111, 100, 101, 108, 115, 46, 70, 117, 110, 99, 116, 105, 111, 110, 77, 111, 100, 101, 108])], isReadOnly = false), key = null, value = {"name":"esponenziale","language":"java","code":"codec","registryImageUrl":"abc123","registryUser":null,"registryToken":null,"description":null,"id":3,"requirements":"def456"})
Consume message:ConsumerRecord(topic = function, partition = 0, offset = 21, CreateTime = 1612282401918, serialized key size = -1, serialized value size = 175, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 110, 116, 116, 100, 97, 116, 97, 46, 105, 111, 116, 46, 101, 108, 100, 46, 97, 112, 105, 46, 109, 111, 100, 101, 108, 115, 46, 70, 117, 110, 99, 116, 105, 111, 110, 77, 111, 100, 101, 108])], isReadOnly = false), key = null, value = {"name":"esponenziale","language":"java","code":"codec","registryImageUrl":"abc123","registryUser":null,"registryToken":null,"description":null,"id":3,"requirements":"def456"})
如何将消费的消息转换为包含原始类模型信息的json?
【问题讨论】:
-
1) 你使用什么值反序列化器? 2) 当你使用
consumerJsonFunction(FunctionModel o)时会发生什么? 3) 你是否阅读过关于 JSON 序列化部分的 spring-kafka 文档?
标签: java json apache-kafka spring-kafka