【问题标题】:Data doesn't get pushed to Kafka Queue using python multiprocessing数据不会使用 python 多处理推送到 Kafka 队列
【发布时间】:2017-11-08 09:34:00
【问题描述】:

我正在使用 Python (2.7) 多处理通过 kafka-python (1.3.5) KafkaProducer 将数据推送到 Kafka 队列。

from kafka import KafkaProducer
import multiprocessing
# other imports


class TestClass(object):
    def __init__(self, producer):
        self.kafka_producer = producer

    def main(self, conf, nthreads):
        try:
            for i in range(nthreads):
                logger.info("Starting process number = %d " % (i + 1))
                p = Process(target=self.do_some_task, args=(conf, 2))
                p.start()
                processes.append(p)
            for p in processes:
                logger.info("Joining process")
            p.join()
        except Exception, ex:
            logger.error("Exception occurred : %s" % str(ex))

    def do_some_task(conf, retry):
        # some task happening
        self.record(arg1, arg2)

    # pushing to kafka
    def record(self, arg1, arg2)
        message = json.dumps({"a": "arg1", "b": "arg2"})
        self.kafka_producer.send(KAFKA_TOPIC, message)


if __name__ == '__main__':
    kafka_producer = KafkaProducer(
        bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS, 
        request_timeout_ms=60000, 
        retries=2)
    obj = TestClass(kafka_producer)

    try:
        parser = argparse.ArgumentParser(description='Description')
        parser.add_argument('-threads', type=int, default=1) # 20 threads
        parser.add_argument('-debug', type=int, default=0)
        args = parser.parse_args()
        me = SingleInstance(args.src)
        TestClass.main(CONF[args.src], args.threads)

20 个线程在其中生成写入 kafka。我查看了日志,发现该进程等待消息被写入 kafka,但最终它继续前进而没有写入 Kafka。没有引发异常。

我尝试在没有线程的情况下从 python 命令行运行相同的代码,一切都按预期工作。可能是什么问题。

【问题讨论】:

    标签: python multithreading apache-kafka multiprocessing kafka-python


    【解决方案1】:

    请在分叉进程后生成与 kafka 的连接。并请关闭连接,并在遇到连接相关错误时重新连接。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2015-09-19
      • 1970-01-01
      • 2013-10-02
      • 1970-01-01
      • 2019-08-18
      • 2020-04-25
      • 2018-06-30
      • 2012-07-11
      相关资源
      最近更新 更多