1.序列化数据集
import os
import tensorflow as tf
import cv2
os.environ[\'TF_CPP_MIN_LEVEL\'] = \'2\'
assert tf.__version__.startswith(\'2.\')
train_dir = \'./faces/\'
train_filename = [train_dir + filename for filename in os.listdir(train_dir)]
file_at = 0
for i in range(len(train_filename)):
image = open(train_filename[i], \'rb\').read()
height = cv2.imread(train_filename[i]).shape[0]
width = cv2.imread(train_filename[i]).shape[1]
channel = cv2.imread(train_filename[i]).shape[2]
feature = {
\'height\': tf.train.Feature(int64_list=tf.train.Int64List(value=[height])),
\'width\': tf.train.Feature(int64_list=tf.train.Int64List(value=[width])),
\'channel\': tf.train.Feature(int64_list=tf.train.Int64List(value=[channel])),
\'image\': tf.train.Feature(bytes_list=tf.train.BytesList(value=[image]))
}
example = tf.train.Example(features=tf.train.Features(feature=feature))
if i % 500 == 0:
file_at += 1
tfrecord_file = \'./TFrecord/train-%.5d.record\' % file_at
if i > 0:
writer.close()
writer = tf.io.TFRecordWriter(tfrecord_file)
print(\'序列化第{}个批次\'.format(file_at))
writer.write(example.SerializeToString())
2.训练网络
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
noises_size = 128 # 噪声维度大小
def gen_deconv(batch_input, out_channels):
return tf.layers.conv2d_transpose(batch_input, out_channels, 4, 2, padding=\'same\') # 反卷积操作
def batchnorm(inputs): # 批度归一化
return tf.layers.batch_normalization(inputs, axis=3, epsilon=1e-5, momentum=0.1,
training=True, gamma_initializer=tf.random_normal_initializer(1.0, 0.02))
def lrelu(x, a):
with tf.name_scope(\'lrelu\'):
x = tf.identity(x)
return (0.5 * (1 + a) * x + (0.5 * (1 - a)) * tf.abs(x))
def generator(noises, base=128, output_channels=3): # 生成网络
layers = []
with tf.variable_scope(\'Linear_layer\'): # 第一层
W = tf.get_variable(\'W\', [noises_size, 3*3*8*base], tf.float32, tf.random_normal_initializer(stddev=0.02))
b = tf.get_variable(\'b\', [3*3*8*base])
output = tf.matmul(noises, W) + b # 一开始是一个全链接层
output = tf.reshape(output, [-1, 3, 3, base*8])
output = batchnorm(output)
print(\'gen_layer_%d\'%(len(layers) + 1))
print(output.shape)
layers.append(output)
layer_specs = [
(base * 8, 0.5),
(base * 4, 0.5),
(base * 2, 0.0),
(base * 1, 0.0),
]
for (out_channels, dropout) in layer_specs: # 第二三四五层
with tf.variable_scope(\'deconv_%d\'%(len(layers)+1)):
temp = tf.nn.relu(layers[-1])
output = gen_deconv(temp, out_channels)
output = batchnorm(output)
print(\'gen_layer_%d\'%(len(layers)+1))
print(output.shape)
layers.append(output)
with tf.variable_scope(\'deconv_6\'): # 第六层
temp = layers[-1]
output = tf.nn.relu(temp)
output = gen_deconv(output, output_channels)
output = tf.tanh(output)
print(\'gen_layer_%d\' % (len(layers) + 1))
print(output.shape)
layers.append(output)
return layers[-1]
def dis_conv(batch_input, out_channels):
in_channels = int(batch_input.shape[3])
kernel = tf.get_variable(initializer=tf.random_normal(shape=[4, 4, in_channels, out_channels]), name=\'kernel\')
return tf.nn.conv2d(batch_input, kernel, strides=[1, 2,2, 1], padding=\'SAME\')
def discriminator(dis_input, base=1128):
layers = []
with tf.variable_scope("layer_1"):
output = dis_conv(dis_input, base)
output = lrelu(output, 0.2)
print("layer_1")
print(output.shape)
layers.append(output)
layers_spec = [
base * 2,
base * 4,
base * 8,
base * 8
]
for out_channels in layers_spec:
with tf.variable_scope("layer_%d" % (len(layers) + 1)):
output = dis_conv(layers[-1], out_channels) # 进行卷积
output = batchnorm(output) # 进行batch normalization
output = lrelu(output, 0.2) # 激活函数为lrelu
print("layer_%d" % (len(layers) + 1))
print(output.shape)
layers.append(output)
with tf.variable_scope("layer_%d" % (len(layers) + 1)):
output = tf.reshape(layers[-1], [-1, 3 * 3 * base * 8]) # [batch,3,3,base*8] -> [batch,3*3*8]
W = tf.get_variable("w", [3 * 3 * 8 * base, 1], tf.float32, tf.random_normal_initializer(stddev=0.02))
b = tf.get_variable("b", [1])
output = tf.matmul(output, W) + b # 这里是一个全连接层
output = tf.sigmoid(output) # sigmoid函数转化输出为0-1
print("layer_%d" % (len(layers) + 1))
print(output.shape)
layers.append(output)
return layers[-1]
def create_model(gen_inputs, dis_inputs, learning_rate): # 创建网络
EPS = 1e-12
with tf.variable_scope("generator"): # 生成网络
gen_outputs = generator(gen_inputs)
with tf.variable_scope("discriminator"): # 判别网络 输入为True
predict_real = discriminator(dis_inputs)
with tf.variable_scope("discriminator", reuse=True): # 判别网络 输入为 False 参数复用
predict_fake = discriminator(gen_outputs)
with tf.name_scope(\'discriminator_loss\'): # D判别网络的损失函数
dis_loss = tf.reduce_mean(-tf.log(predict_real+EPS)-tf.log(1-predict_fake+EPS)) #加上EPS防止出现log(0)
with tf.name_scope(\'generator_loss\'): # G 生成网络的损失函数
gen_loss = tf.reduce_mean(-tf.log(predict_fake+EPS))
all_var = tf.trainable_variables()
with tf.name_scope(\'discriminator_train\'):
dis_var = [var for var in all_var if var.name.startswith(\'discriminator\')]
dis_optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate).minimize(dis_loss, var_list=dis_var)
with tf.name_scope(\'generator_train\'):
gen_var = [var for var in all_var if var.name.startswith(\'generator\')]
gen_optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate).minimize(gen_loss, var_list=gen_var)
return dis_optimizer, gen_optimizer, dis_loss, gen_loss, gen_outputs
def read(): # 读取序列化文件
files = tf.train.match_filenames_once("./TFrecord/train-*.record")
filename_queue = tf.train.string_input_producer(files, shuffle=True) # 将files输入到一个队列
reader = tf.TFRecordReader()
_, serialize_example = reader.read(filename_queue) # 从队列中读出数据
features = tf.parse_single_example(
serialize_example,
features={
\'height\': tf.FixedLenFeature([], tf.int64),
\'width\': tf.FixedLenFeature([], tf.int64),
\'channel\': tf.FixedLenFeature([], tf.int64),
\'image_raw\': tf.FixedLenFeature([], tf.string)
})
image_raw = features[\'image_raw\'] # 读入图片数据
decoded_image = tf.decode_raw(image_raw, tf.uint8) # 将字符串形式的数据解码
images = tf.reshape(decoded_image, [96, 96, 3]) # 重新定义shape
return images
# 开始训练
def main():
batch_size = 64 # 定义一个batch的大小
gen_inputs = tf.placeholder(tf.float32, [None, noises_size]) # 定义Generator的输入
dis_inputs = tf.placeholder(tf.float32, [None, 96, 96, 3]) # 定义Discriminator的输入
dis_optimizer, gen_optimizer, dis_loss, gen_loss, gen_output = create_model(gen_inputs, dis_inputs, 0.0002) # 创建模型
gen_images = (gen_output + 1) * 127.5 # 将Generator的输出转化为可以显示的图像
gen_images = tf.cast(gen_images, tf.int32)
images = read() # 读取图像
images = tf.cast(images, tf.float32)
images_input = images / 127.5 - 1 # 将图像数据范围变成-1-1之间
images_batch = tf.train.batch([images_input], batch_size=batch_size, capacity=5000)
saver = tf.train.Saver()
gpu_options = tf.GPUOptions(per_process_gpu_memory_fraction=0.80) # 定义占用GPU的内存比例
with tf.Session(config=tf.ConfigProto(gpu_options=gpu_options)) as sess:
tf.local_variables_initializer().run()
tf.global_variables_initializer().run() # 变量初始化
steps = 0
while True:
cur_batch = sess.run(images_batch) # 产生一个batch
noises = np.random.uniform(-1, 1, size=(batch_size, noises_size)).astype(np.float32) # 产生一个噪声
for i in range(2):
_, discriminator_loss = sess.run([dis_optimizer, dis_loss], feed_dict={gen_inputs: noises,
dis_inputs: cur_batch}) # 训练Discriminator
_, generator_loss = sess.run([gen_optimizer, gen_loss], feed_dict={gen_inputs: noises}) # 训练Generator
# 训练过程会出现D过弱或过强的现象,可以通过加大D的训练次数 其实应该对D网络先进行预训练 得到不错的参数
# 或者调整learning rate 来达到平衡
if steps % 20 == 0:
print("%d steps: gen_loss is %f; dis_loss is %f" % (
steps, float(generator_loss), float(discriminator_loss))) # 每训练1个batch输出一遍loss
if steps % 100 == 0: # 每训练100个batch保存一张图片
now_image = sess.run(gen_images, {gen_inputs: noises})
plt.imshow(now_image[0].astype(np.uint32))
# plt.show()
plt.savefig("./result/R_%d.png" % steps)
saver.save(sess, "./Model/model.ckpt")
steps += 1
if __name__ == \'__main__\':
main()