【问题标题】:Python asyncio gather returned value from 'call_soon_threadsafe'Python asyncio 从“call_soon_threadsafe”收集返回值
【发布时间】:2019-12-10 08:21:59
【问题描述】:

我试图理解python asyncio的call_soon_threadsafe API,但是失败了,下面的示例代码,如果我的simple协程想要返回一些东西,我应该如何从调用方获取返回值?

import time
import asyncio as aio
import uvloop

from threading import Thread

aio.set_event_loop_policy(uvloop.EventLoopPolicy())

async def simple(a, fut:aio.Future):
  await aio.sleep(a)
  return fut.set_result(a)

def delegator(loop):
  aio.set_event_loop(loop)
  loop.run_forever()

loop_exec = aio.new_event_loop()

t = Thread(target=delegator, args=(loop_exec,))
t.start()


if __name__ == '__main__':
  start_time = time.time()

  fut = loop_exec.create_future() # tried to get back returned value by future
  handle = loop_exec.call_soon_threadsafe(aio.ensure_future, simple(3, fut))
  res = aio.wait_for(fut, 10)

  print('Time consumed: {}s'.format(time.time() - start_time))
  print('>>>>>>>>>>', res)

# Output
Time consumed: 3.2901763916015625e-05s
>>>>>>>>>> <generator object wait_for at 0x110bb9b48>

如您所见,我试图通过将未来传递给在不同线程中运行的协程来取回返回值,但仍然不知道如何正确获取它。

基本上两个问题:

  1. 使用上面的示例代码如何从调用方取回返回值?
  2. 这个call_soon_threadsafe的实际用例是什么,只是觉得run_coroutine_threadsafe使用起来更方便,几乎可以涵盖这种不同线程协程交互中我能想到的所有情况。

【问题讨论】:

  • 要回答问题 1,您必须使用 concurrent.futures.Future 而不是 loop_exec.create_future 并将 aio.wait_for(fut, 10) 替换为 fut.result()。这基本上就是run_couroutine_threadsafe 所做的。

标签: python python-3.x python-asyncio coroutine


【解决方案1】:

使用上面的示例代码如何从调用方取回返回值?

由于事件循环在主线程之外运行,因此需要使用线程感知同步设备。例如:

async def simple(a, event):
    await asyncio.sleep(a)
    event.simple_result = a
    event.set()

done = threading.Event()
loop_exec.call_soon_threadsafe(aio.ensure_future, simple(3, done))
done.wait(10)
res = done.simple_result

或者,您可以使用concurrent.futures.Future 进行同步,这就像带有对象负载的一次性事件。 (请注意,您不能使用 asyncio 未来,因为它是 not thread-safe。)

async def simple(a, fut):
    await asyncio.sleep(a)
    fut.set_result(a)

done = concurrent.futures.Future()
loop_exec.call_soon_threadsafe(aio.ensure_future, simple(3, done))
res = done.result(10)

正如文森特在 cmets 中指出的,这就是 run_coroutine_threadsafe 将为您做的事情:

async def simple(a):
    await asyncio.sleep(a)
    return a

fut = asyncio.run_coroutine_threadsafe(simple(3))
res = fut.result(10)

这个call_soon_threadsafe的实际用例是什么

最简单的答案是call_soon_threadsafe 是一个较低级别的 API,当您只想告诉事件循环执行或开始执行某事时使用它。 call_soon_threadsafe 是用于实现 run_coroutine_threadsafe 等功能的构建块,还有许多其他功能。至于你为什么要自己使用那个管道功能.​​.....

有时你想执行一个普通的函数,而不是一个协程。有时你的函数是一劳永逸的,你并不关心它的返回值。 (或者该函数最终可能会通过某个侧通道通知您其完成。)在这些情况下,call_soon_threadsafe 是该工作的正确工具,因为它更轻量级,因为它不会尝试创建额外的 concurrent.futures.Future并将其附加到执行的代码中。例子:

  • loop.call_soon_threadsafe(loop.stop) 告诉事件循环停止运行
  • loop.call_soon_threadsafe(queue.put_nowait, some_item) 向无限异步队列添加内容
  • loop.call_soon_threadsafe(asyncio.create_task, coroutinefn()) 提交协程到事件循环而不等待它完成
  • loop.call_soon_threadsafe(some_future.set_result, value) 设置来自不同线程的异步未来的结果
  • this answer 中的低级代码

【讨论】:

  • 非常感谢您的详细解释。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2012-09-18
  • 2014-05-25
  • 1970-01-01
  • 2018-10-17
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多