【问题标题】:How to gather task results in Trio?如何在 Trio 中收集任务结果?
【发布时间】:2018-10-05 18:25:18
【问题描述】:

我编写了一个脚本,它使用了一个nursery 和asks 模块来循环并根据循环变量调用一个API。我收到了回复,但不知道如何像使用 asyncio 一样返回数据。

我还有一个关于将 API 限制为每秒 5 个的问题。

from datetime import datetime
import asks
import time
import trio

asks.init("trio")
s = asks.Session(connections=4)

async def main():
    start_time = time.time()

    api_key = 'API-KEY'
    org_id = 'ORG-ID'
    networkIds = ['id1','id2','idn']

    url = 'https://api.meraki.com/api/v0/networks/{0}/airMarshal?timespan=3600'
    headers = {'X-Cisco-Meraki-API-Key': api_key, 'Content-Type': 'application/json'}

    async with trio.open_nursery() as nursery:
        for i in networkIds:
            nursery.start_soon(fetch, url.format(i), headers)

    print("Total time:", time.time() - start_time)



async def fetch(url, headers):
    print("Start: ", url)
    response = await s.get(url, headers=headers)
    print("Finished: ", url, len(response.content), response.status_code)




if __name__ == "__main__":
    trio.run(main)

当我运行nursery.start_soon(fetch...) 时,我在fetch 中打印数据,但是如何返回数据?我没有看到任何类似于 asyncio.gather(*tasks) 函数的东西。

另外,我可以将会话数限制为 1-4,这有助于降低每秒 5 个 API 的限制,但我想知道是否有内置方法来确保调用的 API 不超过 5 个任何给定的秒数?

【问题讨论】:

标签: python-3.x python-trio


【解决方案1】:

返回数据:将 networkID 和一个字典传递给fetch 任务:

async def main():
    …
    results = {}
    async with trio.open_nursery() as nursery:
        for i in networkIds:
            nursery.start_soon(fetch, url.format(i), headers, results, i)
    ## results are available here

async def fetch(url, headers, results, i):
    print("Start: ", url)
    response = await s.get(url, headers=headers)
    print("Finished: ", url, len(response.content), response.status_code)
    results[i] = response

或者,创建一个trio.Queue,您可以在其中put 结果;然后您的主要任务可以从队列中读取结果。

API 限制:创建一个 trio.Queue(10) 并按照以下方式启动任务:

async def limiter(queue):
    while True:
        await trio.sleep(0.2)
        await queue.put(None)

将该队列作为另一个参数传递给 fetch,并在每次 API 调用之前调用 await limit_queue.get()

【讨论】:

  • 我正在尝试解开您的答案,但不太了解如何集成到我的代码中。我将看一下文档中的 .Event / .queue / 和 .sleep 函数。谢谢!
  • 好的,我用这个方法得到了结果,我可以用它。仍在努力将队列集成到我的代码中。我用谷歌搜索并没有看到一个使用 trio.queue 的例子。如何通过限制器(队列)来获取?到目前为止谢谢!
  • 抱歉,措辞不好。您不会将limiter(queue) 传递给fetch - 您只需将用于限制fetch 的队列作为另一个参数传递。
  • 爱你的限制器队列!这是我以前从未见过的模型。
【解决方案2】:

从技术上讲,trio.Queue 在 trio 0.9 中已被弃用。它已被trio.open_memory_channel 取代。

简短示例:

sender, receiver = trio.open_memory_channel(len(networkIds)
async with trio.open_nursery() as nursery:
    for i in networkIds:
        nursery.start_soon(fetch, sender, url.format(i), headers)

async for value in receiver:
    # Do your job here
    pass

并且在您的 fetch 函数中,您应该在某处调用 async sender.send(value)

【讨论】:

    【解决方案3】:

    当我运行nursery.start_soon(fetch...) 时,我在fetch 中打印数据,但是如何返回数据?我没有看到任何类似于 asyncio.gather(*tasks) 函数的东西。

    你问了两个不同的问题,所以我只回答这个。 Matthias 已经回答了您的其他问题。

    当您调用start_soon() 时,您是在要求 Trio 在后台运行任务,然后继续运行。这就是 Trio 能够同时运行多次fetch() 的原因。但是因为 Trio 一直在运行,所以无法像 Python 函数通常那样“返回”结果。它甚至会回到哪里?

    您可以使用队列让fetch() 任务将结果发送到另一个任务以进行额外处理。

    创建队列:

    response_queue = trio.Queue()
    

    当您开始获取任务时,将队列作为参数传递,并在完成后向队列发送哨兵:

    async with trio.open_nursery() as nursery:
        for i in networkIds:
            nursery.start_soon(fetch, url.format(i), headers)
    await response_queue.put(None)
    

    下载 URL 后,将响应放入队列:

    async def fetch(url, headers, response_queue):
        print("Start: ", url)
        response = await s.get(url, headers=headers)
        # Add responses to queue
        await response_queue.put(response)
        print("Finished: ", url, len(response.content), response.status_code)
    

    通过上述更改,您的 fetch 任务会将响应放入队列中。现在您需要从队列中读取响应,以便处理它们。您可以添加一个新函数来执行此操作:

    async def process(response_queue):
        async for response in response_queue:
            if response is None:
                break
            # Do whatever processing you want here.
    

    您应该在启动任何获取任务之前将此处理函数作为后台任务启动,以便它在收到响应后立即处理。

    在 Trio 文档的 Synchronizing and Communicating Between Tasks 部分了解更多信息。

    【讨论】:

    • 这是一个更简洁的解释。除了trio.Queue 总是需要一个容量。因此,您可以使用受任务数量限制的容量(此处为len(networkIds)),而不是标记。
    【解决方案4】:

    基于this answers,可以定义如下函数:

    async def gather(*tasks):
    
        async def collect(index, task, results):
            task_func, *task_args = task
            results[index] = await task_func(*task_args)
    
        results = {}
        async with trio.open_nursery() as nursery:
            for index, task in enumerate(tasks):
                nursery.start_soon(collect, index, task, results)
        return [results[i] for i in range(len(tasks))]
    

    然后您可以通过简单地修补 trio(添加收集功能)以与 asyncio 完全相同的方式使用 trio:

    import trio
    trio.gather = gather
    

    这是一个实际的例子:

    async def child(x):
        print(f"Child sleeping {x}")
        await trio.sleep(x)
        return 2*x
    
    async def parent():
        tasks = [(child, t) for t in range(3)]
        return await trio.gather(*tasks)
    
    print("results:", trio.run(parent))
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-03-20
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-09-02
      相关资源
      最近更新 更多