【发布时间】:2016-01-29 02:40:28
【问题描述】:
我正在尝试使用 Slack RTM API 编写一个小的并发流处理程序,我想知道这是否是最有效地使用 Python 协程。 asyncio 包有很多选项,但很难确定项目的正确方法是什么,我认为文档并不能很好地解释每个项目的优点/缺点。
我认为我这里不需要多线程的开销,我需要异步循环之间的相互通信。我应该为我的每个函数创建一个单独的BaseEventLoop 吗?
作为 Python,我认为对于这个问题 (There should be one-- and preferably only one --obvious way to do it) 有一个接近的确定性答案,但我担心添加所有这些异步杂乱可能只会使我的代码性能低于完全顺序的幼稚实现。
# Is this the best way to communicate between coroutines?
incoming_message_q = asyncio.Queue()
async def print_event():
logging.info("Entering log loop")
# Should this operate within it's own BaseEventLoop?
while True:
event = await incoming_message_q.get()
logging.info(event)
async def log_queue_status():
while True:
logging.info(incoming_message_q.qsize())
await asyncio.sleep(5)
async def read_rtm_connection(client, q):
if client.rtm_connect():
logging.info("Successful Slack RTM connection")
while True:
# How do I make this part non-blocking?
events = client.rtm_read()
for event in events:
logging.info("Putting onto the queue", event)
if event["type"] == "presence_change":
await q.put(event)
elif event["type"] == "message":
await q.put(event)
else:
logging.info("Not sure what to do")
await asyncio.sleep(0.1)
else:
logging.info("RTM connection failed.")
loop = asyncio.get_event_loop()
loop.create_task(print_event())
loop.create_task(log_queue_status())
loop.create_task(read_rtm_connection(client, incoming_message_q))
loop.run_forever()
【问题讨论】:
-
我不熟悉 Slack 或 rtm_read() 方法是什么,因此请记住:要使某些东西成为非阻塞的,它必须是一个可以作为 asyncio 运行的可等待协程使用 ensure_future() 的任务。如果 rtm_read() 不是协程,您可以尝试将其包装在协程中,以某种方式轮询 rtm_read() 以接收新事件并将它们传递给您的其他方法。根据您的情况,您可以将事件消息直接传递到队列,或者在每次收到事件时返回未来,并在协程中使用 for-in 循环来迭代它们。
标签: python python-asyncio