【问题标题】:Convert a message consumed by a KafkaListener in a json将 KafkaListener 使用的消息转换为 json
【发布时间】:2021-02-18 19:57:53
【问题描述】:

在我的 spirng boot 项目中,我有一个模型类(FunctionModel):

public class FunctionModel {

    protected Integer id;
    protected String name;

    @ApiModelProperty(value = "Identifier of function")
    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    @ApiModelProperty(value = "Name of function")
    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return ReflectionToStringBuilder.toString(this, ToStringStyle.SHORT_PREFIX_STYLE);
    }

}

我用 Kafka 发送一条消息,这条消息被 KafkaListener 以这种方式消费:

@KafkaListener(topics = ConstantTopics.TOPIC_FUNCTION, containerFactory = "kafkaListenerContainerFactoryString")
public void consumerJsonFunction(Object o) throws Exception {
    logger.info("Consumed message: " + o);
    System.out.println("Consume message:" + o.toString());
}

所以,在控制台中我看到了这个日志:

[2m2021-02-02 17:13:21.950[0;39m [32m INFO[0;39m [35m18680[0;39m [2m---[0;39m [2m[ntainer#5-0-C-1][0;39m [36mc.n.g.s.service.SupervisorService       [0;39m [2m:[0;39m Consumed message: ConsumerRecord(topic = function, partition = 0, offset = 21, CreateTime = 1612282401918, serialized key size = -1, serialized value size = 175, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 110, 116, 116, 100, 97, 116, 97, 46, 105, 111, 116, 46, 101, 108, 100, 46, 97, 112, 105, 46, 109, 111, 100, 101, 108, 115, 46, 70, 117, 110, 99, 116, 105, 111, 110, 77, 111, 100, 101, 108])], isReadOnly = false), key = null, value = {"name":"esponenziale","language":"java","code":"codec","registryImageUrl":"abc123","registryUser":null,"registryToken":null,"description":null,"id":3,"requirements":"def456"})
Consume message:ConsumerRecord(topic = function, partition = 0, offset = 21, CreateTime = 1612282401918, serialized key size = -1, serialized value size = 175, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 110, 116, 116, 100, 97, 116, 97, 46, 105, 111, 116, 46, 101, 108, 100, 46, 97, 112, 105, 46, 109, 111, 100, 101, 108, 115, 46, 70, 117, 110, 99, 116, 105, 111, 110, 77, 111, 100, 101, 108])], isReadOnly = false), key = null, value = {"name":"esponenziale","language":"java","code":"codec","registryImageUrl":"abc123","registryUser":null,"registryToken":null,"description":null,"id":3,"requirements":"def456"})

如何将消费的消息转换为包含原始类模型信息的json?

【问题讨论】:

  • 1) 你使用什么值反序列化器? 2) 当你使用consumerJsonFunction(FunctionModel o) 时会发生什么? 3) 你是否阅读过关于 JSON 序列化部分的 spring-kafka 文档?

标签: java json apache-kafka spring-kafka


【解决方案1】:

您可以直接在 FunctionModel 中接受有效负载,而不是将其作为对象接受。你可以使用这样的东西:

@KafkaListener(topics = ConstantTopics.TOPIC_FUNCTION, containerFactory = "kafkaListenerContainerFactoryString")
public void consumerJsonFunction(@Payload FunctionModel o) throws Exception {
    //Do something
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-05-01
    • 1970-01-01
    • 1970-01-01
    • 2016-01-15
    • 1970-01-01
    • 1970-01-01
    • 2020-12-15
    • 2020-10-22
    相关资源
    最近更新 更多