【问题标题】:Is this the intended pattern for using Python `asyncio` coroutines?这是使用 Python `asyncio` 协程的预期模式吗?
【发布时间】:2016-01-29 02:40:28
【问题描述】:

我正在尝试使用 Slack RTM API 编写一个小的并发流处理程序,我想知道这是否是最有效地使用 Python 协程。 asyncio 包有很多选项,但很难确定项目的正确方法是什么,我认为文档并不能很好地解释每个项目的优点/缺点。

我认为我这里不需要多线程的开销,我需要异步循环之间的相互通信。我应该为我的每个函数创建一个单独的BaseEventLoop 吗?

作为 Python,我认为对于这个问题 (There should be one-- and preferably only one --obvious way to do it) 有一个接近的确定性答案,但我担心添加所有这些异步杂乱可能只会使我的代码性能低于完全顺序的幼稚实现。

# Is this the best way to communicate between coroutines? 
incoming_message_q = asyncio.Queue()

async def print_event():
    logging.info("Entering log loop")

    # Should this operate within it's own BaseEventLoop? 
    while True:
        event = await incoming_message_q.get()
        logging.info(event)

async def log_queue_status():
    while True:
        logging.info(incoming_message_q.qsize())
        await asyncio.sleep(5)

async def read_rtm_connection(client, q):
    if client.rtm_connect():
        logging.info("Successful Slack RTM connection")
        while True:

            # How do I make this part non-blocking?
            events = client.rtm_read()

            for event in events:
                logging.info("Putting onto the queue", event)
                if event["type"] == "presence_change":
                    await q.put(event)
                elif event["type"] == "message":
                    await q.put(event)
                else:
                    logging.info("Not sure what to do")

            await asyncio.sleep(0.1)
    else:
        logging.info("RTM connection failed.")

loop = asyncio.get_event_loop()
loop.create_task(print_event())
loop.create_task(log_queue_status())
loop.create_task(read_rtm_connection(client, incoming_message_q))
loop.run_forever()

【问题讨论】:

  • 我不熟悉 Slack 或 rtm_read() 方法是什么,因此请记住:要使某些东西成为非阻塞的,它必须是一个可以作为 asyncio 运行的可等待协程使用 ensure_future() 的任务。如果 rtm_read() 不是协程,您可以尝试将其包装在协程中,以某种方式轮询 rtm_read() 以接收新事件并将它们传递给您的其他方法。根据您的情况,您可以将事件消息直接传递到队列,或者在每次收到事件时返回未来,并在协程中使用 for-in 循环来迭代它们。

标签: python python-asyncio


【解决方案1】:

如果您想以一种异步友好的方式与 slack 交互,您将需要使用非阻塞 API。我不确定您当前使用的是什么,但如果它不包含任何 asyncio 协程,它可能不会轻易集成到 asyncio 中,除非您通过 @ 在后台线程中运行所有阻塞调用987654324@。另一种选择是将库中的所有底层阻塞 I/O 调用实际转换为非阻塞,这通常是一大堆工作。

好消息是至少有一个图书馆已经为您完成了这项工作; slacker-asyncio,它是slacker 的一个分支。您应该能够使用它通过协程与 RTM API 进行交互。

【讨论】:

  • 感谢@dano 提供的信息。昨天我在发布此内容后查看了 slackhq/python-slackclient lib,发现它确实非常阻塞。如果我按照你所说的那样在loop.run_in_executor 中运行那部分代码,那么在协程和slackclient 线程之间进行通信的安全方式是什么?
  • @dalanmiller 如果你只是像上面的例子一样在client 对象上执行方法,你可以只做events = loop.run_in_executor(None, client.rtm_read)asyncio 将自动返回 client.rtm_read() 调用的结果。
  • 如果我有其他 run_in_executors 虽然要去,他们也可以安全地访问队列吗? AKA 与 run_in_executor 我必须担心线程安全是吗?
  • @dalanmiller 在您的示例代码中,没有从事件循环线程外部访问队列,因此您可以继续使用asyncio.Queue。如果您的代码在需要访问队列的其他线程中运行,则必须通过run_in_executor 使用threading.Queue,或janus 库的队列实现(如here 所述)。
  • @dalanmiller 另外,我之前评论中的代码应该是event = await loop.run_in_executor(None, client.rtm_read)
猜你喜欢
  • 2020-12-29
  • 1970-01-01
  • 2018-02-25
  • 2019-03-16
  • 1970-01-01
  • 2020-04-14
  • 2016-03-27
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多