首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >行为克隆(BC:模仿学习)步骤及原理解析

行为克隆(BC:模仿学习)步骤及原理解析

作者头像
索旭东
发布2026-06-01 18:07:46
发布2026-06-01 18:07:46
720
举报
文章被收录于专栏:具身小站具身小站

行为克隆的本质是把模仿学习转化为监督学习问题,下面从"在做什么"和"怎么做"两个维度,结合工程实践详细展开。

一、行为克隆在做什么?

1.1 核心思想

行为克隆 = 监督学习版的策略学习

给定专家演示数据集 D={(s1,a1),(s2,a2),...,(sN,aN)},训练一个策略网络πθ(a∣s),使其在状态 ss 下输出的动作 aa 尽可能接近专家的动作。

数学表达

min⁡θE(s,a)∼D[−log⁡πθ(a∣s)]θminE(s,a)∼D[−logπθ(a∣s)]

对于连续动作空间(如机器人控制),常用 MSE 损失:

θminE(s,a)∼D[∥πθ(s)−a∥2]

1.2 为什么叫"克隆"?

因为策略直接在"克隆"专家的行为——看到状态 ss,就输出专家在该状态下的动作 aa不关心为什么专家这么做,也不尝试理解任务目标

类比:就像你模仿书法大师的字帖,只管一笔一划模仿,不理解为什么这一笔要这样写。


二、行为克隆怎么做?

2.1 完整流程图

代码语言:javascript
复制
阶段1: 数据收集
  ↓
[专家演示数据采集] → 状态-动作对 (s, a)
  ↓
[数据预处理] → 清洗、标准化、划分训练/验证集
  ↓
阶段2: 模型训练
  ↓
[定义策略网络] → π_θ(a|s)
  ↓
[定义损失函数] → 分类: CrossEntropy / 回归: MSE
  ↓
[优化训练] → SGD/Adam 最小化损失
  ↓
[验证与调参] → 在验证集上评估,调整超参
  ↓
阶段3: 部署与评估
  ↓
[策略部署] → 将 π_θ 部署到目标环境
  ↓
[线上评估] → 计算成功率、平均回报等指标
  ↓
[错误分析] → 识别失败案例,决定是否需改进(如 DAgger)

2.2 详细步骤拆解

步骤 1:专家演示数据收集

目标:获取高质量的 (s,a) 配对数据

数据来源

  • 人类演示:通过遥操作、VR 设备、鼠标键盘等采集
  • 脚本策略:用传统控制算法(如 PID、RRT)生成数据
  • 过往策略:用之前训练的策略生成数据(课程学习)
代码语言:javascript
复制
import numpy as np
import pickle

class DemonstrationCollector:
    def __init__(self, env, expert_policy):
        self.env = env
        self.expert = expert_policy
        self.demo_data = []

    def collect_episode(self, max_steps=1000):
        """收集一整条专家演示轨迹"""
        obs = self.env.reset()
        episode_data = []

        for t in range(max_steps):
            # 专家策略给出动作
            action = self.expert.get_action(obs)

            # 记录状态-动作对
            episode_data.append({
                'state': obs.copy(),
                'action': action.copy(),
                'reward': None,  # BC 不需要 reward
                'done': False
            })

            # 执行动作(可选,用于获取下一状态)
            obs, reward, done, info = self.env.step(action)

            if done:
                episode_data[-1]['done'] = True
                break

        return episode_data

    def collect_dataset(self, n_episodes=100):
        """收集完整数据集"""
        for i in range(n_episodes):
            episode = self.collect_episode()
            self.demo_data.extend(episode)
            print(f"Collected episode {i+1}/{n_episodes}, length={len(episode)}")

        # 保存数据集
        with open('expert_demos.pkl', 'wb') as f:
            pickle.dump(self.demo_data, f)

        return self.demo_data

工程注意事项

  1. 数据多样性:确保覆盖状态空间的不同区域,避免只采集单一轨迹
  2. 数据质量:专家策略要足够好,错误演示会直接误导模型
  3. 数据量:通常至少需要 数百到数千条轨迹(取决于任务复杂度)
  4. 数据格式:统一状态表示(如图像归一化到 [0,1]),动作格式(离散/连续)

步骤 2:数据预处理

目标:将原始演示数据转换为训练可用的格式

代码语言:javascript
复制
import torch
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as T

class BCDataset(Dataset):
    def __init__(self, demo_file, transform=None):
        with open(demo_file, 'rb') as f:
            self.demo_data = pickle.load(f)

        self.transform = transform

        # 提取状态和动作
        self.states = []
        self.actions = []

        for demo in self.demo_data:
            state = demo['state']
            action = demo['action']

            # 状态预处理
            if self.transform:
                state = self.transform(state)

            # 动作预处理(如连续动作归一化)
            if isinstance(action, np.ndarray):
                action = torch.FloatTensor(action)

            self.states.append(state)
            self.actions.append(action)

    def __len__(self):
        return len(self.states)

    def __getitem__(self, idx):
        return self.states[idx], self.actions[idx]

# 数据预处理流水线
state_transform = T.Compose([
    T.ToTensor(),
    T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# 创建数据集
train_dataset = BCDataset('expert_demos_train.pkl', transform=state_transform)
val_dataset = BCDataset('expert_demos_val.pkl', transform=state_transform)

# 数据加载器
train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=128, shuffle=False)

工程注意事项

  1. 训练/验证划分:通常 80%/20% 或 90%/10%
  2. 数据增强(如适用):对图像状态做随机裁剪、翻转等
  3. 动作归一化:连续动作空间建议归一化到 [-1, 1] 或 [0, 1]
  4. 类别平衡(离散动作):检查各类别分布,必要时过采样/欠采样

步骤 3:定义策略网络架构

目标:选择合适的网络结构 πθ(a∣s)

网络设计原则

状态类型

推荐网络架构

输出层设计

低维向量(如关节角度)

MLP (2-3 层)

离散:Softmax / 连续:Linear

图像(如相机 RGB)

CNN (ResNet-18/34)

同上

高维感知(如点云)

PointNet / 3D CNN

同上

多模态(图像+状态)

CNN + MLP 融合

同上

代码语言:javascript
复制
import torch.nn as nn
import torch.nn.functional as F

class BCNetwork(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_dim=256):
        super(BCNetwork, self).__init__()

        # MLP 策略网络
        self.network = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Dropout(0.1),  # 防止过拟合

            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Dropout(0.1),

            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.ReLU(),

            nn.Linear(hidden_dim // 2, action_dim)
        )

    def forward(self, state):
        # 连续动作空间:直接输出动作值
        # action = self.network(state)
        # return action

        # 离散动作空间:输出 logits,用 Softmax 转为概率
        logits = self.network(state)
        return logits

# 图像输入的策略网络
class VisionBCNetwork(nn.Module):
    def __init__(self, action_dim):
        super(VisionBCNetwork, self).__init__()

        # 预训练的 ResNet-18
        self.cnn = torchvision.models.resnet18(pretrained=True)
        self.cnn.fc = nn.Identity()  # 移除分类层

        # 策略头
        self.policy_head = nn.Sequential(
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Linear(256, action_dim)
        )

    def forward(self, image):
        features = self.cnn(image)
        action = self.policy_head(features)
        return action

工程注意事项

  1. 网络容量:任务简单用小网络,复杂任务用大网络(但注意过拟合)
  2. 预训练权重:图像任务强烈建议用 ImageNet 预训练
  3. 输出激活:
  • 离散动作:Softmax(多类)或 Sigmoid(二类)
  • 连续动作:Tanh(限制范围)或直接 Linear(无范围限制)

步骤 4:定义损失函数

目标:量化预测动作与专家动作的差距

损失函数选择

动作空间

损失函数

公式

PyTorch 实现

离散

CrossEntropyLoss

−∑aexpertlog⁡πθ(a∣s)−∑aexpertlogπθ(a∣s)

nn.CrossEntropyLoss()

连续(单峰)

MSELoss

∣πθ(s)−aexpert∣2∣πθ(s)−aexpert∣2

nn.MSELoss()

连续(多峰)

混合密度网络 (MDN)

−log⁡∑kπkN(μk,σk)−log∑kπkN(μk,σk)

需自定义

连续(复杂分布)

扩散损失

Et,ϵ[∣ϵ−ϵθ(s,at,t)∣2]Et,ϵ[∣ϵ−ϵθ(s,at,t)∣2]

需自定义

代码语言:javascript
复制
import torch.nn as nn

# 离散动作空间
discrete_policy = BCNetwork(state_dim, action_dim)
discrete_loss_fn = nn.CrossEntropyLoss()

# 连续动作空间
continuous_policy = BCNetwork(state_dim, action_dim)
continuous_loss_fn = nn.MSELoss()

# 训练循环中
for states, actions in train_loader:
    states = states.to(device)
    actions = actions.to(device)

    # 前向传播
    pred_actions = policy(states)

    # 计算损失
    if discrete:  # 离散动作
        loss = discrete_loss_fn(pred_actions, actions.long())
    else:  # 连续动作
        loss = continuous_loss_fn(pred_actions, actions)

    # 反向传播
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

工程注意事项

  1. 多模态动作:如果专家在同一状态下会给出不同动作(如左右对称),用 MSE 会学出平均动作(很差)。此时应用 MDN 或扩散策略。
  2. 类别不平衡:离散动作空间若存在类别不平衡,用 weight 参数调整损失权重。
  3. 梯度裁剪:防止梯度爆炸,torch.nn.utils.clip_grad_norm_(policy.parameters(), max_norm=1.0)

步骤5:模型训练

5.1 训练循环核心原理

本质:通过梯度下降最小化 策略预测动作专家动作 之间的差距。

训练循环伪代码

代码语言:javascript
复制
算法:行为克隆训练循环
输入:专家数据集 D = {(s₁,a₁), (s₂,a₂), ..., (sₙ,aₙ)}
输出:训练好的策略网络 π_θ

初始化:
  策略网络 π_θ(随机初始化或使用预训练权重)
  优化器(Adam/SGD)
  学习率调度器

for epoch = 1 to N_EPOCHS:
    # === 训练阶段 ===
    将数据集 D 分成若干 batch
    for each batch B = {(sᵢ, aᵢ)}ᵢ₌₁^B:
        # 前向传播
        pred_actions = π_θ(sᵢ)

        # 计算损失
        if 离散动作:
            loss = CrossEntropy(pred_actions, aᵢ)
        else:
            loss = MSE(pred_actions, aᵢ)

        # 反向传播
        optimizer.zero_grad()
        loss.backward()

        # 梯度裁剪(防止梯度爆炸)
        clip_grad_norm(π_θ.parameters(), max_norm=1.0)

        # 更新参数
        optimizer.step()

    # === 验证阶段 ===
    在验证集上计算 loss 和准确率
    val_loss = evaluate(π_θ, D_val)

    # === 学习率调整 ===
    if val_loss 连续上升:
        reduce_learning_rate()

    # === 早停检查 ===
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        save_checkpoint(π_θ, epoch)
        patience_counter = 0
    else:
        patience_counter += 1
        if patience_counter >= PATIENCE_LIMIT:
            break  # 早停

    # === 定期保存 ===
    if epoch % SAVE_INTERVAL == 0:
        save_checkpoint(π_θ, epoch)

返回:最佳模型 π_θ*
代码语言:javascript
复制


5.2 关键训练技巧原理

技巧1:学习率预热

原理:训练初期使用较小的学习率,避免破坏预训练权重或导致训练不稳定。

代码语言:javascript
复制
for step = 1 to WARMUP_STEPS:
    lr = BASE_LR * (step / WARMUP_STEPS)
    set_learning_rate(optimizer, lr)
代码语言:javascript
复制

技巧2:混合精度训练

原理:使用 FP16 进行计算,FP32 存储权重,节省显存并加速训练。

代码语言:javascript
复制
# 使用 PyTorch 的 AMP (Automatic Mixed Precision)
scaler = GradScaler()

for each batch:
    with autocast():  # 自动混合精度
        pred = π_θ(states)
        loss = criterion(pred, actions)

    scaler.scale(loss).backward()
    scaler.step(optimizer)
    scaler.update()
代码语言:javascript
复制

技巧3:梯度累积

原理:当 GPU 显存不足时,累积多个小 batch 的梯度再更新参数,等效于大 batch 训练。

代码语言:javascript
复制
ACCUM_STEPS = 4  # 累积 4 个 batch
optimizer.zero_grad()

for i, (states, actions) in enumerate(dataloader):
    pred = π_θ(states)
    loss = criterion(pred, actions)

    # 归一化损失(除以累积步数)
    loss = loss / ACCUM_STEPS
    loss.backward()

    # 每 ACCUM_STEPS 步更新一次
    if (i + 1) % ACCUM_STEPS == 0:
        optimizer.step()
        optimizer.zero_grad()
代码语言:javascript
复制

技巧4:类别平衡

问题:离散动作空间中,某些动作样本少,导致模型偏向高频动作。

解决方案:给损失函数加权,低频动作权重高。

代码语言:javascript
复制
# 统计各类别样本数
class_counts = count_samples(D_train)
class_weights = 1.0 / class_counts

# 使用加权损失
criterion = CrossEntropyLoss(weight=class_weights)

for each batch:
    pred = π_θ(states)
    loss = criterion(pred, actions)  # 自动应用权重
    loss.backward()
    optimizer.step()
代码语言:javascript
复制


5.3 训练监控指标

指标

计算公式

用途

训练损失

1B∑i=1BL(πθ(si),ai)1/B∑i=1BL(πθ(si),ai)

监控训练收敛

验证损失

同上(在验证集上)

检测过拟合

训练准确率

1B∑1[apred=agt]1/B∑1[apred=agt]

离散动作空间

学习率

当前 lr 值

监控调度器工作

梯度范数

∣∇θL∣2∣∇θL∣2

检测梯度爆炸/消失

显存使用

GPU memory allocated

调试显存泄漏

训练监控

代码语言:javascript
复制
for epoch = 1 to N_EPOCHS:
    metrics = {
        'train_loss': [],
        'train_acc': [],
        'val_loss': [],
        'val_acc': [],
        'lr': [],
        'grad_norm': []
    }

    for each batch:
        # 训练步骤(略)

        # 记录指标
        metrics['train_loss'].append(loss.item())
        metrics['train_acc'].append(accuracy)
        metrics['lr'].append(get_lr(optimizer))
        metrics['grad_norm'].append(compute_grad_norm(π_θ))

    # 验证步骤(略)
    metrics['val_loss'].append(val_loss)
    metrics['val_acc'].append(val_acc)

    # 日志记录(可接入 W&B/TensorBoard)
    log_to_wandb(metrics)

    # 打印训练报告
    print(f"Epoch {epoch}: train_loss={mean(metrics['train_loss']):.4f}, "
          f"val_loss={val_loss:.4f}, lr={metrics['lr'][-1]:.6f}")
代码语言:javascript
复制


步骤6:模型评估

6.1 评估的两个层面

行为克隆的评估分为 离线评估在线评估

代码语言:javascript
复制
评估体系
├── 离线评估(Offline)
│   ├── 在测试集上计算损失/准确率
│   ├── 不需要与环境交互
│   └── 只能评估"动作预测准确度",不能评估"任务成功率"
│
└── 在线评估(Online)
    ├── 将策略部署到真实/仿真环境
    ├── 执行完整任务,计算成功率/回报
    └── 能评估"实际任务表现",但计算成本高
代码语言:javascript
复制


6.2 离线评估原理

目标:量化策略在未见过的状态上的动作预测准确度。

核心指标

(1)离散动作空间
  • 准确率:

Accuracy=1N∑i=1N1[aipred=aiexpert]Accuracy=1/N{i=1}∑N1[aipred=aiexpert]

  • 混淆矩阵 :显示哪些动作容易被误判,指导改进方向。
  • Top-K 准确率:预测的前 K 个动作中包含真实动作的比例(允许一定容错)。

离线评估(离散)

代码语言:javascript
复制
算法:离线评估(离散动作)
输入:测试数据集 D_test = {(sᵢ, aᵢ)}
输出:准确率、混淆矩阵

初始化:
  policy.eval()  # 评估模式
  correct = 0
  total = 0
  confusion_matrix = zeros(N_ACTIONS, N_ACTIONS)

for each (s, a_gt) in D_test:
    with no_grad():
        logits = policy(s)
        a_pred = argmax(logits)

    # 统计准确率
    if a_pred == a_gt:
        correct += 1
    total += 1

    # 更新混淆矩阵
    confusion_matrix[a_gt][a_pred] += 1

accuracy = correct / total
返回:accuracy, confusion_matrix
代码语言:javascript
复制

(2)连续动作空间
  • 均方误差 (MSE):

MSE=1N∑i=1N∥aipred−aiexpert∥2MSE=1/N{i=1}∑N∥aipred−aiexpert∥2

  • 平均绝对误差 (MAE):

MAE=1N∑i=1N∥aipred−aiexpert∥1MAE=1/N{i=1}∑N∥aipred−aiexpert∥1

  • 决定系数 (R²):

R2=1−∑(aexpert−apred)2∑(aexpert−aˉexpert)2R2=1−∑(aexpert−aˉexpert)2∑(aexpert−apred)2

越接近 1 说明拟合越好。

离线评估(连续)

代码语言:javascript
复制
算法:离线评估(连续动作)
输入:测试数据集 D_test = {(sᵢ, aᵢ)}
输出:MSE, MAE, R²

初始化:
  policy.eval()
  preds = []
  gts = []

for each (s, a_gt) in D_test:
    with no_grad():
        a_pred = policy(s)

    preds.append(a_pred)
    gts.append(a_gt)

# 计算指标
preds = tensor(preds)
gts = tensor(gts)

mse = mean((preds - gts)²)
mae = mean(|preds - gts|)
r2 = 1 - sum((gts - preds)²) / sum((gts - mean(gts))²)

返回:mse, mae, r2
代码语言:javascript
复制


6.3 在线评估原理

目标:评估策略在真实任务上的表现(成功率、回报等)。

核心指标

指标

含义

计算方法

成功率

任务成功比例

NsuccessNepisodes

平均回报

任务平均得分

1N∑i=1N∑t=0Trt(i)1/N∑i=1N∑t=0Trt(i)

平均步数

任务完成所需步数

1N∑i=1NTi1/N∑i=1NTi

探索覆盖率

状态空间覆盖程度

用 t-SNE/UMAP 可视化状态分布

在线评估

代码语言:javascript
复制
算法:在线评估
输入:训练好的策略 π_θ,环境 env,评估回合数 N
输出:成功率、平均回报、详细日志

初始化:
  success_count = 0
  returns = []
  lengths = []

for episode = 1 to N:
    state = env.reset()
    done = False
    total_reward = 0
    steps = 0

    while not done:
        # 策略推理(无梯度)
        with no_grad():
            action = policy(state)

        # 执行动作
        next_state, reward, done, info = env.step(action)

        total_reward += reward
        steps += 1
        state = next_state

        # 防止无限循环
        if steps >= MAX_STEPS:
            break

    # 记录结果
    returns.append(total_reward)
    lengths.append(steps)

    # 判断是否成功(任务相关)
    if info.get('success', False):
        success_count += 1

    # 打印进度
    print(f"Episode {episode}: return={total_reward}, steps={steps}, "
          f"success={info.get('success', False)}")

# 汇总统计
success_rate = success_count / N
mean_return = mean(returns)
std_return = std(returns)

返回:{
    'success_rate': success_rate,
    'mean_return': mean_return,
    'std_return': std_return,
    'returns': returns,
    'lengths': lengths
}
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2026-05-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 具身小站 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、行为克隆在做什么?
    • 1.1 核心思想
    • 1.2 为什么叫"克隆"?
  • 二、行为克隆怎么做?
    • 2.1 完整流程图
    • 2.2 详细步骤拆解
      • 步骤 1:专家演示数据收集
      • 步骤 2:数据预处理
      • 步骤 3:定义策略网络架构
      • 步骤 4:定义损失函数
  • 步骤5:模型训练
    • 5.1 训练循环核心原理
    • 5.2 关键训练技巧原理
      • 技巧1:学习率预热
      • 技巧2:混合精度训练
      • 技巧3:梯度累积
      • 技巧4:类别平衡
    • 5.3 训练监控指标
  • 步骤6:模型评估
    • 6.1 评估的两个层面
    • 6.2 离线评估原理
      • (1)离散动作空间
      • (2)连续动作空间
    • 6.3 在线评估原理
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档