【发布时间】:2020-05-07 18:25:07
【问题描述】:
我的用例是使用 kafka consumer-api,以便我们可以手动从 kafka-topic 中读取最后成功处理的数据的偏移量,然后手动确认 Kafka 的成功处理数据。 (这是为了减少数据丢失)。但是,在我当前的实现中,即使 我注释掉了 'ack.acknowledge()',程序也会向前移动并从下一个偏移量读取。我是 Kafka 的新手,并通过以下方式实现了我的消费者(我们使用的是 spring boot)
问题是:即使我注释掉 ack.acknowledge(),偏移量仍在更新,消费者正在读取下一个偏移量,这是出乎意料的(到目前为止我的理解)
Consumer Config [请注意我将 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG 设置为 false 并设置 factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE)]:
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Autowired
private AdapterStreamProperties appProperties;
@Value("${spring.kafka.streams.properties.receive.buffer.bytes}")
private String receiveBufferBytes;
@Bean
public ConsumerFactory<PreferredMediaMsgKey, SendEmailCmd> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, appProperties.getApplicationId());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, appProperties.getBootstrapServers());
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
"adapter");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "com.ringo.notification.adapter.serializers.PreferredMediaMsgKeyDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.ringo.notification.adapter.serializers.SendEmailCmdDeserializer");
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
props.put(StreamsConfig.RECEIVE_BUFFER_CONFIG, receiveBufferBytes);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<PreferredMediaMsgKey, SendEmailCmd>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<PreferredMediaMsgKey, SendEmailCmd> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
那么我的消费者就是这样消费的[即使我注释掉 'ack.acknowledge()' ,它下次仍然从下一个偏移量读取:
@KafkaListener(topics = Constants.INPUT_TOPIC, groupId = "adapter")
public void listen(ConsumerRecord<PreferredMediaMsgKey, SendEmailCmd> record,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
@Header(KafkaHeaders.OFFSET) Long offset, Acknowledgment ack) {
System.out.println("----------------------------------------------------------------------------");
System.out.println("Reading from Offset: " + offset + ", and Partition: " + partition);
System.out.println("Record for this pertition: Key : "+ record.key() + ", Value : " + record.value());
System.out.println("----------------------------------------------------------------------------");
NotificationProcessedFinal result = processor.processEmail(record.key(),record.value());
if( StringUtils.isNotEmpty(result.getErrorCmd().getErrorMsg())) {
kafkaErrorProducerTemplate.send(adapterMsgProperties.getErrorTopic(), record.key(), result.getErrorCmd());
}
else {
kafkaOutputProducerTemplate.send(adapterMsgProperties.getOutputTopic(), record.key(), result.getNotifyEmailCmd());
}
ack.acknowledge();
}
我的 gradle.build 中的 kafka api 版本:
//Kafka Dependencie
implementation 'org.apache.kafka:kafka-streams:2.0.1'
implementation 'org.apache.kafka:kafka-clients:2.0.1'
任何见解都会有所帮助。
提前致谢
【问题讨论】:
标签: apache-kafka kafka-consumer-api