【问题标题】:Properly Designing a Multiprocessing.Manager Custom Object正确设计 Multiprocessing.Manager 自定义对象
【发布时间】:2015-07-23 07:27:12
【问题描述】:

我想使用 multiprocessing.Manager() 对象,这样我就可以将信息从工作人员异步发送到管理器以将信息发送到服务器。我拥有大约 10 个将 PDF 写入磁盘的实例。然后我想使用多处理包中的管理器对象将该数据发送到我的 S3 存储桶,因为我不想阻止本地内容生成。

所以我想知道如果我创建一个自定义管理器对象,这是正确的方法吗?提交给管理器对象的每个进程都会排队吗?或者如果我调用多个上传,经理会放弃一些调用吗?

下面是我想做的示例代码:

from multiprocessing.managers import BaseManager

class UploadClass(object):
    def upload(self, filePath, params, destUrl):
        # do stuff
        return results

class MyManager(BaseManager):
    pass

MyManager.register('uploads', UploadClass)

if __name__ == '__main__':
    manager = MyManager()
    manager.start()
    upload = manager.uploads()
    # do this wait for completion or do they perform this async
    print upload.upload(r"< path >", {...}, "some url")
    print upload.upload(r"< path >", {...}, "some url")

【问题讨论】:

  • 澄清一下:您想要有十个不同的进程(这些是同一个 python 脚本的唯一实例,还是只是在一个脚本中产生的 multiprocessing.Process 实例?),它们都将 PDF 写入磁盘。一旦完成写入,每个实例都会将文件的路径发送到单个multiprocessing.Manager,它应该一次上传一个文件(意味着没有并行上传)。对吗?
  • 另外,您是否关心从上传过程中获取结果?还是您只是想在后台启动上传并忘记它?
  • @dano - 从进程中取回某种消息以确保进程正常运行会很有帮助。

标签: python python-2.7 amazon-s3 multiprocessing


【解决方案1】:

直接回答您的一些问题:

提交给管理器对象的每个进程都会排队吗?

Manager 服务器会生成一个新线程来处理每个传入的请求,因此您的所有请求都会立即开始处理。你可以在multiprocessing/managers.py里面看到这个:

def serve_forever(self):
    '''
    Run the server forever
    '''
    current_process()._manager_server = self
    try:
        try:
            while 1:
                try:
                    c = self.listener.accept()
                except (OSError, IOError):
                    continue
                t = threading.Thread(target=self.handle_request, args=(c,))
                t.daemon = True
                t.start()
        except (KeyboardInterrupt, SystemExit):
            pass
    finally:
        self.stop = 999
        self.listener.close()

如果我调用多个上传,经理会放弃一些调用吗?

不,不会掉线。

# do this wait for completion or do they perform this async
print upload.upload(r"< path >", {...}, "some url")
print upload.upload(r"< path >", {...}, "some url")

upload.upload 的两个调用都是同步的;在UploadClass.upload 完成之前,他们不会返回。但是,如果您有多个脚本/线程/进程同时调用 upload.upload,则每个唯一调用都将同时发生在 Manager 服务器进程中其自己的线程内。

还有你最重要的问题:

这是正确的做法吗?

如果我正确理解了这个问题,我会说不。如果您只有一个脚本,然后在该脚本中生成十个 multiprocessing.Process 实例来写出 PDF,那么您应该只使用另一个 multiprocessing.Process 来处理上传:

def upload(self, q):
    for payload in iter(q.get, None):  # Keep getting from the queue until a None is found
        filePath, params, destUrl = payload
        # do stuff

def write_pdf(pdf_file_info, q):
   # write a pdf to disk here
   q.put((filepath, params, destUrl))  # Send work to the uploader
   # Move on with whatever comes next.

if __name__ == '__main__':
    pdf_queue = multiprocessing.Queue()

    # Start uploader
    upload_proc = multiprocessing.Process(upload, args=(pdf_queue,))
    upload_proc.start()

    # Start pdf writers
    procs = []
    for pdf in pdfs_to_write: 
         p = multiprocessing.Process(write_pdf, args=(pdf, pdf_queue))
         p.start()
         p.append(procs)

    # Wait for pdf writers and uploader to finish.
    for p in procs:
        p.join()
    pdf_queue.put(None) # Sending None breaks the for loop inside upload
    upload_proc.join()

如果您实际上对并发上传没问题,那么根本不需要单独的upload 进程 - 只需直接从 pdf 编写进程上传即可。

不过,从您的问题中很难判断这是否正是您正在做的事情。澄清后,我将调整最后一部分以适合您的特定用例。

【讨论】:

  • 为什么要生成进程将数据放入队列而不是直接从主进程放入?
  • @Sir_FZ OP 表示他有多个编写 PDF 的实例:“我所拥有的大约是 10 个将 PDF 写入磁盘的实例”。所以有多个工人最终将项目并行放入队列中。
  • 好点。但是在使用Manager时,OP的优势在于可以同时处理多个上传(因为Manager会为每个请求分叉一个线程),并且由于涉及IO,因此适用并发。在您的解决方案中,您只有一个进程按顺序处理上传。我建议在上传过程中使用 ThreadPool 来异步处理放入队列的上传请求。
  • @Sir_FZ 我的印象是 OP 不希望上传同时运行。我不明白他为什么还要使用Manager 而不是仅仅产生一个线程或类似的东西。如果并发上传真的没问题,那么在上传过程中运行一个线程池肯定是有意义的。您甚至可以为每个工作进程创建一个线程池,以避免将数据传递到单独的上传进程的 IPC 开销,尽管这会使优化正在运行的上传线程的数量变得更加困难。
  • 完全同意。我猜他更喜欢根据他的示例以及请求是按顺序运行还是异步运行的问题来异步完成上传。这应该在问题中更清楚。
猜你喜欢
  • 2010-11-20
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2013-03-12
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2010-12-08
相关资源
最近更新 更多