【问题标题】:How to programmatically create a topic in Apache Kafka using Python如何使用 Python 以编程方式在 Apache Kafka 中创建主题
【发布时间】:2014-11-19 05:17:47
【问题描述】:

到目前为止,我还没有看到一个 python 客户端在不使用配置选项自动创建主题的情况下显式实现主题的创建。

【问题讨论】:

    标签: python apache-kafka kafka-python kafka-topic


    【解决方案1】:

    您可以使用kafka-pythonconfluent_kafka 客户端以编程方式创建主题,这是librdkafka 的轻量级包装器。


    使用kafka-python

    from kafka.admin import KafkaAdminClient, NewTopic
    
    
    admin_client = KafkaAdminClient(
        bootstrap_servers="localhost:9092", 
        client_id='test'
    )
    
    topic_list = []
    topic_list.append(NewTopic(name="example_topic", num_partitions=1, replication_factor=1))
    admin_client.create_topics(new_topics=topic_list, validate_only=False)
    

    使用confluent_kafka

    from confluent_kafka.admin import AdminClient, NewTopic
    
    
    admin_client = AdminClient({
        "bootstrap.servers": "localhost:9092"
    })
    
    topic_list = []
    topic_list.append(NewTopic("example_topic", 1, 1))
    admin_client.create_topics(topic_list)
    

    【讨论】:

    • 能否添加主题配置,例如 confluent_kafka 的 max.message.bytes=1000000 示例
    • 使用上述技术我能够创建主题,我得到响应 {'jjd_topic1': } 但是当我列出新的主题时主题不在列表中......就像它没有持久化一样。这里会发生什么?
    • 通过使用 confluent_kafka 答案,我也没有看到 kafka 中的新主题。
    【解决方案2】:

    如果你可以运行confluent_kafka(Python)v0.11.6或以上,那么下面是如何运行create kafka topicslist kafka topicsdelete kafka topics

    >>> import confluent_kafka.admin, pprint
    
    >>> conf        = {'bootstrap.servers': 'broker01:9092'}
    >>> kafka_admin = confluent_kafka.admin.AdminClient(conf)
    
    >>> new_topic   = confluent_kafka.admin.NewTopic('topic100', 1, 1)
                      # Number-of-partitions  = 1
                      # Number-of-replicas    = 1
    
    >>> kafka_admin.create_topics([new_topic,]) # CREATE (a list(), so you can create multiple).
        {'topic100': <Future at 0x7f524b0f1240 state=running>} # Stdout from above command.
    
    >>> pprint.pprint(kafka_admin.list_topics().topics) # LIST
        {'topic100' : TopicMetadata(topic100, 1 partitions),
         'topic99'  : TopicMetadata(topic99,  1 partitions),
         'topic98'  : TopicMetadata(topic98,  1 partitions)}
    

    对于 delete kafka topics 使用相同的 kafka_admin 对象,这个:

    kafka_admin.delete_topics(['topic99', 'topic100',]) # DELETE
    

    我希望这会有所帮助。 \(◠﹏◠)/

    【讨论】:

    • P.S.在可能的情况下,我更喜欢confluent_kafka 库而不是kafka-python 库,因为前者是librdkafka C/C++ library 的“瘦包装器”(引用Confluent 文献);因此表现出色。不过,公平地说,kafka-python 更符合 Python 风格,并且两个库都运行良好。
    • 当我打开一个 python shell 并逐行输入你的代码时,它可以 100% 工作 - 我创建了主题,当我列出所有主题时,它就在那里。但是当我把它放在一个 .py 文件中并运行该文件时,由于某种原因它不会创建,即使它没有给出错误。如果我复制文件中的每一行,并将其粘贴到 python shell 中,那么它会再次创建。完全相同的代码在 shell 中有效,但在文件中无效...
    • @AlfaBravo 嗯...您确定程序正在运行吗?尝试在代码周围插入print('hello') 语句进行检查。
    • 是的,我亲身体验过——程序运行,客户端返回有关创建的主题的信息 {'jjd_topic1': },但是当我运行列表,他们不在那里。就像他们从未承诺过一样。为什么会这样?
    • 超级奇怪——@AlfaBravo 我遇到了同样的问题。如果我作为 .py 文件运行,它不会创建主题,但如果我进入 python shell 并执行它,它会被创建......
    【解决方案3】:

    看起来您可以使用以下内容来确保您的主题已经存在(我假设您正在使用以下kafka python 实现):

    client = KafkaClient(...)
    producer = KafkaProducer(...)
    client.ensure_topic_exists('my_new_topic')
    producer.send_messages('my_new_topic', ...)
    

    【讨论】:

    【解决方案4】:

    好像没有kafka server api来创建topic所以只能使用topic自动创建或者命令行工具:

    bin/kafka-create-topic.sh --zookeeper localhost:2181 --replica 1 --partition 1 --topic test
    

    【讨论】:

    • 没有。您根据 NYCeyes 的评论使用 AdminClient。
    【解决方案5】:

    已经太晚了。我不知道用于显式创建主题的命令,但以下创建并添加消息。

    我创建了一个 python kafka 生产者:

    prod = KafkaProducer(bootstrap_servers='localhost:9092')
    for i in xrange(1000):
        prod.send('xyz', str(i))
    

    在 Kafka 主题列表中 xyz 以前不存在。当我执行上述方法时,Python-kafka 客户端创建了它并将消息添加到它。

    【讨论】:

    • 实际上是代理创建了主题,只是因为 auto.topic.create.enable 设置为“true”。以这种方式创建的所有主题都将具有默认配置,可能适合您的用例,也可能不适合您的用例。
    【解决方案6】:

    在 Kafka 0.11 中刚刚添加了进行编程主题创建和配置所需的 AdminClient API(最初用于 Java)

    https://cwiki.apache.org/confluence/display/KAFKA/KIP-117%3A+Add+a+public+AdminClient+API+for+Kafka+admin+operations

    预计非 Java 客户端库也会随着时间的推移添加此功能。请咨询您正在使用的 Kafka Python 客户端的作者(有几个),了解 API 中是否以及何时支持 KIP-4 管理协议

    https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations

    【讨论】:

      【解决方案7】:
      from kafka import KafkaProducer
      
      producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
      topic = 'topic-name'
      
      producer.send(topic, final_list[0]).get(timeout=10)
      

      【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-10-30
      • 2019-11-24
      • 2016-07-26
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-09-23
      • 1970-01-01
      相关资源
      最近更新 更多