【问题标题】:Python: parallel processing while yieldingPython:在产生的同时进行并行处理
【发布时间】:2021-11-03 08:18:38
【问题描述】:

我正在创建一个文件列表中的行生成器,我的方法类似于:

def load(f_name: str):
    with open(f_name, "r") as f:
        lines = f.readlines()
    # some calculations
    return lines

def iter_files(f_names: list):
    for f in f_names:
        for line in load(f):
            yield line

如果可能且有用的话,我想做的是加载下一个文件,同时屈服于另一个文件。 作为多处理的新手,我尝试了以下方法:

cache = dict()

def load(f_name: str, id: int):
    global cache
    with open(f_name, "r") as f:
        lines = f.readlines()
    # some calculations
    cache[id] = lines

def iter_list(arr):
    for x in arr:
        yield x

def iter_files(f_names: list):
    global cache
    num_files = len(f_names)
    load(f_names[0], 0)
    for n in range(num_files - 1):
        current = mp.Process(target=iter_list, args=(cache[n],))
        next = mp.Process(target=load, args=(f_names[n + 1], n + 1))
        current.start()
        next.start()
        current.join()
        next.join()
        del cache[n]
    iter_list(cache[num_files - 1])
    del cache[num_files - 1]

但除了看起来过于复杂之外,它还不起作用。

首先,如果我不将主代码放入 'if __name__ == "__main__":' 中(我不希望这是强制性的),我会收到以下错误:

RuntimeError:
      An attempt has been made to start a new process before the
      current process has finished its bootstrapping phase.

但是即使我这样做了文件也不会添加到缓存中:

current = mp.Process(target=iter_list, args=(cache[n],))
KeyError: 1

是否有可能实现我想要做的事情?我做错了什么?

谢谢大家

【问题讨论】:

  • “如果可能和有用的话,我想做什么”。可能,当然。有用?这取决于你用这些线做什么。如果处理量与 I/O 相比较小(可能很容易慢 100 倍),那么您不会看到额外的复杂性带来任何显着的加速。
  • @Thomas 文件非常小(平均 50kB),但它们的编码使得每个字节或多或少对应于已处理列表的条目,所以我猜加载算法的缓慢部分是“#一些计算”,而不是文件的实际变红。

标签: python multiprocessing yield


【解决方案1】:

multiprocessing.Queue 类非常适合这种情况。您 put 一端(子进程)中的行和 get 另一端(主进程)中的行。不幸的是,没有内置方法可以将队列标记为“已完成”,因此我们需要 put 一个类似 None 的标记值来指示所有行都已处理完毕。

import multiprocessing as mp

def load(f_name: str):
    with open(f_name, "r") as f:
        lines = f.readlines()
    # some calculations
    return lines

def iter_files(f_names: list, queue: mp.Queue):
    for f in f_names:
        for line in load(f):
            queue.put(line)
    queue.put(None)

def iter_files_process(f_names: list):
    queue = mp.Queue()
    process = mp.Process(target=iter_files, args=(f_names, queue))
    process.start()
    while True:
        line = queue.get()
        if line is None:  # End-of-queue value.
            break
        yield line
    process.join()  # Wait for the process to be completely finished.

if __name__ == "__main__":
    for line in iter_files_process(['a.txt', 'b.txt']):
        print(line, end='')

【讨论】:

  • 当我执行 queue.get() 时,值会从队列中删除,对吧?另外,以这种方式,该进程会尝试加载所有文件,有没有办法将其限制为仅下一个文件(如果尚未在队列中,则为当前文件)?
  • 是的,queue.get() 删除并返回下一项。您可以将最大队列大小传递给 Queue() 构造函数,以防止文件读取器进程超前(例如 queue = mp.Queue(1))。
【解决方案2】:

我相信 Thomas 提供的解决方案是一种有趣的方法,但是:

  1. Pipe 虽然不如Queue 灵活,但这里只需要一个发送者和一个接收者,而且它的性能要高得多。
  2. 我使用多线程和多处理进行了测试,多线程也快得多。
from multiprocessing import Pipe
from multiprocessing.connection import Connection
import threading

def load(f_name: str):
    with open(f_name, "r", encoding="utf8") as f:
        lines = f.readlines()
    # some calculations
    return lines

def iter_files(f_names: list, send_conn: Connection):
    for f in f_names:
        for line in load(f):
            send_conn.send(line)
    send_conn.send(None)

def generate_lines(f_names: list):
    recv_conn, send_conn = Pipe(False)
    threading.Thread(target=iter_files, args=(f_names, send_conn), daemon=True).start()
    for line in iter(recv_conn.recv, None):
        yield line

if __name__ == "__main__":
    import time

    t = time.time()
    lines = list(generate_lines(['irv.py', 'waitList.py', 'send_mail_async.py', '../ajaxtcr.js', '../browser_detection.php']))
    elapsed = time.time() - t
    print(len(lines), elapsed)

打印:

6537 0.18297886848449707

请注意,Thomas 的代码未经修改,除了将 encoding 参数添加到 open 调用中,此文件列表需要 0.24300265312194824 秒。

但与原代码对比:

def load(f_name: str):
    with open(f_name, "r", encoding="utf8") as f:
        lines = f.readlines()
    # some calculations
    return lines

def iter_files(f_names: list):
    for f in f_names:
        for line in load(f):
            yield line

import time
t = time.time()
lines = list(iter_files(['irv.py', 'waitList.py', 'send_mail_async.py', '../ajaxtcr.js', '../browser_detection.php']))
elapsed = time.time() - t
print(len(lines), elapsed)

打印:

6537 0.07400083541870117

所以这种方法并没有提供真正的改进。另一种方法是同时处理所有文件:

from multiprocessing.pool import ThreadPool

def load(f_name: str):
    with open(f_name, "r", encoding='utf8') as f:
        lines = f.readlines()
    # some calculations
    return lines

def generate_lines(f_names: list):
    with ThreadPool(len(f_names)) as pool:
        for lines in pool.imap(load, f_names):
            for line in lines:
                yield line

if __name__ == "__main__":
    import time

    t = time.time()
    lines = list(generate_lines(['irv.py', 'waitList.py', 'send_mail_async.py', '../ajaxtcr.js', '../browser_detection.php']))
    elapsed = time.time() - t
    print(len(lines), elapsed)

打印:

6537 0.010999441146850586

重要提示

承认在同时处理多个文件时可能会出现争用,而在最后一个基准测试中性能如此良好的原因是因为正在读取的所有或大部分数据可能都在缓存中,因为这些文件已经从所有这些测试中读取了很多次。使用管道(或队列)的解决方案是使用相同的缓存,显然性能比原始代码差,所以唯一真正的问题是线程池版本是否会在未缓存的数据上表现更好。

我现在必须等待大量时间(或重新启动?)以确保缓存已被刷新,然后重新运行线程池示例以获得更准确的读数。或者用线程池代码新建一个文件列表,然后运行原代码,这样会有缓存比较的好处。

更新

我用另一个文件列表重新运行它,线程池解决方案的性能比原始代码差。 我会坚持使用原始代码,并避免尝试结合多线程、多处理或异步来提高性能。

根据函数load 中提到的“一些计算”是什么,您可能会考虑:

def load(f_name: str):
    with open(f_name, "r" as f:
        for line in f:
            # some calculations
            yield line

def iter_files(f_names: list):
    for f in f_names:
        yield from load(f)

这样可以节省内存资源。

【讨论】:

    猜你喜欢
    • 2018-08-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-10-09
    • 2014-01-17
    • 1970-01-01
    • 2020-05-16
    相关资源
    最近更新 更多