在我看来,真正的问题是需要什么样的处理来计算字典的条目以及有多少条目。
这种处理对于了解multiprocessing 是否可以显着加快字典的创建至关重要。如果您的计算受 I/O 限制,则应使用多线程,而如果受 CPU 限制,则应使用多处理。你可以找到更多关于这个here的信息。
假设每个条目的值可以独立计算并且该计算受 CPU 限制,让我们对单进程和多进程实现之间的差异进行基准测试(基于 multiprocessing 库)。
以下代码用于在某些场景下测试这两种方法,改变每个条目所需的计算复杂度和条目数(对于多进程实现,使用了 7 个进程)。
import timeit
import numpy as np
def some_fun(s, d, n=1):
"""A function with an adaptable complexity"""
a = s * np.ones(np.random.randint(1, 10, (2,))) / (d + 1)
for _ in range(n):
a += np.random.random(a.shape)
return a
# Code to create dictionary with only one process
setup_simple = "from __main__ import some_fun, n_first_level, n_second_level, complexity"
code_simple = """
data_dict = {}
for s in range(n_first_level):
data_dict[s] = {}
for d in range(n_second_level):
data_dict[s][d] = some_fun(s, d, n=complexity)
"""
# Code to create a dictionary with multiprocessing: we are going to use all the available cores except 1
setup_mp = """import numpy as np
import multiprocessing as mp
import itertools
from functools import partial
from __main__ import some_fun, n_first_level, n_second_level, complexity
n_processes = mp.cpu_count() - 1
# Uncomment if you want to know how many concurrent processes are you going to use
# print(f'{n_processes} concurrent processes')
"""
code_mp = """
with mp.Pool(processes=n_processes) as pool:
dict_values = pool.starmap(partial(some_fun, n=complexity), itertools.product(range(n_first_level), range(n_second_level)))
data_dict = {
k: dict(zip(range(n_second_level), dict_values[k * n_second_level: (k + 1) * n_second_level]))
for k in range(n_first_level)
}
"""
# Time the code with different settings
print('Execution time on 10 repetitions: mean [std]')
for label, complexity, n_first_level, n_second_level in (
("TRIVIAL FUNCTION", 0, 10, 10),
("TRIVIAL FUNCTION", 0, 500, 500),
("SIMPLE FUNCTION", 5, 500, 500),
("COMPLEX FUNCTION", 50, 100, 100),
("HEAVY FUNCTION", 1000, 10, 10),
):
print(f'\n{label}, {n_first_level * n_second_level} dictionary entries')
for l, t in (
('Single process', timeit.repeat(stmt=code_simple, setup=setup_simple, number=1, repeat=10)),
('Multiprocess', timeit.repeat(stmt=code_mp, setup=setup_mp, number=1, repeat=10)),
):
print(f'\t{l}: {np.mean(t):.3e} [{np.std(t):.3e}] seconds')
这些是结果:
Execution time on 10 repetitions: mean [std]
TRIVIAL FUNCTION, 100 dictionary entries
Single process: 7.752e-04 [7.494e-05] seconds
Multiprocess: 1.163e-01 [2.024e-03] seconds
TRIVIAL FUNCTION, 250000 dictionary entries
Single process: 7.077e+00 [7.098e-01] seconds
Multiprocess: 1.383e+00 [7.752e-02] seconds
SIMPLE FUNCTION, 250000 dictionary entries
Single process: 1.405e+01 [1.422e+00] seconds
Multiprocess: 2.858e+00 [5.742e-01] seconds
COMPLEX FUNCTION, 10000 dictionary entries
Single process: 1.557e+00 [4.330e-02] seconds
Multiprocess: 5.383e-01 [5.330e-02] seconds
HEAVY FUNCTION, 100 dictionary entries
Single process: 3.181e-01 [5.026e-03] seconds
Multiprocess: 1.171e-01 [2.494e-03] seconds
如您所见,假设您有一个 CPU 受限计算,多进程方法在大多数情况下都能获得更好的结果。仅当您对每个条目的计算量非常轻和/或条目数量非常有限时,才应该首选单进程方法。
另一方面,multiprocessing 提供的改进是有代价的:例如,如果您对每个条目的计算使用大量内存,则可能会引发OutOfMemory 错误,这意味着您必须改进您的代码并使其更复杂以避免它,在内存占用和减少执行时间之间找到适当的平衡。如果您环顾四周,会有很多问题询问如何解决因非最佳使用 multiprocessing 而导致的内存问题。换句话说,这意味着您的代码将不太容易阅读和维护。
总而言之,你应该判断执行时间的改进是否值得,即使有可能。