【问题标题】:How to describe a topic using kafka client in Python如何在 Python 中使用 kafka 客户端描述一个主题
【发布时间】:2019-05-22 12:52:22
【问题描述】:

我是 python 中 kafka 客户端的初学者,我需要一些帮助来描述使用客户端的主题。

我能够使用以下代码列出我所有的 kafka 主题:-

consumer = kafka.KafkaConsumer(group_id='test', bootstrap_servers=['kafka1'])
topicList = consumer.topics()

【问题讨论】:

    标签: python kafka-python


    【解决方案1】:

    在参考了多篇文章和代码示例后,我能够通过使用 confluent_kafka 的 describe_configs 做到这一点。

    链接 1 [Confluent-kafka-python] 链接2Git Sample

    下面是我的示例代码!!

    from confluent_kafka.admin import AdminClient, NewTopic, NewPartitions, ConfigResource
    import confluent_kafka
    import concurrent.futures
    
    #Creation of config
    conf = {'bootstrap.servers': 'kafka1','session.timeout.ms': 6000}
    adminClient = AdminClient(conf)
    topic_configResource = adminClient.describe_configs([ConfigResource(confluent_kafka.admin.RESOURCE_TOPIC, "myTopic")])
        for j in concurrent.futures.as_completed(iter(topic_configResource.values())):
            config_response = j.result(timeout=1)
    

    【讨论】:

      【解决方案2】:

      我已经找到了如何用 kafka-python 做到这一点:

      from kafka.admin import KafkaAdminClient, ConfigResource, ConfigResourceType
      KAFKA_URL = "localhost:9092" # kafka broker
      KAFKA_TOPIC = "test" # topic name
      
      admin_client = KafkaAdminClient(bootstrap_servers=[KAFKA_URL])
      configs = admin_client.describe_configs(config_resources=[ConfigResource(ConfigResourceType.TOPIC, KAFKA_TOPIC)])
      config_list = configs.resources[0][4]
      

      config_list(元组列表)中,您拥有该主题的所有配置。

      【讨论】:

      • 我无法使用 ConfigResourceType,知道吗?
      • 在 Kafka-Python 2.0 中,configs 是一个 DescribeConfigsResponse_v2 的列表,所以你需要:configs[0].resources[0][4]
      【解决方案3】:

      参考:https://docs.confluent.io/current/clients/confluent-kafka-python/

      1. list_topics 提供 confluent_kafka.admin.TopicMetadata (topic, 分区)
      2. kafka.admin.TopicMetadata.partitions 提供:confluent_kafka.admin.PartitionMetadata (Partition id, leader, replicas, isrs)

        from confluent_kafka.admin import AdminClient
        kafka_admin = AdminClient({"bootstrap.servers": bootstrap_servers})    
        for topic in topics:    
            x = kafka_admin.list_topics(topic=topic)    
            print x.topics, '\n'    
            for key, value in x.topics.items():    
                for keyy, valuey in value.partitions.items():    
                    print keyy, ' Partition id : ', valuey, 'leader : ', valuey.leader,' replica: ', valuey.replicas
        

      【讨论】:

        【解决方案4】:

        有趣的是,对于 Java,此功能 (describeTopics()) 位于 KafkaAdminCLient.java 中。

        所以,我试图寻找相同的 python 等效项,我发现了code repository of kafka-python

        在 kafka-python 包中相当于 admin-client 的文档(内嵌 cmets)说明如下:

        describe topics functionality is in ClusterMetadata
        Note: if implemented here, send the request to the controller
        

        然后我切换到同一存储库中的cluster.py file。这包含您用来检索主题列表的topics() 函数和以下两个可以帮助您实现describe 功能的函数:

        1. partitions_for_topic() - 返回主题的所有分区集(无论是否可用)
        2. available_partitions_for_topic() - 返回具有已知领导者的分区集

        注意:我自己没有尝试过,所以我不确定行为是否与您在kafka-topics --describe ... 命令的结果中看到的相同,但值得一试。

        我希望这会有所帮助!

        【讨论】:

        • 感谢拉利特的解释。我查看了可用的方法没有运气。实际上,我正在考虑通过 kafka-topics.sh --describe 获得每个主题的保留期。
        • 好的。很公平。我会尝试找到一些东西,但似乎 python 库略有不同,并不完全相似。
        猜你喜欢
        • 2018-10-11
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2019-08-28
        • 1970-01-01
        • 1970-01-01
        • 2019-10-01
        • 2018-02-18
        相关资源
        最近更新 更多