用Python手撕LSTM

RNN存在环路,虽然能够记忆一些过去的信息,但是实际的效果并不是特别好,在许多情况下RNN都无法很好地学习到时序数据的长期依赖关系,这是因为RNN在反向传播时会出现梯度消失梯度爆炸的问题。为了能让神经网络学习到时序数据的长期依赖关系,便发展出了LSTM(Long ShortTerm Memory,长短期记忆网络),LSTM在RNN的基础上增加了“门”结构,从而可以学习到时序数据的长期依赖关系,本文先介绍一下RNN存在的问题,然后分别介绍了LSTM的原理和实现,最后介绍了LSTM的使用。

RNN存在的问题

梯度消失和梯度爆炸

假设RNN的任务是根据已经出现的词来预测下一个将要出现的词,如下图所示,要预测?中该填入哪个单词

通过分析?中应该填入的词是Tom,如果让RNN来预测的话,它必须从头到尾记住这句话的全部信息才能预测出Tom,因此在RNN的反向传播中,要学习到长距离的依赖关系时,梯度要流经很远的距离,如下图所示

理论上,在梯度从尾到头传递一遍之后便记住了那些应该学到的有意义的信息,但是RNN反向传播的距离越长,梯度在传播的过程中就会变得越弱,即远离尾部的RNN层的权重将不会被更新,随着时间的回溯,RNN 不可避免地会发生梯度变小(梯度消失)或者梯度变大(梯度爆炸)的命运。

梯度消失和梯度爆炸的原因

我们知道了RNN反向传播的距离越长,梯度在传播的过程中就会变得越弱,那么具体会发生怎样的变化的呢?我们将RNN在水平方向上的反向传播拎出来,如下图所示

可以看到反向传播的运算主要有tanh、加法和矩阵乘积(MatMul)三个运算,加法的反向传播将上游传来的梯度原样的传给下游,不会造成梯度的变化。tanh运算的导数为1y21-y^2,tanh及其导数的图像如下所示

图中虚线即为tanh的导数,可以看到,tanh导数的最大值是1,远离0都会使tanh得值便小,因此每一次经过tanh运算时,梯度都会越来越小,从而导致梯度消失的问题。

那么在RNN水平方向上的反向传播中矩阵乘积运算会怎样影响梯度呢?以一个实验来展示矩阵乘积对梯度的影响,代码如下

import numpy as np
import matplotlib.pyplot as plt

N = 2 # mini-batch的大小
H = 3 # 隐藏状态向量的维数
T = 20 # 时序数据的长度

dh = np.ones((N, H))
np.random.seed(3) # 为了复现,固定随机数种子
Wh = np.random.randn(H, H)

norm_list = []
for t in range(T):
dh = np.dot(dh, Wh.T)
norm = np.sqrt(np.sum(dh**2)) / N
norm_list.append(norm)

这段代码就是模拟梯度进行矩阵乘积运算,dh使用了L2范数,即对所有元素的平方和求平方根,将代码执行完norm_list的结果画在图上就如下图所示

可以看到梯度的大小随着时间步长呈指数级增加,这会导致梯度爆炸,梯度爆炸会引起数值溢出,使得神经网络的学习无法进行。实验还没完,我们改动其中的一行代码Wh = np.random.randn(H, H) * 0.5,只是将权重变小,norm_list的结果便会大相径庭,如下图所示

这次梯度呈指数级减小,这会导致梯度消失,如果发生梯度消失,梯度将迅速变小。一旦梯度变小,权重梯度不能被更新,模型就会无法学习长期的依赖关系。总的来说,因为矩阵 Wh 被反复乘了 T 次(水平方向有有多少个RNN层),当 Wh 大于 1 时(假设为标量),梯度呈指数级增加,可能会导致梯度爆炸;当 Wh 小于 1 时,梯度呈指数级减小,可能会导致梯度消失。

LSTM

为了解决RNN出现的梯度消失和梯度爆炸问题,需要对RNN做一些改进,关于梯度爆炸的问题,RNN中可以采用梯度裁剪的方法来规避,这个方法非常简单,就是设置一个阈值,当梯度的L2范数大于或等于阈值时,那么就将其强制限制在某个范围之内。而为了解决梯度消失的问题,需要对RNN进行些改进,由此便诞生出了LSTM,LSTM与RNN的不同在于多了记忆单元,如下图所示

图中路径ct1c_{t-1}流经ctc_t的单元即为记忆单元,记忆单元的特点是,仅在 LSTM 层内部接收和传递数据。也就是说,记忆单元在 LSTM 层内部结束工作,不向其他层输出,记忆单元的作用就是记住时序数据的长期依赖关系,LSTM中实现记忆单元的组件就是门结构。

门结构就像门打开或合上一样,控制数据的流动,并且能够控制数据流动的大小,即开合度,开合度也是一个超参数,可以通过学习来获得,在LSTM中主要有输出门遗忘门输入门三种门结构。

输出门

LSTM有记忆单元ctc_t,这个ctc_t存储了时刻t时LSTM的记忆,可以认为其中保存了从过去到时刻 t的所有必要信息(或者以此为目的进行了学习),然后,数据流经记忆单元,基于这个充满必要信息的记忆,向外部的层(和水平方向上下一时刻的 LSTM)输出隐藏状态hth_t,大致如下图所示

输出门呢就是对tanh(ct)tanh(c_t)施加门,针对tanh(ct)tanh(c_t)的各个元素,调整它们作为下一时刻的隐藏状态的重要程度。输出门的开合度(流出比例)根据输入xtx_t和上一个状态ht1h_{t-1}求得,开合度用到了sigmoid函数,用σ()\sigma()表示,输出门开合度的计算公式如下所示

o=σ(xtWx(o)+ht1Wh(o)+b(o))o=\sigma(x_tW_x^{(o)}+h_{t-1}W_h^{(o)}+b^{(o)})

输入xtx_t的权重为Wx(o)W_x^{(o)},上一时刻的状态ht1h_{t-1}的权重为Wh(o)W_h^{(o)},偏置为b(o)b^{(o)}。将输出门的开合度与tanh(ct)tanh(c_t)对应元素的乘积作为hth_t输出,注意这里的乘积的对应元素乘积,即哈达玛积,用符号()\odot()表示,计算输出的公式为

ht=otanh(ct)h_t=o\odot tanh(c_t)

添加输出门后的结构如下图所示

遗忘门

遗忘是为了更好的铭记,LSTM不仅要一股脑的把之前的信息记住,还需要忘记一些信息,忘记的这一些信息可能是下一个时刻LSTM层不需要的信息,因此需要为记忆单元ct1c_{t-1}添加一个忘记不必要记忆的门,即遗忘门,遗忘门的开合度也是通过学习到的,通俗的来说遗忘门要学习一个合适的开合度,使得丢弃一些信息之后,将学习后得到的损失降到最低,遗忘门开合度和输出门类似,计算公式如下所示

f=σ(xtWx(f)+ht1Wh(f)+b(f))f=\sigma(x_tW_x^{(f)}+h_{t-1}W_h^{(f)}+b^{(f)})

遗忘门放置得位置如下图所示

根据上图,经过遗忘门得输出为

ct=ftanh(ct1)c_t=f\odot tanh(c_{t-1})

输入门

遗忘门从上一时刻的记忆单元中删除了应该忘记的东西,但是这样一来,记忆单元只会忘记信息,现在我们还想向这个记忆单元添加一些应当记住的新信息,为此我们添加新的 tanh 节点,如下图所示

这个记忆节点只需要把计算结果加到上一时刻的记忆单元ctt1c_t{t-1}上,因此只需要用到tanh运算,tanh运算的作用不是门,它无法控制数据流经的开合度,只是将新的信息添加到记忆单元中,记忆节点的计算公式如下所示

g=tanh(xtWx(g)+ht1Wh(g)+b(g))g=tanh(x_tW_x^{(g)}+h_{t-1}W_h^{(g)}+b^{(g)})

最后,需要给记忆节点g添加门,用来表示需要记住多少信息,这个门就是输入门,输入门判断经由记忆节点g新增的信息中各个元素的价值有多大,输入门不会不经考虑就添加新信息,而是要利用门的开合度对要添加的信息进行取舍,输入门会添加加权后的新信息,输入门的计算图如下所示

同理,输入门开合度的计算公式如下

i=σ(xtWx(i)+ht1Wh(i)+b(i))i=\sigma(x_tW_x^{(i)}+h_{t-1}W_h^{(i)}+b^{(i)})

门结构计算总结

介绍完了输出门、遗忘门和输入门之后,对LSTM门结构的计算进行下总结,相较于RNN,LSTM多了以下六步计算

f=σ(xtWx(f)+ht1Wh(f)+b(f))g=tanh(xtWx(g)+ht1Wh(g)+b(g))i=σ(xtWx(i)+ht1Wh(i)+b(i))ct=fct1+giht=otanh(ct)f=\sigma(x_tW_x^{(f)}+h_{t-1}W_h^{(f)}+b^{(f)}) \\ g=tanh(x_tW_x^{(g)}+h_{t-1}W_h^{(g)}+b^{(g)}) \\ i=\sigma(x_tW_x^{(i)}+h_{t-1}W_h^{(i)}+b^{(i)}) \\ c_t=f\odot c_{t-1} + g\odot i \\ h_t=o\odot tanh(c_t)

LSTM为什么可以避免梯度消失和梯度爆炸

现在回到开始的那个问题,LSTM可以有效的避免梯度消失问题,那么加了这几个门结构就能够避免梯度消失吗?为什么呢?我们可以观察记忆单元cc的反向传播来解释,记忆单元cc的反向传播如下图所示

从图中可以看到,记忆单元的运算只有加法和哈达玛积这两个运算,哈达玛积不同于矩阵乘积,哈达玛积只是矩阵对应元素的乘积,这是很重要的一点,LSTM 的反向传播进行的不是矩阵乘积计算,而是对应元素的乘积计算,而且每次都会基于不同的门值进行对应元素的乘积计算,这就是LSTM不会发生梯度消失和梯度爆炸的原因。

LSTM的实现

和之前的神经网络一样,LSTM也是搭积木的过程,只是在上一篇文章RNN和Time RNN层的基础上增加了几个门结构的计算。

单层LSTM的实现

单层LSTM的实现代码如下

class LSTM:
def __init__(self, Wx, Wh, b):
'''
Parameters
----------
Wx: 输入`x`用的权重参数(整合了4个权重)
Wh: 隐藏状态`h`用的权重参数(整合了4个权重)
b: 偏置(整合了4个偏置)
'''
self.params = [Wx, Wh, b]
self.grads = [np.zeros_like(Wx), np.zeros_like(Wh), np.zeros_like(b)]
self.cache = None

def forward(self, x, h_prev, c_prev):
Wx, Wh, b = self.params
N, H = h_prev.shape

A = np.dot(x, Wx) + np.dot(h_prev, Wh) + b

f = A[:, :H]
g = A[:, H:2*H]
i = A[:, 2*H:3*H]
o = A[:, 3*H:]

f = sigmoid(f)
g = np.tanh(g)
i = sigmoid(i)
o = sigmoid(o)

c_next = f * c_prev + g * i
h_next = o * np.tanh(c_next)

self.cache = (x, h_prev, c_prev, i, f, g, o, c_next)
return h_next, c_next

def backward(self, dh_next, dc_next):
Wx, Wh, b = self.params
x, h_prev, c_prev, i, f, g, o, c_next = self.cache

tanh_c_next = np.tanh(c_next)

ds = dc_next + (dh_next * o) * (1 - tanh_c_next ** 2)

dc_prev = ds * f

di = ds * g
df = ds * c_prev
do = dh_next * tanh_c_next
dg = ds * i

di *= i * (1 - i)
df *= f * (1 - f)
do *= o * (1 - o)
dg *= (1 - g ** 2)

dA = np.hstack((df, dg, di, do))

dWh = np.dot(h_prev.T, dA)
dWx = np.dot(x.T, dA)
db = dA.sum(axis=0)

self.grads[0][...] = dWx
self.grads[1][...] = dWh
self.grads[2][...] = db

dx = np.dot(dA, Wx.T)
dh_prev = np.dot(dA, Wh.T)

return dx, dh_prev, dc_prev

需要注意的是在反向传播中用到了np.hstack()函数,np.hstack()函数在水平方向上将参数中给定的数组(矩阵)拼接起来。

Time LSTM层的实现

Time LSTM层的实现代码如下

class TimeLSTM:
def __init__(self, Wx, Wh, b, stateful=False):
self.params = [Wx, Wh, b]
self.grads = [np.zeros_like(Wx), np.zeros_like(Wh), np.zeros_like(b)]
self.layers = None

self.h, self.c = None, None
self.dh = None
self.stateful = stateful

def forward(self, xs):
Wx, Wh, b = self.params
N, T, D = xs.shape
H = Wh.shape[0]

self.layers = []
hs = np.empty((N, T, H), dtype='f')

if not self.stateful or self.h is None:
self.h = np.zeros((N, H), dtype='f')
if not self.stateful or self.c is None:
self.c = np.zeros((N, H), dtype='f')

for t in range(T):
layer = LSTM(*self.params)
self.h, self.c = layer.forward(xs[:, t, :], self.h, self.c)
hs[:, t, :] = self.h

self.layers.append(layer)

return hs

def backward(self, dhs):
Wx, Wh, b = self.params
N, T, H = dhs.shape
D = Wx.shape[0]

dxs = np.empty((N, T, D), dtype='f')
dh, dc = 0, 0

grads = [0, 0, 0]
for t in reversed(range(T)):
layer = self.layers[t]
dx, dh, dc = layer.backward(dhs[:, t, :] + dh, dc)
dxs[:, t, :] = dx
for i, grad in enumerate(layer.grads):
grads[i] += grad

for i, grad in enumerate(grads):
self.grads[i][...] = grad
self.dh = dh
return dxs

def set_state(self, h, c=None):
self.h, self.c = h, c

def reset_state(self):
self.h, self.c = None, None

LSTM的使用

Time LSTM和Time RNN的使用方法基本一致,如下图所示

加深神经网络的层数往往能够提高精度,这里使用两个LSTM层(垂直方向),使用两层LSTM层的RNNLM如下图所示

利用Dropout抑制过拟合

通过叠加 LSTM 层,可以期待能够学习到时序数据的复杂依赖关系。换句话说,通过加深层,可以创建表现力更强的模型,但是这样的模型往往会发生过拟合问题,过拟合是指过度学习了训练数据的状态,也就是说,过拟合是一种缺乏泛化能力的状态。我们想要的是一个泛化能力强的模型,因此必须基于训练数据和验证数据的评价差异,判断是否发生了过拟合,并据此来进行模型的设计。

抑制过拟合常见的方法有两种:一是增加训练数据;二是降低模型的复杂度。除此之外,对模型复杂度给予惩罚的正则化也很有效。比如,L2 正则化会对过大的权重进行惩罚。但在这里我们采用Dropout的方式抑制过拟合,Dropout在训练时随机忽略层的一部分(比如 50 %)神经元,也可以被视为一种正则化,Dropout的原理如下图所示

那么,Dropout层应该插入到哪里呢?我们知道LSTM网络的数据流向有垂直和水平两个方向,Dropout最好垂直方向的层上插入,因为如果在时序方向上插入 Dropout,那么当模型学习时,随着时间的推移,信息会渐渐丢失,插入Dropout层后的RNNLM结构如下所示

Dropout的代码实现如下

class TimeDropout:
def __init__(self, dropout_ratio=0.5):
self.params, self.grads = [], []
self.dropout_ratio = dropout_ratio
self.mask = None
self.train_flg = True

def forward(self, xs):
if self.train_flg:
flg = np.random.rand(*xs.shape) > self.dropout_ratio
scale = 1 / (1.0 - self.dropout_ratio)
self.mask = flg.astype(np.float32) * scale

return xs * self.mask
else:
return xs

def backward(self, dout):
return dout * self.mask

双层LSTM的RNNLM结构的实现

相较于RNN那篇文章中的RNNLM,双层LSTM的RNNLM效果更好,因此命名为BetterRnnlm类,BetterRnnlm的实现代码如下所示

import sys
class BetterRnnlm(BaseModel):
def __init__(self, vocab_size=10000, wordvec_size=650,
hidden_size=650, dropout_ratio=0.5):
V, D, H = vocab_size, wordvec_size, hidden_size
rn = np.random.randn

embed_W = (rn(V, D) / 100).astype('f')
lstm_Wx1 = (rn(D, 4 * H) / np.sqrt(D)).astype('f')
lstm_Wh1 = (rn(H, 4 * H) / np.sqrt(H)).astype('f')
lstm_b1 = np.zeros(4 * H).astype('f')
lstm_Wx2 = (rn(H, 4 * H) / np.sqrt(H)).astype('f')
lstm_Wh2 = (rn(H, 4 * H) / np.sqrt(H)).astype('f')
lstm_b2 = np.zeros(4 * H).astype('f')
affine_b = np.zeros(V).astype('f')

self.layers = [
TimeEmbedding(embed_W),
TimeDropout(dropout_ratio),
TimeLSTM(lstm_Wx1, lstm_Wh1, lstm_b1, stateful=True),
TimeDropout(dropout_ratio),
TimeLSTM(lstm_Wx2, lstm_Wh2, lstm_b2, stateful=True),
TimeDropout(dropout_ratio),
TimeAffine(embed_W.T, affine_b) # 权重共享!!
]
self.loss_layer = TimeSoftmaxWithLoss()
self.lstm_layers = [self.layers[2], self.layers[4]]
self.drop_layers = [self.layers[1], self.layers[3], self.layers[5]]
self.params, self.grads = [], []
for layer in self.layers:
self.params += layer.params
self.grads += layer.grads

def predict(self, xs, train_flg=False):
for layer in self.drop_layers:
layer.train_flg = train_flg
for layer in self.layers:
xs = layer.forward(xs)
return xs

def forward(self, xs, ts, train_flg=True):
score = self.predict(xs, train_flg)
loss = self.loss_layer.forward(score, ts)
return loss

def backward(self, dout=1):
dout = self.loss_layer.backward(dout)
for layer in reversed(self.layers):
dout = layer.backward(dout)
return dout

def reset_state(self):
for layer in self.lstm_layers:
layer.reset_state()

需要注意的是,BetterRnnlm用到了权重共享的小技巧,权重共享即共享Embedding 层和 Affine 层的权重,通过在这两个层之间共享权重,可以大大减少学习的参数数量,并且仍能提高精度,抑制过拟合。

双层LSTM的RNNLM结构的学习

还是利用ptb语料库演示一下BetterRnnlm网络的学习流程,学习的代码如下所示

import sys
# 在用GPU运行时,请打开下面的注释(需要cupy)
# ==============================================
# config.GPU = True
# ==============================================
from common.optimizer import SGD
from common.trainer import RnnlmTrainer
from common.util import eval_perplexity
from dataset import ptb
from better_rnnlm import BetterRnnlm

# 设定超参数
batch_size = 20
wordvec_size = 650
hidden_size = 650
time_size = 35
lr = 20.0
max_epoch = 40
max_grad = 0.25
dropout = 0.5

# 读入训练数据
corpus, word_to_id, id_to_word = ptb.load_data('train')
corpus_val, _, _ = ptb.load_data('val')
corpus_test, _, _ = ptb.load_data('test')

vocab_size = len(word_to_id)
xs = corpus[:-1]
ts = corpus[1:]

model = BetterRnnlm(vocab_size, wordvec_size, hidden_size, dropout)
optimizer = SGD(lr)
trainer = RnnlmTrainer(model, optimizer)
best_ppl = float('inf')
for epoch in range(max_epoch):
trainer.fit(xs, ts, max_epoch=1, batch_size=batch_size,
time_size=time_size, max_grad=max_grad)

model.reset_state()
ppl = eval_perplexity(model, corpus_val)
print('valid perplexity: ', ppl)

if best_ppl > ppl:
best_ppl = ppl
model.save_params()
else:
lr /= 4.0
optimizer.lr = lr
model.reset_state()
print('-' * 50)

需要注意的是学习的过程使用RnnlmTrainer类进行学习,并用RnnlmTrainer类的fit()方法求模型的梯度,更新模型的参数,代码如下

class RnnlmTrainer:
def __init__(self, model, optimizer):
self.model = model
self.optimizer = optimizer
self.time_idx = None
self.ppl_list = None
self.eval_interval = None
self.current_epoch = 0

def get_batch(self, x, t, batch_size, time_size):
batch_x = np.empty((batch_size, time_size), dtype='i')
batch_t = np.empty((batch_size, time_size), dtype='i')

data_size = len(x)
jump = data_size // batch_size
offsets = [i * jump for i in range(batch_size)] # mini-batch的各笔样本数据的开始位置

for time in range(time_size):
for i, offset in enumerate(offsets):
batch_x[i, time] = x[(offset + self.time_idx) % data_size]
batch_t[i, time] = t[(offset + self.time_idx) % data_size]
self.time_idx += 1
return batch_x, batch_t

def fit(self, xs, ts, max_epoch=10, batch_size=20, time_size=35,
max_grad=None, eval_interval=20):
data_size = len(xs)
max_iters = data_size // (batch_size * time_size)
self.time_idx = 0
self.ppl_list = []
self.eval_interval = eval_interval
model, optimizer = self.model, self.optimizer
total_loss = 0
loss_count = 0

start_time = time.time()
for epoch in range(max_epoch):
for iters in range(max_iters):
batch_x, batch_t = self.get_batch(xs, ts, batch_size, time_size)

# 计算梯度,更新参数
loss = model.forward(batch_x, batch_t)
model.backward()
params, grads = remove_duplicate(model.params, model.grads) # 将共享的权重整合为1个
if max_grad is not None:
clip_grads(grads, max_grad)
optimizer.update(params, grads)
total_loss += loss
loss_count += 1

# 评价困惑度
if (eval_interval is not None) and (iters % eval_interval) == 0:
ppl = np.exp(total_loss / loss_count)
elapsed_time = time.time() - start_time
print('| epoch %d | iter %d / %d | time %d[s] | perplexity %.2f'
% (self.current_epoch + 1, iters + 1, max_iters, elapsed_time, ppl))
self.ppl_list.append(float(ppl))
total_loss, loss_count = 0, 0

self.current_epoch += 1

def plot(self, ylim=None):
x = numpy.arange(len(self.ppl_list))
if ylim is not None:
plt.ylim(*ylim)
plt.plot(x, self.ppl_list, label='train')
plt.xlabel('iterations (x' + str(self.eval_interval) + ')')
plt.ylabel('perplexity')
plt.show()
Author: Hongyi Guo
Link: https://guohongyi.com/2020/12/07/用Python手撕LSTM/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.