【问题标题】:Listen to ZeroMQ in aiohttp application processaiohttp应用进程中监听ZeroMQ
【发布时间】:2016-08-12 21:12:26
【问题描述】:

我运行aiohttp 应用程序,Gunicornnginx 后面。 在我的应用程序的初始化模块中,我不使用web.run_app(app) 运行应用程序,而只是创建一个将由Gunicorn 导入的实例,以便在Gunicorn 创建的每个worker 中运行它。 所以Gunicorn 创建了一些工作进程,在其中创建了事件循环,然后runs 在这些循环中创建了应用程序的请求处理程序。

我的aiohttp 应用程序有一组连接的WebSockets(移动应用程序客户端),我想在Gunicorn 启动的任何应用程序进程中发生事件时通知它们。 我想通知连接到所有应用程序进程所有 WebSockets。 因此,我使用ZeroMQ 创建了某种上游代理,并且我想在每个应用程序进程中使用zmq.SUB 套接字订阅它。

...所以基本上我想在每个应用程序工作人员中实现这样的目标:

context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect('tcp://localhost:5555')

while True:
    event = socket.recv()
    for ws in app['websockets']:
        ws.send_bytes(event)
    # break before app shutdown. How?

如何监听aiohttp 应用程序中的ZeroMQ 代理将消息转发到WebSockets

我可以在哪里放置这段代码在事件循环中的后台运行,以及如何在aiohttp 应用程序的生命周期内正确运行和关闭它?


更新

我已经在 aiohttp 的 GitHub 存储库中创建了一个 issue 来描述问题并提出可能的解决方案。我非常感谢您在这里或那里就所描述的问题提供意见。

【问题讨论】:

  • 我正在尝试做同样的事情(结合 ZMQ 和 aiohttp),但您的答案与问题有何关系?这与 zmq 无关,是吗?
  • @Alex 没错。问题是如何从同一个 aiohttp 进程在后台收听 ZeroMQ(或任何其他队列)。那时 aiohttp 没有 on_startup 信号处理程序。但现在有了,所以在后台监听任意数量的队列都没有问题。

标签: python zeromq gunicorn python-asyncio aiohttp


【解决方案1】:

好的,关于 issue 的问题和讨论导致了我为 aiohttp 贡献的新功能,即在版本 1.0 中,我们将能够注册 @ 987654325@ 应用信号使用Application.on_startup() 方法。

Documentation.
Working example on the master branch.

#!/usr/bin/env python3
"""Example of aiohttp.web.Application.on_startup signal handler"""
import asyncio

import aioredis
from aiohttp.web import Application, WebSocketResponse, run_app

async def websocket_handler(request):
    ws = WebSocketResponse()
    await ws.prepare(request)
    request.app['websockets'].append(ws)
    try:
        async for msg in ws:
            print(msg)
            await asyncio.sleep(1)
    finally:
        request.app['websockets'].remove(ws)
    return ws


async def on_shutdown(app):
    for ws in app['websockets']:
        await ws.close(code=999, message='Server shutdown')


async def listen_to_redis(app):
    try:
        sub = await aioredis.create_redis(('localhost', 6379), loop=app.loop)
        ch, *_ = await sub.subscribe('news')
        async for msg in ch.iter(encoding='utf-8'):
            # Forward message to all connected websockets:
            for ws in app['websockets']:
                ws.send_str('{}: {}'.format(ch.name, msg))
            print("message in {}: {}".format(ch.name, msg))
    except asyncio.CancelledError:
        pass
    finally:
        print('Cancel Redis listener: close connection...')
        await sub.unsubscribe(ch.name)
        await sub.quit()
        print('Redis connection closed.')


async def start_background_tasks(app):
    app['redis_listener'] = app.loop.create_task(listen_to_redis(app))


async def cleanup_background_tasks(app):
    print('cleanup background tasks...')
    app['redis_listener'].cancel()
    await app['redis_listener']


async def init(loop):
    app = Application(loop=loop)
    app['websockets'] = []
    app.router.add_get('/news', websocket_handler)
    app.on_startup.append(start_background_tasks)
    app.on_cleanup.append(cleanup_background_tasks)
    app.on_shutdown.append(on_shutdown)
    return app

loop = asyncio.get_event_loop()
app = loop.run_until_complete(init(loop))
run_app(app)

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-06-09
    • 1970-01-01
    • 2021-03-21
    • 1970-01-01
    • 2011-11-27
    • 2018-07-17
    • 2016-01-28
    相关资源
    最近更新 更多