源码级别解析 · 源码解析 · AI基础架构
2026-04-25 | 每日技术深度解读
Vaswani et al. "Attention Is All You Need" (2017)
BERT、GPT、T5等模型的基石
import torch
import torch.nn as nn
import torch.nn.functional as F
class Transformer(nn.Module):
def __init__(self, src_vocab_size, tgt_vocab_size, d_model, nhead,
num_encoder_layers, num_decoder_layers, dim_feedforward, dropout):
super().__init__()
# 词嵌入和位置编码
self.src_embedding = nn.Embedding(src_vocab_size, d_model)
self.tgt_embedding = nn.Embedding(tgt_vocab_size, d_model)
self.positional_encoding = PositionalEncoding(d_model, dropout)
# Transformer编码器和解码器
encoder_layer = nn.TransformerEncoderLayer(
d_model, nhead, dim_feedforward, dropout, batch_first=True)
decoder_layer = nn.TransformerDecoderLayer(
d_model, nhead, dim_feedforward, dropout, batch_first=True)
self.encoder = nn.TransformerEncoder(encoder_layer, num_encoder_layers)
self.decoder = nn.TransformerDecoder(decoder_layer, num_decoder_layers)
# 输出层
self.generator = nn.Linear(d_model, tgt_vocab_size)
def forward(self, src, tgt, src_mask=None, tgt_mask=None):
# 词嵌入 + 位置编码
src_emb = self.positional_encoding(self.src_embedding(src))
tgt_emb = self.positional_encoding(self.tgt_embedding(tgt))
# 编码器-解码器
memory = self.encoder(src_emb, src_mask)
output = self.decoder(tgt_emb, memory, tgt_mask, src_mask)
return self.generator(output)
完整的Transformer模型架构实现
def scaled_dot_product_attention(q, k, v, mask=None):
"""
缩放点积注意力机制
Args:
q: Query矩阵 [batch_size, seq_len, d_k]
k: Key矩阵 [batch_size, seq_len, d_k]
v: Value矩阵 [batch_size, seq_len, d_v]
mask: 可选的掩码矩阵
Returns:
attention_output: 注意力输出 [batch_size, seq_len, d_v]
attention_weights: 注意力权重 [batch_size, seq_len, seq_len]
"""
d_k = q.size(-1)
# 计算注意力分数 (Q * K^T) / sqrt(d_k)
scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(d_k)
# 应用掩码(如果存在)
if mask is not None:
scores = scores.masked_fill(mask == 0, -1e9)
# Softmax归一化
attention_weights = F.softmax(scores, dim=-1)
# 加权求和
output = torch.matmul(attention_weights, v)
return output, attention_weights
注意力机制的核心数学实现
注意力机制的完整流程图
class MultiHeadAttention(nn.Module):
def __init__(self, d_model, nhead, dropout=0.1):
super().__init__()
assert d_model % nhead == 0, "d_model must be divisible by nhead"
self.d_model = d_model
self.nhead = nhead
self.d_k = d_model // nhead
self.d_v = d_model // nhead
# 线性变换层
self.w_q = nn.Linear(d_model, d_model)
self.w_k = nn.Linear(d_model, d_model)
self.w_v = nn.Linear(d_model, d_model)
self.w_o = nn.Linear(d_model, d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, query, key, value, mask=None):
batch_size = query.size(0)
# 线性变换
Q = self.w_q(query).view(batch_size, -1, self.nhead, self.d_k).transpose(1, 2)
K = self.w_k(key).view(batch_size, -1, self.nhead, self.d_k).transpose(1, 2)
V = self.w_v(value).view(batch_size, -1, self.nhead, self.d_v).transpose(1, 2)
# 计算多头注意力
attn_output, attn_weights = scaled_dot_product_attention(Q, K, V, mask)
# 拼接多头结果
attn_output = attn_output.transpose(1, 2).contiguous().view(
batch_size, -1, self.d_model)
# 最终线性变换
output = self.w_o(attn_output)
return output, attn_weights
完整的多头注意力模块实现
class PositionalEncoding(nn.Module):
def __init__(self, d_model, max_len=5000, dropout=0.1):
super().__init__()
self.dropout = nn.Dropout(dropout)
# 创建位置编码矩阵
position = torch.arange(max_len).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2) *
(-math.log(10000.0) / d_model))
pe = torch.zeros(max_len, d_model)
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
# 增加batch维度
self.register_buffer('pe', pe.unsqueeze(0))
def forward(self, x):
"""
Args:
x: Tensor, shape [batch_size, seq_len, d_model]
"""
x = x + self.pe[:, :x.size(1)]
return self.dropout(x)
正弦-余弦位置编码的实现
class EncoderLayer(nn.Module):
def __init__(self, d_model, nhead, dim_feedforward, dropout=0.1):
super().__init__()
# 多头自注意力
self.self_attn = MultiHeadAttention(d_model, nhead, dropout)
# 前馈神经网络
self.linear1 = nn.Linear(d_model, dim_feedforward)
self.dropout = nn.Dropout(dropout)
self.linear2 = nn.Linear(dim_feedforward, d_model)
# 层归一化
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
# Dropout
self.dropout1 = nn.Dropout(dropout)
self.dropout2 = nn.Dropout(dropout)
def forward(self, src, src_mask=None):
# 自注意力 + 残差连接 + 层归一化
src2 = self.self_attn(src, src, src, src_mask)[0]
src = src + self.dropout1(src2)
src = self.norm1(src)
# 前馈网络 + 残差连接 + 层归一化
src2 = self.linear2(self.dropout(F.relu(self.linear1(src))))
src = src + self.dropout2(src2)
src = self.norm2(src)
return src
Encoder层的标准实现
class DecoderLayer(nn.Module):
def __init__(self, d_model, nhead, dim_feedforward, dropout=0.1):
super().__init__()
# 掩码自注意力
self.self_attn = MultiHeadAttention(d_model, nhead, dropout)
# 编码器-解码器注意力
self.multihead_attn = MultiHeadAttention(d_model, nhead, dropout)
# 前馈网络
self.linear1 = nn.Linear(d_model, dim_feedforward)
self.dropout = nn.Dropout(dropout)
self.linear2 = nn.Linear(dim_feedforward, d_model)
# 层归一化
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.norm3 = nn.LayerNorm(d_model)
# Dropout
self.dropout1 = nn.Dropout(dropout)
self.dropout2 = nn.Dropout(dropout)
self.dropout3 = nn.Dropout(dropout)
def forward(self, tgt, memory, tgt_mask=None, memory_mask=None):
# 掩码自注意力
tgt2 = self.self_attn(tgt, tgt, tgt, tgt_mask)[0]
tgt = tgt + self.dropout1(tgt2)
tgt = self.norm1(tgt)
# 编码器-解码器注意力
tgt2 = self.multihead_attn(tgt, memory, memory, memory_mask)[0]
tgt = tgt + self.dropout2(tgt2)
tgt = self.norm2(tgt)
# 前馈网络
tgt2 = self.linear2(self.dropout(F.relu(self.linear1(tgt))))
tgt = tgt + self.dropout3(tgt2)
tgt = self.norm3(tgt)
return tgt
Decoder层的完整实现
def generate_square_subsequent_mask(sz):
"""
生成下三角掩码矩阵
Args:
sz: 序列长度
Returns:
mask: 下三角掩码矩阵 [sz, sz]
"""
mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
mask = mask.float().masked_fill(mask == 0, float('-inf'))
mask = mask.masked_fill(mask == float(1), float(0.0))
return mask
def create_padding_mask(seq, pad_idx=0):
"""
创建填充位置掩码
Args:
seq: 输入序列 [batch_size, seq_len]
pad_idx: 填充token的索引
Returns:
mask: 填充掩码 [batch_size, 1, 1, seq_len]
"""
return (seq == pad_idx).unsqueeze(1).unsqueeze(2)
两种掩码的生成方法
class FeedForward(nn.Module):
def __init__(self, d_model, dim_feedforward, dropout=0.1):
super().__init__()
# 线性层扩展
self.linear1 = nn.Linear(d_model, dim_feedforward)
self.dropout = nn.Dropout(dropout)
self.linear2 = nn.Linear(dim_feedforward, d_model)
def forward(self, x):
"""
前向传播
Args:
x: 输入张量 [batch_size, seq_len, d_model]
Returns:
输出张量 [batch_size, seq_len, d_model]
"""
# 线性变换 → ReLU → Dropout → 线性变换
return self.linear2(self.dropout(F.relu(self.linear1(x))))
标准前馈网络实现
class LayerNorm(nn.Module):
def __init__(self, d_model, eps=1e-6):
super().__init__()
self.eps = eps # 数值稳定性
self.gamma = nn.Parameter(torch.ones(d_model)) # 缩放参数
self.beta = nn.Parameter(torch.zeros(d_model)) # 平移参数
def forward(self, x):
"""
层归一化前向传播
Args:
x: 输入张量 [batch_size, seq_len, d_model]
Returns:
归一化后的张量
"""
mean = x.mean(-1, keepdim=True) # 计算均值
std = x.std(-1, keepdim=True) # 计算标准差
# 归一化
x_norm = (x - mean) / (std + self.eps)
# 缩放和平移
return self.gamma * x_norm + self.beta
LayerNorm的数学实现
class TransformerEncoder(nn.Module):
def __init__(self, encoder_layer, num_layers, norm=None):
super().__init__()
self.layers = nn.ModuleList([copy.deepcopy(encoder_layer) for _ in range(num_layers)])
self.num_layers = num_layers
self.norm = norm
def forward(self, src, src_mask=None):
"""
前向传播
Args:
src: 输入序列 [batch_size, seq_len, d_model]
src_mask: 源序列掩码
Returns:
编码器输出 [batch_size, seq_len, d_model]
"""
output = src
# 逐层通过Encoder层
for layer in self.layers:
output = layer(output, src_mask)
# 最终层归一化(如果存在)
if self.norm is not None:
output = self.norm(output)
return output
完整Encoder的堆叠实现
class TransformerDecoder(nn.Module):
def __init__(self, decoder_layer, num_layers, norm=None):
super().__init__()
self.layers = nn.ModuleList([copy.deepcopy(decoder_layer) for _ in range(num_layers)])
self.num_layers = num_layers
self.norm = norm
def forward(self, tgt, memory, tgt_mask=None, memory_mask=None):
"""
前向传播
Args:
tgt: 目标序列 [batch_size, seq_len, d_model]
memory: 编码器输出 [batch_size, src_len, d_model]
tgt_mask: 目标序列掩码
memory_mask: 编码器掩码
Returns:
解码器输出 [batch_size, seq_len, d_model]
"""
output = tgt
# 逐层通过Decoder层
for layer in self.layers:
output = layer(output, memory, tgt_mask, memory_mask)
# 最终层归一化(如果存在)
if self.norm is not None:
output = self.norm(output)
return output
完整Decoder的堆叠实现
class EmbeddingWithPositional(nn.Module):
def __init__(self, vocab_size, d_model, max_len=5000, dropout=0.1):
super().__init__()
# 词嵌入层
self.embedding = nn.Embedding(vocab_size, d_model)
# 位置编码
self.positional_encoding = PositionalEncoding(d_model, max_len, dropout)
# 初始化权重
self._init_weights()
def _init_weights(self):
"""
初始化权重
"""
# 初始化词嵌入权重
nn.init.normal_(self.embedding.weight, mean=0, std=self.embedding.embedding_dim**-0.5)
# 初始化位置编码权重
# 使用相同的初始化策略
pe_weight = self.positional_encoding.pe.squeeze(0)
nn.init.normal_(pe_weight, mean=0, std=1.0)
def forward(self, x):
"""
前向传播
Args:
x: 输入token序列 [batch_size, seq_len]
Returns:
嵌入+位置编码后的表示 [batch_size, seq_len, d_model]
"""
# 词嵌入
embedded = self.embedding(x) * math.sqrt(self.embedding.embedding_dim)
# 添加位置编码
return self.positional_encoding(embedded)
词嵌入与位置编码的组合实现
| 模型规模 | d_model | nhead | num_layers | dim_feedforward | 参数量 |
|---|---|---|---|---|---|
| 小型 | 512 | 8 | 6 | 2048 | 86M |
| 中型 | 768 | 12 | 12 | 3072 | 345M |
| 大型 | 1024 | 16 | 24 | 4096 | 1.1B |
| 超大型 | 2048 | 32 | 48 | 8192 | 10.5B |
def calculate_transformer_complexity(config):
"""
计算Transformer的计算复杂度和参数量
Args:
config: 配置字典
Returns:
complexity_dict: 复杂度信息
"""
d_model = config['d_model']
nhead = config['nhead']
num_layers = config['num_layers']
dim_feedforward = config['dim_feedforward']
seq_len = config['seq_len']
# 注意力机制复杂度
attention_complexity = num_layers * seq_len**2 * d_model
# 前馈网络复杂度
ff_complexity = num_layers * seq_len * d_model * dim_feedforward
# 总计算复杂度
total_complexity = attention_complexity + ff_complexity
# 参数量计算
embedding_params = config['vocab_size'] * d_model
output_params = config['vocab_size'] * d_model
# 层参数
layer_params = (
d_model * d_model * 4 + # QKV线性层
d_model * d_model + # 输出层
d_model * dim_feedforward * 2 + # 前馈网络
d_model * 6 # 层归一化
)
total_params = embedding_params + output_params + num_layers * layer_params
return {
'attention_complexity': attention_complexity,
'ff_complexity': ff_complexity,
'total_complexity': total_complexity,
'total_params': total_params
}
Transformer复杂度分析工具
import torch.cuda.amp as amp
class MixedPrecisionTrainer:
def __init__(self, model, scaler=None):
self.model = model
self.scaler = scaler or amp.GradScaler()
def train_step(self, batch, criterion, optimizer):
"""
混合精度训练步骤
Args:
batch: 训练数据批次
criterion: 损失函数
optimizer: 优化器
Returns:
loss: 损失值
"""
# 数据移到GPU
src, tgt = batch
src, tgt = src.cuda(), tgt.cuda()
# 自动混合精度上下文管理器
with amp.autocast():
# 前向传播
output = self.model(src, tgt)
# 计算损失
loss = criterion(output.view(-1, output.size(-1)), tgt.view(-1))
# 反向传播(缩放)
self.scaler.scale(loss).backward()
# 梯度裁剪
self.scaler.unscale_(optimizer)
torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
# 参数更新
self.scaler.step(optimizer)
self.scaler.update()
# 清空梯度
optimizer.zero_grad()
return loss.item()
混合精度训练实现
class KVCache:
def __init__(self, batch_size, max_len, d_model, nhead, device):
"""
KV缓存初始化
Args:
batch_size: 批次大小
max_len: 最大序列长度
d_model: 模型维度
nhead: 注意力头数
device: 设备
"""
self.batch_size = batch_size
self.max_len = max_len
self.d_model = d_model
self.nhead = nhead
self.d_k = d_model // nhead
# 初始化KV缓存 [batch_size, nhead, max_len, d_k]
self.k_cache = torch.zeros(batch_size, nhead, max_len, self.d_k, device=device)
self.v_cache = torch.zeros(batch_size, nhead, max_len, self.d_k, device=device)
self.current_len = 0
def update(self, k, v):
"""
更新KV缓存
Args:
k: 新的Key [batch_size, seq_len, nhead, d_k]
v: 新的Value [batch_size, seq_len, nhead, d_k]
"""
# 更新缓存
self.k_cache[:, :, self.current_len:self.current_len+k.size(1), :] = k
self.v_cache[:, :, self.current_len:self.current_len+v.size(1), :] = v
# 更新当前长度
self.current_len += k.size(1)
def get_kv(self):
"""
获取当前KV缓存
Returns:
k, v: Key和Value缓存
"""
return self.k_cache[:, :, :self.current_len, :], self.v_cache[:, :, :self.current_len, :]
KV缓存的高效实现
class BertEncoder(nn.Module):
def __init__(self, vocab_size, d_model, nhead, num_layers, dim_feedforward):
super().__init__()
# 词嵌入 + 位置编码 + 段落编码
self.embedding = nn.Embedding(vocab_size, d_model)
self.positional_encoding = PositionalEncoding(d_model)
self.token_type_embedding = nn.Embedding(2, d_model) # 0:句子A, 1:句子B
# 多层Transformer Encoder
encoder_layer = nn.TransformerEncoderLayer(
d_model, nhead, dim_feedforward, batch_first=True)
self.encoder = nn.TransformerEncoder(encoder_layer, num_layers)
# 输出层
self.pooler = nn.Linear(d_model, d_model)
def forward(self, input_ids, token_type_ids=None):
if token_type_ids is None:
token_type_ids = torch.zeros_like(input_ids)
# 嵌入组合
embeddings = self.embedding(input_ids)
position_embeddings = self.positional_encoding(embeddings)
type_embeddings = self.token_type_embedding(token_type_ids)
# 组合嵌入
embeddings = embeddings + position_embeddings + type_embeddings
# 通过Encoder
sequence_output = self.encoder(embeddings)
# 池化层(取[CLS] token)
pooled_output = self.pooler(sequence_output[:, 0, :])
return sequence_output, pooled_output
BERT核心架构实现
class GPTDecoder(nn.Module):
def __init__(self, vocab_size, d_model, nhead, num_layers, dim_feedforward):
super().__init__()
# 词嵌入 + 位置编码
self.embedding = nn.Embedding(vocab_size, d_model)
self.positional_encoding = PositionalEncoding(d_model)
# 多层Transformer Decoder
decoder_layer = nn.TransformerDecoderLayer(
d_model, nhead, dim_feedforward, batch_first=True)
self.decoder = nn.TransformerDecoder(decoder_layer, num_layers)
# 输出层
self.lm_head = nn.Linear(d_model, vocab_size)
def forward(self, input_ids, past_key_values=None):
# 嵌入 + 位置编码
embeddings = self.embedding(input_ids)
embeddings = self.positional_encoding(embeddings)
# 生成因果掩码
seq_len = input_ids.size(1)
tgt_mask = generate_square_subsequent_mask(seq_len).to(input_ids.device)
# 如果有past_key_values,使用缓存
if past_key_values is not None:
# 使用缓存的KV和新的输入
memory = past_key_values
else:
memory = embeddings
# 通过Decoder
output = self.decoder(embeddings, memory, tgt_mask=tgt_mask)
# 计算logits
logits = self.lm_head(output)
return logits, past_key_values
GPT核心架构实现
class VisionTransformer(nn.Module):
def __init__(self, image_size, patch_size, d_model, nhead, num_classes, num_layers):
super().__init__()
self.image_size = image_size
self.patch_size = patch_size
self.num_patches = (image_size // patch_size) ** 2
self.d_model = d_model
# Patch嵌入层
self.patch_embedding = nn.Conv2d(3, d_model, kernel_size=patch_size, stride=patch_size)
# 位置编码 + CLS token
self.positional_encoding = nn.Parameter(torch.zeros(1, self.num_patches + 1, d_model))
self.cls_token = nn.Parameter(torch.zeros(1, 1, d_model))
# Transformer Encoder
encoder_layer = nn.TransformerEncoderLayer(d_model, nhead, batch_first=True)
self.encoder = nn.TransformerEncoder(encoder_layer, num_layers)
# 分类头
self.head = nn.Linear(d_model, num_classes)
# 初始化
self._init_weights()
def forward(self, x):
# 图像分块 [B, 3, H, W] → [B, num_patches, d_model]
B, C, H, W = x.shape
x = self.patch_embedding(x) # [B, d_model, H/p, W/p]
x = x.flatten(2) # [B, d_model, num_patches]
x = x.transpose(1, 2) # [B, num_patches, d_model]
# 添加CLS token
cls_tokens = self.cls_token.expand(B, -1, -1)
x = torch.cat([cls_tokens, x], dim=1)
# 位置编码
x = x + self.positional_encoding
# 通过Encoder
x = self.encoder(x)
# 分类:使用CLS token
cls_output = x[:, 0, :]
logits = self.head(cls_output)
return logits
Vision Transformer架构实现
def clip_loss(image_features, text_features, temperature=0.07):
"""
CLIP对比损失函数
Args:
image_features: 图像特征 [batch_size, d_model]
text_features: 文本特征 [batch_size, d_model]
temperature: 温度参数
Returns:
loss: 对比损失
"""
# 计算相似度矩阵
logits_per_image = (image_features @ text_features.t()) * math.exp(temperature)
logits_per_text = logits_per_image.t()
# 对比损失
batch_size = image_features.shape[0]
labels = torch.arange(batch_size, device=image_features.device)
# 图像到文本的损失
loss_i = F.cross_entropy(logits_per_image, labels)
# 文本到图像的损失
loss_t = F.cross_entropy(logits_per_text, labels)
# 平均损失
return (loss_i + loss_t) / 2.0
CLIP对比损失实现
class LoraLinear(nn.Module):
def __init__(self, in_features, out_features, rank=8, lora_alpha=16):
super().__init__()
# 原始权重(冻结)
self.weight = nn.Parameter(torch.zeros(out_features, in_features), requires_grad=False)
# LoRA低秩适配
self.lora_A = nn.Parameter(torch.zeros(rank, in_features))
self.lora_B = nn.Parameter(torch.zeros(out_features, rank))
# 缩放因子
self.lora_alpha = lora_alpha
# 初始化
nn.init.kaiming_uniform_(self.weight, a=math.sqrt(5))
nn.init.zeros_(self.lora_B)
nn.init.kaiming_uniform_(self.lora_A, a=math.sqrt(5))
def forward(self, x):
# 原始线性变换 + LoRA适配
base_output = F.linear(x, self.weight)
lora_output = F.linear(F.linear(x, self.lora_A), self.lora_B)
# 应用缩放
return base_output + (lora_output * self.lora_alpha / self.lora_A.size(0))
LoRA线性层实现
| 模型 | 参数量 | 训练数据 | 任务性能 | 推理速度 |
|---|---|---|---|---|
| BERT-base | 110M | 16B tokens | SOTA文本理解 | 1K tokens/s |
| GPT-3 | 175B | 300B tokens | 强大文本生成 | 50 tokens/s |
| T5-large | 770M | 1T tokens | 多任务SOTA | 200 tokens/s |
| ViT-Large | 658M | ImageNet-21K | SOTA图像分类 | 100 images/s |
感谢阅读!
访问 https://atcfu.com/ai-articles/transformer-architecture/ 回顾本文