【问题标题】:Use python's asyncio to make an API request and process the result asynchronously使用python的asyncio发出API请求并异步处理结果
【发布时间】:2019-11-27 16:58:11
【问题描述】:

我需要对多条数据发出 API 请求,然后处理每个结果。请求是分页的,所以我目前正在做

def get_results():
    while True:
        response = api(num_results=5)
        if response is None:  # No more results
            break
        yield response

def process_data():
    for page in get_results():
        for result in page:
            do_stuff(result)

process_data()

我希望在处理当前结果时使用 asyncio 从 API 检索下一页结果,而不是等待结果,处理它们,然后再次等待。我已将代码修改为

import asyncio

async def get_results():
    while True:
        response = api(num_results=5)
        if response is None:  # No more results
            break
        yield response

async def process_data():
    async for page in get_results():
        for result in page:
            do_stuff(result)

asyncio.run(process_data())

我不确定这是否符合我的预期。这是处理 API 结果的当前页面并异步获取下一页结果的正确方法吗?

【问题讨论】:

  • 要使用 asyncio,您调用的 API 本身必须是异步的。一个好的指标是您必须等待它的结果。如果您的 async def 函数中没有一个包含等待,则暗示它们并不是真正的异步。

标签: python python-asyncio


【解决方案1】:

也许您可以使用Asyncio.Queue 将您的代码重构为生产者/消费者模式

import asyncio
import random

q = asyncio.Queue()

async def api(num_results):
    # you could use aiohttp to fetch api

    # fake content
    await asyncio.sleep(1)
    fake_response = random.random()
    if fake_response < 0.1:
        return None
    return fake_response

async def get_results(q):
    while True:
        response = await api(num_results=5)
        if response is None:
            # indicate producer done
            print('Producer Done')
            await q.put(None)
            break
        print('Producer: ', response)
        await q.put(response)

async def process_data():
    while True:
        data = await q.get()
        if not data:
            print('Consumer Done')
            break
        # process data whatever you want, but if its cpu intensive, you can call loop.run_in_executor
        # fake the process needs a little time
        await asyncio.sleep(3)
        print('Consume', data)

loop = asyncio.get_event_loop()
loop.create_task(get_results(q))
loop.run_until_complete(process_data())

回到问题

这是使处理当前页面的 API 结果和异步获取下一页结果的正确方法吗?

这不是正确的方法,因为每次您的 do_stuff(result) 完成时,get_results() 都会被迭代

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-05-13
    • 1970-01-01
    • 2022-01-06
    • 2012-09-20
    • 1970-01-01
    相关资源
    最近更新 更多