【问题标题】:Enqueuing a tf.RandomShuffleQueue from multiple processes using multiprocessing使用多处理从多个进程中对 tf.RandomShuffleQueue 进行排队
【发布时间】:2017-05-10 10:32:26
【问题描述】:

我想使用多个进程 (not threads) 进行一些预处理并将结果排入一个 tf.RandomShuffleQueue 队列,我的主图可以使用该队列进行训练。

有没有办法做到这一点?

我的实际问题

我已将我的数据集转换为跨 256 个分片的 TFRecords。我想使用multiprocessing 启动 20 个进程,并让每个进程处理一系列分片。每个进程都应该读取图像,然后对其进行扩充并将它们推送到tf.RandomShuffleQueue,从中可以将输入提供给图形进行训练。

有些人建议我查看tensorflow 中的inception 示例。然而,这是一个非常不同的情况,因为只有数据分片的读取是由多个线程完成的 (not processes),而预处理(例如 - 扩充)发生在主线程中。

【问题讨论】:

  • 您是在 TF 中进行扩充还是使用非 TF 库?
  • 我只使用基于 TF 的增强

标签: python tensorflow multiprocessing


【解决方案1】:

(这旨在解决你的实际问题)

在另一个主题中,有人告诉您 Python 具有全局解释器锁 (GIL),因此除非您使用多个进程,否则多核不会带来速度优势。

这可能是促使您使用multiprocessing 的原因。

但是,对于 TF,Python 通常仅用于构建“图”。实际执行发生在本机代码(或 GPU)中,其中 GIL 不起任何作用。

鉴于此,我建议让 TF 使用多线程。这可以使用intra_op_parallelism_threads 参数来控制,例如:

with tf.Session(graph=graph, 
    config=tf.ConfigProto(allow_soft_placement=True, 
    intra_op_parallelism_threads=20)) as sess:
    # ...

(旁注:如果你有一个 2-CPU、32 核系统,最好的参数可能是intra_op_parallelism_threads=16,这取决于很多因素)

【讨论】:

  • 但是预处理是为 CPU 设计的。使用 multiprocessing 没有意义吗?例如,当我尝试使用 python 的 for 循环进行 N 次数据扩充时,为什么 GIL 不会发挥任何作用(同时在 for 循环中使用 TF 库?
  • @Ujjwal 如果for 循环仅构建图形,GIL 不相关。运行 sess.run 时会发生耗时的实际执行。此时,最终被调用的本机代码甚至不“知道”您是从 Python 调用它的。您可以从 C++ 或 Java 调用它。
  • @Ujjwal 这有一些很好的例子可以效仿:tensorflow.org/programmers_guide/threading_and_queues
【解决方案2】:

评论:TFRecords 的酸洗没那么重要。 我可以传递包含分片 TFRecord 文件范围名称的列表列表。

我必须重新启动决策过程!

评论:我可以将它作为参数传递给 Pool.map()。

验证,如果 multiprocesing.Queue() 可以处理此问题。
张量函数的结果是Tensor object
请尝试以下操作:

tensor_object = func(TFRecord)
q = multiprocessing.Manager().Queue()
q.put(tensor_object)
data = q.get()
print(data)

评论:如何确保所有进程都排入同一个队列?

这很简单enqueue 来自Pool.map(... 的结果 毕竟process 完成了。
或者,我们可以enqueue 并行,queueing 来自all processes 的数据。

但这样做取决于如上所述的 pickleabel 数据。


例如:

import multiprocessing as mp
def func(filename):
    TFRecord = read(filename)
    tensor_obj = tf.func(TFRecord)
    return tensor_obj

def main_Tensor(tensor_objs):
    tf = # ... instantiat Tensor Session
    rsq = tf.RandomShuffleQueue(...)
    for t in tensor_objs:
        rsq.enqueue(t)

if __name__ == '__main__':
    sharded_TFRecords = ['file1', 'file2']
    with mp.Pool(20) as pool:
        tensor_objs = pool.map(func, sharded_TFRecords)
        pool.join()

    main_Tensor(tensor_objs)

【讨论】:

  • 1.在您的 sn-p 之后,我得到了 TFRecord 文件名称的输出。没有产生错误。 2. 我在一个集群上工作,有 64 个内核。您应该阅读@noxdafox 的另一个答案,我以前使用过多处理,但我不知道如何将它与 TensorFlow 一起使用,因为它在 tf.Session() 中有惰性评估的概念
  • TFRecords 的酸洗并不那么重要。我可以传递包含分片 TFRecord 文件范围名称的列表列表。在每个函数中,我可以分别打开 TFRecord 文件并完成工作。然而,每个函数都需要将数据排入tf.RandomShuffleQueue 队列,这不能使用multiprocessing 进行picklable。但是使用pathos 模块,我可以将它作为参数传递给Pool.map()。但是,如何确保所有进程都排入同一个队列?由于 TensorFlow 的惰性执行原理,我无法完全解决这个问题。
【解决方案3】:

似乎推荐的使用multiprocessing 运行TF 的方法是为每个孩子创建一个单独的tf.Session,因为跨进程共享它是不可行的。

你可以看看this example,希望对你有帮助。

[编辑:旧答案]

您可以使用multiprocessing.Pool 并依靠其回调机制在结果准备好后立即将结果放入tf.RandomShuffleQueue

这里有一个非常简单的例子来说明如何做到这一点。

from multiprocessing import Pool


class Processor(object):
    def __init__(self, random_shuffle_queue):
        self.queue = random_shuffle_queue
        self.pool = Pool()

    def schedule_task(self, task):
        self.pool.apply_async(processing_function, args=[task], callback=self.task_done)

    def task_done(self, results):
        self.queue.enqueue(results)

这里假设 Python 2,对于 Python 3,我建议使用 concurrent.futures.ProcessPoolExecutor

【讨论】:

  • 我认为这行不通。 tf.RandomShuffleQueue 没有任何 put() 方法。此外,在 TF 中,执行发生在会话内。 tf.Session()可以在不同进程之间传递或共享吗?
  • 以上代码只是一个例子,我对 TF API 不熟悉。在处理流程时,让有状态的逻辑远离流程本身是很重要的。您在“数据预处理”期间是否使用任何有状态逻辑?请在您的问题中分享预处理逻辑。
  • 我向你保证,我没有否决你的回答。但是,Tensorflow 并不能以这种方式工作,这将允许上述 sn-p 正常工作。
  • 我实际上是在向反对者提问 :)。正如我上面所说,我希望您展示预处理的逻辑。上面的代码 sn-p 在主进程中处理RandomShuffleQueue,因此它将像在正常工作流中一样工作。如果预处理函数使用了一些TF 会话,那么逻辑会变得更加复杂。
  • 现在更清楚了。 tf.Tensor 似乎是一个计算封装单元,稍后将对其进行懒惰评估。因此,即使您将预处理拆分为多个进程,实际处理也可能发生在tf.RandomShuffleQueue 的另一侧。我想看看我是否可以将预处理逻辑与TF 分离并将其运行到单独的进程中(从进程接收原始数据,将它们封装到tf.Tensor 并将其排入tf.RandomShuffleQueue)。如果没有,我猜你无能为力。我不会自己尝试腌制它们。
猜你喜欢
  • 2020-05-18
  • 2014-12-17
  • 2015-10-28
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-10-22
  • 1970-01-01
  • 2011-08-28
相关资源
最近更新 更多