【问题标题】:KafkaProducer does not send messages in multiprocessing.ProcessKafkaProducer 在 multiprocessing.Process 中不发送消息
【发布时间】:2022-02-11 19:46:55
【问题描述】:

我得到了以下概念,这是一个真实应用程序的简化示例。问题是producermultiprocessing.Process 进程中工作时不会产生消息。

import logging
import multiprocessing
import os

from kafka import KafkaProducer

KAFKA_HOST = os.getenv('KAFKA_HOST')
KAFKA_PORT = os.getenv('KAFKA_PORT')
KAFKA_NAME = os.getenv('KAFKA_NAME')

producer = KafkaProducer(bootstrap_servers=f'{KAFKA_HOST}:{KAFKA_PORT}')


def test_produce():
    # producer1 = KafkaProducer(bootstrap_servers=f'{KAFKA_HOST}:{KAFKA_PORT}')
    for i in range(10):
        print('inserting') # prints once
        producer.send(KAFKA_NAME, b'test').get() # will not be commited in the topic


producer.send(KAFKA_NAME, b'test_in_non_thread1').get() # will be commited in the topic
producer.send(KAFKA_NAME, b'test_in_non_thread2').get() # will be commited in the topic
multiprocessing.Process(target=test_produce).start()

【问题讨论】:

    标签: python apache-kafka kafka-python


    【解决方案1】:

    问题在于跨进程的对象序列化,这会导致生产者被序列化,无论发生什么错误。解决方案是在内部您的多处理函数

    中创建本地生产者实例
    import logging
    import multiprocessing
    import os
    
    from kafka import KafkaProducer
    
    KAFKA_HOST = os.getenv('KAFKA_HOST')
    KAFKA_PORT = os.getenv('KAFKA_PORT')
    KAFKA_NAME = os.getenv('KAFKA_NAME')
    
    producer = KafkaProducer(bootstrap_servers=f'{KAFKA_HOST}:{KAFKA_PORT}')
    
    
    def test_produce():
        producer1 = KafkaProducer(bootstrap_servers=f'{KAFKA_HOST}:{KAFKA_PORT}')
        for i in range(10):
            print('inserting') # prints 10 times
            producer1.send(KAFKA_NAME, b'test').get() #all messages will be commited
    
    
    producer.send(KAFKA_NAME, b'test_in_non_thread1').get() # will be commited in the topic
    producer.send(KAFKA_NAME, b'test_in_non_thread2').get() # will be commited in the topic
    multiprocessing.Process(target=test_produce).start()
    

    这样,一切都按预期进行。对了,你可以使用threading.Thread,不会出现这样的问题。

    https://github.com/dpkp/kafka-python/issues/1416

    【讨论】:

      猜你喜欢
      • 2017-03-30
      • 1970-01-01
      • 2016-09-21
      • 2017-12-06
      • 1970-01-01
      • 1970-01-01
      • 2018-11-23
      • 2016-03-31
      • 1970-01-01
      相关资源
      最近更新 更多