【问题标题】:Running Python async server loop and Future concurrently同时运行 Python 异步服务器循环和 Future
【发布时间】:2022-01-11 23:35:36
【问题描述】:

我正在做一个项目,我有一个简单的套接字服务器来寻找连接,以便用消息队列中的消息回复客户端。同时,我有一个使用 PyWin32 的监控系统,它正在寻找目录中文件的更改。当 PyWin32 检测到文件更改时,它应该将信息附加到消息队列中。当服务器检测到连接时,它应该从消息队列中获取相关信息。服务器将不断寻找连接,而 PyWin32 仅在检测到更改时运行。

我可以分别成功运行它们,但不能一起运行。似乎我的代码挂在了一个函数上,而另一个停止了处理。我已阅读文档并仔细阅读了 SO 帖子,但似乎没有任何问题可以解决我的问题。

这是一个 Socket 服务器的示例:

class TestSocketServer:
    def __init__(self,
                 # response_handler: Callable
                 ):
        self._alive: bool = False

        # Configure client-server connection
        self._host: str = 'localhost'
        self._port: int = 50000

        self._mq = {}

    def _respond(self, reader):
        request = self._mq[reader].get()

        return f"Got request {request}"

    async def _client_connected(self, reader, writer):
        data = await reader.read(1024)

        # print(reader, type(reader))
        message = data.decode()
        print(message)

        self._mq[reader]: queue.Queue = queue.Queue()
        self._mq[reader].put_nowait(message)

        response = self._respond(reader)

        writer.write(response.encode())
        await writer.drain()

        writer.close()

    async def handle_traffic(self):
        server = await asyncio.start_server(
            self._client_connected,
            self._host,
            self._port)

        await server.serve_forever()

    def start(self):
        asyncio.run(self.handle_traffic())

PyWin32 处理程序如下所示:

    async def _monitor(self):
        """
        Look for changes in directory and return them
        :return: FILE_NOTIFY_INFORMATION
        """
        ReadDirectoryChangesW(self._handle,  # Python version of Windows Handle
                          self._buffer,
                          TRUE,  # Monitor directory tree
                          # Attributes to monitor
                          (FILE_NOTIFY_CHANGE_SIZE |
                           FILE_NOTIFY_CHANGE_ATTRIBUTES |
                           FILE_NOTIFY_CHANGE_DIR_NAME |
                           FILE_NOTIFY_CHANGE_FILE_NAME |
                           FILE_NOTIFY_CHANGE_LAST_WRITE)
                          )

        change = FILE_NOTIFY_INFORMATION(self._buffer, 9999)

        action, item = change[0]
        act_str = EventWatcher._ACTIONS.get(action)

        if change:
            chg_msg = {str(time.time()): [str(act_str), item]}
            # TODO try to push changes through socket before persisting
            await self._write_changes(action=action, change=chg_msg)
    async.run(self._monitor)

我是如何尝试实现它们的(不包括 async.run() 调用):

loop = asyncio.new_event_loop()
loop.create_task(self.handle_traffic())
loop.create_task(self.monitor())
loop.run_forever()

我还尝试在我的循环中为监控部分创建一个 Future:

loop = asyncio.new_event_loop()
loop.create_task(self.handle_traffic())
fut = loop.create_future()
loop.create_task(self.monitor(fut))
loop.run_forever()

【问题讨论】:

  • 看看你们是如何一起实现它们会很有帮助。从您所展示的内容来看,我猜您错误地使用了“asyncio.run()”。这应该是整个异步程序的入口点,所以不要在方法中使用它。
  • 明白了。我已经厌倦了一些没有运气的实现,但让我包括我尝试过的。
  • @thisisalsomypassword 添加。在使用事件循环时,我遇到了同样的问题,一个协程似乎阻塞了另一个。
  • 可以理解,但没有线程是不可能的。如果您绝对不想这样做,您也可以制作一种微服务并通过套接字传递 fs 事件。
  • 那肯定行得通,尽管这基本上是我已经在做的事情,如果可能的话,我想避免添加另一层。对于这个用例,线程应该可以正常工作。

标签: python-3.x asynchronous python-asyncio pywin32 python-sockets


【解决方案1】:

感谢 @thisisalsomypassword 指出我的 Win32 API 调用阻塞了我的脚本的其余部分,我需要使用线程。执行以下操作解决了我的问题:

class TestSocketServer:
    def __init__(self,
                 # response_handler: Callable
                 ):
        self._alive: bool = False

        # Configure client-server connection
        self._host: str = 'localhost'
        self._port: int = 50000

        self._mq = {}

    def _respond(self, reader):
        request = self._mq[reader].get()

        return f"Got request {request}"

    async def _client_connected(self, reader, writer):
        data = await reader.read(1024)

        # print(reader, type(reader))
       message = data.decode()
        print(message)

        self._mq[reader]: queue.Queue = queue.Queue()
        self._mq[reader].put_nowait(message)

        response = self._respond(reader)

        writer.write(response.encode())
        await writer.drain()

        writer.close()

    async def start_serving(self):
        await asyncio.start_server(
            self._client_connected,
            self._host,
            self._port)

对于观察者:

    def monitor(self):
        """
        Look for changes in directory and return them
        :return: FILE_NOTIFY_INFORMATION
        """
        while True:
            ReadDirectoryChangesW(self._handle,  # Python version of Windows Handle
                                  self._buffer,
                                  TRUE,  # Monitor directory tree
                                  # Attributes to monitor
                                  (FILE_NOTIFY_CHANGE_SIZE |
                                   FILE_NOTIFY_CHANGE_ATTRIBUTES |
                                FILE_NOTIFY_CHANGE_DIR_NAME |
                                FILE_NOTIFY_CHANGE_FILE_NAME |
                                FILE_NOTIFY_CHANGE_LAST_WRITE)
                                )

            change = FILE_NOTIFY_INFORMATION(self._buffer, 9999)

            action, item = change[0]
            print(change)

然后以这种方式实现这两种方法:

self._server = TestSocketServer()
self._watcher = EventWatcher()

def main(self):
    loop = asyncio.get_event_loop()
    loop.create_task(asyncio.to_thread(self._watcher.monitor))
    loop.create_task(self._server.start_serving())
    loop.run_forever()

main()

现在,每当有套接字请求或文件系统发生更改时,我都会看到两者都出现,并且程序继续按预期处理未来的输入。

【讨论】:

  • asyncio.to_thread() 对我来说是新的。所以谢谢你!
  • 我的荣幸!我主要作为数据科学家工作,所以所有这些都在我的驾驶室之外 XD 再次感谢您的帮助!
猜你喜欢
  • 2018-04-10
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-01-11
  • 2021-12-02
  • 1970-01-01
  • 2021-04-30
  • 1970-01-01
相关资源
最近更新 更多