PyTorch
PyTorch 是 Facebook 开发的开源深度学习框架,提供灵活的动态计算图和强大的 GPU 加速能力,是研究和工业界最流行的深度学习框架之一。
简介
PyTorch 特性
"""
PyTorch 核心特性:
- 动态计算图: 灵活的图结构,便于调试
- Tensor计算: 类似NumPy的张量操作,支持GPU加速
- 自动求导: torch.autograd自动微分
- 丰富API: 神经网络层、优化器、损失函数
- 模型部署: 支持移动端和Web端部署
- 社区活跃: 大量预训练模型和工具
- Python风格: 直观的Python接口
适用场景:
- 计算机视觉: 图像分类、目标检测、图像分割
- 自然语言处理: 文本分类、机器翻译、问答系统
- 强化学习: 游戏、机器人控制
- 生成模型: GAN、VAE、扩散模型
- 时序预测: 时间序列、语音识别
"""
安装 PyTorch
# 创建虚拟环境
python -m venv venv
# Windows 激活
venv\Scripts\activate
# Linux/Mac 激活
source venv/bin/activate
# CPU版本
pip install torch torchvision torchaudio
# GPU版本 (CUDA 11.8)
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118
# GPU版本 (CUDA 12.1)
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121
# 查看版本
python -c "import torch; print(torch.__version__)"
# 验证GPU是否可用
python -c "import torch; print(torch.cuda.is_available())"
# 安装其他工具
pip install numpy matplotlib pandas scikit-learn
pip install pillow # 图像处理
pip install tqdm # 进度条
快速开始
Tensor基础
import torch
# 创建Tensor
x = torch.tensor([1, 2, 3, 4])
print(x)
# 查看设备
print(f'设备: {x.device}')
# 查看形状
print(f'形状: {x.shape}')
# 查看数据类型
print(f'类型: {x.dtype}')
# 创建在GPU上
if torch.cuda.is_available():
x_gpu = torch.tensor([1, 2, 3], device='cuda')
# 或
x_gpu = torch.tensor([1, 2, 3]).cuda()
# Tensor与NumPy转换
import numpy as np
a = np.array([1, 2, 3])
t = torch.from_numpy(a) # NumPy转Tensor
n = t.numpy() # Tensor转NumPy
Tensor操作
创建Tensor
import torch
# 从列表创建
x = torch.tensor([1, 2, 3, 4])
# 指定类型
x = torch.tensor([1, 2, 3], dtype=torch.float32)
# 创建全零Tensor
x = torch.zeros(3, 4)
x = torch.zeros_like(x) # 按形状创建
# 创建全一Tensor
x = torch.ones(3, 4)
x = torch.ones_like(x)
# 创建单位矩阵
x = torch.eye(3)
# 创建随机Tensor
x = torch.randn(3, 4) # 标准正态分布
x = torch.rand(3, 4) # 均匀分布[0,1)
x = torch.randint(0, 10, (3, 4)) # 整数随机
# 创建范围Tensor
x = torch.arange(0, 10, 1) # [0, 1, 2, ..., 9]
x = torch.linspace(0, 10, 5) # [0.0, 2.5, 5.0, 7.5, 10.0]
# 未初始化Tensor
x = torch.empty(3, 4)
Tensor索引和切片
import torch
x = torch.arange(12).reshape(3, 4)
# 索引
print(x[0, 0]) # 第1行第1列
print(x[0]) # 第1行
print(x[:, 0]) # 第1列
# 切片
print(x[0:2, :]) # 前2行
print(x[:, 1:3]) # 第2-3列
# 高级索引
indices = torch.tensor([0, 2])
print(x[:, indices]) # 第1列和第3列
# 布尔索引
mask = x > 5
print(x[mask])
# 条件索引
print(x[x > 5])
Tensor运算
import torch
x = torch.tensor([1, 2, 3, 4], dtype=torch.float32)
y = torch.tensor([5, 6, 7, 8], dtype=torch.float32)
# 算术运算
print(x + y) # 加法
print(x - y) # 减法
print(x * y) # 乘法(逐元素)
print(x / y) # 除法
print(x @ y) # 矩阵乘法
print(torch.matmul(x, y)) # 矩阵乘法
# 标量运算
print(x + 10)
print(x * 2)
# 广播机制
x = torch.ones(3, 1)
y = torch.ones(1, 3)
print(x + y) # (3,1)+(1,3) -> (3,3)
# 数学函数
print(torch.sqrt(x))
print(torch.exp(x))
print(torch.log(x))
print(torch.abs(x))
# 聚合函数
x = torch.randn(3, 4)
print(x.sum()) # 所有元素求和
print(x.sum(dim=0)) # 列求和
print(x.sum(dim=1)) # 行求和
print(x.mean()) # 平均值
print(x.std()) # 标准差
print(x.max()) # 最大值
print(x.argmax()) # 最大值索引
# 形状操作
x = torch.arange(12)
x = x.reshape(3, 4) # 重塑
x = x.view(3, 4) # 重塑(共享内存)
x = x.unsqueeze(0) # 增加维度
x = x.squeeze() # 减少维度
x = x.transpose(0, 1) # 转置
x = x.permute(1, 0, 2) # 多维转置
# 拼接和分割
x1 = torch.randn(2, 3)
x2 = torch.randn(2, 3)
x = torch.cat([x1, x2], dim=0) # 拼接(纵向)
x = torch.cat([x1, x2], dim=1) # 拼接(横向)
x = torch.stack([x1, x2], dim=0) # 堆叠
# 修改形状
x = torch.randn(2, 3)
y = x.flatten() # 展平
y = x.view(-1) # 展平(自动推断)
自动求导
import torch
# 创建需要梯度的Tensor
x = torch.tensor([2.0, 3.0], requires_grad=True)
# 定义计算
y = x ** 2 + 2 * x + 1
# 计算梯度
y_sum = y.sum()
y_sum.backward()
# 查看梯度
print(x.grad) # dy/dx = 2*x + 2 = [6, 8]
# 清除梯度
x.grad.zero_()
# 复杂计算
x = torch.ones(2, 2, requires_grad=True)
y = x + 2
z = y * y * 3
out = z.mean()
out.backward()
print(x.grad)
# 控制梯度计算
with torch.no_grad():
# 在此块内的计算不会追踪梯度
y = x * 2
# 或使用 detach()
y = x.detach() # 分离出不需要梯度的Tensor
神经网络基础
nn.Module
import torch
import torch.nn as nn
# 定义神经网络
class SimpleNet(nn.Module):
def __init__(self, input_size, hidden_size, num_classes):
super(SimpleNet, self).__init__()
# 定义层
self.fc1 = nn.Linear(input_size, hidden_size)
self.relu = nn.ReLU()
self.fc2 = nn.Linear(hidden_size, num_classes)
def forward(self, x):
# 前向传播
out = self.fc1(x)
out = self.relu(out)
out = self.fc2(out)
return out
# 创建模型
model = SimpleNet(input_size=784, hidden_size=128, num_classes=10)
# 查看模型结构
print(model)
# 查看模型参数
for name, param in model.named_parameters():
print(f'{name}: {param.shape}')
# 前向传播
x = torch.randn(64, 784) # batch_size=64
output = model(x)
print(output.shape) # [64, 10]
损失函数
import torch.nn as nn
# 分类损失
# 交叉熵损失 (包含softmax)
criterion = nn.CrossEntropyLoss()
# logits: [batch_size, num_classes]
# target: [batch_size] (类别索引)
output = torch.randn(3, 5) # 3个样本,5个类别
target = torch.tensor([1, 2, 0]) # 真实类别
loss = criterion(output, target)
# 二分类交叉熵损失
criterion = nn.BCEWithLogitsLoss() # 包含sigmoid
# logits: [batch_size, 1] or [batch_size]
# target: [batch_size] (0或1)
# 回归损失
criterion = nn.MSELoss() # 均方误差
criterion = nn.L1Loss() # 平均绝对误差
# 使用
pred = model(x)
loss = criterion(pred, target)
反向传播
import torch.optim as optim
# 定义优化器
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)
# 训练步骤
# 1. 前向传播
output = model(input)
# 2. 计算损失
loss = criterion(output, target)
# 3. 清除梯度
optimizer.zero_grad()
# 4. 反向传播
loss.backward()
# 5. 更新参数
optimizer.step()
# 完整训练循环
for epoch in range(num_epochs):
for batch_x, batch_y in dataloader:
# 前向传播
output = model(batch_x)
loss = criterion(output, batch_y)
# 反向传播
optimizer.zero_grad()
loss.backward()
optimizer.step()
优化器
import torch.optim as optim
# SGD
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)
# Adam
optimizer = optim.Adam(model.parameters(), lr=0.001, betas=(0.9, 0.999))
# RMSprop
optimizer = optim.RMSprop(model.parameters(), lr=0.001, alpha=0.99)
# Adagrad
optimizer = optim.Adagrad(model.parameters(), lr=0.01)
# 学习率调度
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1) # 每10个epoch降低10倍
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=5) # 验证损失不降低时降低学习率
# 在训练循环中使用
for epoch in range(num_epochs):
# 训练
train(model, dataloader)
# 验证
val_loss = validate(model, val_loader)
# 更新学习率
scheduler.step(val_loss)
构建模型
线性层
import torch.nn as nn
# 全连接层
fc = nn.Linear(in_features=100, out_features=50)
# 示例
model = nn.Sequential(
nn.Linear(784, 256),
nn.ReLU(),
nn.Linear(256, 128),
nn.ReLU(),
nn.Linear(128, 10)
)
卷积层
import torch.nn as nn
# 2D卷积
conv = nn.Conv2d(
in_channels=3, # 输入通道数(RGB=3)
out_channels=64, # 输出通道数(卷积核数量)
kernel_size=3, # 卷积核大小
stride=1, # 步长
padding=1 # 填充
)
# 示例
model = nn.Sequential(
# Conv1
nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2), # 28x28 -> 14x14
# Conv2
nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2), # 14x14 -> 7x7
# Flatten
nn.Flatten(),
# FC
nn.Linear(64 * 7 * 7, 128),
nn.ReLU(),
nn.Linear(128, 10)
)
# 转置卷积 (上采样)
deconv = nn.ConvTranspose2d(
in_channels=64,
out_channels=32,
kernel_size=2,
stride=2
)
循环层
import torch.nn as nn
# RNN
rnn = nn.RNN(
input_size=100, # 输入特征维度
hidden_size=128, # 隐藏层大小
num_layers=2, # RNN层数
batch_first=True, # 输入形状为(batch, seq, feature)
bidirectional=False # 是否双向
)
# LSTM
lstm = nn.LSTM(
input_size=100,
hidden_size=128,
num_layers=2,
batch_first=True,
bidirectional=False,
dropout=0.5
)
# GRU
gru = nn.GRU(
input_size=100,
hidden_size=128,
num_layers=2,
batch_first=True,
bidirectional=False
)
# 示例
class LSTMModel(nn.Module):
def __init__(self, vocab_size, embed_size, hidden_size, num_layers):
super(LSTMModel, self).__init__()
self.embedding = nn.Embedding(vocab_size, embed_size)
self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)
self.fc = nn.Linear(hidden_size, 2)
def forward(self, x):
# x: [batch_size, seq_len]
embed = self.embedding(x) # [batch_size, seq_len, embed_size]
lstm_out, (h_n, c_n) = self.lstm(embed)
out = self.fc(lstm_out[:, -1, :]) # 取最后时刻
return out
激活函数
import torch.nn as nn
# ReLU
relu = nn.ReLU()
# LeakyReLU
leaky_relu = nn.LeakyReLU(negative_slope=0.01)
# Sigmoid
sigmoid = nn.Sigmoid()
# Tanh
tanh = nn.Tanh()
# Softmax
softmax = nn.Softmax(dim=1)
# GELU (用于Transformer)
gelu = nn.GELU()
# 在模型中使用
model = nn.Sequential(
nn.Linear(100, 50),
nn.ReLU(),
nn.Linear(50, 10)
)
池化层
import torch.nn as nn
# 最大池化
maxpool = nn.MaxPool2d(kernel_size=2, stride=2)
# 平均池化
avgpool = nn.AvgPool2d(kernel_size=2, stride=2)
# 自适应平均池化 (输出固定大小)
adapool = nn.AdaptiveAvgPool2d((1, 1)) # 任意大小 -> 1x1
# 全局平均池化
class GlobalAvgPool(nn.Module):
def forward(self, x):
return x.mean(dim=[2, 3]) # [batch, channel, h, w] -> [batch, channel]
批归一化
import torch.nn as nn
# 1D批归一化 (用于全连接层)
bn1d = nn.BatchNorm1d(num_features=100)
# 2D批归一化 (用于CNN)
bn2d = nn.BatchNorm2d(num_features=64)
# 层归一化
ln = nn.LayerNorm(normalized_shape=100)
# 在CNN中使用
model = nn.Sequential(
nn.Conv2d(3, 64, kernel_size=3, padding=1),
nn.BatchNorm2d(64),
nn.ReLU(),
nn.MaxPool2d(2)
)
Dropout
import torch.nn as nn
# Dropout
dropout = nn.Dropout(p=0.5) # 丢弃概率
# Dropout2d (用于CNN)
dropout2d = nn.Dropout2d(p=0.5)
# 在模型中使用
model = nn.Sequential(
nn.Linear(100, 50),
nn.ReLU(),
nn.Dropout(0.5), # 训练时丢弃,测试时不丢弃
nn.Linear(50, 10)
)
# 训练时注意
model.train() # 设置为训练模式
model.eval() # 设置为评估模式
数据加载
Dataset
from torch.utils.data import Dataset
# 自定义Dataset
class CustomDataset(Dataset):
def __init__(self, data, labels):
self.data = data
self.labels = labels
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
x = self.data[idx]
y = self.labels[idx]
return x, y
# 使用
dataset = CustomDataset(data, labels)
sample = dataset[0] # 获取第一个样本
x, y = sample
DataLoader
from torch.utils.data import DataLoader
# 创建DataLoader
dataloader = DataLoader(
dataset=dataset,
batch_size=32, # 批次大小
shuffle=True, # 是否打乱数据
num_workers=4, # 加载数据的进程数
pin_memory=True, # 是否锁页内存(加速GPU传输)
drop_last=True # 丢弃最后不完整的batch
)
# 遍历数据
for batch_x, batch_y in dataloader:
# batch_x: [batch_size, ...]
# batch_y: [batch_size, ...]
output = model(batch_x)
loss = criterion(output, batch_y)
数据变换
from torchvision import transforms
# 常用变换
transform = transforms.Compose([
transforms.Resize(256), # 调整大小
transforms.CenterCrop(224), # 中心裁剪
transforms.ToTensor(), # 转为Tensor
transforms.Normalize( # 标准化
mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]
)
])
# 数据增强
train_transform = transforms.Compose([
transforms.RandomResizedCrop(224), # 随机裁剪
transforms.RandomHorizontalFlip(), # 随机水平翻转
transforms.RandomRotation(15), # 随机旋转
transforms.ColorJitter( # 颜色抖动
brightness=0.2,
contrast=0.2,
saturation=0.2
),
transforms.ToTensor(),
transforms.Normalize(
mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]
)
])
# 使用
from torchvision.datasets import ImageFolder
dataset = ImageFolder('data/train', transform=transform)
训练流程
完整训练循环
import torch
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm
# 定义模型
model = SimpleNet(input_size=784, hidden_size=128, num_classes=10)
# 定义损失函数
criterion = nn.CrossEntropyLoss()
# 定义优化器
optimizer = optim.Adam(model.parameters(), lr=0.001)
# 学习率调度器
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)
# 训练循环
num_epochs = 50
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
for epoch in range(num_epochs):
# 训练阶段
model.train()
train_loss = 0
train_correct = 0
train_total = 0
pbar = tqdm(dataloader, desc=f'Epoch {epoch+1}/{num_epochs}')
for batch_x, batch_y in pbar:
batch_x = batch_x.to(device)
batch_y = batch_y.to(device)
# 前向传播
output = model(batch_x)
loss = criterion(output, batch_y)
# 反向传播
optimizer.zero_grad()
loss.backward()
optimizer.step()
# 统计
train_loss += loss.item()
_, predicted = output.max(1)
train_total += batch_y.size(0)
train_correct += predicted.eq(batch_y).sum().item()
# 更新进度条
pbar.set_postfix({
'loss': f'{loss.item():.4f}',
'acc': f'{100.*train_correct/train_total:.2f}%'
})
# 计算平均损失和准确率
train_loss = train_loss / len(dataloader)
train_acc = 100. * train_correct / train_total
print(f'Epoch {epoch+1}: Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%')
# 验证阶段
model.eval()
val_loss = 0
val_correct = 0
val_total = 0
with torch.no_grad():
for batch_x, batch_y in val_loader:
batch_x = batch_x.to(device)
batch_y = batch_y.to(device)
output = model(batch_x)
loss = criterion(output, batch_y)
val_loss += loss.item()
_, predicted = output.max(1)
val_total += batch_y.size(0)
val_correct += predicted.eq(batch_y).sum().item()
val_loss = val_loss / len(val_loader)
val_acc = 100. * val_correct / val_total
print(f'Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%')
# 更新学习率
scheduler.step()
模型保存与加载
# 保存整个模型
torch.save(model, 'model.pth')
# 保存模型参数 (推荐)
torch.save(model.state_dict(), 'model_state_dict.pth')
# 保存检查点 (包含优化器等)
torch.save({
'epoch': epoch,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'loss': loss,
}, 'checkpoint.pth')
# 加载模型
model = torch.load('model.pth')
# 加载模型参数
model = SimpleNet(input_size=784, hidden_size=128, num_classes=10)
model.load_state_dict(torch.load('model_state_dict.pth'))
model.eval()
# 加载检查点
checkpoint = torch.load('checkpoint.pth')
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
epoch = checkpoint['epoch']
loss = checkpoint['loss']
CNN
卷积神经网络
import torch
import torch.nn as nn
class CNN(nn.Module):
def __init__(self, num_classes=10):
super(CNN, self).__init__()
self.features = nn.Sequential(
# Block 1
nn.Conv2d(3, 64, kernel_size=3, padding=1),
nn.BatchNorm2d(64),
nn.ReLU(inplace=True),
nn.Conv2d(64, 64, kernel_size=3, padding=1),
nn.BatchNorm2d(64),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=2, stride=2), # 32x32 -> 16x16
# Block 2
nn.Conv2d(64, 128, kernel_size=3, padding=1),
nn.BatchNorm2d(128),
nn.ReLU(inplace=True),
nn.Conv2d(128, 128, kernel_size=3, padding=1),
nn.BatchNorm2d(128),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=2, stride=2), # 16x16 -> 8x8
# Block 3
nn.Conv2d(128, 256, kernel_size=3, padding=1),
nn.BatchNorm2d(256),
nn.ReLU(inplace=True),
nn.Conv2d(256, 256, kernel_size=3, padding=1),
nn.BatchNorm2d(256),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=2, stride=2), # 8x8 -> 4x4
)
self.classifier = nn.Sequential(
nn.Flatten(),
nn.Linear(256 * 4 * 4, 512),
nn.ReLU(inplace=True),
nn.Dropout(0.5),
nn.Linear(512, num_classes)
)
def forward(self, x):
x = self.features(x)
x = self.classifier(x)
return x
# 创建模型
model = CNN(num_classes=10)
图像分类示例
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
# 数据变换
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])
# 加载数据
trainset = datasets.CIFAR10(root='./data', train=True,
download=True, transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=32,
shuffle=True, num_workers=2)
testset = datasets.CIFAR10(root='./data', train=False,
download=True, transform=transform)
testloader = torch.utils.data.DataLoader(testset, batch_size=32,
shuffle=False, num_workers=2)
# 训练
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = CNN(num_classes=10).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
for epoch in range(10):
running_loss = 0.0
for i, (inputs, labels) in enumerate(trainloader, 0):
inputs, labels = inputs.to(device), labels.to(device)
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
running_loss += loss.item()
if i % 100 == 99:
print(f'[{epoch+1}, {i+1}] loss: {running_loss/100:.3f}')
running_loss = 0.0
print('Finished Training')
RNN
LSTM模型
import torch.nn as nn
class LSTMModel(nn.Module):
def __init__(self, vocab_size, embed_size, hidden_size, num_layers, num_classes):
super(LSTMModel, self).__init__()
self.embedding = nn.Embedding(vocab_size, embed_size)
self.lstm = nn.LSTM(
embed_size,
hidden_size,
num_layers,
batch_first=True,
bidirectional=True
)
self.fc = nn.Linear(hidden_size * 2, num_classes) # 双向LSTM
def forward(self, x):
# x: [batch_size, seq_len]
embed = self.embedding(x) # [batch_size, seq_len, embed_size]
lstm_out, (h_n, c_n) = self.lstm(embed)
# 使用最后时刻的输出
out = self.fc(lstm_out[:, -1, :])
return out
# 使用
model = LSTMModel(
vocab_size=10000,
embed_size=128,
hidden_size=256,
num_layers=2,
num_classes=2
)
GRU模型
class GRUModel(nn.Module):
def __init__(self, vocab_size, embed_size, hidden_size, num_layers, num_classes):
super(GRUModel, self).__init__()
self.embedding = nn.Embedding(vocab_size, embed_size)
self.gru = nn.GRU(
embed_size,
hidden_size,
num_layers,
batch_first=True,
bidirectional=True
)
self.fc = nn.Linear(hidden_size * 2, num_classes)
def forward(self, x):
embed = self.embedding(x)
gru_out, h_n = self.gru(embed)
out = self.fc(gru_out[:, -1, :])
return out
Transformer
自注意力机制
import torch
import torch.nn as nn
import torch.nn.functional as F
class SelfAttention(nn.Module):
def __init__(self, embed_size, heads):
super(SelfAttention, self).__init__()
self.embed_size = embed_size
self.heads = heads
self.head_dim = embed_size // heads
assert (self.head_dim * heads == embed_size), "Embed size needs to be divisible by heads"
self.values = nn.Linear(self.head_dim, self.head_dim, bias=False)
self.keys = nn.Linear(self.head_dim, self.head_dim, bias=False)
self.queries = nn.Linear(self.head_dim, self.head_dim, bias=False)
self.fc_out = nn.Linear(heads * self.head_dim, embed_size)
def forward(self, values, keys, query, mask):
N = query.shape[0]
value_len, key_len, query_len = values.shape[1], keys.shape[1], query.shape[1]
# Split into multiple heads
values = values.reshape(N, value_len, self.heads, self.head_dim)
keys = keys.reshape(N, key_len, self.heads, self.head_dim)
queries = query.reshape(N, query_len, self.heads, self.head_dim)
values = self.values(values)
keys = self.keys(keys)
queries = self.queries(queries)
# QK^T / sqrt(d_k)
energy = torch.einsum("nqhd,nkhd->nhqk", [queries, keys])
if mask is not None:
energy = energy.masked_fill(mask == 0, float("-1e20"))
attention = torch.softmax(energy / (self.embed_size ** (1/2)), dim=3)
# Attention * V
out = torch.einsum("nhql,nlhd->nqhd", [attention, values])
out = out.reshape(N, query_len, self.heads * self.head_dim)
out = self.fc_out(out)
return out
Transformer块
class TransformerBlock(nn.Module):
def __init__(self, embed_size, heads, dropout, forward_expansion):
super(TransformerBlock, self).__init__()
self.attention = SelfAttention(embed_size, heads)
self.norm1 = nn.LayerNorm(embed_size)
self.norm2 = nn.LayerNorm(embed_size)
self.feed_forward = nn.Sequential(
nn.Linear(embed_size, forward_expansion * embed_size),
nn.ReLU(),
nn.Linear(forward_expansion * embed_size, embed_size)
)
self.dropout = nn.Dropout(dropout)
def forward(self, value, key, query, mask):
attention = self.attention(value, key, query, mask)
x = self.dropout(self.norm1(attention + query))
forward = self.feed_forward(x)
out = self.dropout(self.norm2(forward + x))
return out
完整Transformer
class Transformer(nn.Module):
def __init__(
self,
src_vocab_size,
trg_vocab_size,
embed_size=512,
num_layers=6,
forward_expansion=4,
heads=8,
dropout=0,
device="cuda",
max_length=100
):
super(Transformer, self).__init__()
self.encoder = Encoder(
src_vocab_size,
embed_size,
num_layers,
heads,
device,
forward_expansion,
dropout,
max_length
)
self.decoder = Decoder(
trg_vocab_size,
embed_size,
num_layers,
heads,
forward_expansion,
dropout,
device,
max_length
)
self.device = device
self.fc_out = nn.Linear(embed_size, trg_vocab_size)
def forward(self, src, trg, src_mask, trg_mask):
enc_src = self.encoder(src, src_mask)
out = self.decoder(trg, enc_src, src_mask, trg_mask)
out = self.fc_out(out)
return out
GPU加速
CUDA基础
# 检查CUDA是否可用
print(torch.cuda.is_available())
# 查看GPU数量
print(torch.cuda.device_count())
# 查看当前GPU
print(torch.cuda.current_device())
# 查看GPU名称
print(torch.cuda.get_device_name(0))
# 将模型移动到GPU
model = model.cuda()
# 或
device = torch.device('cuda:0')
model = model.to(device)
# 将Tensor移动到GPU
x = x.cuda()
# 或
x = x.to(device)
# 清除GPU缓存
torch.cuda.empty_cache()
混合精度训练
from torch.cuda.amp import autocast, GradScaler
scaler = GradScaler()
for inputs, labels in dataloader:
inputs, labels = inputs.cuda(), labels.cuda()
optimizer.zero_grad()
with autocast(): # 自动混合精度
outputs = model(inputs)
loss = criterion(outputs, labels)
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
迁移学习
import torchvision.models as models
# 加载预训练模型
model = models.resnet18(pretrained=True)
# 冻结参数
for param in model.parameters():
param.requires_grad = False
# 修改最后的全连接层
num_features = model.fc.in_features
model.fc = nn.Linear(num_features, num_classes)
# 只训练最后的全连接层
optimizer = optim.SGD(model.fc.parameters(), lr=0.001, momentum=0.9)
# 或微调整个模型
for param in model.parameters():
param.requires_grad = True
optimizer = optim.SGD(model.parameters(), lr=0.0001, momentum=0.9)
实战案例
图像分类
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms, models
# 数据准备
transform = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
train_dataset = datasets.ImageFolder('data/train', transform=transform)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=32, shuffle=True)
# 迁移学习
model = models.resnet18(pretrained=True)
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, 10)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
# 训练
model.train()
for epoch in range(10):
for inputs, labels in train_loader:
inputs, labels = inputs.to(device), labels.to(device)
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
文本分类
import torch
import torch.nn as nn
class TextClassifier(nn.Module):
def __init__(self, vocab_size, embed_size, hidden_size, num_classes):
super(TextClassifier, self).__init__()
self.embedding = nn.Embedding(vocab_size, embed_size)
self.lstm = nn.LSTM(embed_size, hidden_size, batch_first=True)
self.fc = nn.Linear(hidden_size, num_classes)
def forward(self, x):
embed = self.embedding(x)
lstm_out, (h_n, _) = self.lstm(embed)
out = self.fc(lstm_out[:, -1, :])
return out
# 使用
model = TextClassifier(vocab_size=10000, embed_size=128, hidden_size=256, num_classes=2)
最佳实践
模型训练技巧
# 1. 梯度裁剪 (防止梯度爆炸)
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
# 2. 学习率预热
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=10)
# 3. 早停
best_val_loss = float('inf')
patience = 5
counter = 0
for epoch in range(num_epochs):
train_loss = train_one_epoch(model, dataloader)
val_loss = validate(model, val_loader)
if val_loss < best_val_loss:
best_val_loss = val_loss
torch.save(model.state_dict(), 'best_model.pth')
counter = 0
else:
counter += 1
if counter >= patience:
print('Early stopping')
break
# 4. 模型集成
models = [model1, model2, model3]
for model in models:
model.eval()
predictions = []
for model in models:
pred = model(x)
predictions.append(pred)
ensemble_pred = torch.mean(torch.stack(predictions), dim=0)
代码组织
# 1. 分离模型定义
# models.py
class MyModel(nn.Module):
def __init__(self):
super(MyModel, self).__init__()
# ...
def forward(self, x):
# ...
# 2. 分离数据集
# dataset.py
class MyDataset(Dataset):
def __init__(self):
# ...
def __getitem__(self, idx):
# ...
# 3. 分离训练逻辑
# train.py
def train_one_epoch(model, dataloader, optimizer, criterion):
model.train()
for batch_x, batch_y in dataloader:
# ...
return avg_loss
# 4. 配置文件
# config.py
class Config:
batch_size = 32
learning_rate = 0.001
num_epochs = 100
总结
PyTorch 是深度学习的主流框架:
核心概念
- Tensor: 基本数据结构,类似NumPy数组但支持GPU
- Autograd: 自动求导系统
- nn.Module: 神经网络模块基类
- 优化器: 参数更新算法
主要功能
- Tensor操作: 创建、索引、运算、形状变换
- 神经网络: 层、激活函数、损失函数
- 数据加载: Dataset、DataLoader、变换
- 训练流程: 前向传播、反向传播、参数更新
- 模型构建: CNN、RNN、Transformer
- GPU加速: CUDA、混合精度训练
- 迁移学习: 预训练模型微调
最佳实践
- 使用DataLoader高效加载数据
- 合理使用学习率调度
- 使用验证集防止过拟合
- 定期保存模型检查点
- 使用混合精度训练加速
- 合理组织代码结构
PyTorch 灵活直观,适合研究和生产环境,是深度学习的首选框架之一。