【问题标题】:How to initialize RabbitMQ consumer using pika and connexion如何使用 pika 和 connexion 初始化 RabbitMQ 消费者
【发布时间】:2021-06-16 11:23:17
【问题描述】:

我正在尝试设置一个 Python 微服务,它通过 RabbitMQ 接收消息,同时为 Kubernetes 健康检查提供一个 /health REST 端点。我将 pika 用于 RabbitMQ 消费者,connexion 用于 REST 端点。

但是,当我在 main() 中启动 RabbitMQ 消费者时,连接应用程序不会启动。

python-app.py

#!/usr/bin/env python
import pika, sys, os, connexion
from flask import Flask, request, jsonify

app = connexion.FlaskApp(__name__, specification_dir='./')

def main():
    # Connection
    ...
    # Exchange and queues
    ...

    def callback(ch, method, properties, body):
        ...

    channel.basic_consume(queue='pg-python', on_message_callback=callback, auto_ack=True)

    print(' [*] Waiting for messages.')
    channel.start_consuming()
    app.run(port=8080, use_reloader=False)

@app.route('/api/v1/health', methods=['GET'])
def return_health():
    message = {'status':'Healthy! <3'}
    return jsonify(message)

if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

输出:

[*] Waiting for messages.

如何正确初始化这两个组件?我需要使用线程吗?

【问题讨论】:

  • 问题是 pika 被阻塞了,所以你永远无法访问 app.run。我创建了一个示例,说明这如何与烧瓶一起使用,它也应该适用于连接。 github.com/eandersson/python-rabbitmq-examples/blob/main/…
  • 出于好奇,为什么需要进行 http 健康检查?带有 pika 的 python 命令行进程不会完成这项工作吗?无论如何,健康检查并没有做太多事情(例如,如果您的消费者线程被阻塞且无响应,它仍然会返回健康状态吗?)

标签: python pika connexion


【解决方案1】:

我已经通过在一个单独的线程中初始化 RabbitMQ 消费者来解决这个问题:

#!/usr/bin/env python
import pika, sys, os, threading
from flask import Flask, request, jsonify

app = Flask(__name__)

def start_rmq_connection():
    # Connection
    ...
    # Exchange and queues
    ...

    def callback(ch, method, properties, body):
        ...

    channel.basic_consume(queue='pg-python', on_message_callback=callback, auto_ack=True)
    print(' [*] Waiting for messages.')
    channel.start_consuming()

@app.route('/api/v1/health', methods=['GET'])
def return_health():
    message = {'status':'Healthy! <3'}
    return jsonify(message)

if __name__ == '__main__':
    try:
        thread_1 = threading.Thread(target=start_rmq_connection)
        thread_1.start()
        thread_1.join(0)
        app.run()
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-12-16
    • 2012-10-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多