【问题标题】:Why am I getting such bad results with multiprocessing?为什么使用多处理会得到如此糟糕的结果?
【发布时间】:2021-06-01 10:40:01
【问题描述】:

我正在为学校作业做一些并行化。将图片转换为灰度。但是自从我实施multiprocessing.Array 以来,我得到了可怕的结果。随着处理器数量的增加,情况更糟。

当我将结果输入 numpy 数组时,速度很好并且符合预期(但显然我没有得到有效的图片)。但是自从我切换到 mp Array 后,它变得迟缓了,但我从中得到了不错的灰度图片。

我也尝试使用 processexecutor 来实现它,但是当我将 mp 数组作为参数传递时它冻结了(甚至没有到达被调用函数第一行的 pid print)

代码:

from PIL import Image
import numpy
import time
from multiprocessing import Process
from multiprocessing import Array

if __name__ == "__main__":
    main2()


def main2():
    for i in range(3):
        filename = "%d.jpg" % i
        cols, rows, mpx, source_picture, result_array_seq, result_array_par = load_picture(filename)
        print(filename, "%dx%d" % (cols, rows), "%f Mpx" % mpx)
        seq_time, new_image = grayscale_seq(cols, rows, source_picture, result_array_seq)
        new_image.save("%d_gray_seq.jpg" % i)
        print("Seq run:", time.strftime('%H:%M:%S', time.gmtime(seq_time)))
        for p in range(2, 7, 2): #run on 2, 4 and 6 cores
            par_time, new_image = grayscale_par(cols, rows, source_picture, result_array_par, p)
            new_image.save("%d_gray_par_%d.jpg" % (i, p))
            print("Par run with %d cores" % p, time.strftime('%H:%M:%S', time.gmtime(par_time)))


def grayscale_seq(cols, rows, source_pic, dest_pic):
    time_start = time.time()
    for row in range(rows):
        for col in range(cols):
            r, g, b = source_pic[row, col]
            dest_pic[row, col] = calculate_rgb(r, g, b)
    time_end = time.time()
    time_diff = (time_end - time_start)
    result_array = numpy.array(dest_pic).astype(numpy.uint8)
    new_image = Image.fromarray(result_array)

    return time_diff, new_image


def grayscale_par(cols, rows, source_pic, dest_pic, num_of_cpus):
    tuples = splitIndex(num_of_cpus, rows)
    process_array = []
    for i in range(len(tuples)):
        start_index, end_index = tuples[i]
        picture_slice = source_pic[start_index:end_index, :, :]
        p = Process(target=grayscaleSlice, args=(picture_slice, dest_pic, tuples[i][0], cols))
        process_array.append(p)

    time_start = time.time()
    for process in process_array:
        process.start()
        # process.join()
    for process in process_array:
        process.join()
    time_end = time.time()
    time_diff = (time_end - time_start)
    image = numpy.array(dest_pic).astype(numpy.uint8).reshape(rows, cols)
    new_image = Image.fromarray(image)
    return time_diff, new_image


def splitIndex(numprocessors, height):
    resultarray = [0]
    previousindex = 0
    for i in range(numprocessors - 1):
        result = previousindex + height // numprocessors
        resultarray.append(result)
        previousindex = result
    indextuples = []
    for i in range(numprocessors):
        if i == numprocessors - 1:
            indextuples.append((resultarray[i], height))
            break
        indextuples.append((resultarray[i], resultarray[i + 1]))
    return indextuples


def grayscaleSlice(picture_slice, array, start_index, cols):
    for row in range(len(picture_slice)):
        for col in range(cols):
            R = picture_slice[row, col, 0]
            G = picture_slice[row, col, 1]
            B = picture_slice[row, col, 2]
            result = int(round(0.299 * R + 0.587 * G + 0.114 * B))
            array[start_index * cols + row * cols + col] = result


def load_picture(filename):
    im = Image.open(filename)
    cols, rows = im.size
    mpx = (cols * rows) / 1000000
    source_picture = numpy.asarray(im)
    result_array_seq = numpy.zeros((rows, cols))
    result_array_par = Array('i', rows * cols)
    return cols, rows, mpx, source_picture, result_array_seq, result_array_par


def calculate_rgb(r, g, b):
    return int(round(0.299 * r + 0.587 * g + 0.114 * b))

结果:

0.jpg 500x375 0.187500 Mpx
Seq run: 00:00:06
Par run with 2 cores 00:00:01
Par run with 4 cores 00:00:01
Par run with 6 cores 00:00:02

1.jpg 1920x1200 2.304000 Mpx
Seq run: 00:00:16
Par run with 2 cores 00:00:12
Par run with 4 cores 00:00:18
Par run with 6 cores 00:00:21

2.jpg 3100x2074 6.429400 Mpx
Seq run: 00:00:47
Par run with 2 cores 00:00:32
Par run with 4 cores 00:00:45
Par run with 6 cores 00:01:03

【问题讨论】:

  • 您没有发布 splitIndexgrayscaleSlice 的实现,但作为猜测,您似乎让每个调用者一次将一个像素的数据提供给共享数组。由于在进程之间发送数据可能是该过程中成本最高的部分,因此速度会非常慢。
  • 是的,我刚刚意识到它已经存在了。我预计同步会从并行计算中获取一些处理能力,但老实说,我没想到它会产生如此巨大的影响。应该如何解决这个问题以充分利用多个处理器?以某种方式在各个进程中创建每个切片,然后将它们拼接在一起?
  • 你无法避免进程创建的开销(这也是后来添加线程的原因之一)。一个解决方案是让进程保持活跃,并尽可能少地使用共享资源。

标签: python image-processing multiprocessing


【解决方案1】:

每次访问共享内存组件(例如多处理的 Array)时,您都必须进行一些进程间通信。当然,这比正常的内存访问要慢,更糟糕的是,它往往会锁定数组,这意味着您的工作人员主要是相互锁定,而不是做真正的工作。

如果不是一次发送一个像素,而是批量处理结果,则可以加快处理速度。例如,如果您更改为列出需要更新哪些像素的模型,并让您的主要工作人员执行更新目标数组的实际工作:

def grayscale_par(cols, rows, source_pic, dest_pic, num_of_cpus):
    # dest_pic is now a numpy array, just like in grayscale_seq
    tuples = splitIndex(num_of_cpus, rows)
    process_array = []
    # A queue for the workers to store result sets into
    queue = Queue()
    for i in range(len(tuples)):
        start_index, end_index = tuples[i]
        picture_slice = source_pic[start_index:end_index, :, :]
        p = Process(target=grayscaleSlice, args=(queue, picture_slice, tuples[i][0], cols))
        process_array.append(p)

    time_start = time.time()
    for process in process_array:
        process.start()
    # Go ahead and apply the result set from each worker once it's ready
    # We want to do this once for each process we launched, so 
    # loop through that array.  The value is ignored, so just use an
    # underscore here.
    for _ in process_array:
        # Now, for each process, call queue.get() to get it's result
        # array.  This will block till it returns.  Once it returns
        # we iterate through that array, one item at a time.
        # Each item is just a tuple telling us which pixel to
        # update to which value
        for row, col, value in queue.get():
            # So, finally, we update each pixel in turn
            dest_pic[row, col] = value
    for process in process_array:
        process.join()
    time_end = time.time()
    time_diff = (time_end - time_start)
    image = numpy.array(dest_pic).astype(numpy.uint8)
    new_image = Image.fromarray(image)
    return time_diff, new_image


def grayscaleSlice(queue, picture_slice, start_index, cols):
    # Just store the result in an array
    result = []
    for row in range(len(picture_slice)):
        for col in range(cols):
            R = picture_slice[row, col, 0]
            G = picture_slice[row, col, 1]
            B = picture_slice[row, col, 2]
            value = int(round(0.299 * R + 0.587 * G + 0.114 * B))
            result.append((start_index + row, col, value))
    # All done, go ahead and send the result set along to the main thread
    queue.put(result)

这将提高线程的使用效率。在我的机器上,带有示例图像,这会输出以下内容,显示更多内核的速度明显提高。

0.jpg 2272x1704 3.871488 Mpx
Seq run: 00:00:29
Par run with 2 cores 00:00:14
Par run with 4 cores 00:00:08
Par run with 6 cores 00:00:06

【讨论】:

  • 太棒了,谢谢。我很难理解这个for _ in process_array:for row, col, value in queue.get():dest_pic[row, col] = value 我认为它会等待队列,直到它有一些可用的值?像 dart 中的 streamlistener 一样?
  • 我想你基本上明白了,但这是一个公平的问题。我用更多的 cmets 充实了这部分,希望您能详细了解每个步骤。
  • 嗨@Anon,我有一个关于在Windows 笔记本电脑here 上运行multiprocessing.Pool 的问题。我希望你能花一些时间来检查一下这个问题。非常感谢您的帮助!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-11-04
  • 2020-05-10
  • 2015-07-17
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多