【问题标题】:Print multithread subprocess打印多线程子进程
【发布时间】:2013-07-19 09:47:39
【问题描述】:

美好的一天!

我有一个 python 脚本,它创建一个文件列表并在 multiprocess.Pool.map 和线程函数中处理它。线程函数使用外部可执行文件并通过 subprocess.check_call 调用它。这个外部可执行文件将一些信息打印到标准输出。

所以我在阅读此输出时遇到问题 - 有时它很混乱,我无法从中获得任何有用的信息。我已经阅读了 python 中的打印和多线程,但我认为这不完全是我的问题,因为我没有在我的脚本中明确调用 print 函数。

我该如何解决这个问题?谢谢。

另外,我注意到,如果我将脚本的输出重定向到文件,输出根本不会混乱。

[更新]:

如果我运行脚本,这可以正常工作:python mp.py > mp.log

import time, argparse, threading, sys
from os import getenv
from multiprocessing import Pool

def f(x):
    cube = x*x*x
    print '|Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut %d|'%(cube)
    return cube

if __name__ == '__main__':

    #file = open('log.txt', 'w+')
    parser = argparse.ArgumentParser(description='cube', usage='%(prog)s [options] -n')
    parser.add_argument('-n', action='store', help='number', dest='n', default='10000', metavar = '')
    args = parser.parse_args()

    pool = Pool()

    start = time.time()
    result = pool.map(f, range(int(args.n)))
    end = time.time()
    print (end - start)
    #file.close()

【问题讨论】:

  • 原因是因为不同的进程打印到同一个终端,所以你得到一个线程的一行,而不是第二个线程的一行,而不是第一个线程的另一行,(或者至少就是这样我认为您的问题与“混乱的输出”有关)
  • 我该如何解决这个问题?锁没用。我还尝试将所有 print expr 替换为 sys.stdout.write,它也没有帮助。
  • 在这种情况下,我想解决方案是让您的外部可执行文件将其输出打印到每个线程的 log.txt 中,这样它就可以工作
  • 好的,谢谢,我试试
  • 看看是否能解决问题

标签: python multithreading python-2.7 subprocess


【解决方案1】:

为避免来自多个并发子进程的混合输出,您可以将每个子进程的输出重定向到不同的文件:

from multiprocessing.dummy import Pool # use threads
from subprocess import call

def run(i):
    with open('log%d.txt' % i, 'wb') as file:
        return call(["cmd", str(i)], stdout=file)

return_codes = Pool(4).map(run, range(10)) # run 10 subprocesses, 4 at a time

或者收集输出并从代码中的单个线程打印出来:

from functools import partial
from multiprocessing.dummy import Pool, Queue, Process # use threads
from subprocess import Popen, PIPE

def run(i, output):
    p = Popen(["cmd", str(i)], stdout=PIPE, bufsize=1)
    for line in iter(p.stdout.readline, b''):
        output((p.pid, line)) # collect the output 
    p.stdout.close()
    return p.wait()

def print_output(q):
    for pid, line in iter(q.get, None):
        print pid, line.rstrip()

q = Queue()
Process(target=print_output, args=[q]).start() # start printing thread
return_codes = Pool(4).map(partial(run, output=q.put_nowait),
                           range(10)) # run 10 subprocesses, 4 at a time
q.put(None) # exit printing thread

或者你可以使用锁:

from __future__ import print_function
from multiprocessing.dummy import Pool, Lock # use threads
from subprocess import Popen, PIPE

def run(i, lock=Lock()):
    p = Popen(["cmd", str(i)], stdout=PIPE, bufsize=1)
    for line in iter(p.stdout.readline, b''):
        with lock:
            print(p.pid, line.rstrip())
    p.stdout.close()
    return p.wait()

return_codes = Pool(4).map(run, range(10)) # run 10 subprocesses, 4 at a time

注意:print() 函数用于解决问题中的问题:Why a script that uses threads prints extra lines occasionally?

为避免混合来自不同子流程的行,您可以一次收集大于单行的单元,具体取决于实际输出。

【讨论】:

  • 非常感谢,锁定解决方案效果很好!我不知道线程函数可以带锁作为参数!
【解决方案2】:

另一个相当通用的解决方案,也使用独特的文件:

import time, argparse, threading, sys
from os import getenv, getcwd, getpid
from os.path import join
from multiprocessing import Pool, cpu_count

logger = None  # Will be set by init() to give a unique logger for each process in the pool
def init(*initargs):
    global logger
    print(initargs)
    lpath = getcwd() if initargs is None or len(initargs) == 0 else initargs[0]
    name = 'log{!s}'.format(str(getpid()))
    logger = open(join(lpath, name), mode='wt')  # Get logger with unique name


def f(x):
    global logger
    cube = x*x*x
    logger.write('|Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut {}|\n'.format(cube))
    logger.flush()
    return cube

if __name__ == '__main__':

    #file = open('log.txt', 'w+')
    parser = argparse.ArgumentParser(description='cube', usage='%(prog)s [options] -n')
    parser.add_argument('-n', action='store', help='number', dest='n', default='10000', metavar = '')
    args = parser.parse_args()

    pool = Pool(cpu_count(), init)

    start = time.time()
    result = pool.map(f, range(int(args.n)))
    end = time.time()
    print (end - start)
    #file.close()

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2016-05-09
    • 1970-01-01
    • 2021-07-01
    • 1970-01-01
    • 2022-11-02
    • 2011-09-26
    • 2019-06-07
    • 2017-10-09
    相关资源
    最近更新 更多