用Python手撕RNN

前面介绍的几个神经网络都是前馈型神经网络,前馈的意思是网络的传播方向是单向的。但是在NLP任务中,输入可能是一句话,词在句子中具有位置信息,即句子是时序数据,前馈神经网络无法充分学习时序数据的性质,而RNN(Recurrent Neural Network,循环神经网络)则可以较好的处理时序数据。

RNN

神经网络的循环

循环的意思就是数据的流通形成一个环路,使得数据在神经网络中不断的循环,从而一边记住过去的数据,一遍更新最新的数据,RNN的特征就是拥有这样的一个环路,下图展现了RNN的这个环路

在上图中,左边是数据在整体RNN层循环的大体示意,右边是RNN层的展开,不同的时刻对应的不同的RNN层,时刻t的输入数据是xtx_t,时序数据(x0,x1,,xt,)(x_0, x_1, \dots, x_t, \dots)会被输入到RNN层中,输出为(h0,h1,,ht,)(h_0, h_1, \dots, h_t, \dots)。可以看到,xtx_t的数据中包含有最开始的x0x_0的数据,各个时刻的 RNN 层接收传给该层的输入和前一个 RNN 层的输出,然后据此计算当前时刻的输出,时刻t的输出计算公式为

ht=tanh(ht1Wh+xtWx+b)h_t=tanh(h_{t-1}W_h+x_tW_x+b)

在上式中,RNN有两个权重,分别是将输入x转化为输出h的权重WxW_x和将前一个RNN层的输出转化为当前时刻的输出权重WhW_h,此外还有偏置b。在该式中,首先执行矩阵的乘积计算,然后使用tanh函数变换他们的和,因此hth_t的结果是由ht1h_{t-1}计算得来的,可以说记忆了前面的计算结果,即RNN具有储存记忆的功能。

基于时间的反向传播

上一节中介绍了RNN的正向传播,与前馈神经网络不同的是,RNN可视为在水平方向上延伸的神经网络,数据有两个方向的传递,一个是向垂直方向上的下一层传递,一个是向水平方向上不同时刻的RNN层进行传递。与RNN正向传播的过程类似,RNN的反向传播也是按时间顺序展开的反向传播,称为Backpropagation Through Time(基于时间的反向传播),简称 BPTT,如下图所示

在计算BPTT时有个需要解决的问题,那就是随着时序的时间跨度增大时,BPTT的计算量会呈指数级的增大,同时BPTT的梯度也会不稳定,因此需要对学习长时序的数据进行截断,即Truncated BPTT(截断的 BPTT),具体来说,就是将时间轴方向上过长的网络在合适的位置进行截断,从而创建多个小型网络,然后对截出来的小型网络执行误差反向传播法,需要注意的是截断的只是反向传播,正向传播的连接依然被维持,Truncated BPTT数据的处理顺序如下图所示

RNN的实现

单层RNN的实现

回顾一下RNN正向传播的数学表达式

ht=tanh(ht1Wh+xtWx+b)h_t=tanh(h_{t-1}W_h+x_tW_x+b)

上式中用到的运算有tanh、乘法和加法,都是之前介绍的几个神经网络中用到的操作,所以正向传播和反向传播的代码也和之前的神经网络差不多,代码如下

class RNN:
def __init__(self, Wx, Wh, b):
self.params = [Wx, Wh, b]
self.grads = [np.zeros_like(Wx), np.zeros_like(Wh), np.zeros_like(b)]
self.cache = None

def forward(self, x, h_prev):
Wx, Wh, b = self.params
t = np.dot(h_prev, Wh) + np.dot(x, Wx) + b
h_next = np.tanh(t)

self.cache = (x, h_prev, h_next)
return h_next

def backward(self, dh_next):
Wx, Wh, b = self.params
x, h_prev, h_next = self.cache

dt = dh_next * (1 - h_next ** 2)
db = np.sum(dt, axis=0)
dWh = np.dot(h_prev.T, dt)
dh_prev = np.dot(dt, Wh.T)
dWx = np.dot(x.T, dt)
dx = np.dot(dt, Wx.T)

self.grads[0][...] = dWx
self.grads[1][...] = dWh
self.grads[2][...] = db

return dx, dh_prev

RNN 的初始化方法接收两个权重参数和一个偏置参数。这里,将通过函数参数传进来的模型参数设置为列表类型的成员变量 params。然后,以各个参数对应的形状初始化梯度,并保存在 grads 中。最后,使用 None 对反向传播时要用到的中间数据 cache 进行初始化,正向传播的 forward(x, h_prev) 方法接收两个参数:从下方输入的 x 和从左边输入的 h_prev,反向传播的计算图如下所示,结合代码就很容易理解了。

Time RNN的实现

前面已经介绍过了,RNN可视为在水平方向上延伸的神经网络,在水平方向上由连续多个单层RNN组合起来的网络,我们称之为Time RNN,如下图所示

Truncated BPTT的处理过程就是将每一个截断的隐藏状态h保存在成员变量中,在一个截断的传播中会用到这个成员变量,如下图所示

TimeRNN的实现代码如下

class TimeRNN:
def __init__(self, Wx, Wh, b, stateful=False):
self.params = [Wx, Wh, b]
self.grads = [np.zeros_like(Wx), np.zeros_like(Wh), np.zeros_like(b)]
self.layers = None

self.h, self.dh = None, None
self.stateful = stateful

def set_state(self, h):
self.h = h

def reset_state(self):
self.h = None

def forward(self, xs):
Wx, Wh, b = self.params
N, T, D = xs.shape
D, H = Wx.shape

self.layers = []
hs = np.empty((N, T, H), dtype='f')

if not self.stateful or self.h is None:
self.h = np.zeros((N, H), dtype='f')

for t in range(T):
layer = RNN(*self.params)
self.h = layer.forward(xs[:, t, :], self.h)
hs[:, t, :] = self.h
self.layers.append(layer)

return hs

def backward(self, dhs):
Wx, Wh, b = self.params
N, T, H = dhs.shape
D, H = Wx.shape

dxs = np.empty((N, T, D), dtype='f')
dh = 0
grads = [0, 0, 0]
for t in reversed(range(T)):
layer = self.layers[t]
dx, dh = layer.backward(dhs[:, t, :] + dh) # 求和后的梯度
dxs[:, t, :] = dx

for i, grad in enumerate(layer.grads):
grads[i] += grad

for i, grad in enumerate(grads):
self.grads[i][...] = grad
self.dh = dh

return dxs

下面来分别介绍这段实现代码,初始化方法的参数有权重、偏置和布尔型(True/False)的 stateful。一个成员变量 layers 在列表中保存多个 RNN 层,另一个成员变量,h 保存调用 forward() 方法时的最后一个 RNN 层的隐藏状态。参数中的 stateful 是“有状态”的意思,当 statefulTrue 时,无论时序数据多长,Time RNN 层的正向传播都可以不中断地进行,而当 statefulFalse 时,每次调用 Time RNN 层的 forward() 时,第一个 RNN 层的隐藏状态都会被初始化为零矩阵(所有元素均为 0 的矩阵)。

正向传播的 forward(xs) 方法从下方获取输入 xsxs 囊括了T个时序数据。因此,如果批大小是 N,输入向量的维数是 D,则 xs 的形状为 (N,T,D)

TimeRNN的反向传播backward(dhs)结合下图来进行理解,在反向传播中,将从上游(输出侧的层)传来的梯度记为dhs,将流向下游的梯度记为 dxs,因为这里我们进行的是 Truncated BPTT,所以不需要流向这个块上一时刻的反向传播,只需要将流向上一时刻的隐藏状态的梯度存放在成员变量 dh 中。

将RNN用于处理时序数据

RNNLM

之前介绍的word2vec其实是一种语言模型,通过语言模型能够给出词语序列发生的概率,也就是说使用概率来评估一个词语序列发生的可能性,即在多大程度上是自然的词语序列。介绍完了单层RNN和TimeRNN的实现过程,接下来还需要像搭积木一样来搭建完整的用于处理时序数据的RNN语言模型,我们这里称之为RNNLM(RNN Language Model)

RNNLM同样用到了之前介绍过的Affine层、Embedding 层、Softmax层,RNNLM的全貌图如下所示,右图展示了在时间轴上展开的形式。

经过第一层的 Embedding 层可以将语料库中的词语ID转化为词语的分布式表示(词向量),然后将这个词向量输入到RNN层中,RNN向下一层输出隐藏状态,同时也向下一时刻的单层RNN输出隐藏状态,最终RNN向下一层输出的隐藏状态经过Affine层,传给Softmax层。

RNNLM的实现

RNNLM中的Affine层、Embedding 层、Softmax层和之前介绍的神经网络的各层不太一样,因为要处理时序数据,同样也要使用Time Embedding 层、Time Affine 层、Time Softmax层等来实现整体处理时序数据的层,如下图所示

Time Embedding 层、Time Affine 层、Time Softmax层的处理比Time RNN层要简单许多,因为不需要在水平方向上传递数据,只需要别处理各个时刻的数据即可,下面是Time Embedding 层、Time Affine 层、Time Softmax层的实现代码

class TimeEmbedding:
def __init__(self, W):
self.params = [W]
self.grads = [np.zeros_like(W)]
self.layers = None
self.W = W

def forward(self, xs):
N, T = xs.shape
V, D = self.W.shape

out = np.empty((N, T, D), dtype='f')
self.layers = []

for t in range(T):
layer = Embedding(self.W)
out[:, t, :] = layer.forward(xs[:, t])
self.layers.append(layer)

return out

def backward(self, dout):
N, T, D = dout.shape

grad = 0
for t in range(T):
layer = self.layers[t]
layer.backward(dout[:, t, :])
grad += layer.grads[0]

self.grads[0][...] = grad
return None


class TimeAffine:
def __init__(self, W, b):
self.params = [W, b]
self.grads = [np.zeros_like(W), np.zeros_like(b)]
self.x = None

def forward(self, x):
N, T, D = x.shape
W, b = self.params

rx = x.reshape(N*T, -1)
out = np.dot(rx, W) + b
self.x = x
return out.reshape(N, T, -1)

def backward(self, dout):
x = self.x
N, T, D = x.shape
W, b = self.params

dout = dout.reshape(N*T, -1)
rx = x.reshape(N*T, -1)

db = np.sum(dout, axis=0)
dW = np.dot(rx.T, dout)
dx = np.dot(dout, W.T)
dx = dx.reshape(*x.shape)

self.grads[0][...] = dW
self.grads[1][...] = db

return dx


class TimeSoftmaxWithLoss:
def __init__(self):
self.params, self.grads = [], []
self.cache = None
self.ignore_label = -1

def forward(self, xs, ts):
N, T, V = xs.shape

if ts.ndim == 3: # 在监督标签为one-hot向量的情况下
ts = ts.argmax(axis=2)

mask = (ts != self.ignore_label)

# 按批次大小和时序大小进行整理(reshape)
xs = xs.reshape(N * T, V)
ts = ts.reshape(N * T)
mask = mask.reshape(N * T)

ys = softmax(xs)
ls = np.log(ys[np.arange(N * T), ts])
ls *= mask # 与ignore_label相应的数据将损失设为0
loss = -np.sum(ls)
loss /= mask.sum()

self.cache = (ts, ys, mask, (N, T, V))
return loss

def backward(self, dout=1):
ts, ys, mask, (N, T, V) = self.cache

dx = ys
dx[np.arange(N * T), ts] -= 1
dx *= dout
dx /= mask.sum()
dx *= mask[:, np.newaxis] # 与ignore_label相应的数据将梯度设为0

dx = dx.reshape((N, T, V))

return dx

这里需要介绍一下TimeSoftmaxWithLoss这个类,在这个类中一并实现了Time Softmax 层和 Time Cross Entropy Error 层,称之为Time Softmax with Loss 层,如下图所示

计算完每个时序数据的损失后,将它们加在一起取个平均就行了,得到的值最为最终的损失,计算的式子如下

L=1T(L0+L0++LT1)L=\frac{1}{T}(L_0+L_0+\dots+L_{T-1})

介绍完了Time Embedding 层、Time Affine 层、Time Time Softmax with Loss 层之后,只需将它们拼在一起就行了,将RNNLM的实现定义为SimpleRnnlm类,实现代码如下

import numpy as np

class SimpleRnnlm:
def __init__(self, vocab_size, wordvec_size, hidden_size):
V, D, H = vocab_size, wordvec_size, hidden_size
rn = np.random.randn

# 初始化权重
embed_W = (rn(V, D) / 100).astype('f')
rnn_Wx = (rn(D, H) / np.sqrt(D)).astype('f')
rnn_Wh = (rn(H, H) / np.sqrt(H)).astype('f')
rnn_b = np.zeros(H).astype('f')
affine_W = (rn(H, V) / np.sqrt(H)).astype('f')
affine_b = np.zeros(V).astype('f')

# 生成层
self.layers = [
TimeEmbedding(embed_W),
TimeRNN(rnn_Wx, rnn_Wh, rnn_b, stateful=True),
TimeAffine(affine_W, affine_b)
]
self.loss_layer = TimeSoftmaxWithLoss()
self.rnn_layer = self.layers[1]

# 将所有的权重和梯度整理到列表中
self.params, self.grads = [], []
for layer in self.layers:
self.params += layer.params
self.grads += layer.grads

def forward(self, xs, ts):
for layer in self.layers:
xs = layer.forward(xs)
loss = self.loss_layer.forward(xs, ts)
return loss

def backward(self, dout=1):
dout = self.loss_layer.backward(dout)
for layer in reversed(self.layers):
dout = layer.backward(dout)
return dout

def reset_state(self):
self.rnn_layer.reset_state()

这段代码值得注意的地方是权重和偏置初始化的部分,在上面的初始化代码中,RNN 层和 Affine 层使用了Xavier 初始值,Xavier 初始值的含义是在上一层的节点数是n的情况下,使用标准差为1n\frac{1}{\sqrt{n}}的分布作为初始值。

RNNLM的学习

学习的代码使用了 PTB 数据集的前1000个单词(训练数据)进行学习,PTB数据集很容易的可以获取到,学习的代码如下所示

import matplotlib.pyplot as plt
import numpy as np
from dataset import ptb

# 设定超参数
batch_size = 10
wordvec_size = 100
hidden_size = 100 # RNN的隐藏状态向量的元素个数
time_size = 5 # Truncated BPTT的时间跨度大小
lr = 0.1
max_epoch = 100

# 读入训练数据(缩小了数据集)
corpus, word_to_id, id_to_word = ptb.load_data('train')
corpus_size = 1000
corpus = corpus[:corpus_size]
vocab_size = int(max(corpus) + 1)

xs = corpus[:-1] # 输入
ts = corpus[1:] # 输出(监督标签)
data_size = len(xs)
print('corpus size: %d, vocabulary size: %d' % (corpus_size, vocab_size))

# 学习用的参数
max_iters = data_size // (batch_size * time_size)
time_idx = 0
total_loss = 0
loss_count = 0
ppl_list = []

# 生成模型
model = SimpleRnnlm(vocab_size, wordvec_size, hidden_size)
optimizer = SGD(lr)

# ❶ 计算读入mini-batch的各笔样本数据的开始位置
jump = (corpus_size - 1) // batch_size
offsets = [i * jump for i in range(batch_size)]

for epoch in range(max_epoch):
for iter in range(max_iters):
# ❷ 获取mini-batch
batch_x = np.empty((batch_size, time_size), dtype='i')
batch_t = np.empty((batch_size, time_size), dtype='i')
for t in range(time_size):
for i, offset in enumerate(offsets):
batch_x[i, t] = xs[(offset + time_idx) % data_size]
batch_t[i, t] = ts[(offset + time_idx) % data_size]
time_idx += 1

# 计算梯度,更新参数
loss = model.forward(batch_x, batch_t)
model.backward()
optimizer.update(model.params, model.grads)
total_loss += loss
loss_count += 1

# ❸ 各个epoch的困惑度评价
ppl = np.exp(total_loss / loss_count)
print('| epoch %d | perplexity %.2f' % (epoch+1, ppl))
ppl_list.append(float(ppl))
total_loss, loss_count = 0, 0

这段代码有三处需要注意

  • 利用mini-batch分批训练数据
  • 使用了SGD进行优化
  • 采用困惑度作为评价标准

前两点之前的文章中介绍有,这里介绍一下为什么RNNLM这个语言模型要选取困惑度作为评价标准。困惑度简单来说就是预测将要出现词语的概率的倒数,假如**you say goodbye and i say hello.**这句话,语言模型预测you的下一个单词是say的概率为0.8,则困惑度就是这个概率的倒数,即10.8=1.25\frac{1}{0.8}=1.25,困惑度越接近1则表明语言模型预测的越准确。

Author: Hongyi Guo
Link: https://guohongyi.com/2020/11/30/用Python手撕RNN/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.