【问题标题】:Killing child processes created in class __init__ in Python在 Python 中杀死类 __init__ 中创建的子进程
【发布时间】:2015-01-05 21:08:03
【问题描述】:

(Python 和 OO 的新手 - 如果我在这里很愚蠢,我提前道歉)

我正在尝试定义一个 Python 3 类,以便在创建实例时还会创建两个子进程。这些子进程在后台做一些工作(发送和监听 UDP 数据包)。子进程还需要相互通信并与实例通信(根据从 UDP 接收到的内容更新实例属性等)。

我正在使用 os.fork 创建子进程,因为我不明白如何使用子进程模块将多个文件描述符发送到子进程 - 也许这是我的问题的一部分。

我遇到的问题是如何在实例被销毁时杀死子进程。我的理解是我不应该在 Python 中使用析构函数,因为应该清理东西并由 Python 自动收集垃圾。无论如何,下面的代码在退出后让子进程继续运行。

这里的正确方法是什么?

import os
from time import sleep

class A:
    def __init__(self):
        sfp, pts = os.pipe() # senderFromParent, parentToSender
        pfs, stp = os.pipe() # parentFromSender, senderToParent
        pfl, ltp = os.pipe() # parentFromListener, listenerToParent
        sfl, lts = os.pipe() # senderFromListener, listenerToSender
        pid = os.fork()
        if pid:
            # parent
            os.close(sfp)
            os.close(stp)
            os.close(lts)
            os.close(ltp)
            os.close(sfl)
            self.pts = os.fdopen(pts, 'w') # allow creator of A inst to
            self.pfs = os.fdopen(pfs, 'r') # send and receive messages
            self.pfl = os.fdopen(pfl, 'r') # to/from sender and
        else:                              # listener processes
            # sender or listener
            os.close(pts)
            os.close(pfs)
            os.close(pfl)
            pid = os.fork()
            if pid:
                # sender
                os.close(ltp)
                os.close(lts)
                sender(self, sfp, stp, sfl)
            else:
                # listener
                os.close(stp)
                os.close(sfp)
                os.close(sfl)
                listener(self, ltp, lts)

def sender(a, sfp, stp, sfl):
    sfp = os.fdopen(sfp, 'r') # receive messages from parent
    stp = os.fdopen(stp, 'w') # send messages to parent
    sfl = os.fdopen(sfl, 'r') # received messages from listener
    while True:
        # send UDP packets based on messages from parent and process
        # responses from listener (some responses passed back to parent)
        print("Sender alive")
        sleep(1)

def listener(a, ltp, lts):
    ltp = os.fdopen(ltp, 'w') # send messages to parent
    lts = os.fdopen(lts, 'w') # send messages to sender
    while True:
        # listen for and process incoming UDP packets, sending some
        # to sender and some to parent
        print("Listener alive")
        sleep(1)

a = A()

运行上述产生:

Sender alive
Listener alive
Sender alive
Listener alive
...

【问题讨论】:

    标签: python subprocess fork kill


    【解决方案1】:

    尝试重新发明轮子是没有意义的。 subprocess 可以满足您的所有需求,甚至更多,尽管multiprocessing 只是简单的过程,所以我们将使用它。

    您可以使用multiprocessing.Pipe 创建连接,并可以在一对进程之间来回发送消息。您可以使管道“双工”,因此如果您需要,两端都可以发送和接收。您可以使用multiprocessing.Manager 在进程之间创建共享Namespace(在侦听器、发送者和父级之间共享状态)。使用multiprocessing.listmultiprocessing.dictmultiprocessing.Namespace 时会出现警告。分配给它们的任何可变对象在重新分配给托管对象之前都不会看到对该对象所做的更改。

    例如。

    namespace.attr = {}
    # change below not cascaded to other processes
    namespace.attr["key"] = "value"
    # force change to other processes
    namespace.attr = namespace.attr
    

    如果您需要让多个进程写入同一个属性,那么您将需要使用同步来防止一个进程的并发修改清除另一个进程所做的更改。

    示例代码:

    from multiprocessing import Process, Pipe, Manager
    
    class Reader:
    
        def __init__(self, writer_conn, namespace):
            self.writer_conn = writer_conn
            self.namespace = namespace
    
        def read(self):
            self.namespace.msgs_recv = 0
            with self.writer_conn:
                try:
                    while True:
                        obj = self.writer_conn.recv()
                        self.namespace.msgs_recv += 1
                        print("Reader got:", repr(obj))
                except EOFError:
                    print("Reader has no more data to receive")
    
    class Writer:
    
        def __init__(self, reader_conn, namespace):
            self.reader_conn = reader_conn
            self.namespace = namespace
    
        def write(self, msgs):
            self.namespace.msgs_sent = 0
            with self.reader_conn:
                for msg in msgs:
                    self.reader_conn.send(msg)
                    self.namespace.msgs_sent += 1
    
    def create_child_processes(reader, writer, msgs):
        p_write = Process(target=Writer.write, args=(writer, msgs))
        p_write.start()
    
        # This is very important otherwise reader will hang after writer has finished.
        # The order of this statement coming after p_write.start(), but after
        # p_read.start() is also important. Look up file descriptors and how they
        # are inherited by child processes on Unix and how a any valid fd to the
        # write side of a pipe will keep all read ends open
        writer.reader_conn.close()
    
        p_read = Process(target=Reader.read, args=(reader,))
        p_read.start()
    
        return p_read, p_write
    
    def run_mp_pipe():
    
        manager = Manager()
        namespace = manager.Namespace()
        read_conn, write_conn = Pipe()
    
        reader = Reader(read_conn, namespace)
        writer = Writer(write_conn, namespace)
    
        p_read, p_write = create_child_processes(reader, writer, 
            msgs=["hello", "world", {"key", "value"}])
    
        print("starting")
    
        p_write.join()
        p_read.join()
    
        print("done")
        print(namespace)
        assert namespace.msgs_sent == namespace.msgs_recv
    
    if __name__ == "__main__":
        run_mp_pipe()
    

    输出:

    starting
    Reader got: 'hello'
    Reader got: 'world'
    Reader got: {'key', 'value'}
    Reader has no more data to receive
    done
    Namespace(msgs_recv=3, msgs_sent=3)
    

    【讨论】:

      【解决方案2】:

      实际上,您应该使用析构函数。 Python 对象有一个__del__ 方法,该方法在对象被垃圾回收之前调用。

      在你的情况下,你应该定义

      def __del__(self):
         ...
      

      在您的class A 中,向您的子进程发送适当的终止信号。当然,不要忘记将子 PID 存储在父进程中。

      【讨论】:

      • 可以肯定的是,使用multiprocessing 模块会更好。您可以使用 Queue 对象而不是管道在父级和子级之间传递数据。
      • 是的,非常好。我已经用多处理和队列重新实现了它,现在它更简单了!也感谢 del 指针。我从其他人那里得到的印象是,在 Python 中析构函数并不那么重要,但我会按照你的建议将子进程清理包含在 del 中。
      【解决方案3】:

      按照here 的建议,您可以使用multiprocessing 模块和标志daemon=True 创建子进程。

      例子:

      from multiprocessing import Process
      
      p = Process(target=f, args=('bob',))
      p.daemon = True
      p.start()
      

      【讨论】:

        猜你喜欢
        • 2013-12-18
        • 2012-09-02
        • 2010-12-08
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多