您的问题非常广泛,并且遗漏了许多细节,因此这里概述了需要完成的工作。为了避免PicklingError,在每个进程中打开数据库——这可以通过使用initializer函数(在下面的示例代码中命名为start_process())来完成。
注意:由于初始化数据库以执行一次查询所涉及的开销,@jdehesa 的多线程方法在这种情况下可能是更好的策略(线程通常使共享全局变量的成本更低)。或者,您可以让get_movie() 接口函数在每次调用时处理多个id(即它们的“批次”)。
class Database:
""" Mock implementation. """
def __init__(self, *args, **kwargs):
pass # Open/connect to database.
def get_movie(self, id):
return 'id_%s_foobar' % id
import multiprocessing as mp
def start_process(*args):
global tmdb
tmdb = Database(*args)
def get_movie(id):
tmdb_movie = tmdb.get_movie(id)
return tmdb_movie
if __name__ == '__main__':
collection = [{'detail': {'ids': {'tmdb_id': 1}}},
{'detail': {'ids': {'tmdb_id': 2}}},
{'detail': {'ids': {'tmdb_id': 3}}}]
pool_size = mp.cpu_count()
with mp.Pool(processes=pool_size, initializer=start_process,
initargs=('init info',)) as pool:
movies = pool.map(get_movie, (movie['detail']['ids']['tmdb_id']
for movie in collection))
print(movies) # -> ['id_1_foobar', 'id_2_foobar', 'id_3_foobar']
允许数据库在一定程度上由多个进程共享而不每次都连接到它的多处理替代方法是定义一个自定义multiprocessing.Manager(),它打开数据库一次,并为其提供一个接口以获取给定其 ID 的一部电影(或多部电影)的信息。这也在在线文档的Sharing state between processes 部分(在服务器进程 小节中)进行了讨论。内置的Manager 支持多种容器类型,lists、dicts 和Queues。
下面的示例代码展示了如何为数据库创建您自己的自定义管理器。如果取消注释对print() 的调用,您会看到只创建了一个Database 实例:
class Database:
""" Mock implementation. """
def __init__(self, *args, **kwargs):
# print('Database.__init__')
pass # Open/connect to database.
def get_movie(self, id):
return 'id_%s_foobar' % id
from functools import partial
import multiprocessing as mp
from multiprocessing.managers import BaseManager
class DB_Proxy(object):
""" Shared Database instance proxy class. """
def __init__(self, *args, **kwargs):
self.database = Database(*args, **kwargs)
def get_movie(self, id):
# print('DB_Proxy.get_movie')
tmdb_movie = self.database.get_movie(id)
return tmdb_movie
class MyManager(BaseManager): pass # Custom Manager
MyManager.register('DB_Proxy', DB_Proxy)
if __name__ == '__main__':
collection = [{'detail': {'ids': {'tmdb_id': 1}}},
{'detail': {'ids': {'tmdb_id': 2}}},
{'detail': {'ids': {'tmdb_id': 3}}}]
manager = MyManager()
manager.start()
db_proxy = manager.DB_Proxy('init info')
pool_size = mp.cpu_count()
with mp.Pool(pool_size) as pool:
movies = pool.map(db_proxy.get_movie,
(movie['detail']['ids']['tmdb_id']
for movie in collection))
print(movies) # -> ['id_1_foobar', 'id_2_foobar', 'id_3_foobar']