ailearn

RNN与序列模型 - 处理时序数据

深入学习循环神经网络原理,掌握LSTM、GRU等序列模型

访问-- -- --

前置知识:需要先掌握 神经网络基础

本文重点:理解RNN原理,掌握LSTM/GRU的使用


一、RNN基础

1.1 为什么需要RNN

传统神经网络处理固定大小输入,但很多任务是序列数据:

  • 文本:单词序列
  • 语音:音频序列
  • 时间序列:股票价格
  • 视频:图像序列 RNN的特点:
  • 记忆能力:保留历史信息
  • 变长输入:处理任意长度序列
  • 权重共享:每个时间步使用相同参数

1.2 RNN原理

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
"""
RNN核心公式:
h_t = tanh(W_hh * h_{t-1} + W_xh * x_t + b_h)
y_t = W_hy * h_t + b_y
- h_t: 当前隐藏状态
- x_t: 当前输入
- y_t: 当前输出
"""
# ===== 手动实现RNN =====
class SimpleRNN:
    """手动实现的简单RNN"""
    
    def __init__(self, input_size, hidden_size, output_size):
        self.hidden_size = hidden_size
        
        # 初始化权重
        self.W_xh = np.random.randn(input_size, hidden_size) * 0.01
        self.W_hh = np.random.randn(hidden_size, hidden_size) * 0.01
        self.W_hy = np.random.randn(hidden_size, output_size) * 0.01
        
        self.b_h = np.zeros(hidden_size)
        self.b_y = np.zeros(output_size)
    
    def forward(self, x):
        """
        x: (seq_len, input_size)
        """
        h = np.zeros(self.hidden_size)
        outputs = []
        hidden_states = [h]
        
        for t in range(len(x)):
            # 计算隐藏状态
            h = np.tanh(x[t] @ self.W_xh + h @ self.W_hh + self.b_h)
            # 计算输出
            y = h @ self.W_hy + self.b_y
            outputs.append(y)
            hidden_states.append(h)
        
        return np.array(outputs), np.array(hidden_states)
# ===== PyTorch RNN =====
# 基础RNN层
rnn = nn.RNN(
    input_size=10,    # 输入特征维度
    hidden_size=20,   # 隐藏状态维度
    num_layers=2,     # RNN层数
    batch_first=True, # 输入格式 (batch, seq, feature)
    bidirectional=False
)
# 输入
batch_size, seq_len, input_size = 3, 5, 10
x = torch.randn(batch_size, seq_len, input_size)
# 前向传播
output, h_n = rnn(x)
print(f"输入形状: {x.shape}")
print(f"输出形状: {output.shape}")  # (batch, seq, hidden)
print(f"隐藏状态形状: {h_n.shape}")  # (num_layers, batch, hidden)

1.3 RNN的问题:梯度消失

# 演示梯度消失问题
def gradient_vanish_demo():
    """演示长序列中的梯度消失"""
    seq_len = 100
    hidden_size = 1
    
    # 简单RNN
    rnn = nn.RNN(input_size=1, hidden_size=hidden_size, batch_first=True)
    
    # 长序列输入
    x = torch.randn(1, seq_len, 1)
    h0 = torch.zeros(1, 1, hidden_size)
    
    # 前向传播
    output, hn = rnn(x, h0)
    
    # 反向传播
    loss = output.sum()
    loss.backward()
    
    # 查看输入层梯度
    input_grad = rnn.weight_ih_l0.grad
    print(f"输入权重梯度: {input_grad.abs().mean():.6f}")
    print("梯度非常小 -> 梯度消失问题")
gradient_vanish_demo()

二、LSTM

2.1 LSTM原理

"""
LSTM 解决梯度消失问题的关键:
- 细胞状态 (Cell State):长期记忆
- 门控机制:选择性地遗忘和记忆
三个门:
- 遗忘门 (Forget Gate): f_t = σ(W_f * [h_{t-1}, x_t])
- 输入门 (Input Gate): i_t = σ(W_i * [h_{t-1}, x_t])
- 输出门 (Output Gate): o_t = σ(W_o * [h_{t-1}, x_t])
更新公式:
C_t = f_t * C_{t-1} + i_t * tanh(W_C * [h_{t-1}, x_t])
h_t = o_t * tanh(C_t)
"""
# ===== PyTorch LSTM =====
lstm = nn.LSTM(
    input_size=10,
    hidden_size=20,
    num_layers=2,
    batch_first=True,
    bidirectional=True  # 双向LSTM
)
x = torch.randn(3, 5, 10)
h0 = torch.zeros(4, 3, 20)  # (num_layers * 2, batch, hidden)
c0 = torch.zeros(4, 3, 20)
output, (hn, cn) = lstm(x, (h0, c0))
print(f"LSTM输出形状: {output.shape}")  # (batch, seq, hidden*2)
print(f"隐藏状态形状: {hn.shape}")       # (num_layers*2, batch, hidden)
print(f"细胞状态形状: {cn.shape}")       # (num_layers*2, batch, hidden)

2.2 自定义LSTM模型

class LSTMClassifier(nn.Module):
    """LSTM文本分类模型"""
    
    def __init__(self, vocab_size, embedding_dim, hidden_size, num_layers, num_classes, dropout=0.5):
        super(LSTMClassifier, self).__init__()
        
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        
        self.lstm = nn.LSTM(
            input_size=embedding_dim,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            bidirectional=True,
            dropout=dropout if num_layers > 1 else 0
        )
        
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(hidden_size * 2, num_classes)  # 双向所以*2
    
    def forward(self, x):
        # x: (batch, seq_len)
        embedded = self.embedding(x)  # (batch, seq, embed_dim)
        
        # LSTM
        lstm_out, (hidden, cell) = self.lstm(embedded)
        
        # 取最后一层的前向和后向隐藏状态
        hidden_forward = hidden[-2, :, :]  # 前向
        hidden_backward = hidden[-1, :, :]  # 后向
        hidden_cat = torch.cat([hidden_forward, hidden_backward], dim=1)
        
        # 分类
        out = self.dropout(hidden_cat)
        out = self.fc(out)
        
        return out
# 创建模型
model = LSTMClassifier(
    vocab_size=10000,
    embedding_dim=128,
    hidden_size=256,
    num_layers=2,
    num_classes=2
)
print("LSTM分类器:")
print(model)

三、GRU

3.1 GRU原理

"""
GRU (Gated Recurrent Unit) 是LSTM的简化版:
两个门:
- 重置门 (Reset Gate): r_t = σ(W_r * [h_{t-1}, x_t])
- 更新门 (Update Gate): z_t = σ(W_z * [h_{t-1}, x_t])
更新公式:
h_tilde = tanh(W * [r_t * h_{t-1}, x_t])
h_t = (1 - z_t) * h_{t-1} + z_t * h_tilde
GRU vs LSTM:
- GRU参数更少,训练更快
- LSTM记忆能力更强
"""
# ===== PyTorch GRU =====
gru = nn.GRU(
    input_size=10,
    hidden_size=20,
    num_layers=2,
    batch_first=True,
    bidirectional=True
)
x = torch.randn(3, 5, 10)
output, hn = gru(x)
print(f"GRU输出形状: {output.shape}")

3.2 序列到序列模型

class Encoder(nn.Module):
    """编码器"""
    
    def __init__(self, vocab_size, embedding_dim, hidden_size, num_layers):
        super(Encoder, self).__init__()
        
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.gru = nn.GRU(embedding_dim, hidden_size, num_layers, batch_first=True)
    
    def forward(self, x):
        embedded = self.embedding(x)
        outputs, hidden = self.gru(embedded)
        return outputs, hidden
class Decoder(nn.Module):
    """解码器"""
    
    def __init__(self, vocab_size, embedding_dim, hidden_size, num_layers):
        super(Decoder, self).__init__()
        
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.gru = nn.GRU(embedding_dim, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, vocab_size)
    
    def forward(self, x, hidden):
        # x: (batch, 1) - 当前输入词
        embedded = self.embedding(x)
        output, hidden = self.gru(embedded, hidden)
        output = self.fc(output.squeeze(1))
        return output, hidden
class Seq2Seq(nn.Module):
    """序列到序列模型"""
    
    def __init__(self, encoder, decoder, device):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device
    
    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        batch_size = src.size(0)
        max_len = trg.size(1)
        vocab_size = self.decoder.fc.out_features
        
        # 存储输出
        outputs = torch.zeros(batch_size, max_len, vocab_size).to(self.device)
        
        # 编码
        _, hidden = self.encoder(src)
        
        # 解码
        input = trg[:, 0].unsqueeze(1)  # <sos>
        
        for t in range(1, max_len):
            output, hidden = self.decoder(input, hidden)
            outputs[:, t, :] = output
            
            # Teacher forcing
            teacher_force = torch.rand(1).item() < teacher_forcing_ratio
            top1 = output.argmax(1)
            input = trg[:, t].unsqueeze(1) if teacher_force else top1.unsqueeze(1)
        
        return outputs

四、时间序列预测实战

import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
# ===== 生成时间序列数据 =====
def generate_sine_wave(seq_length, num_samples):
    """生成正弦波数据"""
    x = np.linspace(0, num_samples * 2 * np.pi, num_samples)
    y = np.sin(x) + np.random.normal(0, 0.1, num_samples)
    return y
# 生成数据
data = generate_sine_wave(100, 1000)
# 创建数据集
def create_dataset(data, look_back=20):
    """创建时间序列数据集"""
    X, y = [], []
    for i in range(len(data) - look_back):
        X.append(data[i:i+look_back])
        y.append(data[i+look_back])
    return np.array(X), np.array(y)
look_back = 20
X, y = create_dataset(data, look_back)
# 划分数据
train_size = int(len(X) * 0.8)
X_train, X_test = X[:train_size], X[train_size:]
y_train, y_test = y[:train_size], y[train_size:]
# 转换为张量
X_train_t = torch.FloatTensor(X_train).unsqueeze(-1)  # (batch, seq, 1)
y_train_t = torch.FloatTensor(y_train)
X_test_t = torch.FloatTensor(X_test).unsqueeze(-1)
y_test_t = torch.FloatTensor(y_test)
print(f"训练集形状: {X_train_t.shape}")
# ===== 定义模型 =====
class TimeSeriesModel(nn.Module):
    def __init__(self, input_size=1, hidden_size=64, num_layers=2, output_size=1):
        super(TimeSeriesModel, self).__init__()
        
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
    
    def forward(self, x):
        lstm_out, _ = self.lstm(x)
        out = self.fc(lstm_out[:, -1, :])  # 取最后一个时间步
        return out
model = TimeSeriesModel()
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
# ===== 训练 =====
num_epochs = 100
train_losses = []
for epoch in range(num_epochs):
    model.train()
    optimizer.zero_grad()
    
    output = model(X_train_t)
    loss = criterion(output.squeeze(), y_train_t)
    loss.backward()
    optimizer.step()
    
    train_losses.append(loss.item())
    
    if (epoch + 1) % 20 == 0:
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.6f}")
# ===== 预测 =====
model.eval()
with torch.no_grad():
    train_predict = model(X_train_t).numpy()
    test_predict = model(X_test_t).numpy()
# ===== 可视化 =====
plt.figure(figsize=(14, 6))
plt.subplot(121)
plt.plot(train_losses)
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('训练损失')
plt.subplot(122)
plt.plot(y_test, label='真实值')
plt.plot(test_predict, label='预测值')
plt.xlabel('时间步')
plt.ylabel('值')
plt.title('测试集预测')
plt.legend()
plt.tight_layout()
plt.savefig('time_series_prediction.png', dpi=100, bbox_inches='tight')
plt.close()

参考资源


上一篇CNN卷积神经网络 下一篇NLP基础 返回深度学习基础 最后更新: 2026年4月14日

访问 --

讨论与反馈