ailearn

CNN卷积神经网络 - 图像处理利器

深入学习卷积神经网络原理,掌握CNN架构设计与经典模型

访问-- -- --

前置知识:需要先掌握 神经网络基础

本文重点:理解卷积操作原理,掌握CNN架构与经典模型


一、CNN基础概念

1.1 为什么需要CNN

全连接网络处理图像的问题:

  • 参数量巨大:224×224×3 = 150,528 输入
  • 忽略空间结构
  • 缺乏平移不变性 CNN的优势:
  • 局部连接:每个神经元只看局部区域
  • 权重共享:同一卷积核扫描整张图
  • 平移等变:位置变化特征不变

1.2 卷积操作

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
# ===== 卷积操作演示 =====
# 输入图像 (batch, channels, height, width)
input_image = torch.randn(1, 1, 6, 6)  # 单通道6x6图像
# 卷积核 (out_channels, in_channels, kernel_h, kernel_w)
conv = nn.Conv2d(in_channels=1, out_channels=1, kernel_size=3, stride=1, padding=0)
output = conv(input_image)
print(f"输入形状: {input_image.shape}")
print(f"输出形状: {output.shape}")
# 手动实现卷积理解
def manual_conv2d(x, kernel):
    """手动实现2D卷积"""
    h, w = x.shape
    kh, kw = kernel.shape
    oh, ow = h - kh + 1, w - kw + 1
    
    output = torch.zeros(oh, ow)
    for i in range(oh):
        for j in range(ow):
            output[i, j] = (x[i:i+kh, j:j+kw] * kernel).sum()
    return output
# 示例
x = torch.tensor([[1., 2., 3., 4.],
                  [5., 6., 7., 8.],
                  [9., 10., 11., 12.],
                  [13., 14., 15., 16.]])
kernel = torch.tensor([[1., 0.],
                       [0., 1.]])
result = manual_conv2d(x, kernel)
print(f"\n手动卷积结果:\n{result}")
# ===== 卷积参数 =====
"""
关键参数:
- kernel_size: 卷积核大小
- stride: 步长
- padding: 填充
- dilation: 空洞卷积
- groups: 分组卷积
输出尺寸计算:
output = (input + 2*padding - dilation*(kernel-1) - 1) / stride + 1
"""
# 不同参数的效果
x = torch.randn(1, 1, 32, 32)
# 标准卷积
conv1 = nn.Conv2d(1, 16, kernel_size=3, stride=1, padding=1)
print(f"\n标准卷积: {x.shape} -> {conv1(x).shape}")
# 步长为2
conv2 = nn.Conv2d(1, 16, kernel_size=3, stride=2, padding=1)
print(f"步长为2: {x.shape} -> {conv2(x).shape}")
# 空洞卷积
conv3 = nn.Conv2d(1, 16, kernel_size=3, stride=1, padding=2, dilation=2)
print(f"空洞卷积(d=2): {x.shape} -> {conv3(x).shape}")
# 可视化卷积过程
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
# 原图
img = torch.randn(1, 1, 8, 8).squeeze().numpy()
axes[0].imshow(img, cmap='gray')
axes[0].set_title('输入图像 (8x8)')
# 卷积核
kernel_edge = torch.tensor([[-1., -1., -1.],
                            [-1., 8., -1.],
                            [-1., -1., -1.]])
axes[1].imshow(kernel_edge.numpy(), cmap='gray')
axes[1].set_title('边缘检测卷积核')
# 输出
conv_edge = nn.Conv2d(1, 1, 3, padding=1)
conv_edge.weight.data = kernel_edge.unsqueeze(0).unsqueeze(0)
conv_edge.bias.data.zero_()
with torch.no_grad():
    output = conv_edge(torch.FloatTensor(img).unsqueeze(0).unsqueeze(0))
axes[2].imshow(output.squeeze().numpy(), cmap='gray')
axes[2].set_title('卷积输出')
plt.tight_layout()
plt.savefig('convolution_demo.png', dpi=100, bbox_inches='tight')
plt.close()

1.3 池化层

# ===== 池化操作 =====
x = torch.randn(1, 1, 4, 4)
# 最大池化
max_pool = nn.MaxPool2d(kernel_size=2, stride=2)
print(f"最大池化: {x.shape} -> {max_pool(x).shape}")
# 平均池化
avg_pool = nn.AvgPool2d(kernel_size=2, stride=2)
print(f"平均池化: {x.shape} -> {avg_pool(x).shape}")
# 自适应池化(输出固定大小)
adaptive_pool = nn.AdaptiveAvgPool2d((1, 1))
print(f"自适应池化: {x.shape} -> {adaptive_pool(x).shape}")
# 演示池化效果
x = torch.arange(16).float().view(1, 1, 4, 4)
print(f"\n原始数据:\n{x.squeeze()}")
print(f"\n最大池化结果:\n{max_pool(x).squeeze()}")
print(f"\n平均池化结果:\n{avg_pool(x).squeeze()}")

二、经典CNN架构

2.1 LeNet-5

class LeNet5(nn.Module):
    """LeNet-5: 早期CNN经典架构"""
    
    def __init__(self, num_classes=10):
        super(LeNet5, self).__init__()
        
        self.conv1 = nn.Conv2d(1, 6, kernel_size=5, padding=2)
        self.conv2 = nn.Conv2d(6, 16, kernel_size=5)
        
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, num_classes)
        
        self.pool = nn.AvgPool2d(kernel_size=2, stride=2)
        self.relu = nn.ReLU()
    
    def forward(self, x):
        # C1: 1x32x32 -> 6x28x28 -> 6x14x14
        x = self.pool(self.relu(self.conv1(x)))
        # C2: 6x14x14 -> 16x10x10 -> 16x5x5
        x = self.pool(self.relu(self.conv2(x)))
        # Flatten
        x = x.view(x.size(0), -1)
        # FC
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        x = self.fc3(x)
        return x
model = LeNet5()
print("LeNet-5 架构:")
print(model)
# 计算参数量
total_params = sum(p.numel() for p in model.parameters())
print(f"\n总参数量: {total_params:,}")

2.2 AlexNet

class AlexNet(nn.Module):
    """AlexNet: ImageNet 2012 冠军"""
    
    def __init__(self, num_classes=1000):
        super(AlexNet, self).__init__()
        
        self.features = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=11, stride=4, padding=2),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
            
            nn.Conv2d(64, 192, kernel_size=5, padding=2),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
            
            nn.Conv2d(192, 384, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            
            nn.Conv2d(384, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
        )
        
        self.avgpool = nn.AdaptiveAvgPool2d((6, 6))
        
        self.classifier = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(256 * 6 * 6, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(4096, 4096),
            nn.ReLU(inplace=True),
            nn.Linear(4096, num_classes),
        )
    
    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x

2.3 VGGNet

class VGGBlock(nn.Module):
    """VGG基础块:多个3x3卷积 + 池化"""
    
    def __init__(self, in_channels, out_channels, num_convs):
        super(VGGBlock, self).__init__()
        
        layers = []
        for i in range(num_convs):
            layers.append(nn.Conv2d(
                in_channels if i == 0 else out_channels,
                out_channels,
                kernel_size=3,
                padding=1
            ))
            layers.append(nn.ReLU(inplace=True))
        
        layers.append(nn.MaxPool2d(kernel_size=2, stride=2))
        self.block = nn.Sequential(*layers)
    
    def forward(self, x):
        return self.block(x)
class VGG16(nn.Module):
    """VGG-16: 使用小卷积核堆叠"""
    
    def __init__(self, num_classes=1000):
        super(VGG16, self).__init__()
        
        self.features = nn.Sequential(
            VGGBlock(3, 64, 2),      # 64x2 conv
            VGGBlock(64, 128, 2),    # 128x2 conv
            VGGBlock(128, 256, 3),   # 256x3 conv
            VGGBlock(256, 512, 3),   # 512x3 conv
            VGGBlock(512, 512, 3),   # 512x3 conv
        )
        
        self.avgpool = nn.AdaptiveAvgPool2d((7, 7))
        
        self.classifier = nn.Sequential(
            nn.Linear(512 * 7 * 7, 4096),
            nn.ReLU(True),
            nn.Dropout(),
            nn.Linear(4096, 4096),
            nn.ReLU(True),
            nn.Dropout(),
            nn.Linear(4096, num_classes),
        )
    
    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x
model = VGG16(num_classes=10)
total_params = sum(p.numel() for p in model.parameters())
print(f"VGG-16 参数量: {total_params:,}")

2.4 ResNet (残差网络)

class BasicBlock(nn.Module):
    """ResNet基础块"""
    expansion = 1
    
    def __init__(self, in_channels, out_channels, stride=1, downsample=None):
        super(BasicBlock, self).__init__()
        
        self.conv1 = nn.Conv2d(in_channels, out_channels, 3, stride, 1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, 3, 1, 1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        
        self.downsample = downsample
        self.relu = nn.ReLU(inplace=True)
    
    def forward(self, x):
        identity = x
        
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        
        out = self.conv2(out)
        out = self.bn2(out)
        
        if self.downsample is not None:
            identity = self.downsample(x)
        
        out += identity  # 残差连接
        out = self.relu(out)
        
        return out
class Bottleneck(nn.Module):
    """ResNet瓶颈块 (用于更深的网络)"""
    expansion = 4
    
    def __init__(self, in_channels, out_channels, stride=1, downsample=None):
        super(Bottleneck, self).__init__()
        
        self.conv1 = nn.Conv2d(in_channels, out_channels, 1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, 3, stride, 1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.conv3 = nn.Conv2d(out_channels, out_channels * self.expansion, 1, bias=False)
        self.bn3 = nn.BatchNorm2d(out_channels * self.expansion)
        
        self.downsample = downsample
        self.relu = nn.ReLU(inplace=True)
    
    def forward(self, x):
        identity = x
        
        out = self.relu(self.bn1(self.conv1(x)))
        out = self.relu(self.bn2(self.conv2(out)))
        out = self.bn3(self.conv3(out))
        
        if self.downsample is not None:
            identity = self.downsample(x)
        
        out += identity
        out = self.relu(out)
        
        return out
class ResNet(nn.Module):
    """ResNet: 残差网络"""
    
    def __init__(self, block, layers, num_classes=1000):
        super(ResNet, self).__init__()
        
        self.in_channels = 64
        
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        
        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)
        
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * block.expansion, num_classes)
    
    def _make_layer(self, block, out_channels, blocks, stride=1):
        downsample = None
        if stride != 1 or self.in_channels != out_channels * block.expansion:
            downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, out_channels * block.expansion, 1, stride, bias=False),
                nn.BatchNorm2d(out_channels * block.expansion),
            )
        
        layers = []
        layers.append(block(self.in_channels, out_channels, stride, downsample))
        self.in_channels = out_channels * block.expansion
        
        for _ in range(1, blocks):
            layers.append(block(self.in_channels, out_channels))
        
        return nn.Sequential(*layers)
    
    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)
        
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        
        return x
def resnet18(num_classes=1000):
    return ResNet(BasicBlock, [2, 2, 2, 2], num_classes)
def resnet50(num_classes=1000):
    return ResNet(Bottleneck, [3, 4, 6, 3], num_classes)
# 创建模型
model = resnet18(num_classes=10)
total_params = sum(p.numel() for p in model.parameters())
print(f"ResNet-18 参数量: {total_params:,}")

三、图像分类实战

3.1 CIFAR-10 分类

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
# ===== 数据准备 =====
transform_train = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2470, 0.2435, 0.2616)),
])
transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2470, 0.2435, 0.2616)),
])
# 下载数据集
train_set = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform_train)
test_set = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform_test)
train_loader = DataLoader(train_set, batch_size=128, shuffle=True, num_workers=2)
test_loader = DataLoader(test_set, batch_size=128, shuffle=False, num_workers=2)
classes = ('plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')
print(f"训练集大小: {len(train_set)}")
print(f"测试集大小: {len(test_set)}")
print(f"类别: {classes}")
# ===== 定义简化版ResNet =====
class SmallResNet(nn.Module):
    """适用于CIFAR-10的小型ResNet"""
    
    def __init__(self, num_classes=10):
        super(SmallResNet, self).__init__()
        
        self.conv1 = nn.Conv2d(3, 32, 3, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        
        self.layer1 = self._make_layer(32, 32, 2)
        self.layer2 = self._make_layer(32, 64, 2, stride=2)
        self.layer3 = self._make_layer(64, 128, 2, stride=2)
        
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(128, num_classes)
    
    def _make_layer(self, in_ch, out_ch, blocks, stride=1):
        layers = []
        layers.append(nn.Conv2d(in_ch, out_ch, 3, stride, 1, bias=False))
        layers.append(nn.BatchNorm2d(out_ch))
        layers.append(nn.ReLU(inplace=True))
        
        for _ in range(1, blocks):
            layers.append(nn.Conv2d(out_ch, out_ch, 3, 1, 1, bias=False))
            layers.append(nn.BatchNorm2d(out_ch))
            layers.append(nn.ReLU(inplace=True))
        
        return nn.Sequential(*layers)
    
    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = F.relu(x)
        
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        
        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        
        return x
# ===== 训练配置 =====
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = SmallResNet().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.1, momentum=0.9, weight_decay=5e-4)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=100)
# ===== 训练函数 =====
def train(model, loader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    
    for inputs, targets in loader:
        inputs, targets = inputs.to(device), targets.to(device)
        
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        _, predicted = outputs.max(1)
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()
    
    return running_loss / len(loader), 100. * correct / total
def test(model, loader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0
    
    with torch.no_grad():
        for inputs, targets in loader:
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            
            running_loss += loss.item()
            _, predicted = outputs.max(1)
            total += targets.size(0)
            correct += predicted.eq(targets).sum().item()
    
    return running_loss / len(loader), 100. * correct / total
# ===== 训练循环 =====
num_epochs = 50
train_losses, test_losses = [], []
train_accs, test_accs = [], []
print("\n=== 开始训练 ===")
for epoch in range(num_epochs):
    train_loss, train_acc = train(model, train_loader, criterion, optimizer, device)
    test_loss, test_acc = test(model, test_loader, criterion, device)
    scheduler.step()
    
    train_losses.append(train_loss)
    test_losses.append(test_loss)
    train_accs.append(train_acc)
    test_accs.append(test_acc)
    
    if (epoch + 1) % 10 == 0:
        print(f"Epoch [{epoch+1}/{num_epochs}]")
        print(f"  Train Loss: {train_loss:.4f}, Acc: {train_acc:.2f}%")
        print(f"  Test Loss: {test_loss:.4f}, Acc: {test_acc:.2f}%")
print(f"\n最终测试准确率: {test_accs[-1]:.2f}%")

四、迁移学习

import torchvision.models as models
# ===== 使用预训练模型 =====
# 加载预训练ResNet18
model_pretrained = models.resnet18(pretrained=True)
# 冻结特征提取层
for param in model_pretrained.parameters():
    param.requires_grad = False
# 替换分类头
num_features = model_pretrained.fc.in_features
model_pretrained.fc = nn.Linear(num_features, 10)  # 10类
model_pretrained = model_pretrained.to(device)
# 只训练分类头
optimizer = optim.SGD(model_pretrained.fc.parameters(), lr=0.01, momentum=0.9)
print("迁移学习模型:")
print(f"冻结层数: {sum(1 for p in model_pretrained.parameters() if not p.requires_grad)}")
print(f"可训练层数: {sum(1 for p in model_pretrained.parameters() if p.requires_grad)}")

参考资源


上一篇神经网络基础 下一篇RNN与序列模型 返回深度学习基础 最后更新: 2026年4月14日

访问 --

讨论与反馈