【问题标题】:HTTP server kick-off background python script without blockingHTTP服务器启动后台python脚本无阻塞
【发布时间】:2019-09-05 18:09:02
【问题描述】:

我希望能够以简单的方式通过 Web 请求触发长时间运行的 Python 脚本。另外,我希望能够在初始副本仍在运行时触发具有不同参数的脚本的其他副本。

我研究了烧瓶、aiohttp 和排队的可能性。 Flask 和 aiohttp 的设置似乎开销最小。我计划通过 subprocess.run 执行现有的 python 脚本(但是,我确实考虑将脚本重构为可用于 Web 响应函数的库)。

使用 aiohttp,我正在尝试类似: 摄取服务.py:

from aiohttp import web
from pprint import pprint
routes = web.RouteTableDef()

@routes.get("/ingest_pipeline")
async def test_ingest_pipeline(request):
    '''
    Get the job_conf specified from the request and activate the script
    '''
    #subprocess.run the command with lookup of job conf file
    response =  web.Response(text=f"Received data ingestion request")
    await response.prepare(request)
    await response.write_eof()

    #eventually this would be subprocess.run call
    time.sleep(80)

    return response

def init_func(argv):
    app = web.Application()
    app.add_routes(routes)
    return app

虽然初始请求会立即返回,但后续请求会阻塞,直到初始请求完成。我正在通过以下方式运行服务器:

python -m aiohttp.web -H localhost -P 8080 ingestion_service:init_func

我知道多线程和并发可能提供比 asyncio 更好的解决方案。在这种情况下,我不是在寻找一个健壮的解决方案,只是让我能够通过 http 请求一次运行多个脚本,理想情况下内存成本最低。

【问题讨论】:

    标签: python multithreading http subprocess aiohttp


    【解决方案1】:

    好的,我正在做的事情有几个问题。也就是说,time.sleep() 是阻塞的,所以应该使用 asyncio.sleep()。但是,由于我对生成子进程感兴趣,我可以使用 asyncio.subprocess 以非阻塞方式执行此操作。 注意: asyncio: run one function threaded with multiple requests from websocket clients https://docs.python.org/3/library/asyncio-subprocess.html.

    使用这些帮助,但 webhandler 终止子进程仍然存在问题。幸运的是,这里有一个解决方案: https://docs.aiohttp.org/en/stable/web_advanced.html

    aiojobs 有一个“原子”装饰器,它将保护进程直到它完成。因此,这些行的代码将起作用:

    from aiojobs.aiohttp import setup, atomic
    import asyncio
    import os
    
    from aiohttp import web
    
    @atomic
    async def ingest_pipeline(request):
        #be careful what you pass through to shell, lest you
        #give away the keys to the kingdom
        shell_command = "[your command here]"
        response_text = f"running {shell_command}"
        response_code = 200
        response =  web.Response(text=response_text, status=response_code)
        await response.prepare(request)
        await response.write_eof()
    
        ingestion_process = await asyncio.create_subprocess_shell(shell_command,
                                                                  stdout=asyncio.subprocess.PIPE,
                                                                  stderr=asyncio.subprocess.PIPE)
    
        stdout, stderr = await ingestion_process.communicate()
        return response
    
    def init_func(argv):
        app = web.Application()
        setup(app)
        app.router.add_get('/ingest_pipeline', ingest_pipeline)
        return app
    

    这是一个非常简单的框架,但可能会帮助其他人寻找一个快速框架作为临时内部解决方案。

    【讨论】:

      猜你喜欢
      • 2014-12-24
      • 2016-04-11
      • 1970-01-01
      • 1970-01-01
      • 2015-11-25
      • 2014-03-18
      • 2019-03-16
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多