【问题标题】:Error while consuming messages via Kafka Consumer通过 Kafka Consumer 消费消息时出错
【发布时间】:2021-05-11 02:08:31
【问题描述】:

我是 Apache Kafka 的新手。我试图找出这个问题的解决方案,但我失败了。 生产者代码工作正常,数据存储在 Kafka 主题中。 我在我的 Kafka Producer 配置中使用 JsonSerializer 作为 ValueSerializer。

NoramlizedEvent 是一个简单的 POJO,用于生产者和消费者。

我的制作人代码:

    public void saveMessage(final IMMessage message) {

        for (NormalizedEvent event :
                message.getNormalizedEvents()) {

            event.setServiceId(message.getServiceId());

            ProducerRecord<String, NormalizedEvent> producerRecord = buildProducerRecord(null, event, TOPIC);
            ListenableFuture<SendResult<String, NormalizedEvent>> listenableFuture = kafkaTemplate.send(producerRecord);
            listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, NormalizedEvent>>() {

                @Override
                public void onFailure(Throwable ex) {
                    handleFailure(null, event, ex);
                }

                @Override
                public void onSuccess(SendResult<String, NormalizedEvent> result) {
                    properties.put("partitionId", result.getRecordMetadata().partition());
                    properties.put("offsetId", result.getRecordMetadata().offset());
                    handleSuccess(null, event, result);
                    imMessageDBService.setDBProperties(event, properties);
                }
            });

        }
    }

    private ProducerRecord<String, NormalizedEvent> buildProducerRecord(String key, NormalizedEvent value, String topic) {
        return new ProducerRecord<>(topic, key, value);
    }

我的消费者代码:

 @Bean
    public ConsumerFactory<String, NormalizedEvent> userConsumerFactory() {
        Map<String, Object> config = new HashMap<>();

        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_json");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(NormalizedEvent.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, NormalizedEvent> userKafkaListenerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, NormalizedEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(userConsumerFactory());
        return factory;
    }

错误信息

java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
    at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:194) ~[spring-kafka-2.7.0.jar:2.7.0]
    at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112) ~[spring-kafka-2.7.0.jar:2.7.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1598) [spring-kafka-2.7.0.jar:2.7.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1210) [spring-kafka-2.7.0.jar:2.7.0]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_291]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_291]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_291]
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition SAP-S_4-HANA-2 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'com.sap.innovision.springkafkaproducer.model.NormalizedEvent' is not in the trusted packages: [java.util, java.lang, com.innovision.consumer.model, com.innovision.consumer.model.*]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
    at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:126) ~[spring-kafka-2.7.0.jar:2.7.0]
    at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:100) ~[spring-kafka-2.7.0.jar:2.7.0]
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:521) ~[spring-kafka-2.7.0.jar:2.7.0]
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1365) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:130) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1596) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1432) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1283) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) ~[kafka-clients-2.6.0.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1410) [spring-kafka-2.7.0.jar:2.7.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1249) [spring-kafka-2.7.0.jar:2.7.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161) [spring-kafka-2.7.0.jar:2.7.0]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_291]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_291]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_291]

【问题讨论】:

    标签: apache-kafka spring-kafka


    【解决方案1】:

    原因:java.lang.IllegalArgumentException:类“com.sap.innovision.springkafkaproducer.model.NormalizedEvent”不在受信任的包中:[java.util, java.lang, com.innovision.consumer.model , com.innovision.consumer.model.]。如果您认为此类可以安全反序列化,请提供其名称。如果序列化仅由受信任的来源完成,您还可以启用全部信任 ()。

    看起来事件在生产者和消费者的不同包中。

    默认情况下,反序列化器在标头中使用类型信息,如果没有类型标头,则使用泛型参数作为后备。

    解串器上有两种解决方案:

    /**
     * Set to false to ignore type information in headers and use the configured
     * target type instead.
     * Only applies if the preconfigured type mapper is used.
     * Default true.
     * @param useTypeHeaders false to ignore type headers.
     * @since 2.2.8
     */
    public void setUseTypeHeaders(boolean useTypeHeaders) {
        if (!this.typeMapperExplicitlySet) {
            this.useTypeHeaders = useTypeHeaders;
            setUpTypePrecedence(Collections.emptyMap());
        }
    }
    

    或在序列化器上:

    /**
     * Set to false to disable adding type info headers.
     * @param addTypeInfo true to add headers.
     * @since 2.1
     */
    public void setAddTypeInfo(boolean addTypeInfo) {
        this.addTypeInfo = addTypeInfo;
    }
    

    请参阅 the documentation 和 Javadocs。

    【讨论】:

    猜你喜欢
    • 2019-12-06
    • 1970-01-01
    • 2020-03-08
    • 2020-02-22
    • 2023-01-27
    • 2020-10-04
    • 2018-10-21
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多