【问题标题】:How can Python continuously fill multiple threads of subprocess?Python如何连续填满子进程的多个线程?
【发布时间】:2011-01-26 07:06:11
【问题描述】:

我正在 Linux 上运行一个应用程序 foo。在 Bash 脚本/终端提示符下,我的应用程序使用以下命令运行多线程:

$ foo -config x.ini -threads 4 < inputfile

系统监视器和顶部报告 foo 平均大约 380% 的 CPU 负载(四核机器)。我在 Python 2.6x 中重新创建了这个功能:

proc = subprocess.Popen("foo -config x.ini -threads 4", \
        shell=True, stdin=subprocess.PIPE, \
        stdout=subprocess.PIPE, stderr=subprocess.PIPE)
mylist = ['this','is','my','test','app','.']
for line in mylist:
    txterr = ''
    proc.stdin.write(line.strip()+'\n')
    while not proc.poll() and not txterr.count('Finished'):
        txterr += subproc.stderr.readline()
    print proc.stdout.readline().strip(),

Foo 运行速度较慢,top 报告 CPU 负载为 100%。 Foo 在 shell=False 时也能正常运行,但仍然很慢:

proc = subprocess.Popen("foo -config x.ini -threads 4".split(), \
        shell=False, stdin=subprocess.PIPE, \
        stdout=subprocess.PIPE, stderr=subprocess.PIPE)

有没有办法让 Python 子进程不断填充所有线程?

【问题讨论】:

  • 您是否尝试过使用 python 代码实际启动一个新线程,并在该新线程中执行 subprocess.Popen?
  • not txterr.count('Finished') 确保进程一次不能处理多个输入行。是你想要的吗?
  • @塞巴斯蒂安。我想让 foo 一直忙于所有四个线程。下面的 sarnold 之间的交换显示 foo 确实在运行 4 个线程,但每个线程只运行 25% 的负载。使用 Bash 将行连接到 foo 比 Python 循环更有效。foo 的输出非常结构化,带有 stderr 上的状态消息。输出仅在 stderr 报告“完成”后出现在 stdout 上。如果我不检索 stderr 缓冲区,整个过程会在大约 20-30 行处理后停止。
  • 这不是 Python 循环效率的问题。在 stderr 中遇到'Finished' 之前,您不会向foo 进程写入任何内容。为了避免由于操作系统管道缓冲区填满而导致的死锁,请使用proc.communicate() 或使用我的回答中的线程stackoverflow.com/questions/4802119/…

标签: python multithreading subprocess


【解决方案1】:

当您像这样使用 Popen 调用命令时,无论是从 Python 还是从 shell 调用它都没有关系。启动进程的是“foo”命令,而不是 Python。

所以答案是“是的,从 Python 调用子进程可以是多线程的。”

【讨论】:

    【解决方案2】:

    首先,你猜它是单线程的只是因为它使用了 100% 的 CPU 而不是 400%?

    最好使用top 程序检查它已经启动了多少线程,点击H 键以显示线程。或者,使用ps -eLf 并确保NLWP 列显示多个线程。

    Linux 对 CPU 的亲和力非常敏感;默认情况下,调度程序不会将进程从它使用的最后一个处理器移开。这意味着,如果您的程序的所有四个线程都在单个处理器上启动,它们将永远共享处理器。您必须使用taskset(1) 之类的工具来强制在必须在不同处理器上长时间运行的进程上设置 CPU 亲和性。例如,taskset -p &lt;pid1&gt; -c 0 ; taskset -p &lt;pid2&gt; -c 1 ; taskset -p &lt;pid3&gt; -c 2 ; taskset -p &lt;pid4&gt; -c 3

    您可以使用taskset -p &lt;pid&gt; 检索关联,以了解当前设置的关联。

    (有一天我想知道为什么我的折叠在家进程使用的 CPU 时间比我预期的要少得多,我发现该死的调度程序已将三个 FaH 任务放在一个超线程兄弟上,而将第四个 FaH 任务放在另一个 HT 兄弟上 在同一个核心上。其他三个处理器处于空闲状态。(第一个核心也很热,其他三个核心冷了四五度。呵呵。)

    【讨论】:

    • 谢谢萨诺德。我使用 top -- hit H 并发现 foo 正在运行 4 个线程。在使用 Bash/terminal 的情况下,每个线程都以接近 100% 的负载运行。在 Python 循环的情况下,每个线程仅以 25% 的负载运行。这似乎是 foo 的队列问题,而不是 Python 或 Linux 的线程问题。关于如何在传递给 foo 之前在 Python 中排队几行的任何建议?谢谢。
    • @tahoar 使用taskset -p &lt;pid&gt; 检查每个 25% 的负载进程。我敢打赌,您会发现它们都被塞在一个处理器上,并且如果您将它们强制到自己的处理器上,它们就会满负荷运行。我不确定“在 python 中排队几行”是什么意思..
    • 再次感谢。你是对的,所有进程都以关联掩码 = F 运行。我强迫它们使用特定的 CPU,例如taskset -p 0x0000000x - 并通过 taskset -p 验证所有都在不同的 CPU 上。尽管如此,即使在不同的 CPU 上运行,每个线程的负载也会达到 25%。 RE "queue"... 从 shell 中, foo 程序从管道(队列)收集数据行并将它们解析到线程。我的 Python 代码的行为显然与 shell 不同,它强制每一行在管道下一行数据之前完成。
    • @tahoar 现在我明白了。 :) shell 只是为您的文件设置重定向,并将文件描述符传递给foo。你可以尝试从你的脚本中填充一个文件,并将文件描述符交给foo,让它尽可能快地吞下输入。或者尝试input = "\n".join(mylist) + "\n" ; proc.stdin.write(input) 并避免循环。
    【解决方案3】:

    如果您的 python 脚本没有足够快地提供 foo 进程,那么您可以将读取 stdout、stderr 的任务转移到线程:

    from Queue import Empty, Queue
    from subprocess import PIPE, Popen
    from threading import Thread
    
    def start_thread(target, *args):
        t = Thread(target=target, args=args)
        t.daemon = True
        t.start()
        return t
    
    def signal_completion(queue, stderr):
        for line in iter(stderr.readline, ''):
            if 'Finished' in line:
               queue.put(1) # signal completion
        stderr.close()
    
    def print_stdout(q, stdout):
        """Print stdout upon receiving a signal."""
        text = []
        for line in iter(stdout.readline, ''):
            if not q.empty():
               try: q.get_nowait()               
               except Empty:
                   text.append(line) # queue is empty
               else: # received completion signal              
                   print ''.join(text),
                   text = []
                   q.task_done()
            else: # buffer stdout until the task is finished
                text.append(line)
        stdout.close()
        if text: print ''.join(text), # print the rest unconditionally
    
    queue = Queue()
    proc = Popen("foo -config x.ini -threads 4".split(), bufsize=1,
                 stdin=PIPE, stdout=PIPE, stderr=PIPE)
    threads =  [start_thread(print_stdout, queue, proc.stdout)]
    threads += [start_thread(signal_completion, queue, proc.stderr)]
    
    mylist = ['this','is','my','test','app','.']
    for line in mylist:
        proc.stdin.write(line.strip()+'\n')
    proc.stdin.close()
    proc.wait()
    for t in threads: t.join() # wait for stdout
    

    【讨论】:

    • 谢谢你,塞巴斯蒂安。我正在解决我的工作。我会在系统完成后立即尝试。
    • @Sebastian - 这就像一个冠军。我还了解到我真的不需要队列和信令,但我把它留在了里面。这是一个完美的解决方案,应用程序正在全速运行。谢谢!
    • @tahoar:如果您发现有用的答案,请单击答案左侧的向上箭头。如果您认为答案是可以接受的(它完全回答了您的问题)并且是所有答案中最好的,那么请勾选(接受)。 stackoverflow.com/faq#howtoask
    猜你喜欢
    • 2013-01-10
    • 1970-01-01
    • 2014-01-26
    • 1970-01-01
    • 2020-12-31
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-05-08
    相关资源
    最近更新 更多