【问题标题】:Python multiprocessing returns queue is empty when actually it is notPython多处理返回队列实际上不是空的
【发布时间】:2018-11-29 13:04:05
【问题描述】:

在这个程序中,经过一些迭代后,所有进程都会终止,这意味着 input_queue 根据目标函数中的条件为空。但是当我打印 input_queue 时返回主函数后,该队列中仍有项目,那么为什么这些多个进程首先终止?

import cv2
import timeit
import face_recognition
import queue
from multiprocessing import Process, Queue
import multiprocessing
import os

s = timeit.default_timer()

def alternative_process_target_func(input_queue, output_queue):

    while not input_queue.empty():
        try:
            frame_no, small_frame, face_loc = input_queue.get(False)  # or input_queue.get_nowait()
            print('Frame_no: ', frame_no, 'Process ID: ', os.getpid(), '----', multiprocessing.current_process())

        except queue.Empty:
            print('___________________________________ Breaking __________________________________________________')
            break  # stop when there is nothing more to read from the input


def alternative_process(file_name):
    start = timeit.default_timer()
    cap = cv2.VideoCapture(file_name)
    frame_no = 1
    fps = cap.get(cv2.CAP_PROP_FPS)
    length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    print('Frames Per Second: ', fps)
    print('Total Number of frames: ', length)
    print('Duration of file: ', int(length / fps))
    processed_frames = 1
    not_processed = 1
    frames = []
    process_this_frame = True
    frame_no = 1
    Input_Queue = Queue()
    while (cap.isOpened()):
        ret, frame = cap.read()
        if not ret:
            print('Size of input Queue: ', Input_Queue.qsize())
            print('Total no of frames read: ', frame_no)
            end1 = timeit.default_timer()
            print('Time taken to fetch useful frames: ', end1 - start)
            threadn = cv2.getNumberOfCPUs()
            Output_Queue = Queue(maxsize=Input_Queue.qsize())
            process_list = []
            #quit = multiprocessing.Event()
            #foundit = multiprocessing.Event()

            for x in range((threadn - 1)):
                # print('Process No : ', x)
                p = Process(target=alternative_process_target_func, args=(Input_Queue, Output_Queue))#, quit, foundit
                p.daemon = True
                #print('I am a new process with process id of: ', os.getpid())
                p.start()
                process_list.append(p)
                #p.join()

            i = 1
            for proc in process_list:
                print('I am hanged here and my process id is : ', os.getpid())
                proc.join()
                print('I have been joined and my process id is : ', os.getpid())
                i += 1

            for value in range(Output_Queue.qsize()):
                print(Output_Queue.get())

            end = timeit.default_timer()
            print('Time taken by face verification: ', end - start)
            print('--------------------------------------------------------------------------------------------------')

            #Here I am again printing the Input Queue which should be empty logically.
            for frame in range(Input_Queue.qsize()):
                frame_no, _, _ = Input_Queue.get()
                print(frame_no)

            break

        if process_this_frame:
            print(frame_no)
            small_frame = cv2.resize(frame, (0, 0), fx=0.25, fy=0.25)
            rgb_small_frame = small_frame[:, :, ::-1]
            face_locations = face_recognition.face_locations(rgb_small_frame)
            # frames.append((rgb_small_frame, face_locations))
            Input_Queue.put((frame_no, rgb_small_frame, face_locations))
            frame_no += 1

        if processed_frames < 5:
            processed_frames += 1
            not_processed = 1

        else:
            if not_processed < 15:
                process_this_frame = False
                not_processed += 1
            else:

                processed_frames = 1
                process_this_frame = True
                print('-----------------------------------------------------------------------------------------------')

    cap.release()
    cv2.destroyAllWindows()

#chec_queues()
#compare_images()
#fps_finder()
alternative_process('user_verification_2.avi')#'hassan_checking.avi'

【问题讨论】:

标签: python parallel-processing queue multiprocessing


【解决方案1】:

您的代码包含while not input_queue.empty()。我猜在工作期间input_queue 变为空,while 循环停止,然后您将其他内容添加到input_queue 以处理其他内容。但此时为时已晚。

通常你使用这样的队列:

while True:
    element = my_queue.get()
    ...

要停止此循环,您可以计算处理元素的数量,使用timeout 参数或在某些条件下终止进程。另一种选择是使用multiprocessing.Poolconcurrent.futures.ProcessPoolExecutor

【讨论】:

  • 不,在目标函数的while循环之后,我不会在输入队列中添加任何额外的东西。但是我能够通过使用 Manager.queue 来解决这个问题,但我仍然不知道为什么之前的那个不起作用以及为什么使用管理器队列可以解决错误。
  • @Muhammadhassan 查看代码我看到你在Input_Queue.put((frame_no, rgb_small_frame, face_locations)) 之前做了p.start()
  • 不,“if not ret”条件确保在进程启动后不会将任何项目添加到输入队列中。
  • @Muhammadhassan 代码背后的逻辑对我来说是模糊的。我看到的是以下内容 - 如果ret 在一段时间后变为False,那么您可能会在input_queue 中获得无限数量的帧,因为每20 步一次process_this_frameTrue(或20 个中有5 个? )。请理解自己代码的逻辑。我想你应该重新组织代码。
猜你喜欢
  • 1970-01-01
  • 2018-06-30
  • 2022-12-09
  • 2011-10-29
  • 2012-02-23
  • 2021-12-07
  • 1970-01-01
  • 2012-07-11
  • 1970-01-01
相关资源
最近更新 更多