【问题标题】:Kafka-python get number of partitions for topicKafka-python 获取主题的分区数
【发布时间】:2021-08-04 15:03:53
【问题描述】:

我正在使用:https://github.com/mumrah/kafka-python 作为 Python 中的 kafka api。我想获取指定主题的分区数。我该怎么做?

【问题讨论】:

    标签: python python-2.7 metadata apache-kafka kafka-consumer-api


    【解决方案1】:

    可能是一个稍微简单的解决方案,但是:

    from kafka import KafkaClient
    
    client = KafkaClient('SERVER:PORT')
    topic_partition_ids = client.get_partition_ids_for_topic(b'TOPIC')
    len(topic_partition_ids)
    

    在 Python 3.4.3 / kafka-python 0.9.3 上测试

    【讨论】:

    • 考虑添加:a = len(topic_partition_ids) print(topic_partition_ids) print(a)
    • 这已被弃用,尝试使用“partitions_for_topic”:kafka-python.readthedocs.io/en/master/apidoc/…
    • 按照@Alan 的建议尝试使用:client.cluster.partitions_for_topic(topic)
    【解决方案2】:

    我在尝试解决完全相同的问题时发现了这个问题。我知道这个问题很老,但这是我想出的解决方案(使用Kazoo 与动物园管理员交谈):

    from kazoo.client import KazooClient
    
    class KafkaInfo(object):
        def __init__(self, hosts):
            self.zk = KazooClient(hosts)
            self.zk.start()
    
        def topics(self):
            return self.zk.get_children('/brokers/topics')
    
        def partitions(self, topic):
            strs = self.zk.get_children('/brokers/topics/%s/partitions' % topic)
            return map(int, strs)
    
        def consumers(self):
            return self.zk.get_children('/consumers')
    
        def topics_for_consumer(self, consumer):
            return self.zk.get_children('/consumers/%s/offsets' % consumer)
    
        def offset(self, topic, consumer, partition):
            (n, _) = self.zk.get('/consumers/%s/offsets/%s/%d' % (consumer, topic, partition))
            return int(n)
    

    【讨论】:

      【解决方案3】:

      对于那些使用 Confluent-Python 或企业 API 的人。这可以通过这种方式完成:

      def count_partitions(my_partitions) -> int:
          count = 0
          for part in my_partitions:
              count = count + 1
          return count
      
      cluster_data: ClusterMetadata = producer.list_topics(topic=TOPIC)
          topic_data: TopicMetadata = cluster_data.topics[TOPIC]
          available_partitions: PartitionMetadata = topic_data.partitions
          print(count_partitions(available_partitions))
      
      

      【讨论】:

        【解决方案4】:

        Python 3.8.10/kafka-python 2.0.2 解决方案:

        from kafka import KafkaConsumer
        
        def get_partitions_number(server, topic):
            consumer = KafkaConsumer(
                topic,
                bootstrap_servers=server
            )
            partitions = consumer.partitions_for_topic(topic)
            return len(partitions)
        

        partitions_for_topic

        【讨论】:

          猜你喜欢
          • 2016-05-28
          • 2016-10-01
          • 2017-04-03
          • 2015-03-05
          • 2019-12-05
          • 1970-01-01
          • 2021-07-15
          • 1970-01-01
          相关资源
          最近更新 更多