【发布时间】:2014-11-19 05:17:47
【问题描述】:
到目前为止,我还没有看到一个 python 客户端在不使用配置选项自动创建主题的情况下显式实现主题的创建。
【问题讨论】:
标签: python apache-kafka kafka-python kafka-topic
到目前为止,我还没有看到一个 python 客户端在不使用配置选项自动创建主题的情况下显式实现主题的创建。
【问题讨论】:
标签: python apache-kafka kafka-python kafka-topic
您可以使用kafka-python 或confluent_kafka 客户端以编程方式创建主题,这是librdkafka 的轻量级包装器。
from kafka.admin import KafkaAdminClient, NewTopic
admin_client = KafkaAdminClient(
bootstrap_servers="localhost:9092",
client_id='test'
)
topic_list = []
topic_list.append(NewTopic(name="example_topic", num_partitions=1, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)
from confluent_kafka.admin import AdminClient, NewTopic
admin_client = AdminClient({
"bootstrap.servers": "localhost:9092"
})
topic_list = []
topic_list.append(NewTopic("example_topic", 1, 1))
admin_client.create_topics(topic_list)
【讨论】:
如果你可以运行confluent_kafka(Python)v0.11.6或以上,那么下面是如何运行create kafka topics、list kafka topics和delete kafka topics:
>>> import confluent_kafka.admin, pprint
>>> conf = {'bootstrap.servers': 'broker01:9092'}
>>> kafka_admin = confluent_kafka.admin.AdminClient(conf)
>>> new_topic = confluent_kafka.admin.NewTopic('topic100', 1, 1)
# Number-of-partitions = 1
# Number-of-replicas = 1
>>> kafka_admin.create_topics([new_topic,]) # CREATE (a list(), so you can create multiple).
{'topic100': <Future at 0x7f524b0f1240 state=running>} # Stdout from above command.
>>> pprint.pprint(kafka_admin.list_topics().topics) # LIST
{'topic100' : TopicMetadata(topic100, 1 partitions),
'topic99' : TopicMetadata(topic99, 1 partitions),
'topic98' : TopicMetadata(topic98, 1 partitions)}
对于 delete kafka topics 使用相同的 kafka_admin 对象,这个:
kafka_admin.delete_topics(['topic99', 'topic100',]) # DELETE
我希望这会有所帮助。 \(◠﹏◠)/
【讨论】:
confluent_kafka 库而不是kafka-python 库,因为前者是librdkafka C/C++ library 的“瘦包装器”(引用Confluent 文献);因此表现出色。不过,公平地说,kafka-python 更符合 Python 风格,并且两个库都运行良好。
print('hello') 语句进行检查。
看起来您可以使用以下内容来确保您的主题已经存在(我假设您正在使用以下kafka python 实现):
client = KafkaClient(...)
producer = KafkaProducer(...)
client.ensure_topic_exists('my_new_topic')
producer.send_messages('my_new_topic', ...)
【讨论】:
ensure_topic_exists 仅适用于启用自动主题创建。 github.com/mumrah/kafka-python/blob/…
好像没有kafka server api来创建topic所以只能使用topic自动创建或者命令行工具:
bin/kafka-create-topic.sh --zookeeper localhost:2181 --replica 1 --partition 1 --topic test
【讨论】:
已经太晚了。我不知道用于显式创建主题的命令,但以下创建并添加消息。
我创建了一个 python kafka 生产者:
prod = KafkaProducer(bootstrap_servers='localhost:9092')
for i in xrange(1000):
prod.send('xyz', str(i))
在 Kafka 主题列表中 xyz 以前不存在。当我执行上述方法时,Python-kafka 客户端创建了它并将消息添加到它。
【讨论】:
在 Kafka 0.11 中刚刚添加了进行编程主题创建和配置所需的 AdminClient API(最初用于 Java)
预计非 Java 客户端库也会随着时间的推移添加此功能。请咨询您正在使用的 Kafka Python 客户端的作者(有几个),了解 API 中是否以及何时支持 KIP-4 管理协议
【讨论】:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
topic = 'topic-name'
producer.send(topic, final_list[0]).get(timeout=10)
【讨论】:
final_list?