【问题标题】:Python multiprocessing with unpicklable objects具有不可提取对象的 Python 多处理
【发布时间】:2012-11-27 23:13:36
【问题描述】:

目标:

  • 使用带有线程或进程的 SQLAlchemy 在数据库中运行约 40 个大型查询,将相应的 SQLA ResultProxies 放入 Queue.Queue(由 multiprocessing.Manager 处理)
  • 同时,将结果写入 .csv 文件,其中包含多个消耗所述队列的进程

当前状态:

  • QueryThread 和 WriteThread 类运行查询和写入数据;由于查询需要一些时间来运行,因此 GIL 处理线程的方式不会造成明显的性能损失
  • 另一方面,写入文件需要很长时间;事实上,尽管最初的想法是运行 WriteThread 类的多个线程,但使用单个线程可以获得最佳性能。

因此产生了使用多处理的想法;我希望能够同时写入输出,而不是受 CPU 限制,而是受 I/O 限制。

撇开背景不谈,这里的问题(本质上是一个设计问题)——multiprocessing library 的工作原理是腌制对象,然后将数据传送到其他衍生的进程;但是我尝试在 WriteWorker Process 中使用的 ResultProxy 对象和共享队列不可提取,这会导致以下消息(不是逐字记录,但足够接近):

pickle.PicklingError: Can't pickle object in WriteWorker.start()

所以对各位有帮助的人来说,有什么想法可以避免这个问题吗?这似乎是一个简单的、经典的生产者-消费者问题,我想出了解决方案很简单,我只是想多了

感谢任何帮助或反馈!谢谢:)

编辑:这里有一些相关的sn-ps代码,如果我可以提供任何其他上下文,请告诉我

来自父类:

#init manager and queues
self.manager = multiprocessing.Manager()
self.query_queue = self.manager.Queue()
self.write_queue = self.manager.Queue()


def _get_data(self):
    #spawn a pool of query processes, and pass them query queue instance
    for i in xrange(self.NUM_QUERY_THREADS):
        qt = QueryWorker.QueryWorker(self.query_queue, self.write_queue, self.config_values, self.args)
        qt.daemon = True
        # qt.setDaemon(True)
        qt.start()

    #populate query queue
    self.parse_sql_queries()

    #spawn a pool of writer processes, and pass them output queue instance
    for i in range(self.NUM_WRITE_THREADS):
        wt = WriteWorker.WriteWorker(self.write_queue, self.output_path, self.WRITE_BUFFER, self.output_dict)
        wt.daemon = True
        # wt.setDaemon(True)
        wt.start()

    #wait on the queues until everything has been processed
    self.query_queue.join()
    self.write_queue.join()

来自 QueryWorker 类:

def run(self):
    while True:
        #grabs host from query queue
        query_tupe = self.query_queue.get()
        table =  query_tupe[0]
        query = query_tupe[1]
        query_num = query_tupe[2]
        if query and table:
            #grab connection from pool, run the query
            connection = self.engine.connect()
            print 'Running query #' + str(query_num) + ': ' + table
            try:
                result = connection.execute(query)
            except:
                print 'Error while running query #' + str(query_num) + ': \n\t' + str(query) + '\nError: '  + str(sys.exc_info()[1])

            #place result handle tuple into out queue
            self.out_queue.put((table, result))

        #signals to queue job is done
        self.query_queue.task_done()

【问题讨论】:

    标签: python concurrency process multiprocessing pickle


    【解决方案1】:

    简单的答案是避免直接使用 ResultsProxy。而是使用 cursor.fetchall() 或 cursor.fetchmany(number_to_fetch) 从 ResultsProxy 获取数据,然后将数据传递到多处理队列中。

    【讨论】:

    • 只要没有太多结果就可以了。您还可以设置一个 feeder/consumer 系统,其中带有光标的进程将数据以块的形式发送给子进程,当子进程完成第一个但很多时,它会请求更多。
    • 感谢您的建议,我实施了这种方法,并且运行良好。我调用 fetchmany(),用结果填充一个 chunk_queue,然后消费者进程基于队列写入
    猜你喜欢
    • 1970-01-01
    • 2023-01-31
    • 1970-01-01
    • 2020-05-01
    • 2017-03-07
    • 1970-01-01
    • 2017-02-07
    • 2011-08-27
    • 2022-01-10
    相关资源
    最近更新 更多