【问题标题】:Python multiprocessing : Killing a process gracefullyPython多处理:优雅地杀死一个进程
【发布时间】:2014-10-29 10:02:18
【问题描述】:

import multiprocessing
import schedule


def worker():
     #do some stuff


def sched(argv):
    schedule.every(0.01).minutes.do(worker)          
    while True:
        schedule.run_pending()


processs = []
..
..
p = multiprocessing.Process(target=sched,args)
..
..
processs.append(p)

for p in processs:
    p.terminate()

优雅地杀死进程列表?

如果不是,最简单的方法是什么?

目标是将配置文件重新加载到内存中,所以我想杀死所有子进程并创建其他子进程,后者将读取新的配置文件。

编辑:添加了更多代码来解释我正在运行while True 循环

编辑:这是@dano suggestion之后的新代码

def get_config(self):
        from ConfigParser import SafeConfigParser
..
        return argv

def sched(self, args, event):
#schedule instruction:
        schedule.every(0.01).minutes.do(self.worker,args)
        while not  event.is_set():
                schedule.run_pending()                                                                    

def dispatch_processs(self, conf):
        processs = []
        event = multiprocessing.Event()

        for conf in self.get_config():
                process = multiprocessing.Process(target=self.sched,args=( i for i in conf), kwargs={'event' : event})
                processs.append((process, event)
return processs

def start_process(self, process):
        process.start()

def gracefull_process(self, process):
        process.join()

def main(self):
        while True:
                processs = self.dispatch_processs(self.get_config())
                print ("%s processes running " % len(processs) )

                for process, event in processs:                                                               

                        self.start_process(process)
                        time.sleep(1)
                        event.set()
                        self.gracefull_process(process)

代码的好处是我可以编辑配置文件,并且进程也会重新加载它的配置。

问题是只有第一个进程运行,其他进程被忽略。

编辑:这救了我的命,在 schedule() 中使用 while True 不是一个好主意,所以我改为设置 refresh_time

def sched(self, args, event):

    schedule.every(0.01).minutes.do(self.worker,args)
    for i in range(refresh_time):
            schedule.run_pending() 
            time.sleep(1)

def start_processs(self, processs):
        for p,event in processs:
                if not p.is_alive():
                        p.start()
                time.sleep(1)
                event.set()

        self.gracefull_processs(processs)

def gracefull_processs(self, processs):
        for p,event in processs:
                p.join()
        processs = self.dispatch_processs(self.get_config())
        self.start_processs(processs)

def main(self):

        while True:
                processs = self.dispatch_processs(self.get_config())

                self.start_processs(processs)
                break
        print ("Reloading function main")
        self.main()

【问题讨论】:

  • gracefully 是什么意思?
  • 当工作人员在远程服务器 (scp) 上复制文件时,除非完成,否则终止它不应终止复制操作。
  • 这太难了。我认为您可以保留一个列表来记录每个子进程的状态,如果它正在执行您不想中断的操作,则状态为忙。
  • 您能告诉我们您的工作逻辑实际上是什么样的吗?使用multiprocessing.Event 通知工作人员关闭可能是最简单的。
  • context : 执行一些 python/bash 脚本(进程之间没有通信),每个子进程都应该处理一个工作..

标签: python linux multiprocessing python-multiprocessing


【解决方案1】:

如果您不介意只在worker 完成所有工作后中止,添加multiprocessing.Event 以优雅地处理退出非常简单:

import multiprocessing
import schedule


def worker():
     #do some stuff

def sched(argv, event=None):
    schedule.every(0.01).minutes.do(worker)          
    while not event.is_set():  # Run until we're told to shut down.
        schedule.run_pending()

processes = []
..
..
event = multiprocessing.Event()
p = multiprocessing.Process(target=sched,args, kwargs={'event' : event})
..
..
processes.append((p, event))

# Tell all processes to shut down
for _, event in processes:
    event.set()

# Now actually wait for them to shut down
for p, _ in processes:
    p.join()

【讨论】:

  • 感谢它有效,但我添加了for p, _ in processs: p.start() 并将event 添加到sched() 的参数列表中。该程序启动并在此之后退出。实际上,事件如何帮助我杀死并重新加载所有进程?
  • @4m1nh4j1 好吧,您可能希望将启动进程的代码移动到函数中。然后,当您有一些配置文件更改要推送时,您为所有进程调用event.set(),等待它们退出,然后重新运行进程启动脚本。实际上,根据读取配置文件的方式,您也许可以使用event 简单地告诉每个工作人员重新读取配置文件,而不必完全杀死他们。
  • 谢谢,我制作并编辑了我的代码,因为我部分解决了问题,但我仍然有一些问题。
  • @4m1nh4j1 您需要为您开始的每个Process 使用不同的event。在for conf in self.get_config(): 循环内移动event = multiprocessing.Event() 行。
  • 我找到了解决方案(cf update),我注意到我使用了您的最后一条评论,但我从一开始就使用您的回答来理解和解决问题。非常感谢您的宝贵时间。
【解决方案2】:

答:不,.terminate() 和 SIG_* 方法都相当残酷

如您的帖子中所述,需要安排任何流程的优雅结束,而是应该有一些“软信号”层,允许在两端发送/接收智能信号而不依赖关于 O/S 解释(O/S 对当前正在处理的各个工作单元的应用程序级上下文和状态一无所知)。

您可能想在 >>> https://stackoverflow.com/a/25373416/3666197

引用的链接中了解这种软信令方法

【讨论】:

    【解决方案3】:

    不,根据您自己对优雅的定义,它不会终止进程 - 除非您采取一些额外的步骤。

    假设您使用的是 unix 系统(因为您提到了 scp),terminate 会向子进程发送 SIGTERM 信号。您可以在子进程中捕获此信号,并采取相应措施(等待scp 完成):

    import signal
    
    def on_terminate(signum, stack):
        wait_for_current_scp_operation()
    
    signal.signal(signal.SIGTERM, on_terminate)
    

    Here's a tutorial about handling and sending signals

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2015-01-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-07-19
      • 1970-01-01
      • 2014-11-19
      • 1970-01-01
      相关资源
      最近更新 更多