【问题标题】:How to use pika with FastAPI's asyncio loop?如何将 pika 与 FastAPI 的 asyncio 循环一起使用?
【发布时间】:2022-01-28 06:09:15
【问题描述】:

我正在尝试使用 FastAPI 创建一个 webhook 端点,并将到达那里的任何 json 请求正文写入 RabbitMQ。

我不知道如何连接到 RabbitMQ、创建通道并通过连接到 FastAPI asyncio 循环来保持所有通道处于活动状态。此处关于 SO 的其他问题或答案均无济于事。

我当前的“解决方案”是在 FastAPI 应用程序启动方法中启动第二个线程,并使用 queue.SimpleQueue 在线程之间进行对话。因此 FastAPI 路径方法将对象写入此 SimpleQueue,第二个线程从中读取对象并将其发布到 RabbitMQ。

这种简单方法的问题在于,由于第二个线程在读取 SimpleQueue 时被阻塞,所以 RabbitMQ 连接没有通过它发送保活,服务器将其关闭。我的代码在写入 RabbitMQ 时捕获异常并重新连接并再次尝试,但这很丑。

我不明白如何将 pika 或 aio-pika 地图中的异步示例调整到 FastAPI 应用程序。

有谁能告诉我,鉴于下面这个简单的 FastAPI 应用程序,我可以打开一个 RabbitMQ 连接,该连接将通过它发送必要的 keepalive 以使其保持打开状态,这样我就可以通过路径方法中的连接发布?

from fastapi import FastAPI, Response
from typing import Any, Dict

app = FastAPI()

@app.on_event("startup")
async def startup() -> None:
    # Connect to RabbitMQ
    # Create channel
    # Declare queue

JSONObject = Dict[str, Any]

@app.post("/webhook")
async def webhook_endpoint(msg: JSONObject):

    # Write msg to RabbitMQ channel here.

    return Response(status_code=204)

我的另一个想法是仍然使用一个线程,但让该线程在 RabbitMQ 连接上执行阻塞读取,以希望继续通过它发送 keepalive,并且它不会干扰通过同一连接从路径法。但这显然是一种黑客行为。我更愿意以“正确”的方式进行操作并使用异步代码。

编辑:似乎没有办法进行阻塞读取,因此使用 channel.basic_get() 进行无限循环并改为休眠。更hacky。

【问题讨论】:

  • 缺少必要的信息,您使用的是asyncio adapter吗?这是为了保持连接活跃。
  • 感谢 Bluenix。我想我打算使用它,但我不知道该怎么做!这些示例仅显示了对某些选择适配器的调用,我认为这意味着选择真正的适配器。我假设 FastIO 使用的是普通的 asyncio(通过 AnyIO),因为我没有要求它这样做。对于一个拥有 25 年 Java 经验的人来说,这一切都非常令人困惑。在那里,我只是假设一切都使用普通线程来处理东西,而我做同样的事情很好。直到最近,即使在那里,异步编程似乎也受到了青睐。

标签: rabbitmq python-asyncio fastapi pika


【解决方案1】:

我通过调整 pika async publisher example 有一些工作。

我更改了示例,因此它创建了一个 AsyncioConnection 而不是使用 SelectConnection,因为 FastAPI 已经启动了标准 asyncio 事件循环,我希望 pika 使用它而不是 SelectConnection 决定使用的任何东西。

这意味着示例中的重新连接逻辑无法按编码工作,因此我需要修复它,它已从下面的代码中删除。

但是,此代码确实在后台保持连接处于活动状态,并且我可以发布消息以响应对 webhook URL 的点击。将日志级别更改为 DEBUG 会显示正在发送和接收的心跳。

这不是最终代码 - 我将更改队列等,但它确实表明 AsyncioConnection 可以连接到已经运行事件循环的 FastAPI。

import pika
from pika.adapters.asyncio_connection import AsyncioConnection
from pika.exchange_type import ExchangeType

from typing import Any, Dict

import asyncio, json, logging, os, queue, threading, time

logging.basicConfig(level=logging.INFO, format='%(asctime)s %(name)s: %(message)s', datefmt='%Y-%m-%dT%H:%M:%S%z')
logger = logging.getLogger(__name__)


class AsyncioRabbitMQ(object):
    EXCHANGE = 'message'
    EXCHANGE_TYPE = ExchangeType.topic
    PUBLISH_INTERVAL = 1
    QUEUE = 'text'
    ROUTING_KEY = 'example.text'

    def __init__(self, amqp_url):
        self._connection = None
        self._channel = None

        self._deliveries = []
        self._acked = 0
        self._nacked = 0
        self._message_number = 0

        self._stopping = False
        self._url = amqp_url

    def connect(self):
        logger.info('Connecting to %s', self._url)
        return AsyncioConnection(
            pika.URLParameters(self._url),
            on_open_callback=self.on_connection_open,
            on_open_error_callback=self.on_connection_open_error,
            on_close_callback=self.on_connection_closed)

    def on_connection_open(self, connection):
        logger.info('Connection opened')
        self._connection = connection
        logger.info('Creating a new channel')
        self._connection.channel(on_open_callback=self.on_channel_open)

    def on_connection_open_error(self, _unused_connection, err):
        logger.error('Connection open failed: %s', err)

    def on_connection_closed(self, _unused_connection, reason):
        logger.warning('Connection closed: %s', reason)
        self._channel = None

    def on_channel_open(self, channel):
        logger.info('Channel opened')
        self._channel = channel
        self.add_on_channel_close_callback()
        self.setup_exchange(self.EXCHANGE)

    def add_on_channel_close_callback(self):
        logger.info('Adding channel close callback')
        self._channel.add_on_close_callback(self.on_channel_closed)

    def on_channel_closed(self, channel, reason):
        logger.warning('Channel %i was closed: %s', channel, reason)
        self._channel = None
        if not self._stopping:
            self._connection.close()

    def setup_exchange(self, exchange_name):
        logger.info('Declaring exchange %s', exchange_name)
        # Note: using functools.partial is not required, it is demonstrating
        # how arbitrary data can be passed to the callback when it is called
        cb = functools.partial(self.on_exchange_declareok, userdata=exchange_name)
        self._channel.exchange_declare(exchange=exchange_name, exchange_type=self.EXCHANGE_TYPE, callback=cb)

    def on_exchange_declareok(self, _unused_frame, userdata):
        logger.info('Exchange declared: %s', userdata)
        self.setup_queue(self.QUEUE)

    def setup_queue(self, queue_name):
        logger.info('Declaring queue %s', queue_name)
        self._channel.queue_declare(queue=queue_name, callback=self.on_queue_declareok)

    def on_queue_declareok(self, _unused_frame):
        logger.info('Binding %s to %s with %s', self.EXCHANGE, self.QUEUE, self.ROUTING_KEY)
        self._channel.queue_bind(self.QUEUE, self.EXCHANGE, routing_key=self.ROUTING_KEY, callback=self.on_bindok)

    def on_bindok(self, _unused_frame):
        logger.info('Queue bound')
        self.start_publishing()

    def start_publishing(self):
        logger.info('Issuing Confirm.Select RPC command')
        self._channel.confirm_delivery(self.on_delivery_confirmation)

    def on_delivery_confirmation(self, method_frame):
        confirmation_type = method_frame.method.NAME.split('.')[1].lower()
        logger.info('Received %s for delivery tag: %i', confirmation_type, method_frame.method.delivery_tag)
        if confirmation_type == 'ack':
            self._acked += 1
        elif confirmation_type == 'nack':
            self._nacked += 1
        self._deliveries.remove(method_frame.method.delivery_tag)
        logger.info(
            'Published %i messages, %i have yet to be confirmed, '
            '%i were acked and %i were nacked', self._message_number,
            len(self._deliveries), self._acked, self._nacked)

    def publish_message(self, message):
        if self._channel is None or not self._channel.is_open:
            return

        hdrs = { "a": "b" }
        properties = pika.BasicProperties(
            app_id='example-publisher',
            content_type='application/json',
            headers=hdrs)

        self._channel.basic_publish(self.EXCHANGE, self.ROUTING_KEY,
                                    json.dumps(message, ensure_ascii=False),
                                    properties)
        self._message_number += 1
        self._deliveries.append(self._message_number)
        logger.info('Published message # %i', self._message_number)


app = FastAPI()
ep = None


@app.on_event("startup")
async def startup() -> None:
    global ep
    await asyncio.sleep(10) # Wait for MQ
    user = os.environ['RABBITMQ_DEFAULT_USER']
    passwd = os.environ['RABBITMQ_DEFAULT_PASS']
    host = os.environ['RABBITMQ_HOST']
    port = os.environ['RABBITMQ_PORT']

    ep = AsyncioRabbitMQ(f'amqp://{user}:{passwd}@{host}:{port}/%2F')
    ep.connect()


JSONObject = Dict[str, Any]


@app.post("/webhook")
async def webhook_endpoint(msg: JSONObject) -> None:
    global ep
    ep.publish_message(msg)
    return Response(status_code=204)

【讨论】:

    猜你喜欢
    • 2020-08-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-07-06
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多