from multiprocessing import Process, Queue as mpq
def worker(qIn, qOut):
for fpath,i,line in iter(qIn.get, None):
qOut.put((fpath,i, process_line(line)))
def main(infilepath, outfilepath):
qIn, qOut = mpq(), mpq()
Process(target=worker, args=(qIn, qOut)).start()
with open(infilepath) as infile, open(outfilepath, 'w') as outfile:
numLines = 0
for tup in enumerate(infile):
qIn.put(tup)
numLines += 1
qIn.put(None)
retlines = {}
top = -1
for _ in range(numLines)
i,line = qOut.get))
retlines[i] = line
if i+1 in retlines:
outfile.write(retlines.pop(i+1))
i += 1
当然,这会在开始写入输出文件之前等待完成对输入文件的读取,这是一个效率瓶颈。我会这样做:
def procWorker(qIn, qOut, numWriteWorkers):
for fpath,i,line in iter(qIn.get, None):
qOut.put((fpath,i, process_line(line)))
for _ in range(numWriteWorkers):
qOut.put(None)
def readWorker(qIn, qOut, numProcWorkers):
for infilepath in iter(qIn.get, None):
with open(infilepath) as infile:
for line in infile:
qOut.put((infilepath, i, line))
for _ in range(numProcWorkers):
qOut.put(None)
def writeWorker(qIn, qOut):
outfilepaths = {"test1.in" : "test1.out"} # dict that maps input filepaths to corresponding output filepaths
lines = collections.defaultdict(dict)
inds = collections.defaultdict(lamnda : -1)
for fpath,i,line in iter(qIn.get, None):
if i == inds[fpath] + 1:
inds[fpath] += 1
with open(outfilepaths[fpath], 'a') as outfile:
outfile.write(line)
qOut.put((fpath, i))
else:
lines[fpath][i] = line
for fpath in lines:
with open(outfilepaths[fpath], 'a') as outfile:
for i in sorted(fpath):
outfile.write(lines[fpath][i])
qOut.put((fpath, i))
qOut.put(None)
def main(infilepaths):
readqIn, readqOut, procqOut, writeqOut = [Queue for _ in range(4)]
numReadWorkers = 1 # fiddle to taste
numWriteWorkers = 1 # fiddle to taste
numProcWorkers = 1 # fiddle to taste
for _ in range(numReadWorkers):
Process(target=readWorker, args=(readqIn, readqOut, numProcWorkers)).start()
for infilepath in infilepaths:
readqIn.put(infilepath)
for _ in range(numReadWorkers):
readqIn.put(None)
for _ in range(numProcWorkers):
Process(target=procWorker, args=(readqOut, procqOut, numWriteWorkers)).start()
for _ in range(numWriteWorkers):
Process(target=writeWorker, args=(procqOut, writeqOut)).start()
writeStops = 0
while True:
if writeStops == numWriteWorkers:
break
msg = writeqOut.get()
if msg == None:
writeStops += 1
else:
fpath, i = msg
print("line #%d was written to file %s" %(i, fpath))
请注意,这允许多个读取器和写入器的可能性。通常,这是没有意义的,因为硬盘驱动器上只有头部。但是,如果您在某个分布式文件系统上或者您的文件驻留在多个硬盘驱动器上,那么您可以增加读/写工作人员的数量以提高效率。假设一个微不足道的process_line 函数,numReadWorkers + numWriteWorkers 应该等于您所有硬盘驱动器上的磁头数量。您可以平衡驱动器上的文件(a la raid)以达到最佳效果,但很大程度上取决于文件大小、读/写速度、缓存等。
真的,你应该得到的第一个加速是通过摆弄numProcWorkers,这应该会给你一个很好的线性效率提升,当然,受限于你机器上逻辑核心处理器的数量