【问题标题】:ValueSerializer, SerializationException and DLT, how to make it works for serialization?ValueSerializer、SerializationException 和 DLT,如何使其适用于序列化?
【发布时间】:2021-10-04 04:29:40
【问题描述】:

我正在使用 Kafka 探索 Spring Boot,但我遇到了 DLT 及其序列化程序的问题。

在我的 DLT 中,我想要所有导致 SerializationException 的消息,以及所有导致技术或功能异常的(有效)消息。我的消息是一个 XML 消息,我可以在反序列化器和 JAXB 生成的对象中轻松反序列化。

问题在于值序列化器:

  • 如果我发送导致 SerializationException 的愚蠢消息(不是 XML,而是随机字符串...),我需要 ByteArraySerializer。 (record.value()Byte[]
  • 如果我有技术或功能异常,我需要一个 MyObjectSerializer...(record.value()MyObject

如何处理这个问题? (现在使用 spring-kafka 2.5.5,使用 kafka-client 2.5.1)

我设法使它与两者一起工作的最好的东西是对象序列化器,但我失去了 MyObject 的字符串格式...

【问题讨论】:

    标签: apache-kafka spring-kafka kafka-producer-api


    【解决方案1】:

    假设您使用的是DeadLetterPublishingRecoverer,请使用这些构造函数之一...

    /**
     * Create an instance with the provided templates and a default destination resolving
     * function that returns a TopicPartition based on the original topic (appended with
     * ".DLT") from the failed record, and the same partition as the failed record.
     * Therefore the dead-letter topic must have at least as many partitions as the
     * original topic. The templates map keys are classes and the value the corresponding
     * template to use for objects (producer record values) of that type. A
     * {@link java.util.LinkedHashMap} is recommended when there is more than one
     * template, to ensure the map is traversed in order. To send records with a null
     * value, add a template with the {@link Void} class as a key; otherwise the first
     * template from the map values iterator will be used.
     * @param templates the {@link KafkaOperations}s to use for publishing.
     */
    public DeadLetterPublishingRecoverer(Map<Class<?>, KafkaOperations<? extends Object, ? extends Object>> templates) {
    
    
    
    /**
     * Create an instance with the provided templates and destination resolving function,
     * that receives the failed consumer record and the exception and returns a
     * {@link TopicPartition}. If the partition in the {@link TopicPartition} is less than
     * 0, no partition is set when publishing to the topic. The templates map keys are
     * classes and the value the corresponding template to use for objects (producer
     * record values) of that type. A {@link java.util.LinkedHashMap} is recommended when
     * there is more than one template, to ensure the map is traversed in order. To send
     * records with a null value, add a template with the {@link Void} class as a key;
     * otherwise the first template from the map values iterator will be used.
     * @param templates the {@link KafkaOperations}s to use for publishing.
     * @param destinationResolver the resolving function.
     */
    @SuppressWarnings("unchecked")
    public DeadLetterPublishingRecoverer(Map<Class<?>, KafkaOperations<? extends Object, ? extends Object>> templates,
            BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {
    

    他们采用由值类型键入的KafkaOperations 中的Map

    您可以创建两个生产者工厂,或者只是使用此构造函数覆盖序列化程序...

    /**
     * Create an instance using the supplied producer factory and properties, with
     * autoFlush false. If the configOverrides is not null or empty, a new
     * {@link DefaultKafkaProducerFactory} will be created with merged producer properties
     * with the overrides being applied after the supplied factory's properties.
     * @param producerFactory the producer factory.
     * @param configOverrides producer configuration properties to override.
     * @since 2.5
     */
    public KafkaTemplate(ProducerFactory<K, V> producerFactory, @Nullable Map<String, Object> configOverrides) {
    

    https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters

    【讨论】:

    • 看起来不错。我检查了:) 谢谢。
    • 另一个与此主题相关的问题。是否可以将对象错误地包装在另一个对象中?就像一个错误的 MyObject 对象,我想将它包装成一个对象 ErrorObject 以在 DLQ 发送。
    • 最好提出一个新问题,以便其他人可以找到它(以及答案)。您可以继承 DLPR 并覆盖 createProducerRecord 方法。 Subclasses can override this method to customize the producer record to send to the DLQ. The default implementation simply copies the key and value from the consumer record and adds the headers.
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-02-19
    • 1970-01-01
    • 2011-01-08
    • 2016-11-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多