找回密码
 立即注册
首页 业界区 业界 大模型基础补全计划(四)---LSTM的实例与测试(RNN的改进) ...

大模型基础补全计划(四)---LSTM的实例与测试(RNN的改进)

旌磅箱 2025-10-1 17:15:46
PS:要转载请注明出处,本人版权所有。

PS: 这个只是基于《我自己》的理解,

如果和你的原则及想法相冲突,请谅解,勿喷。

环境说明

  无
前言

   本文是这个系列第四篇,它们是:

  • 《大模型基础补全计划(一)---重温一些深度学习相关的数学知识》 https://www.cnblogs.com/Iflyinsky/p/18717317
  • 《大模型基础补全计划(二)---词嵌入(word embedding) 》 https://www.cnblogs.com/Iflyinsky/p/18775451
  • 《大模型基础补全计划(三)---RNN实例与测试》 https://www.cnblogs.com/Iflyinsky/p/18967569
   上文我们提到了RNN这种处理序列信息的网络结构,今天我们将会提到RNN的改进版本之一的网络结构:LSTM。注意在transformer结构出来之前,RNN还有很多的改进结构,毕竟这是一个大的研究方向。




LSTM (long short-term memory) 简介

  


LSTM的意义

  我们首先来想一想RNN的结构,很朴素的理解:RNN有两个输入,一个是当前输入,一个是上一次隐藏参数输入。如果我们从时间线来看,对于早期的输入\(X_{t-n}\)来说,由于隐藏参数一层层迭代和传递,对于\(X_t\)的影响非常的弱。此外,相对的,对于\(X_{t-1}\)来说,其对\(X_t\)的影响非常的强,如果\(X_{t-1}\)信息不完整,可能会影响输出。
  为了解决上面RNN结构遇到的问题,提出了LSTM结构。


LSTM的结构介绍

  首先我们来看看其结构图如下:
            
1.png
        注:此图来自于 https://zh.d2l.ai/chapter_recurrent-modern/lstm.html ,若侵权,联系删之。
  其有如下的一些内容:

  • 有三个输入:输入\(X_t\),隐藏参数\(H_{t-1}\),记忆\(C_{t-1}\)。
  • 有三个门:输入门 \(I_t = \sigma(X_tW_{xi} + H_{t-1}W_{hi} + b_i)\) , 遗忘门 \(F_t = \sigma(X_tW_{xf} + H_{t-1}W_{hf} + b_f)\),输出门 \(O_t = \sigma(X_tW_{xo} + H_{t-1}W_{ho} + b_o)\)
  • 有一个候选记忆元 \(\widetilde{C_t} = tanh(X_tW_{xc} + H_{t-1}W_{hc} + b_c)\)。
  • 有一个记忆元\(C_t\),其含义很简单,有多少记忆来自于\(\widetilde{C_t}\) ,然后由输入门 \(I_t\)控制多少候选记忆元进入新记忆中,由 遗忘门 遗忘门 \(F_t\) 来控制多少以前的记忆\(C_{t-1}\)进入新的记忆中。其公式为:\(C_t = F_t \odot C_{t-1} + I_t \odot \widetilde{C_t}\)
  • 有三个输出:输出门 \(O_t\),记忆元 \(C_t\),隐藏态\(H_t = O_t \odot tanh(C_t)\)
  总的来说,就是给隐藏参数加入了记忆参数,并可以通过记忆影响隐藏参数。




基于LSTM训练一个简单的文字序列输出模型

  对于文本预处理、数据集构造、训练框架搭建详见前文《大模型基础补全计划(三)---RNN实例与测试》
  下面是构建LSTM的网络结构,首先我们手动来构建网络:
  1. def get_lstm_params(vocab_size, num_hiddens, device):
  2.     num_inputs = num_outputs = vocab_size
  3.     def normal(shape):
  4.         return torch.randn(size=shape, device=device)*0.01
  5.     def three():
  6.         return (normal((num_inputs, num_hiddens)),
  7.                 normal((num_hiddens, num_hiddens)),
  8.                 torch.zeros(num_hiddens, device=device))
  9.     W_xi, W_hi, b_i = three()  # 输入门参数
  10.     W_xf, W_hf, b_f = three()  # 遗忘门参数
  11.     W_xo, W_ho, b_o = three()  # 输出门参数
  12.     W_xc, W_hc, b_c = three()  # 候选记忆元参数
  13.     # 输出层参数
  14.     W_hq = normal((num_hiddens, num_outputs))
  15.     b_q = torch.zeros(num_outputs, device=device)
  16.     # 附加梯度
  17.     params = [W_xi, W_hi, b_i, W_xf, W_hf, b_f, W_xo, W_ho, b_o, W_xc, W_hc,
  18.               b_c, W_hq, b_q]
  19.     for param in params:
  20.         param.requires_grad_(True)
  21.     return params
  22. def init_lstm_state(batch_size, num_hiddens, device):
  23.     return (torch.zeros((batch_size, num_hiddens), device=device),
  24.             torch.zeros((batch_size, num_hiddens), device=device))
  25. def lstm(inputs, state, params):
  26.     [W_xi, W_hi, b_i, W_xf, W_hf, b_f, W_xo, W_ho, b_o, W_xc, W_hc, b_c,
  27.      W_hq, b_q] = params
  28.     (H, C) = state
  29.     outputs = []
  30.     for X in inputs:
  31.         I = torch.sigmoid((X @ W_xi) + (H @ W_hi) + b_i)
  32.         F = torch.sigmoid((X @ W_xf) + (H @ W_hf) + b_f)
  33.         O = torch.sigmoid((X @ W_xo) + (H @ W_ho) + b_o)
  34.         C_tilda = torch.tanh((X @ W_xc) + (H @ W_hc) + b_c)
  35.         C = F * C + I * C_tilda
  36.         H = O * torch.tanh(C)
  37.         Y = (H @ W_hq) + b_q
  38.         outputs.append(Y)
  39.     return torch.cat(outputs, dim=0), (H, C)
复制代码
  然后是通过torch框架来设计网络:
  1. lstm_layer = nn.LSTM(num_inputs, num_hiddens)
复制代码
  最后是完整的训练代码:
  1. import osimport randomimport torchimport mathfrom torch import nnfrom torch.nn import functional as Fimport numpy as npimport timeimport visdomimport syssys.path.append('.')import datesetclass Accumulator:    """在n个变量上累加"""    def __init__(self, n):        """Defined in :numref:`sec_softmax_scratch`"""        self.data = [0.0] * n    def add(self, *args):        self.data = [a + float(b) for a, b in zip(self.data, args)]    def reset(self):        self.data = [0.0] * len(self.data)    def __getitem__(self, idx):        return self.data[idx]    class Timer:    """记录多次运行时间"""    def __init__(self):        """Defined in :numref:`subsec_linear_model`"""        self.times = []        self.start()    def start(self):        """启动计时器"""        self.tik = time.time()    def stop(self):        """停止计时器并将时间记录在列表中"""        self.times.append(time.time() - self.tik)        return self.times[-1]    def avg(self):        """返回平均时间"""        return sum(self.times) / len(self.times)    def sum(self):        """返回时间总和"""        return sum(self.times)    def cumsum(self):        """返回累计时间"""        return np.array(self.times).cumsum().tolist()    # 以num_steps为步长,从随机的起始位置开始,返回# x1=[ [random_offset1:random_offset1 + num_steps], ... , [random_offset_batchsize:random_offset_batchsize + num_steps] ]# y1=[ [random_offset1 + 1:random_offset1 + num_steps + 1], ... , [random_offset_batchsize + 1:random_offset_batchsize + num_steps + 1] ]def seq_data_iter_random(corpus, batch_size, num_steps):  #@save    """使用随机抽样生成一个小批量子序列"""    # 从随机偏移量开始对序列进行分区,随机范围包括num_steps-1    corpus = corpus[random.randint(0, num_steps - 1):]    # 减去1,是因为我们需要考虑标签    num_subseqs = (len(corpus) - 1) // num_steps    # 长度为num_steps的子序列的起始索引    # [0, num_steps*1, num_steps*2, num_steps*3, ...]    initial_indices = list(range(0, num_subseqs * num_steps, num_steps))    # 在随机抽样的迭代过程中,    # 来自两个相邻的、随机的、小批量中的子序列不一定在原始序列上相邻    random.shuffle(initial_indices)    def data(pos):        # 返回从pos位置开始的长度为num_steps的序列        return corpus[pos: pos + num_steps]    num_batches = num_subseqs // batch_size    for i in range(0, batch_size * num_batches, batch_size):        # 在这里,initial_indices包含子序列的随机起始索引        initial_indices_per_batch = initial_indices[i: i + batch_size]        X = [data(j) for j in initial_indices_per_batch]        Y = [data(j + 1) for j in initial_indices_per_batch]        yield torch.tensor(X), torch.tensor(Y)# 以num_steps为步长,从随机的起始位置开始,返回# x1=[:, random_offset1:random_offset1 + num_steps]# y1=[:, random_offset1 + 1:random_offset1 + num_steps + 1]def seq_data_iter_sequential(corpus, batch_size, num_steps):  #@save    """使用顺序分区生成一个小批量子序列"""    # 从随机偏移量开始划分序列    offset = random.randint(0, num_steps)    num_tokens = ((len(corpus) - offset - 1) // batch_size) * batch_size    # 重新根据corpus建立X_corpus, Y_corpus,两者之间差一位。注意X_corpus, Y_corpus的长度是batch_size的整数倍    Xs = torch.tensor(corpus[offset: offset + num_tokens])    Ys = torch.tensor(corpus[offset + 1: offset + 1 + num_tokens])    # 直接根据batchsize划分X_corpus, Y_corpus    Xs, Ys = Xs.reshape(batch_size, -1), Ys.reshape(batch_size, -1)    # 计算出需要多少次才能取完数据    num_batches = Xs.shape[1] // num_steps    for i in range(0, num_steps * num_batches, num_steps):        X = Xs[:, i: i + num_steps]        Y = Ys[:, i: i + num_steps]        yield X, Yclass SeqDataLoader:  #@save    """加载序列数据的迭代器"""    def __init__(self, batch_size, num_steps, use_random_iter, max_tokens):        if use_random_iter:            self.data_iter_fn = seq_data_iter_random        else:            self.data_iter_fn = seq_data_iter_sequential        self.corpus, self.vocab = dateset.load_dataset(max_tokens)        self.batch_size, self.num_steps = batch_size, num_steps    def __iter__(self):        return self.data_iter_fn(self.corpus, self.batch_size, self.num_steps)    def load_data_epoch(batch_size, num_steps,  #@save                           use_random_iter=False, max_tokens=10000):    """返回时光机器数据集的迭代器和词表"""    data_iter = SeqDataLoader(        batch_size, num_steps, use_random_iter, max_tokens)    return data_iter, data_iter.vocabdef get_lstm_params(vocab_size, num_hiddens, device):
  2.     num_inputs = num_outputs = vocab_size
  3.     def normal(shape):
  4.         return torch.randn(size=shape, device=device)*0.01
  5.     def three():
  6.         return (normal((num_inputs, num_hiddens)),
  7.                 normal((num_hiddens, num_hiddens)),
  8.                 torch.zeros(num_hiddens, device=device))
  9.     W_xi, W_hi, b_i = three()  # 输入门参数
  10.     W_xf, W_hf, b_f = three()  # 遗忘门参数
  11.     W_xo, W_ho, b_o = three()  # 输出门参数
  12.     W_xc, W_hc, b_c = three()  # 候选记忆元参数
  13.     # 输出层参数
  14.     W_hq = normal((num_hiddens, num_outputs))
  15.     b_q = torch.zeros(num_outputs, device=device)
  16.     # 附加梯度
  17.     params = [W_xi, W_hi, b_i, W_xf, W_hf, b_f, W_xo, W_ho, b_o, W_xc, W_hc,
  18.               b_c, W_hq, b_q]
  19.     for param in params:
  20.         param.requires_grad_(True)
  21.     return params
  22. def init_lstm_state(batch_size, num_hiddens, device):
  23.     return (torch.zeros((batch_size, num_hiddens), device=device),
  24.             torch.zeros((batch_size, num_hiddens), device=device))
  25. def lstm(inputs, state, params):
  26.     [W_xi, W_hi, b_i, W_xf, W_hf, b_f, W_xo, W_ho, b_o, W_xc, W_hc, b_c,
  27.      W_hq, b_q] = params
  28.     (H, C) = state
  29.     outputs = []
  30.     for X in inputs:
  31.         I = torch.sigmoid((X @ W_xi) + (H @ W_hi) + b_i)
  32.         F = torch.sigmoid((X @ W_xf) + (H @ W_hf) + b_f)
  33.         O = torch.sigmoid((X @ W_xo) + (H @ W_ho) + b_o)
  34.         C_tilda = torch.tanh((X @ W_xc) + (H @ W_hc) + b_c)
  35.         C = F * C + I * C_tilda
  36.         H = O * torch.tanh(C)
  37.         Y = (H @ W_hq) + b_q
  38.         outputs.append(Y)
  39.     return torch.cat(outputs, dim=0), (H, C)def try_gpu(i=0):    """如果存在,则返回gpu(i),否则返回cpu()    Defined in :numref:`sec_use_gpu`"""    if torch.cuda.device_count() >= i + 1:        return torch.device(f'cuda:{i}')    return torch.device('cpu')#@saveclass RNNModel(nn.Module):    """循环神经网络模型"""    def __init__(self, rnn_layer, vocab_size, device, **kwargs):        super(RNNModel, self).__init__(**kwargs)        self.rnn = rnn_layer        self.vocab_size = vocab_size        self.num_hiddens = self.rnn.hidden_size        # 如果RNN是双向的(之后将介绍),num_directions应该是2,否则应该是1        if not self.rnn.bidirectional:            self.num_directions = 1            self.linear = nn.Linear(self.num_hiddens, self.vocab_size, device=device)        else:            self.num_directions = 2            self.linear = nn.Linear(self.num_hiddens * 2, self.vocab_size, device=device)    def forward(self, inputs, state):        X = F.one_hot(inputs.T.long(), self.vocab_size)        X = X.to(torch.float32)        Y, state = self.rnn(X, state)        # 全连接层首先将Y的形状改为(时间步数*批量大小,隐藏单元数)        # 它的输出形状是(时间步数*批量大小,词表大小)。        output = self.linear(Y.reshape((-1, Y.shape[-1])))        return output, state    def begin_state(self, device, batch_size=1):        if not isinstance(self.rnn, nn.LSTM):            # nn.GRU以张量作为隐状态            return  torch.zeros((self.num_directions * self.rnn.num_layers,                                 batch_size, self.num_hiddens),                                device=device)        else:            # nn.LSTM以元组作为隐状态            return (torch.zeros((                self.num_directions * self.rnn.num_layers,                batch_size, self.num_hiddens), device=device),                    torch.zeros((                        self.num_directions * self.rnn.num_layers,                        batch_size, self.num_hiddens), device=device))class RNNModelScratch: #@save    """从零开始实现的循环神经网络模型"""    def __init__(self, vocab_size, num_hiddens, device,                 get_params, init_state, forward_fn):        self.vocab_size, self.num_hiddens = vocab_size, num_hiddens        # 初始化了隐藏参数 W_xh, W_hh, b_h,  W_hq, b_q        self.params = get_params(vocab_size, num_hiddens, device)        self.init_state, self.forward_fn = init_state, forward_fn    def __call__(self, X, state):        # X的形状:(batch_size, num_steps)        # X one_hot之后的形状:(num_steps,batch_size,词表大小)        X = F.one_hot(X.T, self.vocab_size).type(torch.float32)        return self.forward_fn(X, state, self.params)    def begin_state(self, batch_size, device):        return self.init_state(batch_size, self.num_hiddens, device)def predict_ch8(prefix, num_preds, net, vocab, device):  #@save    """在prefix后面生成新字符"""    state = net.begin_state(batch_size=1, device=device)    outputs = [vocab[prefix[0]]]    get_input = lambda: torch.tensor([outputs[-1]], device=device).reshape((1, 1))    for y in prefix[1:]:  # 预热期        _, state = net(get_input(), state)        outputs.append(vocab[y])    for _ in range(num_preds):  # 预测num_preds步        # y 包含从开始到现在的所有输出        # state是当前计算出来的隐藏参数        y, state = net(get_input(), state)        outputs.append(int(y.argmax(dim=1).reshape(1)))    return ''.join([vocab.idx_to_token[i] for i in outputs])def grad_clipping(net, theta):  #@save    """裁剪梯度"""    if isinstance(net, nn.Module):        params = [p for p in net.parameters() if p.requires_grad]    else:        params = net.params    norm = torch.sqrt(sum(torch.sum((p.grad ** 2)) for p in params))    if norm > theta:        for param in params:            param.grad[:] *= theta / normdef train_epoch_ch8(net, train_iter, loss, updater, device, use_random_iter):    """训练网络一个迭代周期(定义见第8章)"""    state, timer = None, Timer()    metric = Accumulator(2)  # 训练损失之和,词元数量    # X的形状:(batch_size, num_steps)    # Y的形状:(batch_size, num_steps)    for X, Y in train_iter:        if state is None or use_random_iter:            # 在第一次迭代或使用随机抽样时初始化state            state = net.begin_state(batch_size=X.shape[0], device=device)        else:            if isinstance(net, nn.Module) and not isinstance(state, tuple):                # state对于nn.GRU是个张量                state.detach_()            else:                # state对于nn.LSTM或对于我们从零开始实现的模型是个张量                for s in state:                    s.detach_()        y = Y.T.reshape(-1)        X, y = X.to(device), y.to(device)        # y_hat 包含从开始到现在的所有输出        # y_hat的形状:(batch_size * num_steps, 词表大小)        # state是当前计算出来的隐藏参数        y_hat, state = net(X, state)        # 交叉熵损失函数,传入预测值和标签值,并求平均值        l = loss(y_hat, y.long()).mean()        if isinstance(updater, torch.optim.Optimizer):            updater.zero_grad()            l.backward()            grad_clipping(net, 1)            updater.step()        else:            l.backward()            grad_clipping(net, 1)            # 因为已经调用了mean函数            updater(batch_size=1)        # 这里记录交叉熵损失的值的和,以及记录对应交叉熵损失值的样本个数        metric.add(l * y.numel(), y.numel())    # 求交叉熵损失的平均值,再求exp,即可得到困惑度    return math.exp(metric[0] / metric[1]), metric[1] / timer.stop()def sgd(params, lr, batch_size):    """小批量随机梯度下降    Defined in :numref:`sec_linear_scratch`"""    with torch.no_grad():        for param in params:            param -= lr * param.grad / batch_size            param.grad.zero_()#@savedef train_ch8(net, train_iter, vocab, lr, num_epochs, device,              use_random_iter=False):    """训练模型(定义见第8章)"""    loss = nn.CrossEntropyLoss()    # 新建一个连接客户端    # 指定 env=u'test1',默认端口为 8097,host 是 'localhost'    vis = visdom.Visdom(env=u'test1', server="http://10.88.88.136", port=8097)    animator = vis    # 初始化    if isinstance(net, nn.Module):        updater = torch.optim.SGD(net.parameters(), lr)    else:        updater = lambda batch_size: sgd(net.params, lr, batch_size)    predict = lambda prefix: predict_ch8(prefix, 30, net, vocab, device)    # 训练和预测    for epoch in range(num_epochs):        ppl, speed = train_epoch_ch8(            net, train_iter, loss, updater, device, use_random_iter)                if (epoch + 1) % 10 == 0:            # print(predict('你是?'))            # print(epoch)            # animator.add(epoch + 1, )            if epoch == 9:                # 清空图表:使用空数组来替换现有内容                vis.line(X=np.array([0]), Y=np.array([0]), win='train_ch8', update='replace')            vis.line(                X=np.array([epoch + 1]),                Y=[ppl],                win='train_ch8',                update='append',                opts={                    'title': 'train_ch8',                    'xlabel': 'epoch',                    'ylabel': 'ppl',                    'linecolor': np.array([[0, 0, 255]]),  # 蓝色线条                }            )    print(f'困惑度 {ppl:.1f}, {speed:.1f} 词元/秒 {str(device)}')    print(predict('你是'))    print(predict('我有一剑'))if __name__ == '__main__':    batch_size, num_steps = 32, 35    train_iter, vocab = load_data_epoch(batch_size, num_steps)    vocab_size, num_hiddens, device = len(vocab), 256, try_gpu()    num_epochs, lr = 1000, 1    model = RNNModelScratch(len(vocab), num_hiddens, device, get_lstm_params, init_lstm_state, lstm)        # num_inputs = vocab_size    # lstm_layer = nn.LSTM(num_inputs, num_hiddens)    # model = RNNModel(lstm_layer, len(vocab), device)    # model = model.to(device)        print(predict_ch8('你是', 30, model, vocab, device))    train_ch8(model, train_iter, vocab, lr, num_epochs, device)
复制代码
  我们分别使用手动构建的LSTM和框架构建的LSTM进行训练和测试,结果如下:
            
2.png
                  
3.png
                  
4.png
                  
5.png
         我们可以看到,模型未训练和训练后的对比,明显训练后的语句要通顺一点。




后记

  综合RNN和LSTM两篇文章的结论来看,其对序列数据确实有一定的效果。
  此外,当前我们用RNN/LSTM做了序列数据的后续模拟生成工作,但是由于网络深度、广度的问题,其效果也就比在词表中随机抽取字组成的序列看起来要好点。
参考文献


  • https://zh.d2l.ai/chapter_recurrent-modern/lstm.html
  • https://zh.d2l.ai/chapter_recurrent-neural-networks/rnn.html
  • https://zh.d2l.ai/chapter_recurrent-neural-networks/text-preprocessing.html


                    打赏、订阅、收藏、丢香蕉、硬币,请关注公众号(攻城狮的搬砖之路)               
6.jpeg
    PS: 请尊重原创,不喜勿喷。

PS: 要转载请注明出处,本人版权所有。

PS: 有问题请留言,看到后我会第一时间回复。


来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

17 小时前

举报

收藏一下   不知道什么时候能用到
您需要登录后才可以回帖 登录 | 立即注册