【问题标题】:Specifying which item is to be consumed by which consumer for a Python multiprocessing Queue为 Python 多处理队列指定哪个消费者将使用哪个项目
【发布时间】:2021-12-23 22:50:24
【问题描述】:

以下代码正在进行中。这是我发布的here 关于另一个问题的升级。它使用管道在 GUI(生产者)和控制应用程序终止的控制器(消费者)之间传输(当前仅关闭事件)数据(就像在以前版本的代码中一样)。基本升级是用队列替换管道(用于在 GUI 和图像处理过程之间传递数据)。

我选择队列的原因是因为它是线程安全的,并且将来我想将每个独立的图像处理任务移动到单独的线程(如果可用)。例如,在从原始图像生成灰度图像后,我可以使用它执行多个独立的步骤 - 高斯和哈里斯角检测(1)、Canny 边缘检测(2,目前使用@987654322 显示结果图像的问题@ 所以忽略)等等。GUI 只是为了可视化,所以它接收单独项目的顺序并不重要(因此是队列)。

from multiprocessing import Process, Pipe, Queue
from threading import Thread
from concurrent.futures import ThreadPoolExecutor # For future use
import tkinter as tk
from tkinter import filedialog, ttk
from PIL import ImageTk, Image
import atexit
import cv2
import os


class ProcessGUI(object):
    def __init__(self, queue_gui_ip, pipe_gui_co_out):
        print("[ProcessGUI]: PID = " + str(os.getpid()))
        self.queue_gui_ip = queue_gui_ip
        self.pipe_gui_co_out = pipe_gui_co_out

        self.poll_image_data_thread = Thread(target=self.poll_image_data)

        self.setup_gui()
        self.run()

    def setup_gui(self):
        self.app = tk.Tk()
        self.app.protocol("WM_DELETE_WINDOW", self.close_event)

        fr_browse_controls = ttk.Frame(self.app)
        fr_browse_controls.pack(side=tk.TOP, expand=True, fill="x")

        lb_browse = ttk.Label(fr_browse_controls, text="Path to image file:")
        lb_browse.pack(side=tk.LEFT)

        self.ent_browse = ttk.Entry(fr_browse_controls)
        self.ent_browse.pack(side=tk.LEFT, expand=True, fill="x")

        btn_browse = ttk.Button(fr_browse_controls, text="Browse", command=self.btn_browse_clicked)
        btn_browse.pack(side=tk.RIGHT)

        self.edge_algs = tk.StringVar(self.app)
        om_edge_algs = ttk.OptionMenu(self.app, self.edge_algs, "canny", "canny", "sobelx", "sobely", "sobelxy")
        om_edge_algs.pack(side=tk.TOP, expand=True, fill="x")

        self.pb_load = ttk.Progressbar(self.app, orient=tk.HORIZONTAL, mode="determinate")
        self.pb_load.pack(side=tk.TOP, expand=True, fill="x")

        tw_images = ttk.Notebook(self.app)
        tw_images.pack(side=tk.BOTTOM, expand=True, fill="both")

        tb_original = ttk.Frame(tw_images)
        self.image_original = None
        tw_images.add(tb_original, text="RGB")
        self.lb_image_original = ttk.Label(tb_original, image=None)
        self.lb_image_original.pack(expand=True, fill="both")

        tb_gray = ttk.Frame(tw_images)
        self.image_gray = None
        tw_images.add(tb_gray, text="Grayscale")
        self.lb_image_gray = ttk.Label(tb_gray, image=None)
        self.lb_image_gray.pack(expand=True, fill="both")

        tb_gaussian = ttk.Frame(tw_images)
        self.image_gaussian = None
        tw_images.add(tb_gaussian, text="Gaussian")
        self.lb_image_gaussian = ttk.Label(tb_gaussian, image=None)
        self.lb_image_gaussian.pack(expand=True, fill="both")

        tb_edges = ttk.Frame(tw_images)
        self.image_edges = None
        tw_images.add(tb_edges, text="Edges")
        self.lb_image_edges = ttk.Label(tb_edges, image=None)
        self.lb_image_edges.pack(expand=True, fill="both")

        tb_corners = ttk.Frame(tw_images)
        self.image_corners = None
        tw_images.add(tb_corners, text="Harris corners")
        self.lb_image_corners = ttk.Label(tb_corners, image=None)
        self.lb_image_corners.pack(expand=True, fill="both")

    def run(self):
        try:
            self.poll_image_data_thread.start()
            self.app.mainloop()
        except KeyboardInterrupt:
            self.close_event()

    def poll_image_data(self):
        while True:
            request = self.queue_gui_ip.get()
            method = request[0]
            args = request[1:]

            print("------------[ProcessGUI]------------")
            print("Method: " + method)
            print("------------------------------------")

            try:
                getattr(self, method + "_callback")(*args)
            except AttributeError as ae:
                print("Unknown callback received from pipe", str(ae))

    def display_image_dims_callback(self, height, width, channels):
        print("[ProcessGUI]")
        print("Height: " + str(height))
        print("Width: " + str(width))
        print("Channels: " + str(channels))

    def display_image_processing_progress_callback(self, progress):
        progress = 0 + ((100 - 0) / (len(ProcessImageProcessing.ProcessingSteps) - 0)) * (progress - 0)
        print("[ProcessGUI]: Updating progress to " + str(progress) + "%")
        self.pb_load["value"] = progress

    def display_image_original_gray_callback(self, image_original, image_gray):
        self.image_original = ImageTk.PhotoImage(Image.fromarray(image_original))
        self.lb_image_original.configure(image=self.image_original)

        self.image_gray = ImageTk.PhotoImage(Image.fromarray(image_gray))
        self.lb_image_gray.configure(image=self.image_gray)

    def display_image_gaussian_callback(self, image_gaussian):
        self.image_gaussian = ImageTk.PhotoImage(Image.fromarray(image_gaussian))
        self.lb_image_gaussian.configure(image=self.image_gaussian)

    def display_image_edges_callback(self, image_edges):
        self.image_edges = ImageTk.PhotoImage(Image.fromarray(image_edges))
        self.lb_image_edges.configure(image=self.image_edges)

    def display_image_corners_callback(self, image_corners):
        self.image_corners = ImageTk.PhotoImage(Image.fromarray(image_corners))
        self.lb_image_corners.configure(image=self.image_corners)

    def btn_browse_clicked(self):
        filename = tk.filedialog.askopenfilename(initialdir=".",
                                                 title="Select image",
                                                 filetypes=(
                                                     ("Portable Network graphics", "*.png"),
                                                     ("All files", "*.*")))
        self.ent_browse.delete(0, tk.END)
        self.ent_browse.insert(0, filename)

        edge_alg = self.edge_algs.get()
        self.queue_gui_ip.put(["process_image", filename, edge_alg])

    def close_event(self):
        print("[ProcessGUI]: Shutting down")
        self.pipe_gui_co_out.send(["close"])
        if self.poll_image_data_thread.is_alive():
            self.poll_image_data_thread.join()

        self.pipe_gui_co_out.close()

        self.app.destroy()


class ProcessImageProcessing(object):
    ProcessingSteps = [
        "rgb",
        "gray",
        "gaussian",
        "edges",
        "corners"
    ]

    def __init__(self, queue_gui_ip):
        print("[ProcessImageProcessing]: PID = " + str(os.getpid()))
        self.queue_gui_ip = queue_gui_ip

        # atexit.register(self.close)
        self.run()

    def run(self):
        while True:
            request = self.queue_gui_ip.get()
            method = request[0]
            args = request[1:]

            print("------[ProcessImageProcessing]------")
            print("Method: " + method)
            print("------------------------------------")

            if "display_" in method:
                # Skip incoming requests that contain methods meant for ProcessGUI class
                continue

            try:
                getattr(self, method + "_callback")(*args)
            except AttributeError as ae:
                print("Unknown callback received from pipe", str(ae))

    def process_image_callback(self, image_path, edge_alg):
        print("[ProcessImageProcessing]: Received file \"" + image_path + "\"")
        try:
            progress = 0
            original_bgr = cv2.imread(image_path)
            original_rgb = cv2.cvtColor(original_bgr, cv2.COLOR_BGR2RGB)
            (height, width, channels) = original_bgr.shape[:3]
            progress += 1
            self.queue_gui_ip.put(["display_image_processing_progress", progress])
            self.queue_gui_ip.put(["display_image_dims", height, width, channels])
            gray = cv2.cvtColor(original_bgr, cv2.COLOR_BGR2GRAY)
            progress += 1
            self.queue_gui_ip.put(["display_image_processing_progress", progress])
            self.queue_gui_ip.put(["display_image_original_gray", original_rgb, gray])
            gaussian = self.process_image_gaussian(gray)
            progress += 1
            self.queue_gui_ip.put(["display_image_processing_progress", progress])
            self.queue_gui_ip.put(["display_image_gaussian", gaussian])
            try:
                edges = self.process_image_edges(gaussian, edge_alg)
                if not edges:
                    print("Unknown edge detection algorithm")
                else:
                    progress += 1
                    self.queue_gui_ip.put(["display_image_processing_progress", progress])
                    self.queue_gui_ip.put(["display_image_edges", edges])
            except:
                pass
            corners = self.process_image_corners(original_rgb, gray)
            progress += 1
            self.queue_gui_ip.put(["display_image_processing_progress", progress])
            self.queue_gui_ip.put(["display_image_corners", corners])
        except Exception as ex:
            print("Exception: " + str(ex))

    def process_image_gaussian(self, image_gray):
        gaussian = cv2.GaussianBlur(image_gray, (3, 3), cv2.BORDER_DEFAULT)
        return gaussian

    def process_image_edges(self, image_gaussian, edge_alg):
        edges = None

        if edge_alg not in ["canny", "sobelx", "sobely", "sobelxy"]:
            return edges

        # Sobel edge detection
        # Sobel edge detection on the X axis
        if edge_alg == "sobelx":
            print("Sobel X")
            edges = cv2.Sobel(src=image_gaussian, ddepth=cv2.CV_64F, dx=1, dy=0, ksize=5)
        # Sobel edge detection on the Y axis
        elif edge_alg == "sobely":
            print("Sobel Y")
            edges = cv2.Sobel(src=image_gaussian, ddepth=cv2.CV_64F, dx=0, dy=1, ksize=5)
        # Combined X and Y Sobel edge detection
        elif edge_alg == "sobelxy":
            print("Sobel XY")
            edges = cv2.Sobel(src=image_gaussian, ddepth=cv2.CV_64F, dx=1, dy=1, ksize=5)
        # Canny edge detection
        elif edge_alg == "canny":
            print("Canny")
            edges = cv2.Canny(image=image_gaussian, threshold1=100, threshold2=200)

        return edges

    def process_image_corners(self, image_original, image_gray):
        original_with_corners = image_original
        corners = cv2.cornerHarris(image_gray, 2, 3, 0.04)

        # result is dilated for marking the corners, not important
        corners = cv2.dilate(corners, None)

        # Threshold for an optimal value, it may vary depending on the image.
        original_with_corners[corners > 0.01 * corners.max()] = [0, 0, 255]

        return original_with_corners

    def close(self):
        print("[ProcessImageProcessing]: Shutting down")


class Controller(object):
    def __init__(self):
        print("[Controller]: PID = " + str(os.getpid()))
        queue_gui_ip = Queue()
        pipe_gui_co_out, self.pipe_co_in = Pipe()

        self.gui = Process(
            target=ProcessGUI,
            args=(queue_gui_ip, pipe_gui_co_out)
        )

        self.ip = Process(
            target=ProcessImageProcessing,
            args=(queue_gui_ip,)
        )

    def run(self):
        try:
            self.gui.start()
            self.ip.start()

            while self.pipe_co_in.poll(None):
                try:
                    request = self.pipe_co_in.recv()
                    method = request[0]
                    args = request[1:]

                    try:
                        getattr(self, method + "_callback")(*args)
                    except AttributeError as ae:
                        print("Unknown callback received from pipe", str(ae))
                except EOFError:
                    # Raised when nothing to receive from pipe
                    pass
        except KeyboardInterrupt:
            self.close_callback()
        except BrokenPipeError:
            self.close_callback()

    def close_callback(self):
        print("Quitting processes...")
        self.gui.join(1)
        if self.gui.is_alive():
            self.gui.terminate()

        self.ip.join(1)
        if self.ip.is_alive():
            self.ip.terminate()

        print("Finished")


def main():
    c = Controller()
    c.run()


if __name__ == "__main__":
    main()

我的问题在于Queue 的工作方式。与我从未遇到过此问题的双工 Pipe 不同,队列将其项目提供给两个进程 - ProcessGUIProcessImageProcessing。取回物品后,它(自然)被消耗掉。问题是我的主要生产者 - ProcessImageProcessing - 收到了大量已放入队列中的较小生产者 - ProcessGUI 的项目。这会导致商品被错误的消费者消费。

我正在寻找解决此问题的方法。起初我认为我可以消费一个项目,如果它不适合消费它的消费者,则将其放回队列中(因此打印语句和

if "display_" in method:
    continue

ProcessImageProcessing 内部,我正在考虑退回用于ProcessGUI 的项目。这显然有许多缺陷,包括检查每个项目的有效性,更重要的是,没有保证被放回队列的项目下次将被正确的消费者消费。这可能会导致商品被错误的消费者持续消费并无限期退回的情况。

我的下一个想法可能是为传入数据添加一个PipeProcessGUIProcessImageProcessing),并让Queue 用于从单个生产者(ProcessImageProcessing)传输图像数据和一个单一消费者 (ProcessGUI)。这似乎是正确的方法,但我很好奇是否有其他方法可以让我在 Python 中了解更多关于 IPC 和 Pipe/Queue 的信息。


这是更新后的代码(远非完美:D),带有额外的 Pipe 和“单一生产者,单一消费者”Queue

from multiprocessing import Process, Pipe, Queue
from threading import Thread
from concurrent.futures import ThreadPoolExecutor
import tkinter as tk
from tkinter import filedialog, ttk
from PIL import ImageTk, Image
import atexit
import cv2
import os


class ProcessGUI(object):
    def __init__(self, queue_ip_gui, pipe_gui_ip_in, pipe_gui_co_out):
        print("[ProcessGUI]: PID = " + str(os.getpid()))
        self.queue_ip_gui = queue_ip_gui
        self.pipe_gui_ip_in = pipe_gui_ip_in
        self.pipe_gui_co_out = pipe_gui_co_out

        self.poll_image_data_thread = Thread(target=self.poll_image_data)

        self.setup_gui()
        self.run()

    def setup_gui(self):
        # SAME AS ABOVE
        # ...

    def run(self):
        try:
            self.poll_image_data_thread.start()
            self.app.mainloop()
        except KeyboardInterrupt:
            self.close_event()

    def poll_image_data(self):
        while True:
            request = self.queue_ip_gui.get()
            method = request[0]
            args = request[1:]

            try:
                getattr(self, method + "_callback")(*args)
            except AttributeError as ae:
                print("Unknown callback received from pipe", str(ae))

    def display_image_dims_callback(self, height, width, channels):
        print("[ProcessGUI]")
        print("Height: " + str(height))
        print("Width: " + str(width))
        print("Channels: " + str(channels))

    def display_image_processing_progress_callback(self, progress):
        progress = 0 + ((100 - 0) / (len(ProcessImageProcessing.ProcessingSteps) - 0)) * (progress - 0)
        print("[ProcessGUI]: Updating progress to " + str(progress) + "%")
        self.pb_load["value"] = progress

    def display_image_original_gray_callback(self, image_original, image_gray):
        self.image_original = ImageTk.PhotoImage(Image.fromarray(image_original))
        self.lb_image_original.configure(image=self.image_original)

        self.image_gray = ImageTk.PhotoImage(Image.fromarray(image_gray))
        self.lb_image_gray.configure(image=self.image_gray)

    def display_image_gaussian_callback(self, image_gaussian):
        self.image_gaussian = ImageTk.PhotoImage(Image.fromarray(image_gaussian))
        self.lb_image_gaussian.configure(image=self.image_gaussian)

    def display_image_edges_callback(self, image_edges):
        self.image_edges = ImageTk.PhotoImage(Image.fromarray(image_edges))
        self.lb_image_edges.configure(image=self.image_edges)

    def display_image_corners_callback(self, image_corners):
        self.image_corners = ImageTk.PhotoImage(Image.fromarray(image_corners))
        self.lb_image_corners.configure(image=self.image_corners)

    def btn_browse_clicked(self):
        filename = tk.filedialog.askopenfilename(initialdir=".",
                                                 title="Select image",
                                                 filetypes=(
                                                     ("Portable Network graphics", "*.png"),
                                                     ("All files", "*.*")))
        self.ent_browse.delete(0, tk.END)
        self.ent_browse.insert(0, filename)

        edge_alg = self.edge_algs.get()
        self.pipe_gui_ip_in.send(["process_image", filename, edge_alg])

    def close_event(self):
        print("[ProcessGUI]: Shutting down")
        self.pipe_gui_co_out.send(["close"])
        if self.poll_image_data_thread.is_alive():
            self.poll_image_data_thread.join()

        self.pipe_gui_co_out.close()

        self.app.destroy()


class ProcessImageProcessing(object):
    ProcessingSteps = [
        "rgb",
        "gray",
        "gaussian",
        "edges",
        "corners"
    ]

    def __init__(self, queue_gui_ip, pipe_gui_ip_out):
        print("[ProcessImageProcessing]: PID = " + str(os.getpid()))
        self.queue_gui_ip = queue_gui_ip
        self.pipe_gui_ip_out = pipe_gui_ip_out
        self.run()

    def run(self):
        try:
            while self.pipe_gui_ip_out.poll(None):
                try:
                    request = self.pipe_gui_ip_out.recv()
                    method = request[0]
                    args = request[1:]
    
                    try:
                        getattr(self, method + "_callback")(*args)
                    except AttributeError as ae:
                        print("Unknown callback received from pipe", str(ae))
    
                except EOFError:
                    # Raised when nothing to receive from pipe
                    pass
        except KeyboardInterrupt:
            self.close()
        except BrokenPipeError:
            # Raised when exiting the process
            self.close()

    def process_image_callback(self, image_path, edge_alg):
        print("[ProcessImageProcessing]: Received file \"" + image_path + "\"")
        try:
            progress = 0
            original_bgr = cv2.imread(image_path)
            original_rgb = cv2.cvtColor(original_bgr, cv2.COLOR_BGR2RGB)
            (height, width, channels) = original_bgr.shape[:3]
            progress += 1
            self.queue_gui_ip.put(["display_image_processing_progress", progress])
            self.queue_gui_ip.put(["display_image_dims", height, width, channels])
            gray = cv2.cvtColor(original_bgr, cv2.COLOR_BGR2GRAY)
            progress += 1
            self.queue_gui_ip.put(["display_image_processing_progress", progress])
            self.queue_gui_ip.put(["display_image_original_gray", original_rgb, gray])
            gaussian = self.process_image_gaussian(gray)
            progress += 1
            self.queue_gui_ip.put(["display_image_processing_progress", progress])
            self.queue_gui_ip.put(["display_image_gaussian", gaussian])
            try:
                edges = self.process_image_edges(gaussian, edge_alg)
                if not edges:
                    print("Unknown edge detection algorithm")
                else:
                    progress += 1
                    self.queue_gui_ip.put(["display_image_processing_progress", progress])
                    self.queue_gui_ip.put(["display_image_edges", edges])
            except:
                pass
            corners = self.process_image_corners(original_rgb, gray)
            progress += 1
            self.queue_gui_ip.put(["display_image_processing_progress", progress])
            self.queue_gui_ip.put(["display_image_corners", corners])
        except Exception as ex:
            print("Exception: " + str(ex))

    def process_image_gaussian(self, image_gray):
        gaussian = cv2.GaussianBlur(image_gray, (3, 3), cv2.BORDER_DEFAULT)
        return gaussian

    def process_image_edges(self, image_gaussian, edge_alg):
        edges = None

        if edge_alg not in ["canny", "sobelx", "sobely", "sobelxy"]:
            return edges

        if edge_alg == "sobelx":
            print("Sobel X")
            edges = cv2.Sobel(src=image_gaussian, ddepth=cv2.CV_64F, dx=1, dy=0, ksize=5)
        elif edge_alg == "sobely":
            print("Sobel Y")
            edges = cv2.Sobel(src=image_gaussian, ddepth=cv2.CV_64F, dx=0, dy=1, ksize=5)
        elif edge_alg == "sobelxy":
            print("Sobel XY")
            edges = cv2.Sobel(src=image_gaussian, ddepth=cv2.CV_64F, dx=1, dy=1, ksize=5)
        elif edge_alg == "canny":
            print("Canny")
            edges = cv2.Canny(image=image_gaussian, threshold1=100, threshold2=200)

        return edges

    def process_image_corners(self, image_original, image_gray):
        original_with_corners = image_original
        corners = cv2.cornerHarris(image_gray, 2, 3, 0.04)
        corners = cv2.dilate(corners, None)
        original_with_corners[corners > 0.01 * corners.max()] = [0, 0, 255]

        return original_with_corners

    def close(self):
        print("[ProcessImageProcessing]: Shutting down")


class Controller(object):
    def __init__(self):
        print("[Controller]: PID = " + str(os.getpid()))
        queue_ip_gui = Queue()
        pipe_gui_ip_in, pipe_gui_ip_out = Pipe()
        pipe_gui_co_out, self.pipe_co_in = Pipe()

        self.gui = Process(
            target=ProcessGUI,
            args=(queue_ip_gui, pipe_gui_ip_in, pipe_gui_co_out)
        )

        self.ip = Process(
            target=ProcessImageProcessing,
            args=(queue_ip_gui, pipe_gui_ip_out)
        )

    def run(self):
        try:
            self.gui.start()
            self.ip.start()

            while self.pipe_co_in.poll(None):
                try:
                    request = self.pipe_co_in.recv()
                    method = request[0]
                    args = request[1:]

                    try:
                        getattr(self, method + "_callback")(*args)
                    except AttributeError as ae:
                        print("Unknown callback received from pipe", str(ae))
                except EOFError:
                    # Raised when nothing to receive from pipe
                    pass
        except KeyboardInterrupt:
            self.close_callback()
        except BrokenPipeError:
            self.close_callback()

    def close_callback(self):
        print("Quitting processes...")
        self.gui.join(1)
        if self.gui.is_alive():
            self.gui.terminate()

        self.ip.join(1)
        if self.ip.is_alive():
            self.ip.terminate()

        print("Finished")


def pipes():
    c = Controller()
    c.run()


if __name__ == "__main__":
    pipes()

【问题讨论】:

    标签: python multiprocessing queue pipe python-multiprocessing


    【解决方案1】:

    我认为最好的方法是为ProcessImageProcessing 进程设置一个输入队列,并为该进程将其结果写入到ProcessGUI 从中读取结果的输出队列。

    我还将使ProcessGui 成为从主进程启动的线程,并摆脱使用Pipe 来传达关闭事件并改用实际的threading.Event 实例。这允许主线程简单地等待 GUI 关闭,而不会在轮询循环中消耗 CPU 周期。最后,通过在ProcessGui 中使用daemon 线程处理队列get 请求,并使ProcessImageProcessing 进程成为daemon 进程,逻辑变得有些简化。

    from multiprocessing import Process, Pipe, Queue
    from threading import Thread, Event
    import tkinter as tk
    from tkinter import filedialog, ttk
    from PIL import ImageTk, Image
    import cv2
    import os
    
    class ProcessGUI(object):
        def __init__(self, queue_gui_ip, queue_gui_op, termination_event):
            print("[ProcessGUI]: PID = " + str(os.getpid()))
            self.queue_gui_ip = queue_gui_ip
            self.queue_gui_op = queue_gui_op
            self.termination_event = termination_event
    
            self.poll_image_data_thread = Thread(target=self.poll_image_data, daemon=True)
    
            self.setup_gui()
            self.run()
    
        def setup_gui(self):
            self.app = tk.Tk()
            self.app.protocol("WM_DELETE_WINDOW", self.close_event)
    
            fr_browse_controls = ttk.Frame(self.app)
            fr_browse_controls.pack(side=tk.TOP, expand=True, fill="x")
    
            lb_browse = ttk.Label(fr_browse_controls, text="Path to image file:")
            lb_browse.pack(side=tk.LEFT)
    
            self.ent_browse = ttk.Entry(fr_browse_controls)
            self.ent_browse.pack(side=tk.LEFT, expand=True, fill="x")
    
            btn_browse = ttk.Button(fr_browse_controls, text="Browse", command=self.btn_browse_clicked)
            btn_browse.pack(side=tk.RIGHT)
    
            self.edge_algs = tk.StringVar(self.app)
            om_edge_algs = ttk.OptionMenu(self.app, self.edge_algs, "canny", "canny", "sobelx", "sobely", "sobelxy")
            om_edge_algs.pack(side=tk.TOP, expand=True, fill="x")
    
            self.pb_load = ttk.Progressbar(self.app, orient=tk.HORIZONTAL, mode="determinate")
            self.pb_load.pack(side=tk.TOP, expand=True, fill="x")
    
            tw_images = ttk.Notebook(self.app)
            tw_images.pack(side=tk.BOTTOM, expand=True, fill="both")
    
            tb_original = ttk.Frame(tw_images)
            self.image_original = None
            tw_images.add(tb_original, text="RGB")
            self.lb_image_original = ttk.Label(tb_original, image=None)
            self.lb_image_original.pack(expand=True, fill="both")
    
            tb_gray = ttk.Frame(tw_images)
            self.image_gray = None
            tw_images.add(tb_gray, text="Grayscale")
            self.lb_image_gray = ttk.Label(tb_gray, image=None)
            self.lb_image_gray.pack(expand=True, fill="both")
    
            tb_gaussian = ttk.Frame(tw_images)
            self.image_gaussian = None
            tw_images.add(tb_gaussian, text="Gaussian")
            self.lb_image_gaussian = ttk.Label(tb_gaussian, image=None)
            self.lb_image_gaussian.pack(expand=True, fill="both")
    
            tb_edges = ttk.Frame(tw_images)
            self.image_edges = None
            tw_images.add(tb_edges, text="Edges")
            self.lb_image_edges = ttk.Label(tb_edges, image=None)
            self.lb_image_edges.pack(expand=True, fill="both")
    
            tb_corners = ttk.Frame(tw_images)
            self.image_corners = None
            tw_images.add(tb_corners, text="Harris corners")
            self.lb_image_corners = ttk.Label(tb_corners, image=None)
            self.lb_image_corners.pack(expand=True, fill="both")
    
        def run(self):
            try:
                self.poll_image_data_thread.start()
                self.app.mainloop()
            except KeyboardInterrupt:
                self.close_event()
    
        def poll_image_data(self):
            while True:
                request = self.queue_gui_op.get()
                method = request[0]
                args = request[1:]
    
                print("------------[ProcessGUI]------------")
                print("Method: " + method)
                print("------------------------------------")
    
                try:
                    getattr(self, method + "_callback")(*args)
                except AttributeError as ae:
                    print("Unknown callback received from pipe", str(ae))
    
        def display_image_dims_callback(self, height, width, channels):
            print("[ProcessGUI]")
            print("Height: " + str(height))
            print("Width: " + str(width))
            print("Channels: " + str(channels))
    
        def display_image_processing_progress_callback(self, progress):
            progress = 0 + ((100 - 0) / (len(ProcessImageProcessing.ProcessingSteps) - 0)) * (progress - 0)
            print("[ProcessGUI]: Updating progress to " + str(progress) + "%")
            self.pb_load["value"] = progress
    
        def display_image_original_gray_callback(self, image_original, image_gray):
            self.image_original = ImageTk.PhotoImage(Image.fromarray(image_original))
            self.lb_image_original.configure(image=self.image_original)
    
            self.image_gray = ImageTk.PhotoImage(Image.fromarray(image_gray))
            self.lb_image_gray.configure(image=self.image_gray)
    
        def display_image_gaussian_callback(self, image_gaussian):
            self.image_gaussian = ImageTk.PhotoImage(Image.fromarray(image_gaussian))
            self.lb_image_gaussian.configure(image=self.image_gaussian)
    
        def display_image_edges_callback(self, image_edges):
            self.image_edges = ImageTk.PhotoImage(Image.fromarray(image_edges))
            self.lb_image_edges.configure(image=self.image_edges)
    
        def display_image_corners_callback(self, image_corners):
            self.image_corners = ImageTk.PhotoImage(Image.fromarray(image_corners))
            self.lb_image_corners.configure(image=self.image_corners)
    
        def btn_browse_clicked(self):
            filename = tk.filedialog.askopenfilename(initialdir=".",
                                                     title="Select image",
                                                     filetypes=(
                                                         ("Portable Network graphics", "*.png"),
                                                         ("All files", "*.*")))
            self.ent_browse.delete(0, tk.END)
            self.ent_browse.insert(0, filename)
    
            edge_alg = self.edge_algs.get()
            self.queue_gui_ip.put(["process_image", filename, edge_alg])
    
        def close_event(self):
            print("[ProcessGUI]: Shutting down")
    
            self.app.destroy()
            self.termination_event.set() # show were are terminating
    
    
    class ProcessImageProcessing(object):
        ProcessingSteps = [
            "rgb",
            "gray",
            "gaussian",
            "edges",
            "corners"
        ]
    
        def __init__(self, queue_gui_ip, queue_gui_op):
            print("[ProcessImageProcessing]: PID = " + str(os.getpid()))
            self.queue_gui_ip = queue_gui_ip
            self.queue_gui_op = queue_gui_op
    
            self.run()
    
        def run(self):
            while True:
                request = self.queue_gui_ip.get()
                method = request[0]
                args = request[1:]
    
                print("------[ProcessImageProcessing]------")
                print("Method: " + method)
                print("------------------------------------")
    
                if "display_" in method:
                    # Skip incoming requests that contain methods meant for ProcessGUI class
                    continue
    
                try:
                    getattr(self, method + "_callback")(*args)
                except AttributeError as ae:
                    print("Unknown callback received from pipe", str(ae))
    
        def process_image_callback(self, image_path, edge_alg):
            print("[ProcessImageProcessing]: Received file \"" + image_path + "\"")
            try:
                progress = 0
                original_bgr = cv2.imread(image_path)
                original_rgb = cv2.cvtColor(original_bgr, cv2.COLOR_BGR2RGB)
                (height, width, channels) = original_bgr.shape[:3]
                progress += 1
                self.queue_gui_op.put(["display_image_processing_progress", progress])
                self.queue_gui_op.put(["display_image_dims", height, width, channels])
                gray = cv2.cvtColor(original_bgr, cv2.COLOR_BGR2GRAY)
                progress += 1
                self.queue_gui_op.put(["display_image_processing_progress", progress])
                self.queue_gui_op.put(["display_image_original_gray", original_rgb, gray])
                gaussian = self.process_image_gaussian(gray)
                progress += 1
                self.queue_gui_op.put(["display_image_processing_progress", progress])
                self.queue_gui_op.put(["display_image_gaussian", gaussian])
                try:
                    edges = self.process_image_edges(gaussian, edge_alg)
                    if not edges:
                        print("Unknown edge detection algorithm")
                    else:
                        progress += 1
                        self.queue_gui_op.put(["display_image_processing_progress", progress])
                        self.queue_gui_op.put(["display_image_edges", edges])
                except:
                    pass
                corners = self.process_image_corners(original_rgb, gray)
                progress += 1
                self.queue_gui_op.put(["display_image_processing_progress", progress])
                self.queue_gui_op.put(["display_image_corners", corners])
            except Exception as ex:
                print("Exception: " + str(ex))
    
        def process_image_gaussian(self, image_gray):
            gaussian = cv2.GaussianBlur(image_gray, (3, 3), cv2.BORDER_DEFAULT)
            return gaussian
    
        def process_image_edges(self, image_gaussian, edge_alg):
            edges = None
    
            if edge_alg not in ["canny", "sobelx", "sobely", "sobelxy"]:
                return edges
    
            # Sobel edge detection
            # Sobel edge detection on the X axis
            if edge_alg == "sobelx":
                print("Sobel X")
                edges = cv2.Sobel(src=image_gaussian, ddepth=cv2.CV_64F, dx=1, dy=0, ksize=5)
            # Sobel edge detection on the Y axis
            elif edge_alg == "sobely":
                print("Sobel Y")
                edges = cv2.Sobel(src=image_gaussian, ddepth=cv2.CV_64F, dx=0, dy=1, ksize=5)
            # Combined X and Y Sobel edge detection
            elif edge_alg == "sobelxy":
                print("Sobel XY")
                edges = cv2.Sobel(src=image_gaussian, ddepth=cv2.CV_64F, dx=1, dy=1, ksize=5)
            # Canny edge detection
            elif edge_alg == "canny":
                print("Canny")
                edges = cv2.Canny(image=image_gaussian, threshold1=100, threshold2=200)
    
            return edges
    
        def process_image_corners(self, image_original, image_gray):
            original_with_corners = image_original
            corners = cv2.cornerHarris(image_gray, 2, 3, 0.04)
    
            # result is dilated for marking the corners, not important
            corners = cv2.dilate(corners, None)
    
            # Threshold for an optimal value, it may vary depending on the image.
            original_with_corners[corners > 0.01 * corners.max()] = [0, 0, 255]
    
            return original_with_corners
    
        def close(self):
            print("[ProcessImageProcessing]: Shutting down")
    
    
    class Controller(object):
        def __init__(self):
            print("[Controller]: PID = " + str(os.getpid()))
            queue_gui_ip = Queue()
            queue_gui_op = Queue()
    
            # A Thread!
            self.termination_event = Event()
            self.gui = Thread(
                target=ProcessGUI,
                args=(queue_gui_ip, queue_gui_op, self.termination_event)
            )
    
            # Make it a daemon process:
            self.ip = Process(
                target=ProcessImageProcessing,
                args=(queue_gui_ip, queue_gui_op),
                daemon=True
            )
    
        def run(self):
    
            try:
                self.gui.start()
                self.ip.start()
                self.termination_event.wait() # wait for termination
                self.close_callback()
            except KeyboardInterrupt:
                self.close_callback()
    
        def close_callback(self):
            print("Quitting processes...")
            self.gui.join(1)
            if self.gui.is_alive():
                self.gui.terminate()
    
            print("Finished")
    
    
    def main():
        c = Controller()
        c.run()
    
    
    if __name__ == "__main__":
        main()
    

    【讨论】:

    • 这不能解决您的问题还是我遗漏了什么?
    • 抱歉稍后回复。虽然您提供了改进(顺便说一句,我已将其包含在我的初始代码的不同版本中),但我仍然对替代方案感兴趣,其中多个进程共享同一个队列并且每个“知道”是否要处理一个项目过程或独自留给另一个人。我认为这需要某种代理,它接收所有项目,然后将这些项目重新发布到仅位于它和作为实际目标的进程之间的队列。
    猜你喜欢
    • 1970-01-01
    • 2019-10-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-06-09
    • 1970-01-01
    • 2010-11-17
    相关资源
    最近更新 更多