【问题标题】:Are there any better ways to run uvicorn in thread?有没有更好的方法在线程中运行 uvicorn?
【发布时间】:2019-09-19 11:35:41
【问题描述】:

Uvicorn 不会在线程中运行,因为信号在线程中不起作用。 只需删除信号处理即可阻止服务器关闭(需要强制关闭)

我的解决方案是干扰 __new__ 函数以获取服务器对象并创建关闭函数,然后将其绑定到线程外部的信号。

但是,这是一个非常丑陋的解决方案。有更好的方法吗?

def run():
    '''
    Start uvicorn server
    returns exit function
    '''
    server = None

    old_new = uvicorn.Server.__new__

    def spoof_server(self, *_, **__):
        '''Interfeer with __new__ to set server'''
        nonlocal server
        server = old_new(self)
        return server

    uvicorn.Server.__new__ = spoof_server
    uvicorn.Server.install_signal_handlers = lambda *_, **__: None

    Thread(target=uvicorn.run, args=[make_app()]).start()

    def exit_server():
        print('exiting...')
        server.handle_exit(None, None)

    return exit_server

【问题讨论】:

  • 我不知道 Uvicorn 的答案,但是 Hypercorn(另一个 ASGI 服务器)可以做到这一点 - 请参阅这些 docs。 (认为​​这总比没有好)。
  • 感谢 pgjones 的建议!我最终做的是转移到 aiohttp 这也是 cad 做的。

标签: python python-multithreading uvicorn


【解决方案1】:

我也在寻找这样的东西。我发现这个答案对我有帮助。 https://stackoverflow.com/a/64521239/13029591

我会在这里发布sn-p:

import contextlib
import time
import threading
import uvicorn

class Server(uvicorn.Server):
    def install_signal_handlers(self):
        pass

    @contextlib.contextmanager
    def run_in_thread(self):
        thread = threading.Thread(target=self.run)
        thread.start()
        try:
            while not self.started:
                time.sleep(1e-3)
            yield
        finally:
            self.should_exit = True
            thread.join()

config = Config("example:app", host="127.0.0.1", port=5000, log_level="info")
server = Server(config=config)

with server.run_in_thread():
    # Server is started.
    ...
    # Server will be stopped once code put here is completed
    ...

# Server stopped.

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2011-04-13
    • 2010-11-01
    • 2010-11-05
    • 1970-01-01
    • 2023-04-05
    • 1970-01-01
    • 2011-07-11
    • 1970-01-01
    相关资源
    最近更新 更多