【问题标题】:ThreadPoolExecutor KeyboardInterruptThreadPoolExecutor 键盘中断
【发布时间】:2021-04-26 03:31:59
【问题描述】:

我有以下代码,它使用 concurrent.futures.ThreadPoolExecutor 以计量方式启动另一个程序的进程(一次不超过 30 个)。如果我 ctrl-C python 进程,我还希望能够停止所有工作。这段代码有一个警告:我必须按 ctrl-C 两次。我第一次发送 SIGINT 时,什么也没有发生;第二次,我看到“向进程发送 SIGKILL”,进程死了,它起作用了。我的第一个 SIGINT 发生了什么?

execution_list = [['prog', 'arg1'], ['prog', 'arg2']] ... etc
processes = []

def launch_instance(args):
    process = subprocess.Popen(args)
    processes.append(process)
    process.wait()

try:
    with concurrent.futures.ThreadPoolExecutor(max_workers=30) as executor:
        results = list(executor.map(launch_instance, execution_list))
except KeyboardInterrupt:
    print('sending SIGKILL to processes')
    for p in processes:
        if p.poll() is None: #If process is still alive
            p.send_signal(signal.SIGKILL)

【问题讨论】:

    标签: python threadpoolexecutor concurrent.futures


    【解决方案1】:

    我在尝试解决类似问题时偶然发现了您的问题。不是 100% 确定它会解决您的用例(我没有使用子流程),但我认为它会。

    只要作业仍在运行,您的代码就会保留在 executor 的上下文管理器中。我有根据的猜测是,第一个 KeyboardInterrupt 将被 ThreadPoolExecutor 捕获,其默认行为是不启动任何新作业,等到当前作业完成,然后清理(并可能重新引发 KeyboardInterrupt)。但是这些进程可能运行时间很长,所以你不会注意到。然后第二个 KeyboardInterrupt 中断此错误处理。

    我如何解决我的问题(独立线程中的无限后台进程)是使用以下代码:

    from concurrent.futures import ThreadPoolExecutor
    import signal
    import threading
    from time import sleep
    
    
    def loop_worker(exiting):
        while not exiting.is_set():
            try:
                print("started work")
                sleep(10)
                print("finished work")
            except KeyboardInterrupt:
                print("caught keyboardinterrupt")  # never caught here. just for demonstration purposes
    
    
    def loop_in_worker():
        exiting = threading.Event()
        def signal_handler(signum, frame):
            print("Setting exiting event")
            exiting.set()
    
        signal.signal(signal.SIGTERM, signal_handler)
        with ThreadPoolExecutor(max_workers=1) as executor:
            executor.submit(loop_worker, exiting)
    
            try:
                while not exiting.is_set():
                    sleep(1)
                    print('waiting')
            except KeyboardInterrupt:
                print('Caught keyboardinterrupt')
                exiting.set()
        print("Main thread finished (and thus all others)")
    
    
    if __name__ == '__main__':
        loop_in_worker()
    

    它使用Event 向线程发出信号,告知它们应该停止正在执行的操作。在主循环中,有一个循环只是为了保持忙碌并检查任何异常。请注意,此循环在 ThreadPoolExecutor 的上下文中。

    作为奖励,它还通过使用相同的exiting 事件来处理 SIGTERM 信号。

    如果您在 processes.append(process)process.wait() 之间添加一个循环来检查信号,那么它也可能会解决您的用例。这取决于您想对正在运行的进程执行什么操作。

    如果您从命令行运行我的脚本并按 ctrl-C,您应该会看到如下内容:

    started work
    waiting
    waiting
    ^CCaught keyboardinterrupt
    
       # some time passes here
    
    finished work
    Main thread finished (and thus all others)
    

    我的解决方案的灵感来自this blog post

    【讨论】:

    • 我有一个用例,我在 threadpoolexecutor 中点击 API,即使按两次 Ctrl+C 也是如此。获得已经发出的 APi 请求的结果需要时间,然后它会停止。有什么方法可以立即退出而不等待已经提出的请求?另外,这是应该在上游提出的问题吗?
    • 嗨简单代码。我不认为这里描述的东西是问题,而是设计成这样。因此,在上游提出问题可能不是这里需要的。关于您的问题:我不确定,但我认为如果您发布一个单独的问题,其中包含有关您的设置和您尝试过的更多详细信息,那么有人将能够回答它。您尝试的代码越完整,更有经验的人越容易看到正在发生的事情并可以使用它来完成。最好的,弗洛里安K
    猜你喜欢
    • 1970-01-01
    • 2020-11-22
    • 2015-08-22
    • 2014-08-11
    • 1970-01-01
    • 1970-01-01
    • 2010-11-24
    • 2013-11-02
    • 2013-04-30
    相关资源
    最近更新 更多