RNN网络使用了单词的序列信息,网络结构如下:
首先,将单词传入embedding层,之所以使用嵌入层,是因为单词数量太多,使用嵌入式方式词向量来表示单词更有效率。我们在这里使用word2vec方式来实现,我们在这里只需要加入嵌入层即可,网络会自主学习嵌入矩阵。
通过embedding层,新的单词表示传入LSTM cells。这将是一个递归链接网络,所以单词的序列信息会在网络之间传递。最后,LSTM cells连接一个sigmoid output layer。使用sigmoid可以预测该文本是积极的还是消极的情感。输出层只有一个单元节点(使用sigmoid**)。只需要关注最后一个sigmoid的输出,损失只计算最后一步的输出和标签的差异。以下是python的代码:
首先导入相关的库和打开需要的数据文件
import numpy as np
import tensorflow as tf
with open('./data/reviews.txt', 'r') as f:
reviews = f.read()
with open('./data/labels.txt', 'r') as f:
labels = f.read()
数据清洗,使用embedding层,需要将单词编码成整数。这里需要去除标点符号,同时去除不同文本之间的分隔符号\n,先把\n当成分隔符号,分割所有评论。然后在将所有评论再次连接称为一个大的文本。
from string import punctuation
#移除所有标点符号
all_text = ''.join([c for c in reviews if c not in punctuation])
# 以'\n'为分隔符,拆分文本
reviews = all_text.split('\n')
all_text = ' '.join(reviews)
# 文本拆分为单独的单词列表
words = all_text.split()
embedding lookup要求输入的网格数据是整数。最简单的方法就是创建数据字典:{单词:整数}。然后将评论全部一一对应转换成整数,传入网络。
from collections import Counter
counts = Counter(words)
#按计数进行排序
vocab = sorted(counts, key=counts.get, reverse=True)
# 生成字典:{单词:整数}
vocab_to_int = {word: ii for ii, word in enumerate(vocab, 1)}
# 将文本列表 转换为 整数列表
reviews_ints = []
for each in reviews:
reviews_ints.append([vocab_to_int[word] for word in each.split()])
将标签positive和negative转换成数值
labels = labels.split('\n')
labels = np.array([1 if each == 'positive' else 0 for each in labels])
from collections import Counter
review_lens = Counter([len(x) for x in reviews_ints])
这里有一个问题:有一条评论长度为0,而且最大的评论长度为2514,对我们演示来说太长了。所以我们将其截断成200的长度:1、评论长度小于200的,我们对其左边填充0,2、对于大于200的,我们只截取其前200个单词。
non_zero_idx = [ii for ii, review in enumerate(reviews_ints) if len(review) != 0]
reviews_ints = [reviews_ints[ii] for ii in non_zero_idx]
labels = np.array([labels[ii] for ii in non_zero_idx])
seq_len = 200
#生成一个25000*200的全0矩阵。
features = np.zeros((len(reviews_ints), seq_len), dtype=int)
#将reviews_ints值逐行 赋值给features
for i, row in enumerate(reviews_ints):
features[i, -len(row):] = np.array(row)[:seq_len]
创建训练、验证和测试数据集
split_frac = 0.8
split_idx = int(len(features)*0.8)
train_x, val_x = features[:split_idx], features[split_idx:]
train_y, val_y = labels[:split_idx], labels[split_idx:]
test_idx = int(len(val_x)*0.5)
val_x, test_x = val_x[:test_idx], val_x[test_idx:]
val_y, test_y = val_y[:test_idx], val_y[test_idx:]
print("\t\t\tFeature Shapes:")
print("Train set: \t\t{}".format(train_x.shape),
"\nValidation set: \t{}".format(val_x.shape),
"\nTest set: \t\t{}".format(test_x.shape))
开始创建模型图,首先要定义超参数。
lstm_size:隐藏层LSTM cells节点的数量。一般来说越大越好
lstm_layers:隐藏层LSTM层的层数。从1开始,如果效果不好逐渐增加。
batch_size:每次训练传入评论的数量。只要内存不溢出,一般来说越大越好。
learning_rate:学习率,这个需要根据实际情况进行调节,不能太慢也不能太快
lstm_size = 256
lstm_layers = 1
batch_size = 128
learning_rate = 0.001
n_words = len(vocab_to_int)
# 创建图对象(graph object)
graph = tf.Graph()
# 将节点添加到图中:
with graph.as_default():
inputs_ = tf.placeholder(tf.int32, [None, None], name='inputs')
labels_ = tf.placeholder(tf.int32, [None, None], name='labels')
keep_prob = tf.placeholder(tf.float32, name='keep_prob')
添加embedding层。用one_hot编码输入网络太不效率,我们用word2vec方法训练一个嵌入权重矩阵。
# 嵌入向量大小embedding vectors(既嵌入层节点数量)
embed_size = 300
with graph.as_default():
embedding = tf.Variable(tf.random_uniform((n_words, embed_size), -1, 1))
embed = tf.nn.embedding_lookup(embedding, inputs_)
开始创建LSTM cells。先定义单元节点的类型
创建基础的LSTM cell,可以使用tf.contrib.rnn.BasicLSTMCell函数
下面我们要对cell添加dropout。使用函数:tf.contrib.rnn.DropoutWrapper。这等于是将单元(cell) 包裹在另一个单元(cell)中, 也等于在输入或者输出中添加了dropout。
一般而言,隐藏层越多模型效果越好。隐藏层较多的话,会让网络学习到更多的复杂关系。创建多个LSTM隐藏层,可以使用tf.contrib.rnn.MultiRNNCell。
with graph.as_default():
#创建基础的LSTM cell
lstm = tf.contrib.rnn.BasicLSTMCell(lstm_size)
#对cell添加dropout
drop = tf.contrib.rnn.DropoutWrapper(lstm, output_keep_prob=keep_prob)
#堆栈多个LSTM layers
cell = tf.contrib.rnn.MultiRNNCell([drop] * lstm_layers)
## 将所有cell初始化为0状态。
initial_state = cell.zero_state(batch_size, tf.float32)
真正的运行RNN节点,需要使用函数 tf.nn.dynamic_rnn。同时我们将上面定义的initial_state传给了RNN网络。这是在隐藏层之间传递的单元状态。tf.nn.dynamic_rnn 函数帮我们完成了绝大多数工作。并返回每一步的输出和隐藏层最终状态。
with graph.as_default():
outputs, final_state = tf.nn.dynamic_rnn(cell, embed,
initial_state=initial_state)
输出
with graph.as_default():
predictions = tf.contrib.layers.fully_connected(outputs[:, -1], 1, activation_fn=tf.sigmoid)
cost = tf.losses.mean_squared_error(labels_, predictions)
optimizer = tf.train.AdamOptimizer(learning_rate).minimize(cost)
定义了计算准确率函数,验证的时候调用
with graph.as_default():
correct_pred = tf.equal(tf.cast(tf.round(predictions), tf.int32), labels_)
accuracy = tf.reduce_mean(tf.cast(correct_pred, tf.float32))
下面定义了一个函数,从数据集中获取batches。移除了最后一个batch,迭代x和y数组,返回上述数组的切片。
def get_batches(x, y, batch_size=100):
n_batches = len(x)//batch_size
x, y = x[:n_batches*batch_size], y[:n_batches*batch_size]
for ii in range(0, len(x), batch_size):
yield x[ii:ii+batch_size], y[ii:ii+batch_size]
训练代码
epochs = 10
with graph.as_default():
saver = tf.train.Saver()
with tf.Session(graph=graph) as sess:
sess.run(tf.global_variables_initializer())
iteration = 1
for e in range(epochs):
state = sess.run(initial_state)
for ii, (x, y) in enumerate(get_batches(train_x, train_y, batch_size), 1):
feed = {inputs_: x,
labels_: y[:, None],
keep_prob: 0.5,
initial_state: state}
loss, state, _ = sess.run([cost, final_state, optimizer], feed_dict=feed)
if iteration%5==0:
print("Epoch: {}/{}".format(e, epochs),
"Iteration: {}".format(iteration),
"Train loss: {:.3f}".format(loss))
if iteration%25==0:
val_acc = []
val_state = sess.run(cell.zero_state(batch_size, tf.float32))
for x, y in get_batches(val_x, val_y, batch_size):
feed = {inputs_: x,
labels_: y[:, None],
keep_prob: 1,
initial_state: val_state}
batch_acc, val_state = sess.run([accuracy, final_state], feed_dict=feed)
val_acc.append(batch_acc)
print("Val acc: {:.3f}".format(np.mean(val_acc)))
iteration +=1
saver.save(sess, "checkpoints/sentiment.ckpt")
测试
test_acc = []
with tf.Session(graph=graph) as sess:
saver.restore(sess, tf.train.latest_checkpoint('checkpoints'))
test_state = sess.run(cell.zero_state(batch_size, tf.float32))
for ii, (x, y) in enumerate(get_batches(test_x, test_y, batch_size), 1):
feed = {inputs_: x,
labels_: y[:, None],
keep_prob: 1,
initial_state: test_state}
batch_acc, test_state = sess.run([accuracy, final_state], feed_dict=feed)
test_acc.append(batch_acc)
print("Test accuracy: {:.3f}".format(np.mean(test_acc)))