【问题标题】:How to get an acknowledgement or a call back from Kafka consumer to the producer如何从 Kafka 消费者获得确认或回电给生产者
【发布时间】:2019-05-18 19:48:20
【问题描述】:

我有一个 Kafka 生产者向 Kafka 消费者发送消息的 Springboot 项目示例。

这就是我的 Kafka Producer 配置的样子:

@Configuration
public class KafkaProducerConfig {

@Value("${my.kafka.bootstrap-servers}")
private String bootstrapServer;

@Bean
public ProducerFactory<String, Customer> producerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    configProps.put(ProducerConfig.ACKS_CONFIG, "all");
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, Customer> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}
}

以下是我为 Kafka 生产者提供的服务类:

@Service
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, Customer> kafkaTemplate;

@Value("${my.kafka.topic}")
String kafkaTopic = "my-test";

public void send(Customer customer) {
    System.out.println("sending customer data=" + customer);
    kafkaTemplate.send(kafkaTopic, customer);
}
}

我的 KafkaConsumer 配置类如下所示:

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

@Value("${my.kafka.bootstrap-servers}")
private String bootstrapServer;

@Value("${my.kafka.consumer.group-id}")
private String groupId;

@Bean
public ConsumerFactory<String, Customer> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

    return new DefaultKafkaConsumerFactory<>(props,
                                      new StringDeserializer(), 
                                      new JsonDeserializer<>(Customer.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Customer> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Customer> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

}

以下是我的 Kafka Consumer 类:

@Service
public class KafkaConsumer {

@KafkaListener(topics="${my.kafka.topic}")
public void processMessage(Customer customer) {
    System.out.println("received customer details = " + customer);
}
}

我想向生产者发送回确认,可能包含客户姓名的消息。但我无法弄清楚我怎样才能做到这一点。有没有一种方法可以在消费者端提取发件人数据,以便将消息返回给生产者?

【问题讨论】:

    标签: apache-kafka kafka-consumer-api kafka-producer-api spring-kafka


    【解决方案1】:

    实现这一点的一种方法是在另一个主题上发布一条确认消息,生产者应收听该消息。该消息可以包含您想要的数据以及关联数据

    【讨论】:

    • 我很喜欢这个主意。如果一个侦听器有多个生产者发送消息,有没有办法可以通过 id 或其他东西来识别该生产者?
    • 为每个发布者(或消息)生成一个唯一的关联 ID,并复制到响应消息正文或密钥中。
    猜你喜欢
    • 1970-01-01
    • 2018-12-18
    • 1970-01-01
    • 1970-01-01
    • 2017-09-07
    • 1970-01-01
    • 1970-01-01
    • 2017-11-03
    • 2015-03-25
    相关资源
    最近更新 更多