【发布时间】:2019-05-18 19:48:20
【问题描述】:
我有一个 Kafka 生产者向 Kafka 消费者发送消息的 Springboot 项目示例。
这就是我的 Kafka Producer 配置的样子:
@Configuration
public class KafkaProducerConfig {
@Value("${my.kafka.bootstrap-servers}")
private String bootstrapServer;
@Bean
public ProducerFactory<String, Customer> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configProps.put(ProducerConfig.ACKS_CONFIG, "all");
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Customer> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
以下是我为 Kafka 生产者提供的服务类:
@Service
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, Customer> kafkaTemplate;
@Value("${my.kafka.topic}")
String kafkaTopic = "my-test";
public void send(Customer customer) {
System.out.println("sending customer data=" + customer);
kafkaTemplate.send(kafkaTopic, customer);
}
}
我的 KafkaConsumer 配置类如下所示:
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value("${my.kafka.bootstrap-servers}")
private String bootstrapServer;
@Value("${my.kafka.consumer.group-id}")
private String groupId;
@Bean
public ConsumerFactory<String, Customer> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props,
new StringDeserializer(),
new JsonDeserializer<>(Customer.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Customer> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Customer> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
以下是我的 Kafka Consumer 类:
@Service
public class KafkaConsumer {
@KafkaListener(topics="${my.kafka.topic}")
public void processMessage(Customer customer) {
System.out.println("received customer details = " + customer);
}
}
我想向生产者发送回确认,可能包含客户姓名的消息。但我无法弄清楚我怎样才能做到这一点。有没有一种方法可以在消费者端提取发件人数据,以便将消息返回给生产者?
【问题讨论】:
标签: apache-kafka kafka-consumer-api kafka-producer-api spring-kafka