【问题标题】:Input pipeline w/ keras.utils.Sequence object or tf.data.Dataset?带有 keras.utils.Sequence 对象或 tf.data.Dataset 的输入管道?
【发布时间】:2020-06-12 15:01:45
【问题描述】:

我目前正在使用tf.keras.utils.Sequence 对象为 CNN 生成图像批次。我正在为模型使用 Tensorflow 2.2 和 Model.fit 方法。当我拟合模型时,当我在tf.keras.model.fit(...) 中设置use_multiprocessing=True 时,每个时期都会引发以下警告:

WARNING:tensorflow:multiprocessing can interact badly with TensorFlow,
 causing nondeterministic deadlocks. For high performance data pipelines tf.data is recommended

模型优化得很好,正如文档中所预期的那样,而且我正在使用基于Sequence 的生成器。但是,如果 use_multiprocessing 将成为一个已弃用的功能来代替 tf.data 对象,我希望使用最新的输入管道。我目前使用以下基于tf.keras.utils.Sequence 的生成器,灵感来自这篇关于大型数据集分区良好实践的文章: https://stanford.edu/~shervine/blog/keras-how-to-generate-data-on-the-fly

class DataGenerator(keras.utils.Sequence):
'Generates data for Keras'
def __init__(self, list_IDs, labels, data_dir, batch_size=32, dim=(128,128), n_channels=1,
             n_classes=2, shuffle=True, **augmentation_kwargs):
    'Initialization'
    self.dim = dim
    self.batch_size = batch_size
    self.labels = labels
    self.list_IDs = list_IDs
    self.data_dir = data_dir
    self.n_channels = n_channels
    self.n_classes = n_classes
    self.shuffle = shuffle
    self.on_epoch_end()
    self.augmentor = keras.preprocessing.image.ImageDataGenerator(**augmentation_kwargs)

def __len__(self):
    'Denotes the number of batches per epoch'
    return int(np.floor(len(self.list_IDs) / self.batch_size))

def __getitem__(self, index):
    'Generate one batch of data'
    # Generate indexes of the batch
    indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]

    # Find list of IDs
    list_IDs_temp = [self.list_IDs[k] for k in indexes]

    # Generate data
    X, y = self.__data_generation(list_IDs_temp)

    return X, y

def on_epoch_end(self):
    'Updates indexes after each epoch'
    self.indexes = np.arange(len(self.list_IDs))
    if self.shuffle == True:
        np.random.shuffle(self.indexes)

def __data_generation(self, list_IDs_temp):
    'Generates data containing batch_size samples' # X : (n_samples, *dim, n_channels)
    # Initialization
    X = np.empty((self.batch_size, *self.dim))
    y = np.empty((self.batch_size), dtype=int)

    # Generate data
    for i, ID in enumerate(list_IDs_temp):

        # Store sample
        X[i,] = np.load(self.data_dir +'/{}_stars.npy'.format(ID))

        # Store class
        y[i] = self.labels[ID]

    # Reshape and apply augmentation to sample
    X,y = self.augmentor.flow(X.reshape(self.batch_size,*self.dim,1),y=y,
                              shuffle=False,batch_size=self.batch_size)[0]

    return X, y

所有类的所有数据都在data_dir 目录中,并存储为单独的.npy 文件。 ID 来自字符串列表。类标签取自以 ID 为键的字典——如文章中所述。

我真的很喜欢 Sequence 生成器设置的直觉。我还可以轻松地生成随机批次来检查它的行为是否符合我的预期。但是如何使用tf.data 重现此设置?如何使用tf.data.Datasetinterleaveprefetch 方法重现Sequence 生成器的多处理批处理生成?和/或我可以简单地使用tf.data.Dataset.from_generator() 方法摄取这个基于Sequence 的生成器吗?

非常感谢。

【问题讨论】:

    标签: tensorflow tf.keras tensorflow2.x


    【解决方案1】:

    回答可能为时已晚,但我所做的对我来说效果很好; 1- 我的课就是这样的;

    class DataGen(Sequence):
        def __init__(self, df, sr=8000, seconds=3, batch_size=16, shuffle=True):
            self.files = np.array(df.filepath)
            self.label = np.array(df.label)
            self.batch_size = batch_size
            self.shuffle = shuffle
            self.sr = sr
            self.seconds = seconds
            self.dim = self.sr*self.seconds
            self.on_epoch_end()
        
        def __len__():
            return len(self.label)//self.batch_size
        
        def __getitem__(self, x):
            indexs = self.indexs[np.arange(x, x+self.batch_size)]
            return self.__getBatch__(indexs)
            
        def __getBatch__(self, indexs):
            X, y = [], []
            for i in indexs:
                wav = self.__loadFile__(self.files[i])
                X.append(librosa.feature.mfcc(wav, self.sr).T)
                y.append(self.label[i])
            return tf.convert_to_tensor(X), to_categorical(y, num_classes=2)
            
        def __loadFile__(self, file):
            y, sr = librosa.load(file, sr=8000, mono=True)
            if len(y)>self.dim:
                return y[:self.dim]
            return np.pad(y, (0, self.dim-len(y)), 'constant', constant_values=0)
            
        def on_epoch_end(self):
            self.indexs = np.arange(len(self.label))
            if self.shuffle:
                np.random.shuffle(self.indexs)
    

    2- 比我改成如下函数;

    def gen(sr=8000, seconds=3, batch_size=16, shuffle=True):
        dim = sr*seconds
        def loadFile(file):
            wav, _ = librosa.load(file, sr=sr, mono=True)
            if len(wav)>dim:
                return wav[:dim]
            return np.pad(wav, (0, dim-len(wav)), 'constant', constant_values=0)
        
        while True:
            indexs = np.arange(len(df))
            if shuffle:
                np.random.shuffle(indexs)
            
            for x in range(len(df)//batch_size):
                X, y = [], []
                for i in indexs[np.arange(x*batch_size, (x+1)*batch_size)]:
                    X.append(librosa.feature.mfcc(loadFile(df.filepath[i]), sr).T)
                    y.append(df.label[i])
                    
                yield tf.convert_to_tensor(X), to_categorical(y, num_classes=2)
    

    3- 并且工作正常:

    dataset = tf.data.Dataset.from_generator(gen, (tf.dtypes.float32, tf.dtypes.int32))
    

    【讨论】:

    • 这个比官方文档有用多了
    • 感谢您的回答。我目前面临着类似的问题,虽然我的生成器在没有多处理的情况下工作正常,但速度非常慢。您的解决方案使用安全吗?使用多处理和工作线程时数据重叠没有问题吗?
    • 我刚刚发现有些东西可能会有所帮助,在类中您可以添加一个函数 __call__(self) 在其中您应该将返回数据的代码作为生成器放置,您可以只提供类对象到tf.data.Dataset.from_generator,注意我没测试过,
    • @WalidBousseta 您是否能够使用 __call__(self) 取得任何成功?
    • @Austin 是的,我已经对其进行了测试,它工作正常,在 __call__() 内我正在调用 get_item 函数
    【解决方案2】:

    这是我在 tensorflow 中使用的另一种方法,效果很好:

    class DataGen():
        def __init__(self, df, batch_size=32, shuffle=True):
            self.data = np.array(df)
            self.indexs = np.arange(self.data.shape[0])
            if shuffle:
                np.random.shuffle(self.indexs)
            self.batch_size = batch_size
        
        def __len__(self):
            return self.data.shape[0]//self.batch_size
        
        def get_item(self, x):
            # data preprocessing
            data, label = self.data[x]
            return data, label
        
        def __call__(self):
            for i in self.indexs:
                yield self.get_item(i)
        
    train_gen = DataGen(train_df)
    types = (tf.float32, tf.int32)
    shapes = ((1, 500, 201), (n_classes))
    batch_size = 32
    train_data = Dataset.from_generator(train_gen, output_types=types, output_shapes=shapes)
    train_data = train_data.batch(batch_size)
    
    # test
    X, y = next(iter(train_data))
    print(X.shape, y.shape)
    

    【讨论】:

    • @Austin 在这里我使用了 __call__() 方法并且工作正常
    • @Waild 我们需要让Datagen类继承Sequence吗?
    • @GPatel 不用我没用过
    猜你喜欢
    • 1970-01-01
    • 2023-03-18
    • 1970-01-01
    • 1970-01-01
    • 2021-01-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多