【问题标题】:How to make 100 thousand HTTP calls while maintaining the max connection如何在保持最大连接的同时进行 10 万次 HTTP 调用
【发布时间】:2021-03-23 07:40:32
【问题描述】:

在这里,我在阅读 csv 后尝试调用 api。在此过程中,我想限制连接数。但是,当我添加BoundedSemaphore 时,我们开始收到AttributeError: __aexit__ exception

分享代码

file_name = "xgb_tmp"

RAW_FILE = f'/input/{file_name}.csv'
RESPONSE_FILE = f'/output/{file_name}.csv'
LOGFILE_PATH = f'/logs/{file_name}.log'
MAX_CONNECTIONS = 1000
POOL_SEMAPHORE = threading.BoundedSemaphore(value=MAX_CONNECTIONS)


BASE_URL = "http://localhost:8999" # for local


async def fetch(session, url, data, headers):
    try:
        async with session.post(url, json=data, headers=headers, timeout=10) as response:
            return (response.status, await response.json())
    except Exception as e:
        return ("", {"error": str(e), "data": data})


async def write_result(data, writer):
    async with asyncio.Lock():       
        writer.writerow({"data": data})

async def inititate_api_call(session, data, writer):
    async with POOL_SEMAPHORE:
        url = BASE_URL + "/v3/xxx"
        data = {
            "a": data["a"],
        }
        headers = {
            "Content-Type": "application/json",
        }

        status, response_data = await fetch(session, url, data, headers)
        await write_result(data, writer)


async def main():
    async with aiohttp.ClientSession() as session:
        with open(RAW_FILE, 'r') as csv_in, open(RESPONSE_FILE, 'a') as csv_out:
            fieldnames = ['a', 'b', 'c']
            writer = csv.DictWriter(csv_out, fieldnames=fieldnames)
            writer.writeheader()

            reader = csv.DictReader(csv_in, quoting=csv.QUOTE_ALL, skipinitialspace=True)
            aws = [asyncio.ensure_future(inititate_api_call(session, data, writer)) for data in reader]

            await asyncio.gather(*aws)
            print("Completed!!!!!")


# read csv and initiate migration
if __name__ ==  '__main__':
    start_time = time.time()
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()
    print("total time taken: ", time.time() - start_time)

【问题讨论】:

    标签: python python-asyncio python-multithreading semaphore


    【解决方案1】:

    threading 模块不支持异步,因此threading.BoundedSemaphore 不能用作异步上下文管理器。一般来说,您不能混合使用 asynciothreading 类。您想改用asyncio.BoundedSemaphore

    不过,问题是BoundedSemaphore 绑定到一个循环。对于最高 3.9 的 Python 版本,您可以在实例化时将其传入。

    sem = asyncio.BoundedSemaphore(value=1, loop=asyncio.get_event_loop())
    

    但是,从 Python 3.10 开始,将不再有 loop 参数。您需要一个运行循环来实例化您的信号量。您可以通过在 main 中执行此操作并将其传递给 initiate_api_call 来使用您的代码完成此操作。

    async def inititate_api_call(sem: asyncio.Semaphore, session, data, writer):
        async with sem:
            url = BASE_URL + "/v3/xxx"
            data = {
                "a": data["a"],
            }
            headers = {
                "Content-Type": "application/json",
            }
    
            status, response_data = await fetch(session, url, data, headers)
            await write_result(data, writer)
    
    
    async def main():
        sem = asyncio.BoundedSemaphore(value=MAX_CONNECTIONS)
        async with aiohttp.ClientSession() as session:
            with open(RAW_FILE, 'r') as csv_in, open(RESPONSE_FILE, 'a') as csv_out:
                fieldnames = ['a', 'b', 'c']
                writer = csv.DictWriter(csv_out, fieldnames=fieldnames)
                writer.writeheader()
    
                reader = csv.DictReader(csv_in, quoting=csv.QUOTE_ALL, skipinitialspace=True)
                aws = [inititate_api_call(sem, session, data, writer) for data in reader]
    
                await asyncio.gather(*aws)
                print("Completed!!!!!")
    
    
    # read csv and initiate migration
    if __name__ ==  '__main__':
        start_time = time.time()
        asyncio.run(main())
        print("total time taken: ", time.time() - start_time)
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-01-13
      • 2018-11-03
      • 1970-01-01
      • 2011-04-28
      • 2012-10-29
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多