【问题标题】:Not receiving messages from Kafka Topic没有收到来自 Kafka 主题的消息
【发布时间】:2021-12-10 07:14:04
【问题描述】:

在此程序中调用 poll() 时我收到 None,但是从 cmd 运行 kafka-console-consumer.bat 时收到消息,我无法弄清楚到底是什么问题。

从main.py开始执行

from queue import Queue
from concurrent.futures import ThreadPoolExecutor
import time
import json
from kafka_message_consumer import KafkaMessageConsumer
from kafka_discovery_executor import KafkaDiscoveryExecutor


with open('kafka_properties.json') as f:
    kafka_properties = json.loads(f.read())

message_queue = Queue()
kafka_message_consumer = KafkaMessageConsumer(kafka_properties, message_queue)
kafka_discovery_executor = KafkaDiscoveryExecutor(message_queue, kafka_properties)

with ThreadPoolExecutor(max_workers=5) as executor:
    executor.submit(kafka_message_consumer.run())
    time.sleep(1)
    executor.submit(kafka_discovery_executor.run())
    time.sleep(1)

KafkaDiscoveryExecutor 类用于使用共享队列中的消息并处理该消息。

这是 kafka_message_consumer.py

import logging
from confluent_kafka import Consumer


class KafkaMessageConsumer:

    def __init__(self, kafka_properties, message_queue):
        self.message_queue = message_queue
        self.logger = logging.getLogger('KafkaMessageConsumer')
        self.kafka_stream_consumer = None
        self.create_consumer(kafka_properties)

    def create_consumer(self, kafka_properties):
        """
        Create an instance of Kafka Consumer with the consumer configuration properties
        and subscribes to the defined topic(s).
        """

        consumer_config = dict()

        # Consumer configuration properties.
        consumer_config['bootstrap.servers'] = kafka_properties.get('bootstrap.servers')
        consumer_config['group.id'] = kafka_properties.get('group.id')
        consumer_config['enable.auto.commit'] = True
        consumer_config['auto.offset.reset'] = 'earliest'
        
        # For SSL Security
        # consumer_config['security.protocol'] = 'SASL_SSL'
        # consumer_config['sasl.mechanisms'] = 'PLAIN'
        # consumer_config['sasl.username'] = ''
        # consumer_config['sasl.password'] = ''

        # Create the consumer using consumer_config.
        self.kafka_stream_consumer = Consumer(consumer_config)

        # Subscribe to the specified topic(s).
        self.kafka_stream_consumer.subscribe(['mytopic'])

    def run(self):
        while True:
            msg = self.kafka_stream_consumer.poll(1.0)
            if msg is None:
                # No message available within timeout.
                print("Waiting for message or event/error in poll()")
                continue
            elif msg.error():
                print("Error: {}".format(msg.error()))
            else:
            # Consume the record.
            # Push the message into message_queue
                try:
                    self.message_queue.put(msg)
                except Exception as e:
                    self.logger.critical("Error occured in kafka Consumer: {}".format(e))

指定的主题有事件,但我在这里得到 None 并且 'if msg is None:' 中的打印语句正在执行。

【问题讨论】:

  • 不看细节(我不是 python 开发人员)两个最可能的原因(或真正 AFAIK 的唯一可能原因)是(1)有另一个具有相同组名的消费者正在消费消息 - 可能是任何消费者,包括您的控制台消费者或(2)您的应用程序正在使用新的组名(或从未提交的组)并且您的配置是在不存在提交的偏移量时从最新开始,并且消息在您的第一次投票之前发布
  • 我也尝试过使用不同的组 ID,并在运行消费者之前运行生产者,但仍然没有。
  • 你怎么知道你没有消费这些消息——你将它们转发到的消息队列在哪里?在您的代码中,您(正确地)连续轮询,但这意味着只要没有新内容,您就会继续记录,并且如果您确实收到一条消息,则不会记录任何内容,而只是将其放在其他队列中 - 我是不熟悉队列库,也不确定 Kafka 发现执行器是什么,但是您确定没有任何内容进入该消息队列吗?
  • (ps 我看到你有 auto.offset.reset 最早所以在新的消费者组开始消费之前或之后发布消息应该没有问题 - 我们可以将其排除为原因。
  • 只是为了检查我创建了一个单独的消费者,看看我是否收到了消息,是的,它正在工作,但仍然不确定为什么这个程序不工作

标签: python apache-kafka python-multithreading threadpoolexecutor confluent-kafka-python


【解决方案1】:

我仍然不确定为什么上面的代码不能正常工作。

这是我为使此代码正常工作所做的更改

  1. 我使用线程模块而不是 concurrent.futures
  2. 使用的守护线程
  3. 在类 [KafkaMessageConsumer、KafkaDiscoveryExecutor] 的构造函数中调用线程。init()

这里是 main.py

from queue import Queue
import threading
import time
import json
from kafka_message_consumer import KafkaMessageConsumer
from kafka_discovery_executor import KafkaDiscoveryExecutor

def main():
    with open('kafka_properties.json') as f:
        kafka_properties = json.loads(f.read())

    message_queue = Queue()
   
    threads = [
        KafkaMessageConsumer(kafka_properties, message_queue),
        KafkaDiscoveryExecutor(message_queue, kafka_properties)
    ]

    for thread in threads:
        thread.start()
        time.sleep(1)

    for thread in threads:
        thread.join()

    time.sleep(1)

if __name__ == "__main__":
    main()

和 kafka_message_consumer.py

import logging
from confluent_kafka import Consumer
import threading


class KafkaMessageConsumer(threading.Thread):
    daemon = True

    def __init__(self, kafka_properties, message_queue):
        threading.Thread.__init__(self)
        self.message_queue = message_queue
        self.logger = logging.getLogger('KafkaMessageConsumer')
        self.kafka_stream_consumer = None
        self.create_consumer(kafka_properties)

    def create_consumer(self, kafka_properties):
        """
        Create an instance of Kafka Consumer with the consumer configuration properties
        and subscribes to the defined topic(s).
        """

        consumer_config = dict()

        # Consumer configuration properties.
        consumer_config['bootstrap.servers'] = kafka_properties.get('bootstrap.servers')
        consumer_config['group.id'] = kafka_properties.get('group.id')
        consumer_config['enable.auto.commit'] = True
        consumer_config['auto.offset.reset'] = 'earliest'
        

        # Create the consumer using consumer_config.
        self.kafka_stream_consumer = Consumer(consumer_config)

        # Subscribe to the specified topic(s).
        self.kafka_stream_consumer.subscribe(['mytopic'])

    def run(self):
        while True:
            msg = self.kafka_stream_consumer.poll(1.0)
            if msg is None:
                # No message available within timeout.
                print("Waiting for message or event/error in poll()")
                continue
            elif msg.error():
                print("Error: {}".format(msg.error()))
            else:
            # Consume the record.
            # Push the message into message_queue
                try:
                    self.message_queue.put(msg)
                except Exception as e:
                    self.logger.critical("Error occured in kafka Consumer: {}".format(e))
        self.kafka_stream_consumer.close()
 

【讨论】:

    猜你喜欢
    • 2018-09-15
    • 1970-01-01
    • 2015-05-20
    • 1970-01-01
    • 2018-10-27
    • 2020-05-30
    • 2019-11-15
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多