【问题标题】:kafka-python - How do I commit a partition?kafka-python - 如何提交分区?
【发布时间】:2016-08-03 10:51:04
【问题描述】:

使用 kafka-python-1.0.2。

如果我有一个包含 10 个分区的主题,我该如何提交一个特定的分区,同时循环遍历各个分区和消息。我似乎无法在任何地方找到这样的例子,在文档或其他地方

从文档中,我想使用:

consumer.commit(offset=offsets)

具体来说,如何创建偏移量所需的分区和 OffsetAndMetadata 字典(dict,可选) - {TopicPartition: OffsetAndMetadata}。

我希望函数调用是这样的:

consumer.commit(partition, offset)

但似乎并非如此。

提前致谢。

【问题讨论】:

    标签: python kafka-consumer-api kafka-python


    【解决方案1】:

    所以看起来我可能已经想通了,有趣的是当你写下你的问题时会发生这种情况。这似乎有效:

    meta = consumer.partitions_for_topic(topic)
    options = {}
    options[partition] = OffsetAndMetadata(message.offset + 1, meta)
    consumer.commit(options)
    

    需要更多测试,但如果有任何变化会更新。

    【讨论】:

    • 这就是这样做的方法,我联系了 GitLab 上的 kafka 团队。响应:“元数据实际上只是一个不透明的字符串。您也可以传递 None。内部没有使用元数据,如果需要,它可以作为存储应用程序特定数据的一种方式。但是很少有人真正使用该功能,所以如果你走那条路要小心。
    • 这是该主题的链接:github.com/dpkp/kafka-python/issues/645
    • 如果可行,那么您应该接受您的回答
    【解决方案2】:

    不需要使用元数据。 看这个例子:

    from kafka import TopicPartition
    from kafka.structs import OffsetAndMetadata
    ...
    topic = 'your_topic'
    partition = 0
    tp = TopicPartition(topic,partition)
    kafkaConsumer = createKafkaConsumer()
    kafkaConsumer.assign([tp])
    offset = 15394125
    kafkaConsumer.commit({
        tp: OffsetAndMetadata(offset, None)
    })
    

    希望这会有所帮助。

    【讨论】:

      【解决方案3】:
      from kafka import KafkaConsumer
      from kafka import TopicPartition
      
      TOPIC = "test_topic"
      PARTITION = 0
      
      consumer = KafkaConsumer(
          group_id=TOPIC,
          auto_offset_reset="earliest",
          bootstrap_servers="localhost:9092",
          request_timeout_ms=100000,
          session_timeout_ms=99000,
          max_poll_records=100,
      )
      topic_partition = TopicPartition(TOPIC, PARTITION)
      # format: topic, partition
      consumer.assign([topic_partition])
      consumer.seek(topic_partition, 1660000)
      # format: TopicPartition, offset. 1660000 is the offset been set.
      for message in consumer:
          # do something
      
      1. 这只会分配一个分区并为该分区设置偏移量,如果有多个分区,则需要为每个分区分配一个然后设置偏移量。
      2. aalmeida88 的回答有时对我有用,但在某些情况下,它确实有效,而且 aalmeida88 给了我寻找的想法,它似乎也是一种有用的方法。
      3. 另外你可能需要注意的是,当你自己分配分区时,kafka manager 似乎无法获取消费者信息,这可能是因为你在分配分区时是在kafka而不是zookeeper中设置的,所以卡夫卡经理可能无法获得该信息。 希望对您有所帮助!

      ---编辑-----

      找到更好的方法。

      topic_partition = TopicPartition(TOPIC,
                                       message.partition)
      consumer.seek(topic_partition, offset_value)
      consumer.commit()
      

      这将从kafka获取的message中提取分区信息,并保存子句手动分配分区,从而在程序中需要设置多个分区的偏移量(并不罕见)时带来方便。

      ps:为了保证一个分区只设置一次,需要根据你的应用设置一个flag。

      【讨论】:

        【解决方案4】:

        只需拨打consumer.commit()

        from kafka import KafkaConsumer
        
        KAFKA_TOPIC_NAME='KAFKA_TOPIC_NAME'
        KAFKA_CONSUMER_GROUP='KAFKA_CONSUMER_GROUP'
        consumer = KafkaConsumer(
            KAFKA_TOPIC_NAME,
            bootstrap_servers=['localhost:9092'],
            auto_offset_reset='earliest',
            enable_auto_commit=False,
            group_id=KAFKA_CONSUMER_GROUP
        )
        for message in consumer:
            print(message.value)
            consumer.commit()    # <--- This is what we need
            # Optionally, To check if everything went good
            from kafka import TopicPartition
            print('New Kafka offset: %s' % consumer.committed(TopicPartition(KAFKA_TOPIC_NAME, message.partition)))
        

        【讨论】:

          【解决方案5】:
          from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata
                  
          consumer = KafkaConsumer("topic_name", enable_auto_commit= False, bootstrap_servers=["128.0.0.1:9092"],group_id= "group_name")
          msg = next(consumer)
          consumer.commit({TopicPartition("topic_name", msg.partition): OffsetAndMetadata(msg.offset+1, '')})
          

          【讨论】:

            猜你喜欢
            • 2018-01-24
            • 2019-08-27
            • 2020-08-24
            • 2019-01-01
            • 2023-04-02
            • 2018-10-08
            • 2010-09-30
            • 2020-10-11
            相关资源
            最近更新 更多