【问题标题】:What is a minimal example of using Kafka with Python?将 Kafka 与 Python 结合使用的最小示例是什么?
【发布时间】:2019-02-10 17:46:10
【问题描述】:

我尝试了什么

  1. 我克隆了https://github.com/wurstmeister/kafka-docker并执行了sudo docker-compuse up
  2. 我启动了下面列出的producer.py
  3. 我启动了下面列出的consumer.py

这不起作用。我将docker-compose.ymlports 更改为

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

启动后,producer.py 执行完毕,显示 docker-compose 终端

zookeeper_1  | 2018-09-05 14:21:44,001 [myid:] - INFO  [SessionTracker:ZooKeeperServer@358] - Expiring session 0x165aa1acb900000, timeout of 6000ms exceeded
zookeeper_1  | 2018-09-05 14:21:44,002 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@487] - Processed session termination for sessionid: 0x165aa1acb900000
kafka_1      | [2018-09-05 14:21:44,028] INFO Creating /controller (is it secure? false) (kafka.zk.KafkaZkClient)
kafka_1      | [2018-09-05 14:21:44,033] INFO Result of znode creation at /controller is: OK (kafka.zk.KafkaZkClient)
zookeeper_1  | 2018-09-05 14:21:44,141 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@649] - Got user-level KeeperException when processing sessionid:0x165aa1c265e0000 type:delete cxid:0x32 zxid:0x6f txntype:-1 reqpath:n/a Error Path:/admin/reassign_partitions Error:KeeperErrorCode = NoNode for /admin/reassign_partitions
zookeeper_1  | 2018-09-05 14:21:44,152 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@649] - Got user-level KeeperException when processing sessionid:0x165aa1c265e0000 type:delete cxid:0x34 zxid:0x70 txntype:-1 reqpath:n/a Error Path:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election
zookeeper_1  | 2018-09-05 14:21:47,621 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@649] - Got user-level KeeperException when processing sessionid:0x165aa1c265e0000 type:setData cxid:0x3c zxid:0x71 txntype:-1 reqpath:n/a Error Path:/config/topics/mytopic Error:KeeperErrorCode = NoNode for /config/topics/mytopic
kafka_1      | [2018-09-05 14:21:47,628] INFO Topic creation Map(mytopic-0 -> ArrayBuffer(1003)) (kafka.zk.AdminZkClient)
kafka_1      | [2018-09-05 14:21:47,639] INFO [KafkaApi-1003] Auto creation of topic mytopic with 1 partitions and replication factor 1 is successful (kafka.server.KafkaApis)

主题已创建,很好。但是,当我执行消费者时,他们什么都不做。但是 docker-compose 显示

kafka_1      | [2018-09-05 14:24:52,566] ERROR [KafkaApi-1003] Number of alive brokers '0' does not meet the required replication factor '1' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)

我怎样才能有一个最小的 Kafka 安装/设置来查看 Kafka 与 Python 一起工作?

生产者.py

​​>
from confluent_kafka import Producer

p = Producer({'bootstrap.servers': 'localhost:9092'})
p.produce('mytopic', key='hello', value='world')
print("produce done")
p.flush(10)

consumer.py

​​>
from confluent_kafka import Consumer, KafkaError


c = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'mygroup',
    'default.topic.config': {
        'auto.offset.reset': 'smallest'
    }
})

c.subscribe(['mytopic'])

while True:
    msg = c.poll(1.0)

    if msg is None:
        continue
    if msg.error():
        if msg.error().code() == KafkaError._PARTITION_EOF:
            continue
        else:
            print(msg.error())
            break

    print('Received message: {}'.format(msg.value().decode('utf-8')))

c.close()

【问题讨论】:

    标签: python docker apache-kafka


    【解决方案1】:

    试试这个环境部分

      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092,OUTSIDE://localhost:9094
      KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
    

    如果您没有在 Python docker 容器中运行代码,请将 9094:9094 添加到端口并将 Python 指向 localhost:9094

    The Confluent images have a similar setup,但改为端口 29092

    【讨论】:

    猜你喜欢
    • 2010-10-29
    • 2010-09-07
    • 2010-09-06
    • 2013-12-20
    • 2010-09-28
    • 2014-02-21
    • 1970-01-01
    • 1970-01-01
    • 2019-05-26
    相关资源
    最近更新 更多