from kafka import KafkaConsumer
from kafka import TopicPartition
from kafka.structs import OffsetAndMetadata
...
topic = 'your_topic'
partition = 0
tp = TopicPartition(topic,partition)
kafkaConsumer = KafkaConsumer(config here...)
kafkaConsumer.assign([tp])
offset = 15394125
kafkaConsumer.commit({
    tp: OffsetAndMetadata(offset, None)
})

meta = consumer.partitions_for_topic(topic)
options = {}
options[partition] = OffsetAndMetadata(message.offset, meta)
consumer.commit(options)


# a better way, remove assign partition manually, and extract partition info from kafka message
topic_partition = TopicPartition(command_params["topic"], message.partition) 
consumer.seek(topic_partition, offset_value) 
consumer.commit()

from: http://stackoverflow.com/questions/36579815/kafka-python-how-do-i-commit-a-partition

如果consumer.commit()不可以,可以使用seek(),使用seek()时,如果有多个partition,需
要为每个partition都手动进行consumer assign:

 

topic_partition = TopicPartition("TOPIC_TEST", 1)
# 格式为topic, partition, 1表示partition 1.
consumer.assign([topic_partition])

consumer.seek(topic_partition, 1660000)

 

使用最下面的方法,不再需要手动指定partition,直接从message获取partition,更加灵活。 

 

相关文章:

  • 2021-11-13
  • 2021-11-13
  • 2021-09-01
  • 2021-11-18
猜你喜欢
  • 2022-12-23
  • 2022-12-23
  • 2021-11-18
  • 2022-12-23
  • 2021-12-19
  • 2021-09-11
  • 2022-12-23
相关资源
相似解决方案