您正在寻找的是生产者/消费者模式
基本线程示例
这是一个使用threading module(而不是多处理)的基本示例
import threading
import Queue
import sys
def do_work(in_queue, out_queue):
while True:
item = in_queue.get()
# process
result = item
out_queue.put(result)
in_queue.task_done()
if __name__ == "__main__":
work = Queue.Queue()
results = Queue.Queue()
total = 20
# start for workers
for i in xrange(4):
t = threading.Thread(target=do_work, args=(work, results))
t.daemon = True
t.start()
# produce data
for i in xrange(total):
work.put(i)
work.join()
# get the results
for i in xrange(total):
print results.get()
sys.exit()
您不会与线程共享文件对象。您可以通过向queue 提供数据行来为他们制作工作。然后每个线程会拿起一行,处理它,然后在队列中返回它。
multiprocessing module 中内置了一些更高级的工具来共享数据,例如列表和special kind of Queue。使用多处理与线程需要权衡取舍,这取决于您的工作是受 CPU 限制还是 IO 限制。
基本的 multiprocessing.Pool 示例
这是一个非常基本的多处理池示例
from multiprocessing import Pool
def process_line(line):
return "FOO: %s" % line
if __name__ == "__main__":
pool = Pool(4)
with open('file.txt') as source_file:
# chunk the work into batches of 4 lines at a time
results = pool.map(process_line, source_file, 4)
print results
A Pool 是一个管理自己的进程的便利对象。由于打开的文件可以遍历其行,因此您可以将其传递给pool.map(),它将遍历它并将行传递给工作函数。 Map 阻塞并在完成后返回整个结果。请注意,这是一个过于简化的示例,pool.map() 将在执行工作之前将您的整个文件一次全部读入内存。如果您希望有大文件,请记住这一点。有更高级的方法来设计生产者/消费者设置。
具有限制和线重新排序的手动“池”
这是Pool.map 的手动示例,但您可以设置队列大小,这样您就可以尽可能快地处理它,而不是一次性消耗整个迭代。我还添加了行号,以便您以后可以跟踪它们并在需要时参考它们。
from multiprocessing import Process, Manager
import time
import itertools
def do_work(in_queue, out_list):
while True:
item = in_queue.get()
line_no, line = item
# exit signal
if line == None:
return
# fake work
time.sleep(.5)
result = (line_no, line)
out_list.append(result)
if __name__ == "__main__":
num_workers = 4
manager = Manager()
results = manager.list()
work = manager.Queue(num_workers)
# start for workers
pool = []
for i in xrange(num_workers):
p = Process(target=do_work, args=(work, results))
p.start()
pool.append(p)
# produce data
with open("source.txt") as f:
iters = itertools.chain(f, (None,)*num_workers)
for num_and_line in enumerate(iters):
work.put(num_and_line)
for p in pool:
p.join()
# get the results
# example: [(1, "foo"), (10, "bar"), (0, "start")]
print sorted(results)