【问题标题】:How to select starting offset in Pykafka simpleconsumer?如何在 Pykafka simpleconsumer 中选择起始偏移量?
【发布时间】:2018-05-19 10:33:27
【问题描述】:

在我的 kafka 集群单分区主题中,我有一个简单的消费者处理所有传入的消息,如果处理的数据出现错误,我想以相同的顺序重新处理来自某个偏移量(不是开头)的所有消息,以修复不一致并保持来自kafka的原始有序消息序列。

有没有办法用 Pykafka 做到这一点?我没搞清楚

【问题讨论】:

    标签: python apache-kafka consumer pykafka


    【解决方案1】:

    您需要致电reset_offsets()。例如:

    consumer = topic.get_simple_consumer(consumer_group="example")
    partition_offset_pairs = [(p, get_offset_for_partition(p)) for p in consumer.partitions.itervalues()]
    # because we passed in a consumer_group the new offsets will be saved in Kafka
    consumer.reset_offsets(partition_offsets=partition_offset_pairs)
    

    (其中get_offset_for_partition() 是您定义的函数)。或者对于单分区主题:

    # read from offset 123456
    consumer = topic.get_simple_consumer()
    partition = topic.partitions[0]
    consumer.reset_offsets([(partition, 123456)])
    

    同样的reset_offsets() 方法也适用于BalancedConsumerManagedBalanceConsumer 类。

    请注意,作为 Kafka 设计的一部分,仅保证每个主题分区的消息按顺序独立。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2015-06-29
      • 2012-12-13
      • 2014-11-27
      • 2012-07-24
      • 1970-01-01
      • 2015-01-21
      • 2017-03-07
      相关资源
      最近更新 更多