【问题标题】:Using a queue for synchronous multiprocessing使用队列进行同步多处理
【发布时间】:2014-04-01 15:08:39
【问题描述】:

我有一个 python 程序,它从文件中读取一行,处理该行,然后将其写入一个新文件。它对文件中的所有行重复此操作。本质上:

for i in range(nlines):
    line = read_line(line_number = i)
    processed_line = process_line(line)
    write_line(line)

我想多处理它,以便一个进程负责读取和写入,另一个进程负责处理:

read line 1 -> read line 2 -> write line 1 -> read line 3 -> write line 2 --> etc
              process line 1 --------------> process line 2 ----------------> etc

我想我需要使用两个队列来来回传递数据,尽管我真的不知道如何在实践中实现这一点。您对我如何使用多处理将这个问题拆分到 2 个进程有任何想法吗?

【问题讨论】:

  • 输出文件是否需要按照输入文件中未处理行包含的顺序包含已处理行?
  • 是的,没错;为了完全正确,它需要保留它们被读取的顺序

标签: python python-2.7 multiprocessing


【解决方案1】:
from multiprocessing import Process, Queue as mpq

def worker(qIn, qOut):
    for fpath,i,line in iter(qIn.get, None):
        qOut.put((fpath,i, process_line(line)))

def main(infilepath, outfilepath):
    qIn, qOut = mpq(), mpq()
    Process(target=worker, args=(qIn, qOut)).start()
    with open(infilepath) as infile, open(outfilepath, 'w') as outfile:
        numLines = 0
        for tup in enumerate(infile):
            qIn.put(tup)
            numLines += 1
        qIn.put(None)
        retlines = {}
        top = -1
        for _ in range(numLines)
            i,line = qOut.get))
            retlines[i] = line
            if i+1 in retlines:
                outfile.write(retlines.pop(i+1))
                i += 1

当然,这会在开始写入输出文件之前等待完成对输入文件的读取,这是一个效率瓶颈。我会这样做:

def procWorker(qIn, qOut, numWriteWorkers):
    for fpath,i,line in iter(qIn.get, None):
        qOut.put((fpath,i, process_line(line)))
    for _ in range(numWriteWorkers):
        qOut.put(None)

def readWorker(qIn, qOut, numProcWorkers):
    for infilepath in iter(qIn.get, None):
        with open(infilepath) as infile:
            for line in infile:
                qOut.put((infilepath, i, line))
    for _  in range(numProcWorkers):
        qOut.put(None)

def writeWorker(qIn, qOut):
    outfilepaths = {"test1.in" : "test1.out"}  # dict that maps input filepaths to corresponding output filepaths
    lines = collections.defaultdict(dict)
    inds = collections.defaultdict(lamnda : -1)
    for fpath,i,line in iter(qIn.get, None):
        if i == inds[fpath] + 1:
            inds[fpath] += 1
            with open(outfilepaths[fpath], 'a') as outfile:
                outfile.write(line)
                qOut.put((fpath, i))
        else:
            lines[fpath][i] = line
    for fpath in lines:
        with open(outfilepaths[fpath], 'a') as outfile:
            for i in sorted(fpath):
                outfile.write(lines[fpath][i])
                qOut.put((fpath, i))
    qOut.put(None)

def main(infilepaths):
    readqIn, readqOut, procqOut, writeqOut = [Queue for _ in range(4)]
    numReadWorkers = 1  # fiddle to taste
    numWriteWorkers = 1  # fiddle to taste
    numProcWorkers = 1  # fiddle to taste

    for _ in range(numReadWorkers):
        Process(target=readWorker, args=(readqIn, readqOut, numProcWorkers)).start()
    for infilepath in infilepaths:
        readqIn.put(infilepath)
    for _ in range(numReadWorkers):
        readqIn.put(None)

    for _ in range(numProcWorkers):
        Process(target=procWorker, args=(readqOut, procqOut, numWriteWorkers)).start()

    for _ in range(numWriteWorkers):
        Process(target=writeWorker, args=(procqOut, writeqOut)).start()

    writeStops = 0
    while True:
        if writeStops == numWriteWorkers:
            break
        msg = writeqOut.get()
        if msg == None:
            writeStops += 1
        else:
            fpath, i = msg
            print("line #%d was written to file %s" %(i, fpath))

请注意,这允许多个读取器和写入器的可能性。通常,这是没有意义的,因为硬盘驱动器上只有头部。但是,如果您在某个分布式文件系统上或者您的文件驻留在多个硬盘驱动器上,那么您可以增加读/写工作人员的数量以提高效率。假设一个微不足道的process_line 函数,numReadWorkers + numWriteWorkers 应该等于您所有硬盘驱动器上的磁头数量。您可以平衡驱动器上的文件(a la raid)以达到最佳效果,但很大程度上取决于文件大小、读/写速度、缓存等。

真的,你应该得到的第一个加速是通过摆弄numProcWorkers,这应该会给你一个很好的线性效率提升,当然,受限于你机器上逻辑核心处理器的数量

【讨论】:

  • 谢谢。我会试试这个。关于分布式系统的有趣的 cmets - 我一直认为最好将它分布在多台计算机上并让每台计算机处理一个单独的文件。
  • 尚未设法使其正常工作。您错过了为写入工作者创建进程的机会吗?无论如何,我用来读/写的库似乎正在产生不可腌制的对象,所以我需要一个不同的解决方法
  • @jramm:很抱歉 writeWorker 问题。现在已经修好了。如果您发布有关酸洗问题的新帖子,您可能会得到一些帮助(来自我或其他人)
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2015-10-28
  • 2020-09-26
  • 1970-01-01
  • 2012-04-19
  • 1970-01-01
  • 2019-10-22
  • 2019-11-05
相关资源
最近更新 更多