【问题标题】:Run code after asyncio run_until_complete() statement has finished在 asyncio run_until_complete() 语句完成后运行代码
【发布时间】:2021-07-07 23:57:49
【问题描述】:

我对@9​​87654321@ 还很陌生,我设法用它完成了一些请求。我创建了一个函数fetch_all(),它接收查询列表(URL)和先前使用asyncio 创建的循环作为参数,并调用函数fetch(),以JSON 格式获取每个查询的结果:

import aiohttp
import asyncio
import ssl
import nest_asyncio
nest_asyncio.apply()

async def fetch(session, url):
    async with session.get(url, ssl=ssl.SSLContext()) as response:
        return await response.json()

async def fetch_all(urls, loop):
    async with aiohttp.ClientSession(loop=loop) as session:
        return await asyncio.gather(*[fetch(session, url) for url in urls], return_exceptions=True)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    results = loop.run_until_complete(fetch_all(queries, loop))

这可以正常工作,我将results 中的查询结果作为 JSON(字典)列表。但这是我的问题:有时,某些结果(RuntimeErroraiohttp.client_exceptions.ClientConnectorError 等)出现错误,而不是 JSON。我猜这些是一次性错误,因为如果我单独重做查询,我会得到想要的结果。因此,我想出了一个while 循环来检查哪些结果不是字典并重做它们的查询:我用查询及其索引初始化repeat_querieserror_indexresults,并应用run_until_complete()。然后我保存作为字典的每个结果并更新剩下的查询列表及其索引:

repeat_queries = queries
error_index = list(range(len(repeat_queries)))
results = error_index

while error_index:
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        repeat_results = loop.run_until_complete(fetch_all(repeat_queries, loop))
    for i, rr in zip(error_index, repeat_results):
        results[i] = rr
    error_index = [i for i in range(len(results)) if not isinstance(results[i], dict)]
    repeat_queries = [repeat_queries[i] for i in error_index]

但是,由于 asyncio 循环是异步的,error_indexrepeat_queries 更新会在 run_until_complete() 完成之前执行,并且循环会继续运行之前迭代中已经投射的查询,从而导致一个(几乎)无限的while 循环。

因此,我的问题是:
有什么办法可以强制在loop.run_until_complete() 完成后执行某些代码?
我在 stackoverflow 中看到了一些类似的问题,但我无法应用他们的任何答案。

【问题讨论】:

  • 我认为你过度复杂了一些代码 - 你可以使用 for i, rr, enumerate(repeat_results) 而不是使用 list(range(len(repeat_queries))) 。您可以在一个for-loop 中完成所有操作,而不是使用一个for-loop 创建results[i], other for-loop to create errorn_index, and third for-loop to get repeat_queries`。
  • 你应该运行if __name__ == '__main__':中的所有代码
  • 我会在fetch() 内使用while-loop 来一次重复它——而不是等待所有数据结束。最终我会使用循环 for _ in range(3): ... break 重复它 3 次 - 如果有问题 - 如果它得到正确的数据,则使用 break
  • 如果您看到类似的问题,那么您可以添加有问题的链接(不在评论中)

标签: python asynchronous python-asyncio synchronous aiohttp


【解决方案1】:

我会以不同的方式做到这一点。

我会在 fetch()try/except 内运行循环来捕获异常并重复它。

因为有些问题永远无法给出结果,所以while-loop 可能会永远运行——所以我宁愿使用for _ in range(3) 只尝试3 次。

我还会从fetch 返回url,这样可以更轻松地获取不提供结果的网址。

import aiohttp
import asyncio
import ssl

async def fetch(session, url):
    exception = None
    
    for number in range(3):  # try only 3 times
        try:
            async with session.get(url, ssl=ssl.SSLContext()) as response:
                data = await response.json()
                #print('data:', data)
                return url, data
        except Exception as ex:
            print('[ERROR] {} | {} | {}'.format(url, number+1, ex))
            exception = ex
            
    return url, exception

async def fetch_all(urls, loop):
    async with aiohttp.ClientSession(loop=loop) as session:
        return await asyncio.gather(*[fetch(session, url) for url in urls], return_exceptions=True)


queries = [
    'https://httpbin.org/get',
    'https://toscrape.com',
    'https://fake.domain/'
]

if __name__ == '__main__':
    
    loop = asyncio.get_event_loop()
    results = loop.run_until_complete(fetch_all(queries, loop))

    #print(results)
    
    print('--- results ---')
    
    for url, result in results:
        print('url:', url)
        print('result:', result)
        print('is dict:', isinstance(result, dict))
        print('type:', type(result))
        print('---')

结果:

[ERROR] https://fake.domain/ | 1 | Cannot connect to host fake.domain:443 ssl:<ssl.SSLContext object at 0x7f3902afc2c0> [Name or service not known]
[ERROR] https://fake.domain/ | 2 | Cannot connect to host fake.domain:443 ssl:<ssl.SSLContext object at 0x7f3902afc440> [Name or service not known]
[ERROR] https://fake.domain/ | 3 | Cannot connect to host fake.domain:443 ssl:<ssl.SSLContext object at 0x7f3902afc9c0> [Name or service not known]
[ERROR] https://toscrape.com | 1 | 0, message='Attempt to decode JSON with unexpected mimetype: text/html', url=URL('https://toscrape.com')
[ERROR] https://toscrape.com | 2 | 0, message='Attempt to decode JSON with unexpected mimetype: text/html', url=URL('https://toscrape.com')
[ERROR] https://toscrape.com | 3 | 0, message='Attempt to decode JSON with unexpected mimetype: text/html', url=URL('https://toscrape.com')
--- results ---
url: https://httpbin.org/get
result: {'args': {}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Host': 'httpbin.org', 'User-Agent': 'Python/3.8 aiohttp/3.7.4.post0', 'X-Amzn-Trace-Id': 'Root=1-60e5c00e-45aae85e78277e5122b262c9'}, 'origin': '83.11.175.159', 'url': 'https://httpbin.org/get'}
is dict: True
type: <class 'dict'>
---
url: https://toscrape.com
result: 0, message='Attempt to decode JSON with unexpected mimetype: text/html', url=URL('https://toscrape.com')
is dict: False
type: <class 'aiohttp.client_exceptions.ContentTypeError'>
---
url: https://fake.domain/
result: Cannot connect to host fake.domain:443 ssl:<ssl.SSLContext object at 0x7f3902afc9c0> [Name or service not known]
is dict: False
type: <class 'aiohttp.client_exceptions.ClientConnectorError'>
---

编辑:

使用循环run_until_complete 的方法的版本,但我会在一个for-loop 中完成所有操作。

而且我会使用for _ in range(3) 重复三遍。

这可行,但以前的版本似乎要简单得多。

import aiohttp
import asyncio
import ssl

async def fetch(session, url):
    async with session.get(url, ssl=ssl.SSLContext()) as response:
        return await response.json()

async def fetch_all(urls, loop):
    async with aiohttp.ClientSession(loop=loop) as session:
        return await asyncio.gather(*[fetch(session, url) for url in urls], return_exceptions=True)

queries = [
    'https://httpbin.org/get',
    'https://httpbin.org/json',
    'https://toscrape.com',
    'https://fake.domain/'
]

if __name__ == '__main__':

    # you can get it once
    loop = asyncio.get_event_loop()

    # original all queries
    all_queries = queries
    # places for all results  
    all_results = [None] * len(all_queries)
    
    # selected indexes at start
    indexes = list(range(len(all_queries)))
        
    for number in range(3):
        # selected queries
        queries = [all_queries[idx] for idx in indexes]
        
        # selected results
        results = loop.run_until_complete(fetch_all(queries, loop))
        
        print('\n--- try:', number+1, '--- results:', len(results), '---\n')
        
        new_indexes = []
        
        for idx, url, result in zip(indexes, queries, results):
            all_results[idx] = result
            if not isinstance(result, dict):
                new_indexes.append(idx)

            print('url:', url)
            print('result:', result)    
            print('is dict:', isinstance(result, dict))
            print('type:', type(result))
            print('---')
                
        # selected indexes after fitering correct results
        indexes = new_indexes             

【讨论】:

  • 非常感谢您的提示和回答!我尝试了您的解决方案,在 for 循环中尝试了 3 次似乎足够了,我得到了正确的所有结果
  • 您可以使其更通用并将其添加为参数 - fetch(session, url, tries=3):for number in range(tries) - 这样您就可以使用不同的值运行它。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2022-12-18
  • 1970-01-01
  • 1970-01-01
  • 2014-01-22
  • 2018-02-20
  • 2012-06-13
  • 1970-01-01
相关资源
最近更新 更多