【问题标题】:Proper way of cancelling accept and closing a Python processing/multiprocessing Listener connection取消接受和关闭 Python 处理/多处理侦听器连接的正确方法
【发布时间】:2010-09-26 08:07:33
【问题描述】:

(我在此示例中使用pyprocessing 模块,但如果您运行python 2.6 或使用multiprocessing backport,则用多处理替换处理可能会起作用)

我目前有一个程序可以侦听 unix 套接字(使用 processing.connection.Listener),接受连接并生成处理请求的线程。在某个时刻,我想优雅地退出该过程,但由于 accept() 调用被阻塞,我看不出有什么办法可以很好地取消它。我至少有一种方法可以在这里(OS X)工作,设置一个信号处理程序并从另一个线程发出进程信号,如下所示:

import processing
from processing.connection import Listener
import threading
import time
import os
import signal
import socket
import errno

# This is actually called by the connection handler.
def closeme():
    time.sleep(1)
    print 'Closing socket...'
    listener.close()
    os.kill(processing.currentProcess().getPid(), signal.SIGPIPE)

oldsig = signal.signal(signal.SIGPIPE, lambda s, f: None)

listener = Listener('/tmp/asdf', 'AF_UNIX')
# This is a thread that handles one already accepted connection, left out for brevity
threading.Thread(target=closeme).start()
print 'Accepting...'
try:
    listener.accept()
except socket.error, e:
    if e.args[0] != errno.EINTR:
        raise
# Cleanup here...
print 'Done...'

我想到的唯一另一种方法是深入连接(listener._listener._socket)并设置非阻塞选项......但这可能有一些副作用,而且通常真的很可怕。

有没有人有更优雅(甚至可能是正确的!)的方式来完成这个?它需要可移植到 OS X、Linux 和 BSD,但 Windows 可移植性等不是必需的。

说明: 谢谢大家!像往常一样,我原来的问题中的歧义被揭露了:)

  • 我需要在取消侦听后执行清理,而且我并不总是想真正退出该进程。
  • 我需要能够从不是从同一个父进程派生的其他进程访问此进程,这使得队列难以处理
  • 线程的原因是:
    • 他们访问共享状态。实际上或多或少是一个常见的内存数据库,所以我想它可以做得不同。
    • 我必须能够同时接受多个连接,但实际线程大部分时间都处于阻塞状态。每个接受的连接都会产生一个新线程;这是为了不阻止 I/O 操作上的所有客户端。

关于线程与进程,我使用线程来使我的阻塞操作非阻塞,并使用进程来启用多处理。

【问题讨论】:

    标签: python sockets multiprocessing


    【解决方案1】:

    这不就是 select 的用途吗?

    只有在选择表明它不会阻塞时才在套接字上调用接受...

    选择有超时,所以可以偶尔跳出来检查一下 如果是时候关闭....

    【讨论】:

    • 这本身并不是一个坏主意,但是 Listener 对象并没有暴露底层的套接字,我宁愿不要以如此大的方式违反 demeters 定律。然而事实证明,这正是我必须做的:)
    【解决方案2】:

    我以为我可以避免它,但似乎我必须这样做:

    from processing import connection
    connection.Listener.fileno = lambda self: self._listener._socket.fileno()
    
    import select
    
    l = connection.Listener('/tmp/x', 'AF_UNIX')
    r, w, e = select.select((l, ), (), ())
    if l in r:
      print "Accepting..."
      c = l.accept()
      # ...
    

    我知道这违反了 demeter 法则并引入了一些邪恶的猴子补丁,但似乎这将是实现这一点的最容易移植的方式。如果有人有更优雅的解决方案,我会很高兴听到它:)

    【讨论】:

      【解决方案3】:

      我是多处理模块的新手,但在我看来,混合处理模块和线程模块是违反直觉的,它们不是针对解决同一个问题吗?

      不管怎样,将你的监听函数封装到一个进程中怎么样?我不清楚这会如何影响您的其余代码,但这可能是一个更干净的选择。

      from multiprocessing import Process
      from multiprocessing.connection import Listener
      
      
      class ListenForConn(Process):
      
          def run(self):
              listener = Listener('/tmp/asdf', 'AF_UNIX')
              listener.accept()
      
              # do your other handling here
      
      
      listen_process = ListenForConn()
      listen_process.start()
      
      print listen_process.is_alive()
      
      listen_process.terminate()
      listen_process.join()
      
      print listen_process.is_alive()
      print 'No more listen process.'
      

      【讨论】:

      • 这或多或少是我所做的,但是您发送的是 SIGTERM 而不是 SIGPIPE,并且您不能在 run() 的上下文中进行清理,而是在信号处理程序中进行。跨度>
      • 感谢您的澄清,暂时没有想到,如果我想到什么我会更新我的答案。 ;)
      • @HenrikGustafsson 如果您将需要清理的所有内容保存到 ivars 中,您不能在 __del__ 中进行清理吗?
      【解决方案4】:

      可能不太理想,但您可以通过从信号处理程序或终止进程的线程向套接字发送一些数据来释放块。

      编辑:实现此功能的另一种方法可能是使用 Connection Queues,因为它们似乎支持超时(抱歉,我在第一次阅读时误读了您的代码)。

      【讨论】:

      • 因为它不是接收操作,我试图取消我不能真的只是将数据发送到连接(没有)并且那里还有一个竞争条件。其次,如果我真的在轮询数据,那会起作用,我不是;我正在等待新的连接。还是我误会了你?
      • 既然监听器正在接受指定套接字上的连接,不应该从另一个线程释放accept()连接到它(使用multiprocessing.connection.Client)?对于我的回复第二部分的含糊之处深表歉意,我会更正它。
      • 队列无法从外部工具等中发现。连接到套接字将释放 accepy(),但我认为会产生竞争。
      • @HenrikGustafsson 这真的会导致竞争状况吗?如果是这样,如果多个进程同时连接到一个侦听器,那将是同样危险的,我认为这是一个常见的用例,在文档中看不到任何反对的东西。 ______________________________________________________________________________ 否则,此解决方案是最可移植/最高级的(可在another answer 中找到实现)。
      【解决方案5】:

      我遇到了同样的问题。我通过向侦听器发送“停止”命令来解决它。在侦听器的主线程(处理传入消息的线程)中,每次收到新消息时,我只是检查它是否是“停止”命令并退出主线程。

      这是我正在使用的代码:

      def start(self):
          """
          Start listening
          """
          # set the command being executed
          self.command = self.COMMAND_RUN
      
          # startup the 'listener_main' method as a daemon thread
          self.listener = Listener(address=self.address, authkey=self.authkey)
          self._thread = threading.Thread(target=self.listener_main, daemon=True)
          self._thread.start()
      
      def listener_main(self):
          """
          The main application loop
          """
      
          while self.command == self.COMMAND_RUN:
              # block until a client connection is recieved
              with self.listener.accept() as conn:
      
                  # receive the subscription request from the client
                  message = conn.recv()
      
                  # if it's a shut down command, return to stop this thread
                  if isinstance(message, str) and message == self.COMMAND_STOP:
                      return
      
                  # process the message
      
      def stop(self):
          """
          Stops the listening thread
          """
          self.command = self.COMMAND_STOP
          client = Client(self.address, authkey=self.authkey)
          client.send(self.COMMAND_STOP)
          client.close()
      
          self._thread.join()
      

      我正在使用身份验证密钥来防止黑客通过从任意客户端发送停止命令来关闭我的服务。

      我的不是一个完美的解决方案。似乎更好的解决方案可能是修改multiprocessing.connection.Listener 中的代码,并添加stop() 方法。但是,这需要通过流程发送它以供 Python 团队批准。

      【讨论】:

        猜你喜欢
        • 2021-09-14
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2023-03-05
        • 1970-01-01
        • 2019-08-16
        • 2014-12-10
        • 2014-07-04
        相关资源
        最近更新 更多