假设您使用的是DeadLetterPublishingRecoverer,请使用这些构造函数之一...
/**
* Create an instance with the provided templates and a default destination resolving
* function that returns a TopicPartition based on the original topic (appended with
* ".DLT") from the failed record, and the same partition as the failed record.
* Therefore the dead-letter topic must have at least as many partitions as the
* original topic. The templates map keys are classes and the value the corresponding
* template to use for objects (producer record values) of that type. A
* {@link java.util.LinkedHashMap} is recommended when there is more than one
* template, to ensure the map is traversed in order. To send records with a null
* value, add a template with the {@link Void} class as a key; otherwise the first
* template from the map values iterator will be used.
* @param templates the {@link KafkaOperations}s to use for publishing.
*/
public DeadLetterPublishingRecoverer(Map<Class<?>, KafkaOperations<? extends Object, ? extends Object>> templates) {
/**
* Create an instance with the provided templates and destination resolving function,
* that receives the failed consumer record and the exception and returns a
* {@link TopicPartition}. If the partition in the {@link TopicPartition} is less than
* 0, no partition is set when publishing to the topic. The templates map keys are
* classes and the value the corresponding template to use for objects (producer
* record values) of that type. A {@link java.util.LinkedHashMap} is recommended when
* there is more than one template, to ensure the map is traversed in order. To send
* records with a null value, add a template with the {@link Void} class as a key;
* otherwise the first template from the map values iterator will be used.
* @param templates the {@link KafkaOperations}s to use for publishing.
* @param destinationResolver the resolving function.
*/
@SuppressWarnings("unchecked")
public DeadLetterPublishingRecoverer(Map<Class<?>, KafkaOperations<? extends Object, ? extends Object>> templates,
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {
他们采用由值类型键入的KafkaOperations 中的Map。
您可以创建两个生产者工厂,或者只是使用此构造函数覆盖序列化程序...
/**
* Create an instance using the supplied producer factory and properties, with
* autoFlush false. If the configOverrides is not null or empty, a new
* {@link DefaultKafkaProducerFactory} will be created with merged producer properties
* with the overrides being applied after the supplied factory's properties.
* @param producerFactory the producer factory.
* @param configOverrides producer configuration properties to override.
* @since 2.5
*/
public KafkaTemplate(ProducerFactory<K, V> producerFactory, @Nullable Map<String, Object> configOverrides) {
https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters