【问题标题】:Python - Each WebSocket connection as a separate processPython - 每个 WebSocket 连接作为一个单独的进程
【发布时间】:2021-11-25 03:03:58
【问题描述】:

假设如果系统有 4 个 CPU,我需要在它上面运行 8 个 worker。在启动服务器时,我想初始化八个工作人员。我想将数据转发给新 WebSocket 连接上的可用工作人员,并将该工作人员的状态更改为已占用。如果所有工作人员都被占用,则服务器不应接受任何新连接。

我不想在进程之间交换数据。但是当在 WebSocket 中接收到新数据时,我需要将数据发送到正确的进程。每当 WebSocket 连接断开时,我需要将工作人员的状态更改为可用。

# Something like this, 
# FYI : The following is not a python code, just a representation of the requirement


process_manager = ProcessManager()

WebSocket 

    onNewConnection:
        available_process = process_manager.get_available_process()
        if available_process:
            available_process.set_state(state.Occupied)
        else:
            ws.write("Server is on full load")

        

    onNewMessage(_data):
        available_process.sendData(_data)

    OnSocketClose:
        available_process.set_state(state.Available)
    



__main__
    process_pool = ProcessPool (8)
    for loop 0 to 7 as i
        process_manager.put(i,Process)
    #-- start all process
    #-- join all process

【问题讨论】:

    标签: python websocket multiprocessing python-multiprocessing


    【解决方案1】:

    如果您需要多处理来运行并发请求而不是多线程,并且您所在的平台支持 OS fork 调用,那么您可能需要研究与 @ 结合使用的高级 socketserver.TCPServer 类987654323@上课。虽然,这将为每个新的传入请求分叉一个新进程。在任何平台上,如果线程足够,您可以使用socketserver.ThreadingMixIn 类。对于后者,保持启动的活动请求的计数应该是一件相当简单的事情,当您的请求处理程序被调用时增加并在完成时减少(在threading.Lock 的控制下)。对于前一种情况(多处理),此计数器必须是共享内存值,例如使用 multiprocessing.Value('h', lock=multiprocessing.Lock()) 创建的。

    但是,如果您想使用多处理池,那么以下内容可能适合您的要求。我创建了一个特殊的多处理池类,它基本上只支持apply_async 方法,它允许您提交新的非阻塞任务,但会跟踪有多少任务正在运行+等待运行(在您的情况下将有0 个任务等待运行)。因此,您只需在将请求提交到池之前检查当前任务计数:

    import multiprocessing
    import threading
    
    class MyPool:
        """
        Process pool class that only supports the following methods:
        apply_async
        close
        join
        terminate
        task_count
        """
    
        def __init__(self, *args, **kwargs):
            self._task_count = 0
            self._lock = threading.Lock()
            self._pool = multiprocessing.Pool(*args, **kwargs)
    
        def __enter__(self):
            self._pool.__enter__()
            return self
    
        def __exit__(self, exc_type, exc_val, exc_tb):
            return self._pool.__exit__(exc_type, exc_val, exc_tb)
    
        def close(self):
            return self._pool.close()
    
        def join(self):
            return self._pool.join()
    
        def terminate(self):
            return self._pool.terminate()
    
        def task_count(self):
            """
            return number of tasks running or queued up to run
            """
            return self._task_count
    
        def _decrement_task_count(self, result, callback=None):
            with self._lock:
                self._task_count -= 1
            if callback:
                callback(result)
    
        def apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None):
            with self._lock:
                self._task_count += 1
            callback_fn = self._decrement_task_count if callback is None else lambda result: self._decrement_task_count(result, callback=callback)
            error_callback_fn = self._decrement_task_count if error_callback is None else lambda result: self._decrement_task_count(result, callback=callback)
            return self._pool.apply_async(func, args, kwds, callback=callback_fn, error_callback=error_callback_fn)
    
    def handle_request(client_socket):
        ...
    
    if __name__ == '__main__':
        import socket
    
        SERVER_HOST = 'localhost'
        SERVER_PORT = 4000
    
        s = socket.socket()
        s.bind((SERVER_HOST, SERVER_PORT))
        s.listen(5)
        with MyPool(8) as pool:
            while True:
                client_socket, address = s.accept()
                if pool.task_count() < 8:
                    # We can run this
                    pool.apply_async(handle_request, args=(client_socket,))
                else:
                    client_socket.send(b'Server is on full load')
            """
            # We will never arrive here as the code currently stands:
            pool.close()
            pool.join()
            """
    

    【讨论】:

      猜你喜欢
      • 2014-08-23
      • 1970-01-01
      • 1970-01-01
      • 2019-01-21
      • 1970-01-01
      • 1970-01-01
      • 2017-08-19
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多