【问题标题】:Read streaming output from async subprocess从异步子进程读取流输出
【发布时间】:2020-01-18 13:53:24
【问题描述】:

我正在尝试从子进程中运行的程序读取 URL,然后安排异步 HTTP 请求,但看起来请求正在同步运行。那是因为子进程和请求都在同一个协程函数中运行吗?

test.py

import random
import time

URLS = ['http://example.com', 'http://example.com/sleep5s']

def main():
    for url in random.choices(URLS, weights=(1, 1), k=5):
        print(url)
        time.sleep(random.uniform(0.5, 1))


if __name__ == '__main__':
    main()

ma​​in.py

import asyncio
import sys

import httpx

from  httpx.exceptions import TimeoutException


async def req(url):
    async with httpx.AsyncClient() as client:
        try:
            r = await client.get(url, timeout=2)
            print(f'Response {url}: {r.status_code}')
        except Exception as TimeoutException:
            print(f'TIMEOUT - {url}')
        except Exception as exc:
            print(f'ERROR - {url}')


async def run():
    proc = await asyncio.create_subprocess_exec(
        sys.executable,
        '-u',
        'test.py',
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )

    while True:
        line = await proc.stdout.readline()
        if not line:
            break

        url = line.decode().rstrip()
        print(f'Found URL: {url}')

        resp = await req(url)

    await proc.wait()


async def main():
    await run()


if __name__ == '__main__':
    asyncio.run(main())

测试

$ python main.py
Found URL: http://example.com
Response http://example.com: 200
Found URL: http://example.com/sleep5s
TIMEOUT - http://example.com/sleep5s
Found URL: http://example.com/sleep5s
TIMEOUT - http://example.com/sleep5s
Found URL: http://example.com
Response http://example.com: 200
Found URL: http://example.com/sleep5s
TIMEOUT - http://example.com/sleep5s

【问题讨论】:

    标签: python subprocess python-asyncio


    【解决方案1】:

    看起来请求正在同步运行。是不是因为 subprocess 和 requests 都在同一个协程函数中运行?

    您的诊断是正确的。 await 意思是它在罐头上所说的:协程在它有结果给你之前不会继续。幸运的是,asyncio 可以轻松地在后台运行协程:

        tasks = []
        while True:
            line = await proc.stdout.readline()
            if not line:
                break
    
            url = line.decode().rstrip()
            print(f'Found URL: {url}')
    
            tasks.append(asyncio.create_task(req(url)))
    
        resps = asyncio.gather(*tasks)
        await proc.wait()
    

    注意:

    • asyncio.create_task() 确保即使我们仍在读取这些行,请求也会开始处理
    • asyncio.gather() 确保在协程完成之前实际上等待所有任务。它还提供对响应的访问并传播异常(如果有)。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-03-26
      • 1970-01-01
      • 1970-01-01
      • 2015-12-24
      • 1970-01-01
      相关资源
      最近更新 更多