【发布时间】:2021-03-23 07:40:32
【问题描述】:
在这里,我在阅读 csv 后尝试调用 api。在此过程中,我想限制连接数。但是,当我添加BoundedSemaphore 时,我们开始收到AttributeError: __aexit__ exception
分享代码
file_name = "xgb_tmp"
RAW_FILE = f'/input/{file_name}.csv'
RESPONSE_FILE = f'/output/{file_name}.csv'
LOGFILE_PATH = f'/logs/{file_name}.log'
MAX_CONNECTIONS = 1000
POOL_SEMAPHORE = threading.BoundedSemaphore(value=MAX_CONNECTIONS)
BASE_URL = "http://localhost:8999" # for local
async def fetch(session, url, data, headers):
try:
async with session.post(url, json=data, headers=headers, timeout=10) as response:
return (response.status, await response.json())
except Exception as e:
return ("", {"error": str(e), "data": data})
async def write_result(data, writer):
async with asyncio.Lock():
writer.writerow({"data": data})
async def inititate_api_call(session, data, writer):
async with POOL_SEMAPHORE:
url = BASE_URL + "/v3/xxx"
data = {
"a": data["a"],
}
headers = {
"Content-Type": "application/json",
}
status, response_data = await fetch(session, url, data, headers)
await write_result(data, writer)
async def main():
async with aiohttp.ClientSession() as session:
with open(RAW_FILE, 'r') as csv_in, open(RESPONSE_FILE, 'a') as csv_out:
fieldnames = ['a', 'b', 'c']
writer = csv.DictWriter(csv_out, fieldnames=fieldnames)
writer.writeheader()
reader = csv.DictReader(csv_in, quoting=csv.QUOTE_ALL, skipinitialspace=True)
aws = [asyncio.ensure_future(inititate_api_call(session, data, writer)) for data in reader]
await asyncio.gather(*aws)
print("Completed!!!!!")
# read csv and initiate migration
if __name__ == '__main__':
start_time = time.time()
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
print("total time taken: ", time.time() - start_time)
【问题讨论】:
标签: python python-asyncio python-multithreading semaphore