行为克隆的本质是把模仿学习转化为监督学习问题,下面从"在做什么"和"怎么做"两个维度,结合工程实践详细展开。

行为克隆 = 监督学习版的策略学习
给定专家演示数据集 D={(s1,a1),(s2,a2),...,(sN,aN)},训练一个策略网络πθ(a∣s),使其在状态 ss 下输出的动作 aa 尽可能接近专家的动作。
数学表达:
minθE(s,a)∼D[−logπθ(a∣s)]θminE(s,a)∼D[−logπθ(a∣s)]
对于连续动作空间(如机器人控制),常用 MSE 损失:
θminE(s,a)∼D[∥πθ(s)−a∥2]
因为策略直接在"克隆"专家的行为——看到状态 ss,就输出专家在该状态下的动作 aa,不关心为什么专家这么做,也不尝试理解任务目标。
类比:就像你模仿书法大师的字帖,只管一笔一划模仿,不理解为什么这一笔要这样写。
阶段1: 数据收集
↓
[专家演示数据采集] → 状态-动作对 (s, a)
↓
[数据预处理] → 清洗、标准化、划分训练/验证集
↓
阶段2: 模型训练
↓
[定义策略网络] → π_θ(a|s)
↓
[定义损失函数] → 分类: CrossEntropy / 回归: MSE
↓
[优化训练] → SGD/Adam 最小化损失
↓
[验证与调参] → 在验证集上评估,调整超参
↓
阶段3: 部署与评估
↓
[策略部署] → 将 π_θ 部署到目标环境
↓
[线上评估] → 计算成功率、平均回报等指标
↓
[错误分析] → 识别失败案例,决定是否需改进(如 DAgger)目标:获取高质量的 (s,a) 配对数据
数据来源:
import numpy as np
import pickle
class DemonstrationCollector:
def __init__(self, env, expert_policy):
self.env = env
self.expert = expert_policy
self.demo_data = []
def collect_episode(self, max_steps=1000):
"""收集一整条专家演示轨迹"""
obs = self.env.reset()
episode_data = []
for t in range(max_steps):
# 专家策略给出动作
action = self.expert.get_action(obs)
# 记录状态-动作对
episode_data.append({
'state': obs.copy(),
'action': action.copy(),
'reward': None, # BC 不需要 reward
'done': False
})
# 执行动作(可选,用于获取下一状态)
obs, reward, done, info = self.env.step(action)
if done:
episode_data[-1]['done'] = True
break
return episode_data
def collect_dataset(self, n_episodes=100):
"""收集完整数据集"""
for i in range(n_episodes):
episode = self.collect_episode()
self.demo_data.extend(episode)
print(f"Collected episode {i+1}/{n_episodes}, length={len(episode)}")
# 保存数据集
with open('expert_demos.pkl', 'wb') as f:
pickle.dump(self.demo_data, f)
return self.demo_data工程注意事项:
目标:将原始演示数据转换为训练可用的格式
import torch
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as T
class BCDataset(Dataset):
def __init__(self, demo_file, transform=None):
with open(demo_file, 'rb') as f:
self.demo_data = pickle.load(f)
self.transform = transform
# 提取状态和动作
self.states = []
self.actions = []
for demo in self.demo_data:
state = demo['state']
action = demo['action']
# 状态预处理
if self.transform:
state = self.transform(state)
# 动作预处理(如连续动作归一化)
if isinstance(action, np.ndarray):
action = torch.FloatTensor(action)
self.states.append(state)
self.actions.append(action)
def __len__(self):
return len(self.states)
def __getitem__(self, idx):
return self.states[idx], self.actions[idx]
# 数据预处理流水线
state_transform = T.Compose([
T.ToTensor(),
T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
# 创建数据集
train_dataset = BCDataset('expert_demos_train.pkl', transform=state_transform)
val_dataset = BCDataset('expert_demos_val.pkl', transform=state_transform)
# 数据加载器
train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=128, shuffle=False)工程注意事项:
目标:选择合适的网络结构 πθ(a∣s)
网络设计原则:
状态类型 | 推荐网络架构 | 输出层设计 |
|---|---|---|
低维向量(如关节角度) | MLP (2-3 层) | 离散:Softmax / 连续:Linear |
图像(如相机 RGB) | CNN (ResNet-18/34) | 同上 |
高维感知(如点云) | PointNet / 3D CNN | 同上 |
多模态(图像+状态) | CNN + MLP 融合 | 同上 |
import torch.nn as nn
import torch.nn.functional as F
class BCNetwork(nn.Module):
def __init__(self, state_dim, action_dim, hidden_dim=256):
super(BCNetwork, self).__init__()
# MLP 策略网络
self.network = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.ReLU(),
nn.Dropout(0.1), # 防止过拟合
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Dropout(0.1),
nn.Linear(hidden_dim, hidden_dim // 2),
nn.ReLU(),
nn.Linear(hidden_dim // 2, action_dim)
)
def forward(self, state):
# 连续动作空间:直接输出动作值
# action = self.network(state)
# return action
# 离散动作空间:输出 logits,用 Softmax 转为概率
logits = self.network(state)
return logits
# 图像输入的策略网络
class VisionBCNetwork(nn.Module):
def __init__(self, action_dim):
super(VisionBCNetwork, self).__init__()
# 预训练的 ResNet-18
self.cnn = torchvision.models.resnet18(pretrained=True)
self.cnn.fc = nn.Identity() # 移除分类层
# 策略头
self.policy_head = nn.Sequential(
nn.Linear(512, 256),
nn.ReLU(),
nn.Linear(256, action_dim)
)
def forward(self, image):
features = self.cnn(image)
action = self.policy_head(features)
return action工程注意事项:
目标:量化预测动作与专家动作的差距
损失函数选择:
动作空间 | 损失函数 | 公式 | PyTorch 实现 |
|---|---|---|---|
离散 | CrossEntropyLoss | −∑aexpertlogπθ(a∣s)−∑aexpertlogπθ(a∣s) | nn.CrossEntropyLoss() |
连续(单峰) | MSELoss | ∣πθ(s)−aexpert∣2∣πθ(s)−aexpert∣2 | nn.MSELoss() |
连续(多峰) | 混合密度网络 (MDN) | −log∑kπkN(μk,σk)−log∑kπkN(μk,σk) | 需自定义 |
连续(复杂分布) | 扩散损失 | Et,ϵ[∣ϵ−ϵθ(s,at,t)∣2]Et,ϵ[∣ϵ−ϵθ(s,at,t)∣2] | 需自定义 |
import torch.nn as nn
# 离散动作空间
discrete_policy = BCNetwork(state_dim, action_dim)
discrete_loss_fn = nn.CrossEntropyLoss()
# 连续动作空间
continuous_policy = BCNetwork(state_dim, action_dim)
continuous_loss_fn = nn.MSELoss()
# 训练循环中
for states, actions in train_loader:
states = states.to(device)
actions = actions.to(device)
# 前向传播
pred_actions = policy(states)
# 计算损失
if discrete: # 离散动作
loss = discrete_loss_fn(pred_actions, actions.long())
else: # 连续动作
loss = continuous_loss_fn(pred_actions, actions)
# 反向传播
optimizer.zero_grad()
loss.backward()
optimizer.step()工程注意事项:
weight 参数调整损失权重。torch.nn.utils.clip_grad_norm_(policy.parameters(), max_norm=1.0)本质:通过梯度下降最小化 策略预测动作 与 专家动作 之间的差距。
训练循环伪代码:
算法:行为克隆训练循环
输入:专家数据集 D = {(s₁,a₁), (s₂,a₂), ..., (sₙ,aₙ)}
输出:训练好的策略网络 π_θ
初始化:
策略网络 π_θ(随机初始化或使用预训练权重)
优化器(Adam/SGD)
学习率调度器
for epoch = 1 to N_EPOCHS:
# === 训练阶段 ===
将数据集 D 分成若干 batch
for each batch B = {(sᵢ, aᵢ)}ᵢ₌₁^B:
# 前向传播
pred_actions = π_θ(sᵢ)
# 计算损失
if 离散动作:
loss = CrossEntropy(pred_actions, aᵢ)
else:
loss = MSE(pred_actions, aᵢ)
# 反向传播
optimizer.zero_grad()
loss.backward()
# 梯度裁剪(防止梯度爆炸)
clip_grad_norm(π_θ.parameters(), max_norm=1.0)
# 更新参数
optimizer.step()
# === 验证阶段 ===
在验证集上计算 loss 和准确率
val_loss = evaluate(π_θ, D_val)
# === 学习率调整 ===
if val_loss 连续上升:
reduce_learning_rate()
# === 早停检查 ===
if val_loss < best_val_loss:
best_val_loss = val_loss
save_checkpoint(π_θ, epoch)
patience_counter = 0
else:
patience_counter += 1
if patience_counter >= PATIENCE_LIMIT:
break # 早停
# === 定期保存 ===
if epoch % SAVE_INTERVAL == 0:
save_checkpoint(π_θ, epoch)
返回:最佳模型 π_θ*
原理:训练初期使用较小的学习率,避免破坏预训练权重或导致训练不稳定。
for step = 1 to WARMUP_STEPS:
lr = BASE_LR * (step / WARMUP_STEPS)
set_learning_rate(optimizer, lr)
原理:使用 FP16 进行计算,FP32 存储权重,节省显存并加速训练。
# 使用 PyTorch 的 AMP (Automatic Mixed Precision)
scaler = GradScaler()
for each batch:
with autocast(): # 自动混合精度
pred = π_θ(states)
loss = criterion(pred, actions)
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
原理:当 GPU 显存不足时,累积多个小 batch 的梯度再更新参数,等效于大 batch 训练。
ACCUM_STEPS = 4 # 累积 4 个 batch
optimizer.zero_grad()
for i, (states, actions) in enumerate(dataloader):
pred = π_θ(states)
loss = criterion(pred, actions)
# 归一化损失(除以累积步数)
loss = loss / ACCUM_STEPS
loss.backward()
# 每 ACCUM_STEPS 步更新一次
if (i + 1) % ACCUM_STEPS == 0:
optimizer.step()
optimizer.zero_grad()
问题:离散动作空间中,某些动作样本少,导致模型偏向高频动作。
解决方案:给损失函数加权,低频动作权重高。
# 统计各类别样本数
class_counts = count_samples(D_train)
class_weights = 1.0 / class_counts
# 使用加权损失
criterion = CrossEntropyLoss(weight=class_weights)
for each batch:
pred = π_θ(states)
loss = criterion(pred, actions) # 自动应用权重
loss.backward()
optimizer.step()
指标 | 计算公式 | 用途 |
|---|---|---|
训练损失 | 1B∑i=1BL(πθ(si),ai)1/B∑i=1BL(πθ(si),ai) | 监控训练收敛 |
验证损失 | 同上(在验证集上) | 检测过拟合 |
训练准确率 | 1B∑1[apred=agt]1/B∑1[apred=agt] | 离散动作空间 |
学习率 | 当前 lr 值 | 监控调度器工作 |
梯度范数 | ∣∇θL∣2∣∇θL∣2 | 检测梯度爆炸/消失 |
显存使用 | GPU memory allocated | 调试显存泄漏 |
训练监控:
for epoch = 1 to N_EPOCHS:
metrics = {
'train_loss': [],
'train_acc': [],
'val_loss': [],
'val_acc': [],
'lr': [],
'grad_norm': []
}
for each batch:
# 训练步骤(略)
# 记录指标
metrics['train_loss'].append(loss.item())
metrics['train_acc'].append(accuracy)
metrics['lr'].append(get_lr(optimizer))
metrics['grad_norm'].append(compute_grad_norm(π_θ))
# 验证步骤(略)
metrics['val_loss'].append(val_loss)
metrics['val_acc'].append(val_acc)
# 日志记录(可接入 W&B/TensorBoard)
log_to_wandb(metrics)
# 打印训练报告
print(f"Epoch {epoch}: train_loss={mean(metrics['train_loss']):.4f}, "
f"val_loss={val_loss:.4f}, lr={metrics['lr'][-1]:.6f}")
行为克隆的评估分为 离线评估 和 在线评估:
评估体系
├── 离线评估(Offline)
│ ├── 在测试集上计算损失/准确率
│ ├── 不需要与环境交互
│ └── 只能评估"动作预测准确度",不能评估"任务成功率"
│
└── 在线评估(Online)
├── 将策略部署到真实/仿真环境
├── 执行完整任务,计算成功率/回报
└── 能评估"实际任务表现",但计算成本高
目标:量化策略在未见过的状态上的动作预测准确度。
核心指标:
Accuracy=1N∑i=1N1[aipred=aiexpert]Accuracy=1/N{i=1}∑N1[aipred=aiexpert]
离线评估(离散)
算法:离线评估(离散动作)
输入:测试数据集 D_test = {(sᵢ, aᵢ)}
输出:准确率、混淆矩阵
初始化:
policy.eval() # 评估模式
correct = 0
total = 0
confusion_matrix = zeros(N_ACTIONS, N_ACTIONS)
for each (s, a_gt) in D_test:
with no_grad():
logits = policy(s)
a_pred = argmax(logits)
# 统计准确率
if a_pred == a_gt:
correct += 1
total += 1
# 更新混淆矩阵
confusion_matrix[a_gt][a_pred] += 1
accuracy = correct / total
返回:accuracy, confusion_matrix
MSE=1N∑i=1N∥aipred−aiexpert∥2MSE=1/N{i=1}∑N∥aipred−aiexpert∥2
MAE=1N∑i=1N∥aipred−aiexpert∥1MAE=1/N{i=1}∑N∥aipred−aiexpert∥1
R2=1−∑(aexpert−apred)2∑(aexpert−aˉexpert)2R2=1−∑(aexpert−aˉexpert)2∑(aexpert−apred)2
越接近 1 说明拟合越好。
离线评估(连续)
算法:离线评估(连续动作)
输入:测试数据集 D_test = {(sᵢ, aᵢ)}
输出:MSE, MAE, R²
初始化:
policy.eval()
preds = []
gts = []
for each (s, a_gt) in D_test:
with no_grad():
a_pred = policy(s)
preds.append(a_pred)
gts.append(a_gt)
# 计算指标
preds = tensor(preds)
gts = tensor(gts)
mse = mean((preds - gts)²)
mae = mean(|preds - gts|)
r2 = 1 - sum((gts - preds)²) / sum((gts - mean(gts))²)
返回:mse, mae, r2
目标:评估策略在真实任务上的表现(成功率、回报等)。
核心指标:
指标 | 含义 | 计算方法 |
|---|---|---|
成功率 | 任务成功比例 | NsuccessNepisodes |
平均回报 | 任务平均得分 | 1N∑i=1N∑t=0Trt(i)1/N∑i=1N∑t=0Trt(i) |
平均步数 | 任务完成所需步数 | 1N∑i=1NTi1/N∑i=1NTi |
探索覆盖率 | 状态空间覆盖程度 | 用 t-SNE/UMAP 可视化状态分布 |
在线评估
算法:在线评估
输入:训练好的策略 π_θ,环境 env,评估回合数 N
输出:成功率、平均回报、详细日志
初始化:
success_count = 0
returns = []
lengths = []
for episode = 1 to N:
state = env.reset()
done = False
total_reward = 0
steps = 0
while not done:
# 策略推理(无梯度)
with no_grad():
action = policy(state)
# 执行动作
next_state, reward, done, info = env.step(action)
total_reward += reward
steps += 1
state = next_state
# 防止无限循环
if steps >= MAX_STEPS:
break
# 记录结果
returns.append(total_reward)
lengths.append(steps)
# 判断是否成功(任务相关)
if info.get('success', False):
success_count += 1
# 打印进度
print(f"Episode {episode}: return={total_reward}, steps={steps}, "
f"success={info.get('success', False)}")
# 汇总统计
success_rate = success_count / N
mean_return = mean(returns)
std_return = std(returns)
返回:{
'success_rate': success_rate,
'mean_return': mean_return,
'std_return': std_return,
'returns': returns,
'lengths': lengths
}