【问题标题】:Parallel processing in Foreach loop [closed]Foreach 循环中的并行处理
【发布时间】:2019-03-15 17:27:28
【问题描述】:

您好,我有一种情况,我正在调用一些 API 来获取电影列表。对于列表中的每条记录,我调用另一个 API。我想让for 循环并行以获得更好的性能。以下是我的示例代码。

movies = []

for movie in collection:
    tmdb_movie = tmdb.get_movie(movie['detail']['ids']['tmdb_id'])
    movies.append(tmdb_movie)

return tmdb_movie

所以我使用多处理的解决方案如下:

pool = Pool()
output = pool.map(tmdb.get_movie, [movie['detail']['ids']['tmdb_id'] for movie in collection])

但是当我执行这段代码时,我得到了以下错误

PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed

如果有人可以帮助我在 python 2.7 中并行实现此功能,我将不胜感激。

【问题讨论】:

  • 我们不实现功能,我们帮助解决现有代码中的错误和其他问题。有一个library 可以使东西并行。
  • 你必须使用 Python 2.7 吗?最好将 asyncio 模块用于此类 I/O 绑定任务,但它仅适用于 Python 3.5+
  • 是的,我的问题是我仅限于 Python 2.7

标签: python python-2.7


【解决方案1】:

最好的选择是使用线程。 Python 中的线程不能并行使用 CPU,但它们可以在存在阻塞操作时并发执行。进程虽然可以真正并行运行,但启动和通信速度很慢,并且更适合 CPU 受限的大型工作负载。此外,正如您在问题中指出的那样,流程有时可能难以启动。

您可以使用有些秘密(即未记录但实际上众所周知)multiprocessing.pool.ThreadPool 类。如果您要多次这样做,您可以在开始时创建一个池并重用它。您只需要确保在程序退出时调用pool.close()pool.join()

from multiprocessing.pool import ThreadPool

# Global/class variables    
NUM_THREADS = 5
pool = ThreadPool(NUM_THREADS)

# Inside some function/method
return pool.map(lambda movie: tmdb.get_movie(movie['detail']['ids']['tmdb_id']), movies)

# On exit
pool.close()  # Prevents more jobs to be submitted
pool.join()  # Waits until all jobs are finished

【讨论】:

  • 非常感谢。这个 pool.map 函数是否尊重集合的初始排序/序列? (我的例子中的电影收藏)
  • @nafr1 是的,它应该以原始顺序为您提供映射序列。
【解决方案2】:

您的问题非常广泛,并且遗漏了许多细节,因此这里概述了需要完成的工作。为了避免PicklingError,在每个进程中打开数据库——这可以通过使用initializer函数(在下面的示例代码中命名为start_process())来完成。

注意:由于初始化数据库以执行一次查询所涉及的开销,@jdehesa 的多线程方法在这种情况下可能是更好的策略(线程通常使共享全局变量的成本更低)。或者,您可以让get_movie() 接口函数在每次调用时处理多个id(即它们的“批次”)。

class Database:
    """ Mock implementation. """
    def __init__(self, *args, **kwargs):
        pass  # Open/connect to database.

    def get_movie(self, id):
        return 'id_%s_foobar' % id


import multiprocessing as mp

def start_process(*args):
    global tmdb
    tmdb = Database(*args)

def get_movie(id):
    tmdb_movie = tmdb.get_movie(id)
    return tmdb_movie

if __name__ == '__main__':

    collection = [{'detail': {'ids': {'tmdb_id': 1}}},
                  {'detail': {'ids': {'tmdb_id': 2}}},
                  {'detail': {'ids': {'tmdb_id': 3}}}]

    pool_size = mp.cpu_count()
    with mp.Pool(processes=pool_size, initializer=start_process,
                 initargs=('init info',)) as pool:
        movies = pool.map(get_movie, (movie['detail']['ids']['tmdb_id']
                                        for movie in collection))

    print(movies)  # -> ['id_1_foobar', 'id_2_foobar', 'id_3_foobar']

允许数据库在一定程度上由多个进程共享而不每次都连接到它的多处理替代方法是定义一个自定义multiprocessing.Manager(),它打开数据库一次,并为其提供一个接口以获取给定其 ID 的一部电影(或多部电影)的信息。这也在在线文档的Sharing state between processes 部分(在服务器进程 小节中)进行了讨论。内置的Manager 支持多种容器类型,lists、dicts 和Queues。

下面的示例代码展示了如何为数据库创建您自己的自定义管理器。如果取消注释对print() 的调用,您会看到只创建了一个Database 实例:

class Database:
    """ Mock implementation. """
    def __init__(self, *args, **kwargs):
#        print('Database.__init__')
        pass  # Open/connect to database.

    def get_movie(self, id):
        return 'id_%s_foobar' % id


from functools import partial
import multiprocessing as mp
from multiprocessing.managers import BaseManager


class DB_Proxy(object):
    """ Shared Database instance proxy class. """
    def __init__(self, *args, **kwargs):
        self.database = Database(*args, **kwargs)

    def get_movie(self, id):
#        print('DB_Proxy.get_movie')
        tmdb_movie = self.database.get_movie(id)
        return tmdb_movie


class MyManager(BaseManager): pass  # Custom Manager

MyManager.register('DB_Proxy', DB_Proxy)


if __name__ == '__main__':

    collection = [{'detail': {'ids': {'tmdb_id': 1}}},
                  {'detail': {'ids': {'tmdb_id': 2}}},
                  {'detail': {'ids': {'tmdb_id': 3}}}]

    manager = MyManager()
    manager.start()

    db_proxy = manager.DB_Proxy('init info')

    pool_size = mp.cpu_count()
    with mp.Pool(pool_size) as pool:
        movies = pool.map(db_proxy.get_movie,
                          (movie['detail']['ids']['tmdb_id']
                            for movie in collection))

    print(movies)  # -> ['id_1_foobar', 'id_2_foobar', 'id_3_foobar']

【讨论】:

  • 非常感谢您的回复。我最终使用了来自 multiprocessing.pool 的 ThreadPool,它在 Windows 上运行得非常好。但是,我的目标是我的 Android 应用程序,我猜 Android 不支持多处理。您能想到的任何其他解决方案吗?也许直接使用threading.Thread?
  • 抱歉,我对 Android 不太了解。无论如何,线程可以在没有ThreadPool 的情况下使用,根据其文档字符串,它是一个“支持将函数应用于参数的异步版本的类”,并且派生自multipprocessing.Pool。无论如何,是的,您可以直接使用threading.Thread。您可以使用Queue.Queue 在主线程和后台线程之间进行通信,该线程不断从输入Queue 获取内容,查找它们,并将获得的结果放入输出Queue,然后可以通过以下方式检索主线程。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2019-09-30
  • 2023-03-24
  • 2011-10-19
  • 1970-01-01
  • 2020-05-27
  • 1970-01-01
  • 2019-05-13
相关资源
最近更新 更多