【问题标题】:org.apache.kafka.common.KafkaException: class SaleRequestFactory is not an instance of org.apache.kafka.common.serialization.Serializerorg.apache.kafka.common.KafkaException:类 SaleRequestFactory 不是 org.apache.kafka.common.serialization.Serializer 的实例
【发布时间】:2021-02-20 22:56:31
【问题描述】:

我想实现发送和接收 Java 序列化对象的 Kafka 生产者。我试过这个:

制片人:

@Configuration
public class KafkaProducerConfig {

@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;

@Bean
public ProducerFactory<String, SaleRequestFactory> saleRequestFactoryProducerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SaleRequestFactory.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    return new DefaultKafkaProducerFactory<>(configProps);
}


@Bean
public KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate() {
    return new KafkaTemplate<>(saleRequestFactoryProducerFactory());
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

}

发送对象:

@Autowired
private KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate;

private static String topic = "tp-sale";

private void perform(){

    SaleRequestFactory obj = new SaleRequestFactory();
    obj.setId(100);

    ListenableFuture<SendResult<String, SaleRequestFactory>> send = saleRequestFactoryKafkaTemplate.send(topic, obj);
}

消费者:

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    private String groupId = "test";

    @Bean
    public ConsumerFactory<String, SaleResponseFactory> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SaleResponseFactory.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

// 接收对象

    @KafkaListener(topics = "tp-sale")
public SaleResponseFactory transactionElavonAuthorizeProcess(@Payload SaleRequestFactory tf, @Headers MessageHeaders headers) throws Exception {

    System.out.println(tf.getId());

    SaleResponseFactory resObj = new SaleResponseFactory();
    resObj.setUnique_id("123123");

    return resObj;
}

自定义对象

@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder(toBuilder = true)
public class SaleRequestFactory implements Serializable{

    private static final long serialVersionUID = 1744050117179344127L;
    
    private int id;
}

@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder(toBuilder = true)
public class SaleResponseFactory implements Serializable{

    private static final long serialVersionUID = 1744050117179344127L;

    private String unique_id;
}

当我尝试发送消息时出现错误:

org.apache.kafka.common.KafkaException: class SaleRequestFactory is not an instance of org.apache.kafka.common.serialization.Serializer

你知道我该如何解决这个问题吗?

【问题讨论】:

    标签: java spring spring-boot apache-kafka


    【解决方案1】:

    选项 1

    SaleRequestFactory 实现序列化器,SaleResponseFactory 实现反序列化器。

    public class SaleRequestFactory implements Serializable, org.apache.kafka.common.serialization.Serializer<SaleRequestFactory> {
    
      // ...
    
      @Override
      public byte[] serialize(String topic, SaleRequestFactory data) {
        // convert data to byte[]
        try(ByteArrayOutputStream out = new ByteArrayOutputStream();
            ObjectOutputStream outputStream = new ObjectOutputStream(out)) {
          outputStream.writeObject(value);
        }
    
        return out.toByteArray();
      }
    
    }
    
    // ...
    public class SaleResponseFactory implements Serializable, org.apache.kafka.common.serialization.Deserializer<SaleRequestFactory> {
    
      // ...
      @Override
      public SaleResponseFactory deserialize(String topic, byte[] data) {
        // convert data to SaleResponseFactory
        try(ByteArrayInputStream bis = new ByteArrayInputStream(data);
            ObjectInputStream in = new ObjectInputStream(bis)) {
          return (SaleResponseFactory) in.readObject();
        }
      }
    }
    

    选项 2

    序列化器和反序列化器在两个不同的类中。

    public class SaleRequestFactorySerializer implements org.apache.kafka.common.serialization.Serializer<SaleRequestFactory> {
    
      // ...
    
      @Override
      public byte[] serialize(String topic, SaleRequestFactory data) {
        // convert data to byte[]
      }
    
    }
    
    // ...
    public class SaleResponseFactoryDeserializer implements org.apache.kafka.common.serialization.Deserializer<SaleRequestFactory> {
    
      // ...
      @Override
      public SaleResponseFactory deserialize(String topic, byte[] data) {
        // convert data to SaleResponseFactory
      }
    }
    

    然后改变

    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SaleResponseFactory.class);
    

    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SaleResponseFactoryDeserializer.class);
    

    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SaleRequestFactory.class);
    

    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SaleRequestFactorySerializer.class);
    

    【讨论】:

    • 你能告诉我我需要在这里放什么吗:` // 将数据转换为 byte[]` 和这里// convert data to SaleRequestFactory,好吗?
    • 你可以参考这个答案:stackoverflow.com/a/61605459/9277126
    • 在您的情况下,您可以将从deserialize返回的对象转换为SaleRequestFactory
    猜你喜欢
    • 2019-05-17
    • 2013-05-17
    • 2021-02-20
    • 2015-08-02
    • 1970-01-01
    • 2021-05-11
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多