RNN与序列模型 - 处理时序数据
深入学习循环神经网络原理,掌握LSTM、GRU等序列模型
前置知识:需要先掌握 神经网络基础
本文重点:理解RNN原理,掌握LSTM/GRU的使用
一、RNN基础
1.1 为什么需要RNN
传统神经网络处理固定大小输入,但很多任务是序列数据:
- 文本:单词序列
- 语音:音频序列
- 时间序列:股票价格
- 视频:图像序列 RNN的特点:
- 记忆能力:保留历史信息
- 变长输入:处理任意长度序列
- 权重共享:每个时间步使用相同参数
1.2 RNN原理
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
"""
RNN核心公式:
h_t = tanh(W_hh * h_{t-1} + W_xh * x_t + b_h)
y_t = W_hy * h_t + b_y
- h_t: 当前隐藏状态
- x_t: 当前输入
- y_t: 当前输出
"""
# ===== 手动实现RNN =====
class SimpleRNN:
"""手动实现的简单RNN"""
def __init__(self, input_size, hidden_size, output_size):
self.hidden_size = hidden_size
# 初始化权重
self.W_xh = np.random.randn(input_size, hidden_size) * 0.01
self.W_hh = np.random.randn(hidden_size, hidden_size) * 0.01
self.W_hy = np.random.randn(hidden_size, output_size) * 0.01
self.b_h = np.zeros(hidden_size)
self.b_y = np.zeros(output_size)
def forward(self, x):
"""
x: (seq_len, input_size)
"""
h = np.zeros(self.hidden_size)
outputs = []
hidden_states = [h]
for t in range(len(x)):
# 计算隐藏状态
h = np.tanh(x[t] @ self.W_xh + h @ self.W_hh + self.b_h)
# 计算输出
y = h @ self.W_hy + self.b_y
outputs.append(y)
hidden_states.append(h)
return np.array(outputs), np.array(hidden_states)
# ===== PyTorch RNN =====
# 基础RNN层
rnn = nn.RNN(
input_size=10, # 输入特征维度
hidden_size=20, # 隐藏状态维度
num_layers=2, # RNN层数
batch_first=True, # 输入格式 (batch, seq, feature)
bidirectional=False
)
# 输入
batch_size, seq_len, input_size = 3, 5, 10
x = torch.randn(batch_size, seq_len, input_size)
# 前向传播
output, h_n = rnn(x)
print(f"输入形状: {x.shape}")
print(f"输出形状: {output.shape}") # (batch, seq, hidden)
print(f"隐藏状态形状: {h_n.shape}") # (num_layers, batch, hidden)
1.3 RNN的问题:梯度消失
# 演示梯度消失问题
def gradient_vanish_demo():
"""演示长序列中的梯度消失"""
seq_len = 100
hidden_size = 1
# 简单RNN
rnn = nn.RNN(input_size=1, hidden_size=hidden_size, batch_first=True)
# 长序列输入
x = torch.randn(1, seq_len, 1)
h0 = torch.zeros(1, 1, hidden_size)
# 前向传播
output, hn = rnn(x, h0)
# 反向传播
loss = output.sum()
loss.backward()
# 查看输入层梯度
input_grad = rnn.weight_ih_l0.grad
print(f"输入权重梯度: {input_grad.abs().mean():.6f}")
print("梯度非常小 -> 梯度消失问题")
gradient_vanish_demo()
二、LSTM
2.1 LSTM原理
"""
LSTM 解决梯度消失问题的关键:
- 细胞状态 (Cell State):长期记忆
- 门控机制:选择性地遗忘和记忆
三个门:
- 遗忘门 (Forget Gate): f_t = σ(W_f * [h_{t-1}, x_t])
- 输入门 (Input Gate): i_t = σ(W_i * [h_{t-1}, x_t])
- 输出门 (Output Gate): o_t = σ(W_o * [h_{t-1}, x_t])
更新公式:
C_t = f_t * C_{t-1} + i_t * tanh(W_C * [h_{t-1}, x_t])
h_t = o_t * tanh(C_t)
"""
# ===== PyTorch LSTM =====
lstm = nn.LSTM(
input_size=10,
hidden_size=20,
num_layers=2,
batch_first=True,
bidirectional=True # 双向LSTM
)
x = torch.randn(3, 5, 10)
h0 = torch.zeros(4, 3, 20) # (num_layers * 2, batch, hidden)
c0 = torch.zeros(4, 3, 20)
output, (hn, cn) = lstm(x, (h0, c0))
print(f"LSTM输出形状: {output.shape}") # (batch, seq, hidden*2)
print(f"隐藏状态形状: {hn.shape}") # (num_layers*2, batch, hidden)
print(f"细胞状态形状: {cn.shape}") # (num_layers*2, batch, hidden)
2.2 自定义LSTM模型
class LSTMClassifier(nn.Module):
"""LSTM文本分类模型"""
def __init__(self, vocab_size, embedding_dim, hidden_size, num_layers, num_classes, dropout=0.5):
super(LSTMClassifier, self).__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
self.lstm = nn.LSTM(
input_size=embedding_dim,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True,
bidirectional=True,
dropout=dropout if num_layers > 1 else 0
)
self.dropout = nn.Dropout(dropout)
self.fc = nn.Linear(hidden_size * 2, num_classes) # 双向所以*2
def forward(self, x):
# x: (batch, seq_len)
embedded = self.embedding(x) # (batch, seq, embed_dim)
# LSTM
lstm_out, (hidden, cell) = self.lstm(embedded)
# 取最后一层的前向和后向隐藏状态
hidden_forward = hidden[-2, :, :] # 前向
hidden_backward = hidden[-1, :, :] # 后向
hidden_cat = torch.cat([hidden_forward, hidden_backward], dim=1)
# 分类
out = self.dropout(hidden_cat)
out = self.fc(out)
return out
# 创建模型
model = LSTMClassifier(
vocab_size=10000,
embedding_dim=128,
hidden_size=256,
num_layers=2,
num_classes=2
)
print("LSTM分类器:")
print(model)
三、GRU
3.1 GRU原理
"""
GRU (Gated Recurrent Unit) 是LSTM的简化版:
两个门:
- 重置门 (Reset Gate): r_t = σ(W_r * [h_{t-1}, x_t])
- 更新门 (Update Gate): z_t = σ(W_z * [h_{t-1}, x_t])
更新公式:
h_tilde = tanh(W * [r_t * h_{t-1}, x_t])
h_t = (1 - z_t) * h_{t-1} + z_t * h_tilde
GRU vs LSTM:
- GRU参数更少,训练更快
- LSTM记忆能力更强
"""
# ===== PyTorch GRU =====
gru = nn.GRU(
input_size=10,
hidden_size=20,
num_layers=2,
batch_first=True,
bidirectional=True
)
x = torch.randn(3, 5, 10)
output, hn = gru(x)
print(f"GRU输出形状: {output.shape}")
3.2 序列到序列模型
class Encoder(nn.Module):
"""编码器"""
def __init__(self, vocab_size, embedding_dim, hidden_size, num_layers):
super(Encoder, self).__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim)
self.gru = nn.GRU(embedding_dim, hidden_size, num_layers, batch_first=True)
def forward(self, x):
embedded = self.embedding(x)
outputs, hidden = self.gru(embedded)
return outputs, hidden
class Decoder(nn.Module):
"""解码器"""
def __init__(self, vocab_size, embedding_dim, hidden_size, num_layers):
super(Decoder, self).__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim)
self.gru = nn.GRU(embedding_dim, hidden_size, num_layers, batch_first=True)
self.fc = nn.Linear(hidden_size, vocab_size)
def forward(self, x, hidden):
# x: (batch, 1) - 当前输入词
embedded = self.embedding(x)
output, hidden = self.gru(embedded, hidden)
output = self.fc(output.squeeze(1))
return output, hidden
class Seq2Seq(nn.Module):
"""序列到序列模型"""
def __init__(self, encoder, decoder, device):
super(Seq2Seq, self).__init__()
self.encoder = encoder
self.decoder = decoder
self.device = device
def forward(self, src, trg, teacher_forcing_ratio=0.5):
batch_size = src.size(0)
max_len = trg.size(1)
vocab_size = self.decoder.fc.out_features
# 存储输出
outputs = torch.zeros(batch_size, max_len, vocab_size).to(self.device)
# 编码
_, hidden = self.encoder(src)
# 解码
input = trg[:, 0].unsqueeze(1) # <sos>
for t in range(1, max_len):
output, hidden = self.decoder(input, hidden)
outputs[:, t, :] = output
# Teacher forcing
teacher_force = torch.rand(1).item() < teacher_forcing_ratio
top1 = output.argmax(1)
input = trg[:, t].unsqueeze(1) if teacher_force else top1.unsqueeze(1)
return outputs
四、时间序列预测实战
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
# ===== 生成时间序列数据 =====
def generate_sine_wave(seq_length, num_samples):
"""生成正弦波数据"""
x = np.linspace(0, num_samples * 2 * np.pi, num_samples)
y = np.sin(x) + np.random.normal(0, 0.1, num_samples)
return y
# 生成数据
data = generate_sine_wave(100, 1000)
# 创建数据集
def create_dataset(data, look_back=20):
"""创建时间序列数据集"""
X, y = [], []
for i in range(len(data) - look_back):
X.append(data[i:i+look_back])
y.append(data[i+look_back])
return np.array(X), np.array(y)
look_back = 20
X, y = create_dataset(data, look_back)
# 划分数据
train_size = int(len(X) * 0.8)
X_train, X_test = X[:train_size], X[train_size:]
y_train, y_test = y[:train_size], y[train_size:]
# 转换为张量
X_train_t = torch.FloatTensor(X_train).unsqueeze(-1) # (batch, seq, 1)
y_train_t = torch.FloatTensor(y_train)
X_test_t = torch.FloatTensor(X_test).unsqueeze(-1)
y_test_t = torch.FloatTensor(y_test)
print(f"训练集形状: {X_train_t.shape}")
# ===== 定义模型 =====
class TimeSeriesModel(nn.Module):
def __init__(self, input_size=1, hidden_size=64, num_layers=2, output_size=1):
super(TimeSeriesModel, self).__init__()
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, x):
lstm_out, _ = self.lstm(x)
out = self.fc(lstm_out[:, -1, :]) # 取最后一个时间步
return out
model = TimeSeriesModel()
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
# ===== 训练 =====
num_epochs = 100
train_losses = []
for epoch in range(num_epochs):
model.train()
optimizer.zero_grad()
output = model(X_train_t)
loss = criterion(output.squeeze(), y_train_t)
loss.backward()
optimizer.step()
train_losses.append(loss.item())
if (epoch + 1) % 20 == 0:
print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.6f}")
# ===== 预测 =====
model.eval()
with torch.no_grad():
train_predict = model(X_train_t).numpy()
test_predict = model(X_test_t).numpy()
# ===== 可视化 =====
plt.figure(figsize=(14, 6))
plt.subplot(121)
plt.plot(train_losses)
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('训练损失')
plt.subplot(122)
plt.plot(y_test, label='真实值')
plt.plot(test_predict, label='预测值')
plt.xlabel('时间步')
plt.ylabel('值')
plt.title('测试集预测')
plt.legend()
plt.tight_layout()
plt.savefig('time_series_prediction.png', dpi=100, bbox_inches='tight')
plt.close()
参考资源
- Understanding LSTM Networks - LSTM可视化解释
- The Unreasonable Effectiveness of RNNs - RNN应用
- PyTorch RNN教程 - 官方教程
- Sequence to Sequence Learning - Seq2Seq论文
- Attention Is All You Need - Transformer论文
- LSTM论文 - 原始LSTM论文
- GRU论文 - GRU论文
上一篇:CNN卷积神经网络 下一篇:NLP基础 返回:深度学习基础 最后更新: 2026年4月14日
讨论与反馈