【问题标题】:Spring Kafka Custom DeserializerSpring Kafka 自定义反序列化器
【发布时间】:2018-02-19 01:24:24
【问题描述】:

我正在按照link 中列出的步骤创建客户反序列化程序。我从 Kafka 收到的消息在 json 字符串之前有纯文本“log message -”。我希望反序列化器忽略这个字符串并解析 json 数据。有办法吗?

应用程序

@SpringBootApplication
public class TransactionauditServiceApplication {

    public static void main(String[] args) throws InterruptedException {
        new SpringApplicationBuilder(TransactionauditServiceApplication.class).web(false).run(args);
    }

    @Bean
    public MessageListener messageListener() {
        return new MessageListener();
    }

    public static class MessageListener {

        @KafkaListener(topics = "ctp_verbose", containerFactory = "kafkaListenerContainerFactory")
        public void listen(@Payload ConciseMessage message, 
                  @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
            System.out.println("Received Messasge in group foo: " + message.getStringValue("traceId") + " partion " + partition);
        }
    }
}

ConsumerConfig

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Value(value = "${kafka.bootstrapAddress:localhost:9092}")
    private String bootstrapAddress;

    @Value(value = "${groupId:audit}")
    private String groupId;

    @Bean
    public ConsumerFactory<String, ConciseMessage> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(ConciseMessage.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, ConciseMessage> kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, ConciseMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

【问题讨论】:

  • 为什么你的反序列化器不能简单地省略这个文本位并在它之后解析 JSON?
  • 我已经粘贴了代码。它正在隐含地完成。我想截取并省略文本位。
  • new JsonDeserializer&lt;&gt;(ConciseMessage.class) - 这不是自定义的吧?
  • 这是自定义消息。 json 数据的类型为 ConciseMessage
  • 不,我的意思是,反序列化器类不是你想出来的,是吗?

标签: java spring spring-boot apache-kafka json-deserialization


【解决方案1】:

通过编写new JsonDeserializer&lt;&gt;(ConciseMessage.class) 行,您只是告诉Kafka 您要将消息转换为ConciseMessage 类型。所以,这并不能使它成为一个自定义的反序列化器。要解决您的问题,您很可能需要自己实现一个反序列化器,该反序列化器具有去除文本“log message -”的逻辑。

【讨论】:

    猜你喜欢
    • 2019-05-07
    • 1970-01-01
    • 1970-01-01
    • 2022-07-21
    • 1970-01-01
    • 2015-02-17
    • 2019-11-18
    • 1970-01-01
    • 2018-01-07
    相关资源
    最近更新 更多