【问题标题】:java.util.LinkedHashMap cannot be cast to class GenericEvent - objectmapper typeReferencejava.util.LinkedHashMap 无法转换为类 GenericEvent - objectmapper typeReference
【发布时间】:2021-06-13 15:08:44
【问题描述】:

消息正在使用 json 反序列化器(spring commons)从 kafka 主题中消耗。通用消息结构如下。 通用事件:

{
  "id": "10000",
  "payload": {
     "id": 100
     "attribute1": "hi",
     "attribute2": "hello"
  },
  "type": {
    "id" : 1,
    "name" : "A"
  }
}

不同的类型有不同的payload,payload的结构也会有所不同。所以我想根据类型来处理payload。

我各自的POJO如下,一共创建了3个不同的payload和各自的payload pojo。

GenericEvent {
    
    private int id;
    private T payload:
    private Type type;

}

现在我正在使用下面的代码进行转换

JsonNode jsonNode = objectMapper.readTree("messagefromKafka);
GenericEvent genericEvent = objectMapper.convertValue(jsonNode, new TypeReference<GenericEvent>() {});

但是代码抛出 java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast to class GenericEvent。

有人可以帮忙解决这个问题吗?

编辑: //我已经提供的通用对象

 //Payload Object - applicable for different types - A, B, C, D
     public class Payload {
  
           private int id;
           private String name;
           private String address;
           private String typeAAttribute1; //applicable for type A attribute
           private String typeAAttribute2; //applicable for type A attribute
           private String typeBAtribute1;  //applicable for type B attribute
           private String typeABAtribute2; //applicable for type A,B attibute
           private String typeCtribute1;  //applicable for type C  attibute
           private String typeABCAtribute1;//applicable for type A,B,C attibute
            
     }
    

    Kafka consumer config: 
    ---------------------
   

     import org.springframework.kafka.support.serializer.JsonDeserializer;
    
        @Bean
        public ConcurrentKafkaListenerContainerFactory<Object, Object> reprocessListenerContainerFactory() {
    
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
            props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false);
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapservers);
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, latest);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "testgroupid");
            props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "300000");
    
            ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
                    new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));
            factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
    
            factory.setRecordFilterStrategy(
                    (consumerRecord) -> {
                        try {
                            JsonNode jsonNode = objectMapper.readTree(consumerRecord.value().toString());
                            GenericEvent genericEvent = objectMapper.convertValue(jsonNode, new TypeReference<GenericEvent>() {});
                            log.info(
                                    "Retrieved the record {} from the partition {} with offset {}",
                                    consumerRecord.value(),
                                    consumerRecord.partition(),
                                    consumerRecord.offset());
                            //Process type A and B events
                            if (genericEvent.getType().equalIgnoreCase("A") || genericEvent.getType().equalIgnoreCase("B"))) {
                                return false;
                            }
                            return true;
                        } catch (Exception ex) {
                            log.error("Error occured:{}", ex.getStackTrace());
                            return true;
                        }
                    });
            return factory;
        }
    //Listener
    @KafkaListener(id = "MYREPROCESS-ID", topics = "reprocess-test",
            containerFactory = "reprocessListenerContainerFactory",
            autoStartup = "true")
    public void onMessage(ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment) {
        JsonNode jsonNode = objectMapper.readTree("messagefromKafka);
        GenericEvent genericEvent = objectMapper.convertValue(jsonNode, new TypeReference<GenericEvent>() {});
                //I should identify the respective payload during runtime
 Payload payload = genericEvent.getPayload();
                if (genericEvent.getType().equalsIgnoreCase("A") {
                   processPayload(payload);
                } else {
                   processPayload(payload);
                }
        }

【问题讨论】:

  • 你需要展示你的监听代码和spring-kafka配置。
  • 嗨,加里!你能帮忙吗
  • 您编辑问题时我们不会收到通知;你必须评论你已经这样做了(就像你刚刚做的那样)。

标签: spring-kafka objectmapper jackson-databind


【解决方案1】:

有些奇怪。由于您使用的是 Spring JsonDeserializer,因此您必须告诉它要转换为什么;属性记录在这里https://docs.spring.io/spring-kafka/docs/current/reference/html/#serdes-json-config)。

在这种情况下,你会得到ConsumerRecord&lt;?, GenericEvent&gt;

如果您想接收ConsumerRecord&lt;String, String&gt; 并自己进行转换,则应改用StringDeserializer s。

【讨论】:

    猜你喜欢
    • 2018-06-10
    • 1970-01-01
    • 1970-01-01
    • 2021-04-22
    • 2018-08-08
    • 1970-01-01
    • 2022-01-14
    • 2022-01-03
    • 2021-03-03
    相关资源
    最近更新 更多