【问题标题】:Get returncode of a detached subprocess?获取分离子进程的返回码?
【发布时间】:2020-10-04 22:56:09
【问题描述】:

我正在尝试为作业调度程序编写提交程序。由于我不知道作业何时到来以及作业将运行多长时间,所以我使用多处理为每个作业生成一个进程,并分离以处理下一个作业。同时这工作得很好,但是我想在工作完成后得到返回码,这可能吗?我尝试了几个子进程变体,但那些返回 RC 的进程在作业运行时阻塞了进程。

#!/usr/bin/python3
# coding=utf-8
import time
import multiprocessing
import subprocess

JobsList = []

def SubmitJob(jobname):
    """ Submit the next requested job """
    print(f"Starting job {jobname}...")
    JobDir ="/home/xxxxx/Jobs/"
    JobMem = "{}{}.sh".format(JobDir, jobname)
    SysoutFile = "./Sysout/{}.out".format(jobname)
    fh = open(SysoutFile, 'w')
    kwargs = {}
    kwargs.update(start_new_session=True)
    p = subprocess.Popen(JobMem, shell = False, stdout = fh, **kwargs)
    pid = p.pid
    print(f"Job {jobname} pid {pid} submitted...")

def PrepareSubmit():
    """ Create and start one process per job """
    jobs = []

    for Job in JobsList:
        process = multiprocessing.Process(target=SubmitJob, 
                                          args=(Job,))
        jobs.append(process)
        JobsList.remove(Job)

    for j in jobs:
        j.start()

    for j in jobs:
        j.join()

    print("All jobs submitted...")

def main():
    """ Check queue for new job requests """
    number_of_lines = 0
    jobs_list = []

    while 1:
        job_queue = open("/home/xxxxx/Development/Python/#Projects/Scheduler/jobs.que", 'r')
        lines = job_queue.readlines()

        if len(lines) > number_of_lines:
            jobs_list.append(lines[len(lines)-1])
            NewJob = lines[len(lines)-1][:-1]
            JobsList.append(NewJob)
            PrepareSubmit()
            number_of_lines = number_of_lines+1

        time.sleep(1)

if __name__ == "__main__":

    main()

main() 中的 while 循环仅用于测试目的。

谁能告诉我这是否可能以及如何?提前致谢。


这是给我返回码但在前一个作业完成之前不会发送作业的代码。所以如果我有一个长时间运行的作业,它会延迟运行作业的过程,我称之为阻塞。

def Submit(job):
    """ Submit the next requested job """
    print(f"Starting job {job}...")
    JobDir ="/home/uwe/Jobs/"
    JobMem = "{}{}.sh".format(JobDir, job)
    SysoutFile = "./Sysout/{}.out".format(job)
    fh = open(SysoutFile, 'w')
    kwargs = {}
    kwargs.update(start_new_session=True)
    p = subprocess.Popen(JobMem, shell = False, stdout = fh, **kwargs)
    pid = p.pid

    while p.poll() == None:
        a = p.poll()
        print(a)
        time.sleep(1)
    else:
        rc = p.returncode
        print(f"PID: {pid} rc: {rc}")

def main(): 
    JobsList = ['JOB90501','JOB00001','JOB00002','JOB00003']

    for Job in JobsList:
        Submit(Job)

Roy,这是你最后一次提示后我当前的代码:

def SubmitJob(jobname):
    """ Submit the next requested job """
    JobDir ="/home/uwe/Jobs/"
    JobMem = "{}{}.sh".format(JobDir, jobname)
    SysoutFile = "./Sysout/{}.out".format(jobname)
    fh = open(SysoutFile, 'w')
    kwargs = {}
    kwargs.update(start_new_session=True)
    p = subprocess.Popen(JobMem, shell = False, stdout = fh, **kwargs)
    ProcessList[p] = p.pid
    print(f"Started job {jobname} - PID: {p.pid}")

def main(): 
    c_JobsList = ['JOB00001','JOB00002','JOB00003']

    for Job in c_JobsList:
        SubmitJob(Job)

    for p, pid in ProcessList.items():
        RcFile = "./Sysout/{}.rc".format(pid)
        f = open(RcFile, 'w')
        while p.poll() == None:
            a = p.poll()
            time.sleep(1)
        else:
            rc = p.returncode
            f.writelines(str(rc))
            print(f"PID: {pid} rc: {rc}")

        f.close()

和输出:

Started job JOB00001 - PID: 5426
Started job JOB00002 - PID: 5427
Started job JOB00003 - PID: 5429
PID: 5426 rc: 0
PID: 5427 rc: 0
PID: 5429 rc: 8

【问题讨论】:

    标签: python multiprocessing subprocess


    【解决方案1】:

    编辑(下面的原始答案供将来参考)

    用于此目的的 natuaram 是 Popen.poll,但显然它在某些情况下不起作用(请参阅 https://lists.gt.net/python/bugs/633489)。我想提出的解决方案是使用具有非常短超时的Popen.wait,如以下代码示例所示:

    import subprocess
    import time
    
    p = subprocess.Popen(["/bin/sleep", "3"])
    print(f"Created process {p.pid}")
    
    count = 0
    while True: 
        try:
            ret = p.wait(.001) # wait for 1 ms
            print(f"Got a return code {ret}")
            break
        except subprocess.TimeoutExpired as e: 
            print("..", end = "")
    
        time.sleep(.5)
        print(f"Still waiting, count is {count}")
        count += 1
    
    print ("Done!")
    

    我得到的输出是:

    Created process 30040
    ..Still waiting, count is 0
    ..Still waiting, count is 1
    ..Still waiting, count is 2
    ..Still waiting, count is 3
    ..Still waiting, count is 4
    ..Still waiting, count is 5
    Got a return code 0
    Done
    

    最初的想法 - Popen.poll

    您应该使用的方法是Popen.poll (documentation)。它返回进程的退出状态,如果它仍在运行,则返回None

    要使用它,您必须保留在调用 subprocess.Popen 时获得的“popen”对象,稍后在这些对象上使用 poll

    【讨论】:

    • 罗伊,感谢您的回答。这是我尝试过的变体之一。当我在 Popen 之后 poll() 时,进程阻塞并且下一个作业只能在前一个作业完成后开始。我想立即开始下一个过程,稍后再拿到 rc。
    • 嗯。显然,民意调查 (lists.gt.net/python/bugs/633489) 存在问题。我有另一个使用等待超时的解决方案 - 我会在一分钟内添加代码。
    • @UweB - 这能解决您面临的挑战吗?
    • 罗伊,这也阻塞了子进程进程,使多进程无法在job1之后立即提交job2。
    • 您使用的是什么操作系统和 Python 版本?
    猜你喜欢
    • 2019-08-04
    • 1970-01-01
    • 2010-12-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-09-09
    • 2018-12-01
    • 2012-02-20
    相关资源
    最近更新 更多