【问题标题】:No data from the Kafka Consumer Python - Consumer keeps listening but nothing comes outKafka 消费者 Python 没有数据 - 消费者一直在听,但没有任何结果
【发布时间】:2021-06-29 15:28:15
【问题描述】:

我正在寻找一种方法来使用 kafka 向我的 docker 显示我的 API (localhost)。

我的制片人(下)工作得像个魅力。我知道,因为当我打印 res.text 时,我有一个输出。

import json
import requests
from kafka import KafkaProducer
import time

# get data
res = requests.get('http://127.0.0.1:5000/twitter')
#print(res.text)

# use kafka

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])#, api_version='2.0.0')
producer.send('test', json.dumps(res.text).encode('utf-8'))
time.sleep(1)
#producer.flush()

但是,我的消费者不起作用。这是我到目前为止所尝试的。

目前在 for 循环处停止。
import kafka
import json
import requests
from kafka import KafkaConsumer

# utiliser kafka
consumer = KafkaConsumer('test', bootstrap_servers=['localhost:9092'], api_version='2.0.0', group_id="test_id", value_deserializer = json.loads)
print('before for ')
consumer.subscribe('test')
for msg in consumer:
    print('IN for')
    #print(type(consumer))
    print(json.loads(msg.value.decode()))
#print(consumer)

我在某处遗漏了一些东西,但我不知道是什么。

当我手动停止时,我从 docker 收到以下错误:

<class 'kafka.consumer.group.KafkaConsumer'>
^CTraceback (most recent call last):
  File "consumer.py", line 11, in <module>
    for m in consumer:
  File "/usr/lib/python3.7/site-packages/kafka/consumer/group.py", line 1193, in __next__
    return self.next_v2()
  File "/usr/lib/python3.7/site-packages/kafka/consumer/group.py", line 1201, in next_v2
    return next(self._iterator)
  File "/usr/lib/python3.7/site-packages/kafka/consumer/group.py", line 1116, in _message_generator_v2
    record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False)
  File "/usr/lib/python3.7/site-packages/kafka/consumer/group.py", line 655, in poll
    records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
  File "/usr/lib/python3.7/site-packages/kafka/consumer/group.py", line 680, in _poll_once
    self._update_fetch_positions(self._subscription.missing_fetch_positions())
  File "/usr/lib/python3.7/site-packages/kafka/consumer/group.py", line 1112, in _update_fetch_positions
    self._fetcher.update_fetch_positions(partitions)
  File "/usr/lib/python3.7/site-packages/kafka/consumer/fetcher.py", line 186, in update_fetch_positions
    self._reset_offset(tp)
  File "/usr/lib/python3.7/site-packages/kafka/consumer/fetcher.py", line 237, in _reset_offset
    offsets = self._retrieve_offsets({partition: timestamp})
  File "/usr/lib/python3.7/site-packages/kafka/consumer/fetcher.py", line 302, in _retrieve_offsets
    time.sleep(self.config['retry_backoff_ms'] / 1000.0)
KeyboardInterrupt
version: "3.7"
services:

  spark-master:
    image: bde2020/spark-master:3.0.1-hadoop3.2
    ports:
      - "8080:8080"
      - "7077:7077"
    volumes:
       - ./work:/home/jovyan/work
    environment:
       - "SPARK_LOCAL_IP=spark-master"

  spark-worker:
    image: bde2020/spark-worker:3.0.1-hadoop3.2

    depends_on:
      - spark-master
    environment:
      - SPARK_MASTER=spark://spark-master:7077
      - SPARK_WORKER_CORES=2
      - SPARK_WORKER_MEMORY=3G
      - SPARK_DRIVER_MEMORY=2G
      - SPARK_EXECUTOR_MEMORY=2G
    volumes:
       - ./work:/home/jovyan/work

  pyspark-notebook:
    image: jupyter/pyspark-notebook
    container_name: pyspark_notebook
    ports:
      - "8888:8888"
    volumes:
      - ./work:/home/jovyan/work
      - ./work/model:/tmp/model_prediction
    environment:
      - PYSPARK_PYTHON=/usr/bin/python3
      - PYSPARK_DRIVER_PYTHON=ipython3

  zookeeper:
    image: wurstmeister/zookeeper:3.4.6
    expose:
    - "2181"

  kafka:
    image: wurstmeister/kafka:2.11-2.0.0
    depends_on:
    - zookeeper
    ports:
    - "9092:9092"
    expose:
    - "9093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE

  mongo:
    image: mongo
    restart: always
    environment:
      MONGO_INITDB_ROOT_USERNAME: root
      MONGO_INITDB_ROOT_PASSWORD: example

  mongo-express:
    image: mongo-express
    restart: always
    ports:
      - 8081:8081
    environment:
      ME_CONFIG_MONGODB_ADMINUSERNAME: root
      ME_CONFIG_MONGODB_ADMINPASSWORD: example


你能帮帮我吗?

【问题讨论】:

  • 你为什么评论生产者flush?。仅仅因为你可以打印 http 响应并不意味着生产者成功了
  • 您好,感谢您的回答和帮助。如何检查生产者是否成功。对我来说,印刷品是它起作用的原因。我评论了冲洗,因为我没有看到它的用途。
  • 我已取消注释 flush 方法(在生产者中)并取消注释 for 循环(在我的消费者中):for msg in consumer print(json.load(msg.value.decode())).... 没有任何改变!
  • 其他工具可以消费吗?例如,内置 Kafka 控制台消费者?卡夫卡猫?还有一个 offsetshell 工具,您可以使用它来验证主题中是否存在偏移差异......换句话说,我仍然不相信您的制作人工作
  • 我的程序应该像下面这样工作:生产者必须链接到 API (127.0.0.1:5000/twitter) 才能获取数据。它从容器的外部工作。然后,生产者将数据传递给消费者。消费者位于容器内部。

标签: python docker apache-kafka kafka-consumer-api producer-consumer


【解决方案1】:

相同的 docker compose

来自主机

创建主题

$ docker-compose up -d
$ docker-compose exec kafka /opt/kafka/bin/kafka-topics.sh --create --topic test --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1      
Created topic "test".
$ docker-compose exec kafka /opt/kafka/bin/kafka-topics.sh --list --zookeeper zookeeper:2181                            
test

验证 API 是否正在运行

$ curl -H 'Content-Type: application/json' localhost:5000/twitter
{"tweet":"foobar"}

安装kafka-python 并运行生产者(使用未注释的刷新)

$ pip install requests kafka-python
$ python producer.py

验证进入主题的数据

$ docker-compose exec kafka /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
"{\"tweet\":\"foobar\"}\n"

从容器内部

使用pyspark notebook@http://localhost:8888

打开终端标签

$ conda install kafka-python
(base) jovyan@3eaf696e1135:~$ python work/consumer.py
before for
IN for
{"tweet":"foobar"}

新的消费者代码

import kafka
import json
import requests
from kafka import KafkaConsumer

# utiliser kafka
consumer = KafkaConsumer('test',
    bootstrap_servers=['kafka:9093'],  # needs to be the kafka INSIDE:// listener address
    api_version='2.0.0',
    group_id="test_id",
    auto_offset_reset='earliest',  # you're missing this
    value_deserializer=json.loads)
print('before for ')
for msg in consumer:
    print('IN for')
    #print(type(consumer))
    print(msg.value)
#print(consumer)

【讨论】:

  • tl;dr - 默认情况下,消费者从主题末尾读取,并且您显然没有在 消费者循环等待数据之后运行生产者
  • 您好,感谢您的回答。没有任何数据是主题。创建了一个主题,并且 API 正在运行。 127.0.0.1 - - [11/Apr/2021 17:56:25] "←[37mGET /twitter HTTP/1.1←[0m" 200 -
  • 我对制作人的唯一更改是取消注释producer.flush(),所以我不确定问题可能出在哪里
  • 好的,谢谢,我会打电话给我的老师,如果我找到了告诉你。非常感谢您给我的时间!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2018-09-12
  • 1970-01-01
  • 1970-01-01
  • 2020-09-20
  • 1970-01-01
  • 2020-11-15
  • 1970-01-01
相关资源
最近更新 更多