【问题标题】:How to avoid code blocking in a loop如何避免循环中的代码阻塞
【发布时间】:2018-04-23 16:59:43
【问题描述】:

我正在尝试从远程视频流服务器中提取帧,图像类型为 mjpeg,然后上传到 Google Cloud Storage 中的存储桶。

通过我的测试,我能够始终如一地每秒拉出约 30 张图像并将帧保存在我的机器中。但是,当我尝试使用他们的客户端库将所有单个帧上传到 GCS 时,它会先等待上传过程完成,然后返回并上传新收到的帧。

例如,我希望在 1s 内接收并上传所有0...30 图片,但由于我的代码是异步的,它只上传循环中收到的最新图片。

这是我的代码

class GCStream:

    def __init__(self):
        self.client = storage.Client()
        self.bucket = self.client.get_bucket(BUCKET_NAME)
        self.frame = None
        self.fileName = ""
        self.secondCount = 0
        self.startSecond = datetime.datetime.now().second
        self.vs = VideoStream(src=REMOTE_STREAM_MJPEG).start()


    def uploadFrame(self):
        try:
            blob = self.bucket.blob("{}/{}/{}".format(RPID_FOLDER, BY_DAY, self.fileName))
            blob.upload_from_filename(os.path.abspath(self.fileName))
            print("{} uploaded to {}/{}".format(self.fileName, RPID_FOLDER, BY_DAY))
            os.remove(self.fileName)
        except Exception as e:
            print(e)


    def readFrame(self):
        while True:
            self.frame = self.vs.read()
            self.secondCount = self.secondCount + 1

            if datetime.datetime.now().second != self.startSecond:
                self.startSecond = datetime.datetime.now().second
                self.secondCount = 0

            if self.frame is not None:
                self.frame = imutils.resize(self.frame, 800)
                self.fileName = "{}-{}.png".format(datetime.datetime.now().strftime("%H-%M-%S"), self.secondCount)
                cv2.imwrite(self.fileName, self.frame)

                self.uploadFrame()

                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break

        vs.release()
        cv2.destroyAllWindows() 


if __name__ == "__main__":
    stream = GCStream()
    stream.readFrame()

【问题讨论】:

    标签: python loops asynchronous stream google-cloud-storage


    【解决方案1】:

    经过数小时的反复试验,我终于实现了我想要的。本质是在我的程序中使用线程,这样它就不会阻塞其余代码的处理。

    我的程序现在有单独的线程来分别抓取、处理和上传帧。因此,一个线程能够收集所有图像,而其他线程将能够同时处理和上传图像。

    QUEUE_SIZE = 120
    
    grabbedFrames = Queue(QUEUE_SIZE)
    processFrames = Queue(QUEUE_SIZE)
    
    
    class FrameGraber(threading.Thread):
    
        def __init__(self):
            threading.Thread.__init__(self)
            self.vs = VideoStream(src=REMOTE_STREAM_MJPEG).start()
    
        def run(self):
            global grabbedFrames
    
            while True:
                frame = self.vs.read()
                if frame is not None:
                    grabbedFrames.put(frame)
                    time.sleep(0.05)
    
    
    class GCSUploader(threading.Thread):
    
        def __init__(self):
            threading.Thread.__init__(self)
            self.client = storage.Client()
            self.bucket = self.client.get_bucket(BUCKET_NAME)
    
        def run(self):
            global processFrames
    
            while True:
                if (not processFrames.empty()):
                    fileName = processFrames.get()
                    try:
                        blob = self.bucket.blob("{}/{}/{}".format(RPID_FOLDER, BY_DAY, fileName))
                        blob.upload_from_filename(os.path.abspath(fileName))
                        print("{} uploaded to {}/{}".format(fileName, RPID_FOLDER, BY_DAY))
                        os.remove(fileName)
                    except Exception as e:
                        print(e)
    
    
    class FrameProcessor(threading.Thread):
    
        def __init__(self):
            threading.Thread.__init__(self)
            self.secondCount = 0
            self.startSecond = datetime.datetime.now().second
    
        def run(self):
            global grabbedFrames, processFrames
            while True:
    
                if datetime.datetime.now().second != self.startSecond:
                    self.startSecond = datetime.datetime.now().second
                    self.secondCount = 0
    
                if (not grabbedFrames.empty()):
                    frame = grabbedFrames.get()
                    self.secondCount = self.secondCount + 1
                    frame = imutils.resize(frame, 800)
                    fileName = "{}-{}.png".format(datetime.datetime.now().strftime("%H-%M-%S"), self.secondCount)
                    processFrames.put(fileName)
                    cv2.imwrite(fileName, frame)
    
    
    if __name__ == "__main__":
        uploader = GCSUploader()
        frameGrabber = FrameGraber()
        frameProcessor = FrameProcessor()
    
        uploader.start()
        frameGrabber.start()
        frameProcessor.start()
    
        frameProcessor.join()
        frameGrabber.join()
        uploader.join()
    

    【讨论】:

      猜你喜欢
      • 2017-08-17
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-05-27
      • 1970-01-01
      • 2020-05-15
      • 1970-01-01
      相关资源
      最近更新 更多