【问题标题】:How to fix the repetitive outputs from multiprocessing a method?如何修复多处理方法的重复输出?
【发布时间】:2019-10-22 11:15:14
【问题描述】:

我正在尝试在 OD 模型(用于定位对象)之上运行分类器模型。为了减少延迟,我对 OD 和 Classifier 模型都使用了多处理。输出是正确的,但我得到了重复的结果。

我有一台 8 核的机器,所以我使用 pool=mp.Pool(8) 进行多处理 我正在使用map_async 并有一个可迭代的图像路径列表。 为了获得列表中的结果,我使用.get()

一开始我没有在pool.close()之后实现pool.join(),这是我在浏览了几个站点后发现的。我得到的输出错误是因为chunksize 我传递给pool.map_async()。相同输出的重复次数与块大小相同。但是根据我对 chunksize 的理解,它应该只创建与 chunksize 大小相同的批次,并将每个批次传递给一个进程。

return_stuff_classifier=[]

def label_it(image_path):
    file_name = image_path
    image_name=image_path.split('/')[-1]
    frame_id=image_name.split('_')[0]
    object_id=image_name.split('_')[1].split('.')[0]
    label="gt"
    result="0.86" #Here I have explicitly mentioned this, not to go through the
                  #classifier model prediction
    return_stuff_classifier.append((frame_id,object_id,label,result))
    return return_stuff_classifier

def multiprocessor():
    m_class = mp.Manager()
    queue_class = m_class.Queue()
    pool_class=mp.Pool(8)

    # Here cropped_image_no 24 -> chunk_size_class=3
    chunk_size_class=round(cropped_images_no/8)

    results_class=pool_class.map_async(label_it,cropped_images,chunk_size_class).get()
    #label_it is the method to be multiprocessed
    #cropped_images is the list of all image paths to be multiprocessed

    pool_class.close()
    pool_class.join()
    final_results.append(results_class)

输出:

[[['443', '10', 'ugt', '0.85964435'],
  ['443', '11', 'ugut', '0.48011008'],
  ['443', '4', 'gut', '0.50242084']],
 [['443', '10', 'ugt', '0.85964435'],
  ['443', '11', 'ugut', '0.48011008'],
  ['443', '4', 'gut', '0.50242084']],
 [['443', '10', 'ugt', '0.85964435'],
  ['443', '11', 'ugut', '0.48011008'],
  ['443', '4', 'gut', '0.50242084']],
 [['443', '2', 'ugut', '0.8623834'],
  ['443', '6', 'gt', '0.95684755'],
  ['443', '1', 'gut', '0.683893']],
 [['443', '2', 'ugut', '0.8623834'],
  ['443', '6', 'gt', '0.95684755'],
  ['443', '1', 'gut', '0.683893']],
 [['443', '2', 'ugut', '0.8623834'],
  ['443', '6', 'gt', '0.95684755'],
  ['443', '1', 'gut', '0.683893']]]

预期输出:

[[['443', '10', 'ugt', '0.85964435'],
  ['443', '11', 'ugut', '0.48011008']
  ['443', '4', 'gut', '0.50242084']],
 [['443', '2', 'ugut', '0.8623834'],
  ['443', '6', 'gt', '0.95684755'],
  ['443', '1', 'gut', '0.683893']]]

【问题讨论】:

  • 您没有提供minimal reproducible example,当我自己编造一个时,我无法重复您所说的结果。建议您提供自己的可重现问题的可运行代码。
  • 感谢@matineau 的回复。我遇到的问题是由于 pool.map_async() 方法中的 chunksize 参数。一旦我删除该论点,问题就会得到解决。但我想知道是否有任何方法可以用 chunksize 解决这个问题。以及为什么即使调用 pool.join() 也会出现这个问题。
  • 文件名是 - './/frame_1.jpg' 并且列表索引是根据这个符号。
  • 我理解你所说的关于 chunksize 的内容——并且做了同样的事情。将尝试使用您添加的代码再次尝试。
  • 谢谢..请告诉我。

标签: python multiprocessing


【解决方案1】:

我认为问题在于您有 label_it() 函数在每次执行时将结果附加到 return_stuff_classifier 列表,然后返回整个列表 - 从而返回一个累积上一个结果的值来电。这种情况发生的次数由chunksize 控制。

幸运的是,这很容易解决 - 只需返回您附加到列表中的元组即可。如果你这样做了,就根本不需要列表了。

请注意,我必须在代码中添加一个if __name__ == '__main__': 保护,以便它可以在我运行 Windows 的计算机上运行,​​因为子进程的创建方式与它们在类 unix 操作系统上的创建方式不同。它仍然应该对它们起作用,因此这样做是可移植的。在multiprocessing 模块的Programming guidelines 中标题为主模块的安全导入 小节的文档中需要执行此操作。

另一个更改是将get() 调用移到pool_class.join() 之后,因为此时所有子进程都已结束。在这种情况下不需要这样做,因为主进程实际上没有更多事情要做,但这是从map_async() 检索结果的规范方式——可能是因为它允许主进程同时执行其他任务,如果它有任何事情要做。

import multiprocessing as mp
from pprint import pprint

cropped_images = [f'./image_directory_path/frame_{i}.jpg' for i in range(1, 25)]
#return_stuff_classifier = []  # No longer needed.

def label_it(image_path):
    file_name = image_path
    image_name = image_path.split('/')[-1]
    frame_id = image_name.split('_')[0]
    object_id = image_name.split('_')[1].split('.')[0]
    label = "gt"
    result = "0.86" # Here I have explicitly mentioned this, not to go through the
                    # classifier model prediction

#    return_stuff_classifier.append((frame_id, object_id, label, result))
#    return return_stuff_classifier

    return (frame_id, object_id, label, result)  # Just return the results.


if __name__ == '__main__':
    def multiprocessor():
        m_class = mp.Manager()
        queue_class = m_class.Queue()
        pool_class = mp.Pool(8)
        final_results = []

        # Here cropped_image_no == 24 -> chunk_size_class=3
        chunk_size_class = round(len(cropped_images) / 8)
        print(f'{chunk_size_class=}')

        results_class = pool_class.map_async(label_it, cropped_images, chunk_size_class)
        # label_it is the method to be multiprocessed
        # cropped_images is the list of all image paths to be multiprocessed

        pool_class.close()
        pool_class.join()
        final_results.append(results_class.get())

        pprint(final_results)

    multiprocessor()

以下是显示现在没有重复的打印件:

chunk_size_class=3
[[('frame', '1', 'gt', '0.86'),
  ('frame', '2', 'gt', '0.86'),
  ('frame', '3', 'gt', '0.86'),
  ('frame', '4', 'gt', '0.86'),
  ('frame', '5', 'gt', '0.86'),
  ('frame', '6', 'gt', '0.86'),
  ('frame', '7', 'gt', '0.86'),
  ('frame', '8', 'gt', '0.86'),
  ('frame', '9', 'gt', '0.86'),
  ('frame', '10', 'gt', '0.86'),
  ('frame', '11', 'gt', '0.86'),
  ('frame', '12', 'gt', '0.86'),
  ('frame', '13', 'gt', '0.86'),
  ('frame', '14', 'gt', '0.86'),
  ('frame', '15', 'gt', '0.86'),
  ('frame', '16', 'gt', '0.86'),
  ('frame', '17', 'gt', '0.86'),
  ('frame', '18', 'gt', '0.86'),
  ('frame', '19', 'gt', '0.86'),
  ('frame', '20', 'gt', '0.86'),
  ('frame', '21', 'gt', '0.86'),
  ('frame', '22', 'gt', '0.86'),
  ('frame', '23', 'gt', '0.86'),
  ('frame', '24', 'gt', '0.86')]]

【讨论】:

  • 非常感谢。问题已经解决了。正如您正确指出的那样,我犯的错误是在 pool.join() 之前附加结果。非常感谢。 :)
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2021-12-05
  • 2018-02-13
  • 1970-01-01
  • 1970-01-01
  • 2017-09-29
  • 1970-01-01
  • 2013-09-12
相关资源
最近更新 更多