【问题标题】:Python: parallelize for loop for database queryPython:为数据库查询并行化for循环
【发布时间】:2019-07-28 20:55:23
【问题描述】:

我将把这个问题分成两部分。

我有类似的代码

for data in data_list:
  rslt = query in databae where data == 10 # just some pseudo database query to give example but this single query usually takes around 30-50seconds.
  if rslt.property == 'some condition here':
     return rslt

这里的条件是

  1. 我们必须在查询后返回data_list中与条件匹配的第一个元素。
  2. 每个元素的每个数据库查询大约需要 30-40 秒。
  3. data_list 通常很大,大约 15-20k 个元素
  4. 很遗憾,我们无法对整个 data_list 进行单个数据库查询。我们必须循环执行此操作,或者一次执行一个元素。

现在我的问题是,

  1. 如何优化此过程。目前,整个过程大约需要 3-4 小时。
  2. 我阅读了有关 python 线程和多处理的文章,但我不知道在这种情况下哪一个更合适。

【问题讨论】:

    标签: python multithreading parallel-processing multiprocessing python-multiprocessing


    【解决方案1】:

    您可以考虑使用多处理 Pool。然后,您可以使用map 将您的iterable 块发送给Pool 的工作人员,以根据给定的函数 进行工作。因此,假设您的查询是一个函数,例如query(data)

    def query(data):
        rslt = query in databae where data == 10
        if rslt.property == 'some condition here':
            return rslt
    

    我们将像这样使用池:

    from multiprocessing import Pool
    
    with Pool() as pool:
        results = pool.map(query, data_list)
    

    现在根据您的要求,我们将找到第一个:

    print(next(filter(None, results)))
    

    注意,以这种方式使用函数query意味着results将是rslts和Nones的列表,我们正在寻找第一个非None的结果。


    一些注意事项:

    1. 请注意,Pool 的构造函数的第一个参数是 processes,它允许您选择池将容纳多少进程:

      如果进程是None,则使用os.cpu_count() 返回的数字。

    2. 请注意,map 也有 chunksize 参数,默认为 1,并允许选择传递给工作人员的块的大小:

      此方法将可迭代对象分割成多个块,将它们作为单独的任务提交给进程池。这些块的(近似)大小可以通过将 chunksize 设置为正整数来指定。

    3. 继续使用mapdocs 建议将imap 用于具有特定块的大型迭代以提高效率:

      请注意,对于非常长的迭代,它可能会导致高内存使用。考虑使用带有显式块大小选项的imap()imap_unordered() 以提高效率。

      还有来自imapdocs

      chunksize 参数与map() 方法使用的参数相同。对于非常长的迭代,使用较大的 chunksize 值可以使作业完成比使用默认值 1 快得多

      所以我们实际上可以更高效地做:

      chunksize = 100
      processes = 10
      
      with Pool(processes=processes) as pool:
          print(next(filter(None, pool.imap(query, data_list, chunksize=chunksize))))
      

      在这里,您可以使用chunksize 甚至processes(从Pool 返回),看看哪种组合会产生最佳效果。

    4. 如果您有兴趣,只需更改导入即可轻松切换到 threads 而不是 processes声明:

      from multiprocessing.dummy import Pool
      

      正如docs 所说:

      multiprocessing.dummy 复制了 multiprocessing 的 API,但只是 threading 模块的包装器。


    希望这对您有任何帮助

    【讨论】:

    • 您的方法循环遍历完整列表,如果第一个 rslt 不是 None,我该如何打破循环。如果我得到任何结果,那么我不想继续循环。
    • 正如我在注释 (3) 中提到的,您可以使用 imap,它是 map 的惰性版本。在我的示例中,我们用filter 包装它,这也是一个迭代器,并且通过在上面执行next,我们只处理将产生第一个结果的最小数量的块。而且顺便说一句,没有实际的循环
    • @asdfkjasdfjk 你试过这些方法吗?它改善了时机吗?
    猜你喜欢
    • 2019-08-31
    • 2017-03-17
    • 2020-06-25
    • 2022-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多