【问题标题】:Python subprocess: callback when cmd exitsPython子进程:cmd退出时的回调
【发布时间】:2011-02-04 15:01:15
【问题描述】:

我目前正在使用subprocess.Popen(cmd, shell=TRUE)启动一个程序

我对 Python 还很陌生,但“感觉”应该有一些 api 可以让我做类似的事情:

subprocess.Popen(cmd, shell=TRUE,  postexec_fn=function_to_call_on_exit)

我这样做是为了让function_to_call_on_exit 可以在知道 cmd 已退出的基础上做一些事情(例如,记录当前正在运行的外部进程的数量)

我假设我可以相当简单地将子进程包装在一个将线程与Popen.wait() 方法相结合的类中,但由于我还没有在 Python 中完成线程,看起来这对于 API 的存在来说可能已经足够普遍了,我想我会先尝试找到一个。

提前致谢:)

【问题讨论】:

标签: python callback subprocess exit


【解决方案1】:

AFAIK 没有这样的 API,至少在 subprocess 模块中没有。您需要自己滚动一些东西,可能使用线程。

【讨论】:

    【解决方案2】:

    你是对的 - 没有很好的 API。您的第二点也是正确的 - 设计一个使用线程为您执行此操作的函数非常容易。

    import threading
    import subprocess
    
    def popen_and_call(on_exit, popen_args):
        """
        Runs the given args in a subprocess.Popen, and then calls the function
        on_exit when the subprocess completes.
        on_exit is a callable object, and popen_args is a list/tuple of args that 
        would give to subprocess.Popen.
        """
        def run_in_thread(on_exit, popen_args):
            proc = subprocess.Popen(*popen_args)
            proc.wait()
            on_exit()
            return
        thread = threading.Thread(target=run_in_thread, args=(on_exit, popen_args))
        thread.start()
        # returns immediately after the thread starts
        return thread
    

    即使在 Python 中线程也很容易,但请注意,如果 on_exit() 计算量很大,您需要将其放在单独的进程中,而不是使用多处理(这样 GIL 不会减慢您的程序速度)。这实际上非常简单 - 您基本上可以将所有对 threading.Thread 的调用替换为 multiprocessing.Process,因为它们遵循(几乎)相同的 API。

    【讨论】:

    • 谢谢。这就是我要做的。不幸的是,有一个问题我无法在简单的场景中复制,但可以在我的实际程序中复制:(如果我使用线程,而不是多处理,proc.wait() 不会返回,直到我对子进程执行其他操作。如果我使用多处理它完美地工作。但是,使用多处理我不得不对共享内存大惊小怪。我现在已经做到了,但我不确定我是否对开销感到满意。任何想法为什么子进程在线程中的行为可能与过程(改变我使用哪一个而没有其他原因/解决问题)?
    • @谁对不起-我不知道为什么线程不起作用,或者为什么在这种情况下它的工作方式与多线程不同。这似乎很奇怪。共享内存的开销是性能瓶颈,还是丑陋?
    • @DanielG 您可以考虑采用 Phil 回答的更改,以便维护 Popen 界面。
    【解决方案3】:

    我遇到了同样的问题,并使用multiprocessing.Pool 解决了它。涉及两个 hacky 技巧:

    1. 设置池 1 的大小
    2. 在长度为 1 的可迭代对象内传递可迭代参数

    结果是一个在完成时执行回调的函数

    def sub(arg):
        print arg             #prints [1,2,3,4,5]
        return "hello"
    
    def cb(arg):
        print arg             # prints "hello"
    
    pool = multiprocessing.Pool(1)
    rval = pool.map_async(sub,([[1,2,3,4,5]]),callback =cb)
    (do stuff) 
    pool.close()
    

    就我而言,我也希望调用是非阻塞的。效果很好

    【讨论】:

    【解决方案4】:

    我受到 Daniel G. answer 的启发并实现了一个非常简单的用例 - 在我的工作中,我经常需要使用不同的参数重复调用相同的(外部)进程。我已经破解了一种方法来确定每个特定调用何时完成,但现在我有一种更简洁的方法来发出回调。

    我喜欢这个实现,因为它非常简单,但它允许我向多个处理器发出异步调用(注意我使用multiprocessing 而不是threading)并在完成时收到通知。

    我测试了示例程序,效果很好。请随意编辑并提供反馈。

    import multiprocessing
    import subprocess
    
    class Process(object):
        """This class spawns a subprocess asynchronously and calls a
        `callback` upon completion; it is not meant to be instantiated
        directly (derived classes are called instead)"""
        def __call__(self, *args):
        # store the arguments for later retrieval
        self.args = args
        # define the target function to be called by
        # `multiprocessing.Process`
        def target():
            cmd = [self.command] + [str(arg) for arg in self.args]
            process = subprocess.Popen(cmd)
            # the `multiprocessing.Process` process will wait until
            # the call to the `subprocess.Popen` object is completed
            process.wait()
            # upon completion, call `callback`
            return self.callback()
        mp_process = multiprocessing.Process(target=target)
        # this call issues the call to `target`, but returns immediately
        mp_process.start()
        return mp_process
    
    if __name__ == "__main__":
    
        def squeal(who):
        """this serves as the callback function; its argument is the
        instance of a subclass of Process making the call"""
        print "finished %s calling %s with arguments %s" % (
            who.__class__.__name__, who.command, who.args)
    
        class Sleeper(Process):
        """Sample implementation of an asynchronous process - define
        the command name (available in the system path) and a callback
        function (previously defined)"""
        command = "./sleeper"
        callback = squeal
    
        # create an instance to Sleeper - this is the Process object that
        # can be called repeatedly in an asynchronous manner
        sleeper_run = Sleeper()
    
        # spawn three sleeper runs with different arguments
        sleeper_run(5)
        sleeper_run(2)
        sleeper_run(1)
    
        # the user should see the following message immediately (even
        # though the Sleeper calls are not done yet)
        print "program continued"
    

    样本输出:

    program continued
    finished Sleeper calling ./sleeper with arguments (1,)
    finished Sleeper calling ./sleeper with arguments (2,)
    finished Sleeper calling ./sleeper with arguments (5,)
    

    下面是sleeper.c的源代码——我的示例“耗时”外部进程

    #include<stdlib.h>
    #include<unistd.h>
    
    int main(int argc, char *argv[]){
      unsigned int t = atoi(argv[1]);
      sleep(t);
      return EXIT_SUCCESS;
    }
    

    编译为:

    gcc -o sleeper sleeper.c
    

    【讨论】:

    • 谢谢!缩进有一些问题,但在那之后,当客户端发出“运行”命令时,这正是我需要在 XMLRPC 服务器中分离异步进程的内容。
    【解决方案5】:

    Python 3.2 中有 concurrent.futures 模块(可通过 pip install futures 获得旧版 Python

    pool = Pool(max_workers=1)
    f = pool.submit(subprocess.call, "sleep 2; echo done", shell=True)
    f.add_done_callback(callback)
    

    回调将在调用f.add_done_callback()的同一进程中调用。

    完整程序

    import logging
    import subprocess
    # to install run `pip install futures` on Python <3.2
    from concurrent.futures import ThreadPoolExecutor as Pool
    
    info = logging.getLogger(__name__).info
    
    def callback(future):
        if future.exception() is not None:
            info("got exception: %s" % future.exception())
        else:
            info("process returned %d" % future.result())
    
    def main():
        logging.basicConfig(
            level=logging.INFO,
            format=("%(relativeCreated)04d %(process)05d %(threadName)-10s "
                    "%(levelname)-5s %(msg)s"))
    
        # wait for the process completion asynchronously
        info("begin waiting")
        pool = Pool(max_workers=1)
        f = pool.submit(subprocess.call, "sleep 2; echo done", shell=True)
        f.add_done_callback(callback)
        pool.shutdown(wait=False) # no .submit() calls after that point
        info("continue waiting asynchronously")
    
    if __name__=="__main__":
        main()
    

    输出

    $ python . && python3 .
    0013 05382 MainThread INFO  begin waiting
    0021 05382 MainThread INFO  continue waiting asynchronously
    done
    2025 05382 Thread-1   INFO  process returned 0
    0007 05402 MainThread INFO  begin waiting
    0014 05402 MainThread INFO  continue waiting asynchronously
    done
    2018 05402 Thread-1   INFO  process returned 0
    

    【讨论】:

    【解决方案6】:

    我修改了 Daniel G 的答案,将 subprocess.Popen argskwargs 作为它们本身而不是作为单独的元组/列表传递,因为我想将关键字参数与 subprocess.Popen 一起使用。

    就我而言,我有一个方法 postExec(),我想在 subprocess.Popen('exe', cwd=WORKING_DIR) 之后运行它

    使用下面的代码,它就变成了popenAndCall(postExec, 'exe', cwd=WORKING_DIR)

    import threading
    import subprocess
    
    def popenAndCall(onExit, *popenArgs, **popenKWArgs):
        """
        Runs a subprocess.Popen, and then calls the function onExit when the
        subprocess completes.
    
        Use it exactly the way you'd normally use subprocess.Popen, except include a
        callable to execute as the first argument. onExit is a callable object, and
        *popenArgs and **popenKWArgs are simply passed up to subprocess.Popen.
        """
        def runInThread(onExit, popenArgs, popenKWArgs):
            proc = subprocess.Popen(*popenArgs, **popenKWArgs)
            proc.wait()
            onExit()
            return
    
        thread = threading.Thread(target=runInThread,
                                  args=(onExit, popenArgs, popenKWArgs))
        thread.start()
    
        return thread # returns immediately after the thread starts
    

    【讨论】:

      【解决方案7】:

      concurrent.futures (https://docs.python.org/3/library/concurrent.futures.html) 中还有自 3.2 以来的 ProcesPoolExecutor。用法与上面提到的 ThreadPoolExecutor 一样。通过 executor.add_done_callback() 附加退出回调。

      【讨论】:

        【解决方案8】:

        在 POSIX 系统上,当子进程退出时,父进程会收到一个 SIGCHLD 信号。要在子进程命令退出时运行回调,请处理父进程中的 SIGCHLD 信号。像这样的:

        import signal
        import subprocess
        
        process = subprocess.Popen('mycmd', shell=TRUE)
        
        def sigchld_handler(signum, frame):
            # This is run when the child exits.
            # Do something here ...
            pass
        
        signal.signal(signal.SIGCHLD, sigchld_handler)
        

        请注意,这不适用于 Windows。

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2012-02-15
          • 1970-01-01
          • 2017-12-12
          • 2018-09-11
          • 2021-05-01
          相关资源
          最近更新 更多