【问题标题】:Is the class generator (inheriting Sequence) thread safe in Keras/Tensorflow?Keras/Tensorflow 中的类生成器(继承序列)线程安全吗?
【发布时间】:2019-03-26 17:01:18
【问题描述】:

为了加快模型的训练速度,在 CPU 上填充/生成批次并在 GPU 上并行运行模型训练似乎是一种好习惯。为此,可以用 Python 编写一个继承 Sequence 类的生成器类。

这里是文档的链接: https://www.tensorflow.org/api_docs/python/tf/keras/utils/Sequence

文档中重要的一点是:

Sequence 是一种更安全的多处理方式。这种结构 保证网络只会在每个样本上训练一次 生成器不是这样的纪元。

并给出了一个简单的代码示例如下:

from skimage.io import imread
from skimage.transform import resize
import numpy as np
import math

# Here, `x_set` is list of path to the images
# and `y_set` are the associated classes.

class CIFAR10Sequence(Sequence):

    def __init__(self, x_set, y_set, batch_size):
        self.x, self.y = x_set, y_set
        self.batch_size = batch_size

    def __len__(self):
        return math.ceil(len(self.x) / self.batch_size)

    def __getitem__(self, idx):
        batch_x = self.x[idx * self.batch_size:(idx + 1) *
        self.batch_size]
        batch_y = self.y[idx * self.batch_size:(idx + 1) *
        self.batch_size]

        return np.array([
            resize(imread(file_name), (200, 200))
               for file_name in batch_x]), np.array(batch_y)

据我了解,理想情况下需要在模型中创建此生成器类的实例并将其提供给fit_generator(...) 函数。

gen = CIFAR10Sequence(x_set, y_set, batch_size)
# Train the model
model.fit_generator(generator=gen,
                    use_multiprocessing=True,
                    workers=6)

这是来自 Keras 文档的引用:

使用keras.utils.Sequence保证订购和保证 每个时期每个输入的单一使用使用时 use_multiprocessing=True.

在这个形状中,我假设这个设置是线程安全的。 问题 1) 我的假设正确吗?

一个令人困惑的事情是,参数use_multiprocessing 在 Windows 10 上可能没有设置为 True。Keras 不允许这样做。似乎它只能在 Linux 上设置为 True 。 (我不知道在其他平台上是怎样的。)但是workers参数仍然可以设置为大于0的值。

我们来看看这2个参数的定义:

workers: 整数。使用时启动的最大进程数 基于进程的线程。如果未指定,workers 将默认为 1。如果 0,将在主线程上执行生成器。

use_multiprocessing: 布尔值。如果为 True,则使用基于进程的线程。如果 未指定,use_multiprocessing 将默认为 False。注意 因为这个实现依赖于多处理,你不应该 将不可腌制的参数传递给生成器,因为它们无法传递 易于子进程。

因此,通过使用workers 参数,似乎可以创建多个进程来加速训练,而与use_multiprocessing 是否为真无关。

如果想要使用继承Sequence 的生成器类(在 Windows 10 上),他/她必须将 use_multiprocessing 设置为 False,如下所示:

gen = CIFAR10Sequence(x_set, y_set, batch_size)
# Train the model
model.fit_generator(generator=gen,
                    use_multiprocessing=False,  # CHANGED
                    workers=6)

而且这里还有多个进程在运行,因为workers = 6。

问题 2) 在将 use_multiprocessing 参数设置为 False 后,此设置仍然是线程安全的还是线程安全特性现在丢失了?我无法根据文档说清楚。

问题 3) 还是和这个话题有关...当以这种方式进行训练时,数据由 CPU 生成并在 GPU 上训练,如果正在训练的模型很浅,由于 GPU 一直在等待来自 CPU 的数据,因此 GPU 利用率最终会非常低,而 CPU 利用率会显着提高。在这种情况下,有没有办法利用一些 GPU 资源来生成数据?

【问题讨论】:

  • +1 @edn 我和你有同样的问题。您在这件事上找到任何有用的答案/资源吗?
  • @AaronDT,感谢您的提醒。我会尽快提供答案。

标签: python tensorflow keras generator


【解决方案1】:

在看过这篇文章的人中,似乎没有人有最终的答案,所以我想给出适合我的答案。由于该领域缺乏文档,我的回答可能会遗漏一些相关细节。请随意添加更多我在这里没有提及的信息。

看起来,Windows 不支持用 Python 编写继承 Sequence 类的生成器类。 (你似乎可以让它在 Linux 上工作。)为了让它工作,你需要设置参数use_multiprocessing=True(使用类方法)。但如前所述,它在 Windows 上不起作用,因此您必须将 use_multiprocessing 设置为 False(在 Windows 上)。 然而,这并不意味着多处理在 Windows 上不起作用。即使您设置了use_multiprocessing=False,当使用以下设置运行代码时,仍然可以支持多处理,您只需将workers 参数设置为大于1 的任何值。

例子:

history = \
   merged_model.fit_generator(generator=train_generator,
                              steps_per_epoch=trainset_steps_per_epoch,
                              epochs=300,
                              verbose=1,
                              use_multiprocessing=False,
                              workers=3,
                              max_queue_size=4)

此时,让我们再次记住 Keras 文档:

keras.utils.Sequence 的使用保证了排序和保证 使用时每个 epoch 的每个输入的单次使用 use_multiprocessing=True。

据我了解,如果use_multiprocessing=False,则生成器不再是线程安全的,这使得编写继承Sequence生成器类变得困难。

为了解决这个问题,我自己编写了一个生成器,我手动使线程安全。这是一个示例伪代码:

import tensorflow as tf
import threading

class threadsafe_iter:
    """Takes an iterator/generator and makes it thread-safe by
    serializing call to the `next` method of given iterator/generator.
    """
    def __init__(self, it):
        self.it = it
        self.lock = threading.Lock()

    def __iter__(self):
        return self

    def __next__(self): # Py3
        return next(self.it)

    #def next(self):     # Python2 only
    #    with self.lock:
    #        return self.it.next()

def threadsafe_generator(f):
    """A decorator that takes a generator function and makes it thread-safe.
    """
    def g(*a, **kw):
        return threadsafe_iter(f(*a, **kw))
    return g


@threadsafe_generator
def generate_data(tfrecord_file_path_list, ...):

    dataset = tf.data.TFRecordDataset(tfrecord_file_path_list)

    # example proto decode
    def _parse_function(example_proto):
      ...
      return batch_data

    # Parse the record into tensors.
    dataset = dataset.map(_parse_function)  

    dataset = dataset.shuffle(buffer_size=100000)

    # Repeat the input indefinitly
    dataset = dataset.repeat()  

    # Generate batches
    dataset = dataset.batch(batch_size)

    # Create an initializable iterator
    iterator = dataset.make_initializable_iterator()

    # Get batch data
    batch_data = iterator.get_next()

    iterator_init_op = iterator.make_initializer(dataset)

    with tf.Session() as sess:

        sess.run(iterator_init_op)

        while True:            
            try:
                batch_data = sess.run(batch_data)
            except tf.errors.OutOfRangeError:
                break
            yield batch_data

好吧,可以讨论以这种方式这样做是否真的很优雅,但它似乎工作得很好。

总结一下:

  • 如果在 Windows 上编写程序,请将 use_multiprocessing 设置为 False
  • (截至今天,据我所知)在 Windows 上编写代码时,不支持编写继承 Sequence 的生成器类。 (我猜这是一个 Tensorflow/Keras 问题)。
  • 要解决此问题,请编写一个普通的生成器,使您的生成器线程安全,并将 workers 设置为大于 1 的数字。

重要提示:在此设置中,生成器在 CPU 上运行,而训练在 GPU 上完成。我可以观察到的一个问题是,如果您正在训练的模型足够浅,那么 GPU 的利用率仍然很低,而 CPU 利用率却很高。如果模型很浅并且数据集足够小,那么将所有数据存储在内存中并在 GPU 上运行所有内容可能是一个不错的选择。它应该显着加快训练速度。如果出于任何原因,您想同时使用 CPU 和 GPU,我的适度建议是尝试使用 Tensorflow 的 tf.data API,它可以显着加快数据预处理和批处理准备。如果生成器只用 Python 编写,GPU 会一直等待数据继续训练。关于 Tensorflow/Keras 文档的一切都可以说,但它确实是高效的代码!

任何对 API 有更完整知识并看到此帖子的人,请随时在此处纠正我,以防我误解任何内容或更新 API 以解决 Windows 上的问题。

【讨论】:

  • 我现在遇到了同样的问题。你有进步吗?你还在用这个方法吗?
  • 是的,我仍在使用相同的解决方案。
【解决方案2】:

我提出了一个其他人可能感兴趣的“改进”解决方案。请注意,这是来自我使用 Tensorflow 1.15 的经验(我还没有使用版本 2)。

TL;DR

在 Windows 上安装 wsl 第 2 版,在此处在 Linux 环境(例如 Ubuntu)中安装 Tensorflow,然后将 use_multiprocessing 设置为 True 以使其工作。

注意: Windows Subshel​​l for Linux (WSL) 版本 2 仅适用于 Windows 10 版本 1903 Build 18362 或更高版本。 请务必在 Windows Update 中升级您的 Windows 版本以使其正常工作。

Install Tensorflow-GPU on WSL2

长答案

对于multitaskingmultithreading(即parallelismconcurrency),我们必须考虑两个操作:

  • forking = 父进程创建自己的副本(子进程),其中包含它使用的所有内存段的精确副本
  • spawning = 父进程创建一个不共享其内存的全新子进程,父进程必须等待子进程完成才能继续

Linux 支持forking,但 Windows 不支持。 Windows 仅支持spawning

使用use_multiprocessing=True 时Windows 挂起的原因是Python 的threading 模块对Windows 使用spawn。因此,父进程永远等待子进程完成,因为父进程无法将其内存转移给子进程,因此子进程不知道该做什么。

答案 2: 不是threadsafe在 Windows 上,如果您曾经尝试使用数据生成器或序列,您可能已经看到像这样的错误

ValueError: Using a generator with use_multiprocessing=True is not supported on Windows 
(no marshalling of generators across process boundaries). Instead, use single 
thread/process or multithreading.

marshalling的意思是“将一个对象的内存表示转换成适合传输的数据格式”。错误是说,与使用 fork 的 Linux 不同,use_multiprocessing=True 在 Windows 上不起作用,因为它使用 spawn` 并且无法将其数据传输到子线程。

此时,您可能会问自己:

“等等...Python 全局解释器锁 (GIL) 怎么样?.. 如果 Python 一次只允许一个线程运行,为什么它甚至还有 threading 模块,为什么我们要在 TensorFlow 中关心这个??!”

答案就在于CPU-bound tasksI/O-bound tasks的区别:

  • CPU-bound tasks = 等待处理数据的人
  • I/O-bound tasks = 等待其他进程输入或输出的进程(即数据传输)

在编程中,当我们说两个任务是concurrent 时,我们的意思是它们可以在重叠的时间内启动、运行和完成。当我们说它们是 parallel 时,我们的意思是它们实际上是同时运行的。

因此,GIL 阻止线程并行运行,而不是并发。这对 Tensorflow 很重要的原因是,并发 都是关于 I/O 操作(数据传输)的。 Tensorflow 中一个好的数据流管道应该尽量使用concurrent,这样当数据在 CPU、GPU 和/或 RAM 之间传输时没有延迟时间,并且训练完成得更快。 (与其让一个线程坐下来等待它从其他地方取回数据,我们可以让它执行图像预处理或其他操作,直到数据取回。)


重要说明:GIL 是用 Python 制作的,因为 Python 中的一切都是对象。 (这就是为什么你可以用“dunder/magic”方法做“奇怪”的事情,比如(5).__add__(3) 得到8 注意: 在上面,5 周围需要括号,因为5.float,因此我们需要通过使用括号来利用运算顺序。 Python 通过计算对单个对象的所有引用来处理内存和垃圾回收。当计数变为 0 时,Python 删除该对象。如果两个线程试图同时访问同一个对象,或者如果一个线程比另一个线程完成得更快,您可以获得race condition 并且对象将被“随机”删除。我们可以在每个线程上放置一个lock,但是我们将无法阻止deadlocks。 Guido(以及我自己,虽然这当然可以争论)认为失去parallel线程执行是一个小损失,因为我们仍然维持I/O并发操作,并且任务仍然可以在parallel中运行,通过在不同的平台上运行它们cpu 内核(即multiprocessing)。因此,这就是为什么 Python 同时具有 threadingmultiprocessing 模块的原因之一。

现在回到threadsafe。在运行concurrent/parallel 任务时,您必须注意其他事项。两个大的是:

  1. race conditions - 每次程序运行时,运算的计算时间并不完全相同(为什么使用timeit,我们会平均多次运行)。由于线程会根据运行在不同时间完成,因此每次运行都会得到不同的结果。

  2. deadlock - 如果两个线程试图同时访问同一个内存,你会得到一个错误。为了防止这种情况,我们在线程中添加lockmutex(互斥),以防止其他线程在运行时访问同一内存。但是,如果两个线程需要访问同一个内存,都被锁定,并且每个线程都依赖另一个线程完成才能执行,那么程序就会挂起。

我提出这个是因为 Tensorflow 需要能够pickle Python 对象以使代码运行得更快。 (pickling 正在将对象和数据转换为字节码,这与整个程序的源代码在 Windows 上转换为exe 的方式非常相似)。 Tensorflow Iterator.__init__() 方法锁定线程并包含threading.Lock()

def __init__(self, n, batch_size, shuffle, seed):
    ...
    self.lock = threading.Lock()
    ...

问题是 Python 不能在 Windows 上 pickle 线程锁定对象(即 Windows 不能 marshall 线程锁定到 child 线程)。

如果您尝试使用生成器并将其传递给 fit_generator,您将收到错误消息(请参阅 GitHub 问题 #10842

TypeError: can't pickle _thread.lock objects

因此,虽然use_multiprocessing=True 在 Linux 上是线程安全的,但在 Windows 上却不是。

解决方案: 2020 年 6 月左右,Microsoft 推出了适用于 Linux 的 Windows Subshel​​l 第 2 版 (wsl)。这很重要,因为它启用了 GPU 硬件加速。第 1 版“简单地”是 Windows NT 和 Linux 之间的驱动程序,而 wsl 现在实际上是一个内核。因此,您现在可以在 Windows 上安装 Linux,从命令提示符打开 bash shell,并且(最重要的是)访问硬件。因此,现在可以在wsl 上安装tensorflow-gpu。此外,您现在可以使用fork

**因此,我推荐

  1. 在 Windows 上安装 wsl 第 2 版并添加所需的 Linux 环境
  2. 在此处的wsl Linux 环境中的虚拟环境中安装tensorflow-gpu
  3. 重试use_multiprocessing=True 看看它是否有效。**

CAVEAT:我尚未对此进行测试以验证其是否有效,但据我所知,我认为应该可以。

在此之后,回答问题3应该是一个简单的问题,用并行度来调整并发量,我推荐TensorflowDev 2018峰会视频Training Performance: A user’s guide to converge faster看看如何做到这一点.

【讨论】:

    猜你喜欢
    • 2017-05-02
    • 2010-11-11
    • 2017-04-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-03-12
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多