【发布时间】:2022-01-09 20:40:52
【问题描述】:
我有以下代码
class Consumer(val consumer: KafkaConsumer<String, ConsumerRecord<String>>) {
fun run() {
consumer.seekToEnd(emptyList())
val pollDuration = 30 // seconds
while (true) {
val records = consumer.poll(Duration.ofSeconds(pollDuration))
// perform record analysis and commitSync()
}
}
}
}
消费者订阅的主题不断接收记录。有时,消费者会由于处理步骤而崩溃。当消费者然后重新启动时,我希望它从主题的最新偏移量开始消费(即忽略消费者关闭时发布到主题的记录)。我认为seekToEnd() 方法可以确保这一点。但是,似乎该方法根本没有效果。消费者从它崩溃的偏移量开始消费。
seekToEnd()的正确使用方法是什么?
编辑:使用以下配置创建消费者
fun <T> buildConsumer(valueDeserializer: String): KafkaConsumer<String, T> {
val props = setupConfig(valueDeserializer)
Common.setupConsumerSecurityProtocol(props)
return createConsumer(props)
}
fun setupConfig(valueDeserializer: String): Properties {
// Configuration setup
val props = Properties()
props[ConsumerConfig.GROUP_ID_CONFIG] = config.applicationId
props[ConsumerConfig.CLIENT_ID_CONFIG] = config.kafka.clientId
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = config.kafka.bootstrapServers
props[AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = config.kafka.schemaRegistryUrl
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = config.kafka.stringDeserializer
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = valueDeserializer
props[KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG] = "true"
props[ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG] = config.kafka.maxPollIntervalMs
props[ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG] = config.kafka.sessionTimeoutMs
props[ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG] = "false"
props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = "false"
props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"
return props
}
fun <T> createConsumer(props: Properties): KafkaConsumer<String, T> {
val consumer = KafkaConsumer<String, T>(props)
consumer.subscribe(listOf(config.kafka.inputTopic))
return consumer
}
【问题讨论】:
-
你应该在投票前尝试 commitSync
-
另外,如果你禁用自动提交并设置
auto.offset.reset=latest,重启时它总是从主题的末尾开始 -
auto.offset.reset配置仅在消费者组没有在某处提交有效偏移量时才会启动,如下所述:stackoverflow.com/a/32392174/11724337 -
为什么您认为在 poll 之前添加 commitSync 会有所帮助?
-
如果您不自动提交(并且您的代码不会自行提交),则不会有任何存储的偏移量,因此它将始终寻求该设置。否则,如果您在轮询之前提交,那么您将保证为消费者组存储结束偏移量
标签: kotlin apache-kafka kafka-consumer-api