【问题标题】:(Python) How can I apply asyncio in while loop with accumulator?(Python) 如何在带有累加器的 while 循环中应用 asyncio?
【发布时间】:2021-12-07 15:11:39
【问题描述】:

我有一段代码可以很好地从 API 请求获取数据到特定站点。问题是该站点每次调用只给我 50 个对象的限制,而且我必须进行多次调用。结果,我花了很长时间才完成抓取工作(有时我不得不等待近 20 分钟)。这是我的代码:

import concurrent.futures
import requests

supply = 3000
offset = 0

token_ids = []
while offset < supply:
   url = "url_1" + str(offset)
   response = requests.request("GET", url)
   a = response.json()
   assets = a["assets"]

   def get_token_ids(an):
       if str(an['sell_orders']) == 'None' and str(an['last_sale']) == 'None' and str(an['num_sales']) == '0':
       token_ids.append(str(an['token_id']))


    with concurrent.futures.ThreadPoolExecutor() as executor:
        results = [executor.submit(get_token_ids, asset) for asset in assets]

    offset += 50

print(token_ids)

问题在于代码运行并等待所有操作完成,然后再发出另一个请求。我正在考虑一个改进,当请求发送时,偏移值被添加,循环处理另一个请求,因此我不必等待。我不知道该怎么做,我学习了'asyncio',但这对我来说仍然是一个挑战。谁能帮我解决这个问题?

【问题讨论】:

  • 您测量过请求所需的时间吗?这是一个网络事务,在您的代码中,您只执行其中的 60 个。 20 分钟对于 60 个 URL 获取来说是很长的时间。如果您只是进行 60 次获取并跳过处理代码,那需要多长时间?您测量过执行get_token_ids 所需的时间吗?它看起来有点像字符串操作,所以我希望它会非常快。我怀疑使用 ThreadPoolExecutor 根本没有帮助你。除非我知道程序的时间花在哪里,否则我不会尝试优化程序的性能。

标签: python multithreading loops python-asyncio


【解决方案1】:

问题在于 Requests 不是异步代码,所以它的每个网络调用都会阻塞循环直到它完成。

https://docs.python-requests.org/en/latest/user/advanced/#blocking-or-non-blocking

因此,最好尝试异步库,例如aiohttp:

https://github.com/aio-libs/aiohttp

示例

为所有连接创建会话:

async with aiohttp.ClientSession() as session:

并运行所有需要的请求:

        results = await asyncio.gather(
            *[get_data(session, offset) for offset in range(0, supply, step)]
        )

这里,请求是异步执行的,session.get(url)只获取响应头,内容获取await response.json()

    async with session.get(url) as response:
        a = await response.json()

并且在主块中主循环开始:

    loop = asyncio.get_event_loop()
    token_ids = loop.run_until_complete(main())
    loop.close()

完整代码

import aiohttp
import asyncio


async def get_data(session, offset):

    token_ids = []
    url = "url_1" + str(offset)

    async with session.get(url) as response:
        # For tests:
        # print("Status:", response.status)
        # print("Content-type:", response.headers['content-type'])
        a = await response.json()

    assets = a["assets"]

    for asset in assets:
        if str(asset['sell_orders']) == 'None' and str(asset['last_sale']) == 'None' and str(asset['num_sales']) == '0':
            token_ids.append(str(asset['token_id']))

    return token_ids


async def main():
    supply = 3000
    step = 50
    token_ids = []
    # Create session for all connections and pass it to "get" function
    async with aiohttp.ClientSession() as session:
        results = await asyncio.gather(
            *[get_data(session, offset) for offset in range(0, supply, step)]
        )

    for ids in results:
        token_ids.extend(ids)

    return token_ids


if __name__ == "__main__":
    # asynchronous code start here
    loop = asyncio.get_event_loop()
    token_ids = loop.run_until_complete(main())
    loop.close()
    # asynchronous code end here

    print(token_ids)

【讨论】:

  • 您好,非常感谢您的解释,但是如何将 aiohttp 与累加器 while 循环一起应用?你能给我一些建议吗?
  • 我已经补充了答案
  • 非常感谢!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-04-10
  • 2021-08-21
  • 1970-01-01
相关资源
最近更新 更多