rsl_rl——人形运控部署框架汇总:从经典RL框架rsl_rl到宇树开源的unitree_rl_gym(含unitree_sdk2_python)

rsl_rl——人形运控部署框架汇总:从经典RL框架rsl_rl到宇树开源的unitree_rl_gym(含unitree_sdk2_python)

前言

现在人形运控基本都避不开RL了,而对于人形运控本身的部署则是一个完整的工程,而作为经典RL框架rsl_rl,则在本博客里的多篇文章反复被提及,比如

  1. 比如Humanplus一文中提到
    对于humanplus的整个代码框架,总计包含以下五个部分
    Humanoid Shadowing Transformer (HST),此所谓low-level,属于机器人小脑
    这个部分的代码是基于仿真的强化学习实现,使用了legged_gym和rsl_rl
    ..

    ————
    顺带,该文『详见此文《斯坦福人形HumanPlus的代码解读与复现关键:从HST(含rsl_rl)到HIT、HardWare》』,曾分析过rsl_rl中对PPO的实现
    既然本文专门解读rsl_rl,故可以把那部分中对rsl_rl的介绍 也综合到本文之中了
  2. 比如NaVILA一文中提到
    第二部分 NaVILA/legged-loco中isaaclab_exts/模块的解析:侧重H1人形机器人配置
    整体代码库主要分为以下几个部分:
    isaaclab_exts - Isaac Lab的扩展,包含机器人配置和控制逻辑
    rsl_rl - 强化学习框架实现
    ..
  3. 再比如HOMIE一文中提到
    1.2 HOMIE的硬件系统设计与摇操实验
    1.2.2.2 遥操作系统:可在现实世界和仿真中遥操机器人(含G1的部署源码Walk these ways)
    G1的部署代码源自[52-Walk these ways: Tuning robot control for generalization with multiplicity of behavior],对应的GitHub为:leggedrobotics/rsl_rl
    ..
  4. 再比如,KungfuBot一文中提到
    第二部分 KungfuBot GitHub仓库的整体分析
    其所参考的库包括
    ASAP:使用 ASAP 库来构建他们的强化学习代码库
    RSL_RL:使用 rsl_rl 库来实现 PPO
  5. 一朋友反馈,通过beyondmimic 试了一小段查尔斯整体还是比较稳的,至于数据用的宇树的数据集,部署框架他则用宇树的unitree_rl_gym改的
    当然,beyondmimic本身集成了rsl_rl——homie便用的这个经典RL框架做的部署,也可以试下

比如Exbody 2一文中提到
“ 对于部署代码

\rightarrow

  我有找到这个HOMIE——包括PBHC的作者也表示可以参考openhomie和unitree rl gym的真机代码,有提供G1的部署代码(我也和HOMIE作者确认了下,说他们GitHub上包含G1的部署代码)——当然 HOMIE也用的别人的Walk these ways,对应的GitHub为leggedrobotics/rsl_rl

\rightarrow

  此外,也可以直接参考或用unitree_rl_gym的代码
此unitree_rl_gym仓库建立在以下开源项目的支持和贡献之上。特别感谢:
legged_gym:训练和运行代码的基础
rsl_rl:强化学习算法实现——即上面提到的Walk these ways
mujoco:提供强大的模拟功能。
unitree_sdk2_python:用于物理部署的硬件通信接口


\rightarrow

  再此外,PBHC里的一群友还介绍了个基于unitree_sdk2_python的部署代码:LeggedLabDeploy 

正因为rsl_rl的屡屡出现,提高了其重要性和影响力,故本文特地专门汇总梳理下人形运控部署框架,包括rsl_rl、unitree_rl_gym、unitree_sdk2_python

值得一体的是,因为我之前在解读chatgpt原理时,花了大力气阐述过RL中重要的概念,包括PPO,关于chatgpt/RL的那几篇文章详见此系列《开启大模型时代的ChatGPT系列:包含原理、RLHF等》,当然,下文我还会再次给出更具体的一篇篇文章的链接的』,对理解本文要介绍的rsl_rl框架 还是很有帮助的,建议大伙读一读

第一部分 rsl_rl/:经典RL框架的封装(含我对PPO实现的解读)

RSL-RL是一个强化学习算法框架,其包含三大主要组件:Runners、Algorithms 和Networks

  • 管理环境步进和智能体学习的Runner
    __init__.py
    distillation_runner.py
    on_policy_runner.py
  • 供算法使用的Network结构
    包含
    __init__.py
    memory.py
    mlp.py
    normalization.py

定义学习智能体的Algorithm——包含 ppo.py、distillation.py
即RSL-RL 包含两种算法:

\rightarrow

  PPO
PPO 是一种无模型、基于策略的强化学习方法,它能够从零开始学习复杂任务,无需先验知识

\rightarrow

  一种类似于 DAgger的 BC 算法
BC算法是一种有监督学习方法,用于将专家策略的行为迁移到学生策略中。该方法通过反复执行学生策略以收集数据,使用专家动作对这些数据进行重新标注,并在此基础上训练学生策略
在使用PPO进行强化学习训练后,如果训练环境与硬件部署环境存在差异,该算法尤其有用。在这种情况下,仅依赖仿真中可用信息的RL智能体的(教师)行为可以蒸馏为不再依赖这些信息的(学生策略)

此外,还有对环境的封装,以及模型组件

  • env
    __init__.py
    vec_env.py
  • modules
    __init__.py
    actor_critic.py
    actor_critic_recurrent.py
    rnd.py
    student_teacher.py
    student_teacher_recurrent.py
    symmetry.py

具体而言,其代码结构为

rsl_rl/ config/ - 配置文件 rsl_rl/ - 核心代码 algorithms/ - 强化学习算法实现,如PPO env/ - 环境封装 modules/ - 模型组件 actor_critic.py - 基础Actor-Critic网络 actor_critic_depth_cnn.py - 带深度视觉的Actor-Critic网络 actor_critic_history.py - 带历史信息的Actor-Critic网络 actor_critic_recurrent.py - 循环神经网络版Actor-Critic depth_backbone.py - 深度视觉处理网络 runners/ - 训练运行器 storage/ - 数据存储 utils/ - 工具函数

这个模块定义了强化学习的核心组件,包括actor-critic网络架构(有普通版、CNN版、历史记忆版和RNN版)、PPO算法实现、训练运行器等

从代码中可以看出,它支持不同类型的输入数据(如关节状态、深度图像等)

1.1 rsl_rl/algorithms/ppo.py:近端策略优化PPO的实现

这段代码定义了一个名为 `PPO` 的类,它实现了近端策略优化(Proximal Policy Optimization)算法,其详细介绍详见此文的《强化学习极简入门:通俗理解MDP、DP MC TD和Q学习、策略梯度、PPO》第4.4节

1.1.1 初始化__init__

构造函数接收一个

  • `actor_critic` 网络(策略网络和价值网络的组合)
  • 学习超参数(如学习周期 `num_learning_epochs`、小批量数量 `num_mini_batches`、裁剪参数 `clip_param`、折扣因子 `gamma`、GAE lambda `lam` 等)

以及优化器设置(学习率 `learning_rate`、梯度裁剪范数 `max_grad_norm`)

import torch # 导入 PyTorch 库 import torch.nn as nn # 导入 PyTorch 神经网络模块 import torch.optim as optim # 导入 PyTorch 优化模块 from rsl_rl.modules import ActorCriticTransformer # 从 rsl_rl.modules 导入 ActorCriticTransformer 类 from rsl_rl.storage import RolloutStorage # 从 rsl_rl.storage 导入 RolloutStorage 类 class PPO: # 定义 PPO 类 actor_critic: ActorCriticTransformer # 定义 actor_critic 变量类型为 ActorCriticTransformer def __init__(self, # 定义 PPO 类的初始化函数 actor_critic, # 定义 actor_critic 参数 num_learning_epochs=1, # 定义 num_learning_epochs 参数,默认值为1 num_mini_batches=1, # 定义 num_mini_batches 参数,默认值为1 clip_param=0.2, # 定义 clip_param 参数,默认值为0.2 gamma=0.998, # 定义 gamma 参数,默认值为0.998 lam=0.95, # 定义 lam 参数,默认值为0.95 value_loss_coef=1.0, # 定义 value_loss_coef 参数,默认值为1.0 entropy_coef=0.0, # 定义 entropy_coef 参数,默认值为0.0 learning_rate=1e-3, # 定义 learning_rate 参数,默认值为1e-3 max_grad_norm=1.0, # 定义 max_grad_norm 参数,默认值为1.0 use_clipped_value_loss=True, # 定义 use_clipped_value_loss 参数,默认值为 True schedule="fixed", # 定义 schedule 参数,默认值为 "fixed" desired_kl=0.01, # 定义 desired_kl 参数,默认值为 0.01 device='cpu', # 定义 device 参数,默认值为 'cpu' ):

它初始化 PPO 的核心组件:

  • `actor_critic`: 传入的神经网络模型,并将其移动到指定的设备(如 CPU 或 GPU)
  • `storage`: 用于存储经验轨迹(transitions)的 `RolloutStorage` 对象,稍后初始化
  • `optimizer`: 使用 Adam 优化器来更新 `actor_critic` 网络的参数
  • `transition`: 一个临时的 `RolloutStorage.Transition` 对象,用于在每个环境步骤中收集数据

及存储 PPO 算法的关键超参数
设置学习率调度策略(`schedule`)和目标 KL 散度(`desired_kl`),用于自适应学习率调整

 self.device = device # 初始化 self.device 为传入的 device 参数 self.desired_kl = desired_kl # 初始化 self.desired_kl 为传入的 desired_kl 参数 self.schedule = schedule # 初始化 self.schedule 为传入的 schedule 参数 self.learning_rate = learning_rate # 初始化 self.learning_rate 为传入的 learning_rate 参数 # PPO components # 最上面PPO类里一系列参数的self初始化 # PPO parameters # 最上面PPO类里一系列参数的self初始化

1.1.2 存储初始化init_storage

  1. 这个方法根据环境的数量、每个环境收集的转换(transitions)数量、观察空间形状和动作空间形状来创建 `RolloutStorage` 实例

`RolloutStorage` 负责存储智能体与环境交互产生的数据序列

 def init_storage(self, num_envs, num_transitions_per_env, actor_obs_shape, critic_obs_shape, action_shape): # 初始化存储 self.storage = RolloutStorage(num_envs, num_transitions_per_env, actor_obs_shape, critic_obs_shape, action_shape, self.device)

1.1.3 模式切换test_mode/train_mode

这些方法用于切换 `actor_critic` 网络到评估(测试)模式或训练模式。这对于包含 Dropout 或 Batch Normalization 层的网络很重要

详细代码如下

 def test_mode(self): # 定义测试模式函数 self.actor_critic.test() # 设置 actor_critic 为测试模式 def train_mode(self): # 定义训练模式函数 self.actor_critic.train() # 设置 actor_critic 为训练模式

1.1.4 动作选择act

这是智能体与环境交互的核心。给定当前观察 `obs`(用于策略网络)和 `critic_obs`(用于价值网络,可能与 `obs` 相同或包含额外信息),它执行以下操作:

返回计算出的动作,供环境执行

 return self.transition.actions # 返回动作

将当前的观察、评论家观察以及计算出的所有信息存储在临时的 `transition` 对象中

 # need to record obs and critic_obs before env.step() # 在环境步之前需要记录 obs 和 critic_obs self.transition.observations = obs # 记录观察 self.transition.critic_observations = critic_obs # 记录评论员观察

存储动作分布的均值 (`action_mean`) 和标准差 (`action_sigma`)

 self.transition.action_mean = self.actor_critic.action_mean.detach() # 获取动作均值 self.transition.action_sigma = self.actor_critic.action_std.detach() # 获取动作标准差

使用 `actor_critic` 网络
计算动作 (`actions`)

 # 使用 Actor-Critic 网络计算动作,并分离计算图 self.transition.actions = self.actor_critic.act(obs).detach() 

状态价值 (`values`)

 # 使用 Actor-Critic 网络评估状态价值,并分离计算图 self.transition.values = self.actor_critic.evaluate(critic_obs).detach() 

和动作的对数概率 (`actions_log_prob`)

 # 获取动作的对数概率,并分离计算图 self.transition.actions_log_prob = self.actor_critic.get_actions_log_prob(self.transition.actions).detach() 

如果网络是循环的(RNN/LSTM),获取并存储隐藏状态

 def act(self, obs, critic_obs): # 定义 act 函数 if self.actor_critic.is_recurrent: # 如果 actor_critic 是循环的 self.transition.hidden_states = self.actor_critic.get_hidden_states() # 获取隐藏状态

1.1.5 处理环境步骤process_env_step

在环境执行动作后调用此方法

清空 `transition` 对象,为下一步做准备

 self.transition.clear() # 清除过渡数据

如果网络是循环的,根据 `dones` 信号重置其隐藏状态

 self.actor_critic.reset(dones) # 重置 actor_critic

且将这些信息存储到 `transition` 对象中

 # Bootstrapping on time outs # 对超时进行引导 if 'time_outs' in infos: # 如果信息中有 'time_outs' self.transition.rewards += self.gamma * torch.squeeze(self.transition.values * infos['time_outs'].unsqueeze(1).to(self.device), 1) # 更新奖励

重要: 它实现了"超时引导 (Bootstrapping on time outs)"。如果一个 episode 因为达到时间限制而不是因为失败状态而结束(通过 `infos["time_outs"]` 判断),它会将最后一步的估计价值(乘以 `gamma`)加到奖励中
这可以防止智能体因为时间限制而受到不公平的惩罚,并提供更准确的回报估计

————
将完整的 `transition` 添加到 `storage` 中

 # Record the transition # 记录过渡 self.storage.add_transitions(self.transition) # 添加过渡到存储中

接收奖励 (`rewards`)、完成标志 (`dones`) 和额外信息 (`infos`)

 def process_env_step(self, rewards, dones, infos): # 定义处理环境步函数 self.transition.rewards = rewards.clone() # 克隆奖励 self.transition.dones = dones # 记录完成状态

1.1.6 计算回报compute_returns

  1. 在收集了足够多的 transitions 后调用

然后,调用 `storage.compute_returns` 方法

 # 调用存储器的 compute_returns 方法计算回报和 GAE 优势 self.storage.compute_returns(last_values, self.gamma, self.lam) 

这个方法通常使用广义优势估计 (Generalized Advantage Estimation, GAE)来计算每个时间步的回报 (`returns`) 和优势 (`advantages`)


GAE 结合了不同时间步长的价值估计,以减少估计的方差

首先,使用 `actor_critic` 网络评估最后一个状态的价值 (`last_values`)

 # 计算回报和优势的方法 def compute_returns(self, last_critic_obs): # 评估最后一个状态的价值 last_values = self.actor_critic.evaluate(last_critic_obs).detach() 
至于什么是GAE,请参见此文《从零实现带RLHF的类ChatGPT:逐行解析微软DeepSpeed Chat的源码》的第3.6节

1次PPO训练由train_rlhf()方法进行管理,其内部主要实现了『注,以下的内容如果有不太理解的,可以结合《ChatGPT技术原理解析:从RL之PPO算法、RLHF到GPT4、instructGPT》一文中的3.2节增进理解』:

至于回报returns就是奖励reward的累计,对于全长为的轨迹来说,其到达某个时间步 时的回报为

优势advantages和回报returns的计算
其中包括本框架在内多数框架的advantages实现并非纯粹使用TD-error,而是在TD-error的基础上结合了MC方法,也即GAE(广义优势估计);
——————
对于全长为的轨迹来说,其某个时间步的优势为(时,advantage完全使用MC方法;时,advantage完全使用TD-error方法):

KL散度惩罚奖励old_rewards的计算,为了防止对phase2学习到的环境奖励过度自信,所以加入了KL散度惩罚项:

1.1.7 update:策略和价值网络参数更新

这是执行策略和价值网络参数更新的核心循环

 # 更新策略和价值网络参数的方法 def update(self): # 初始化平均价值损失 mean_value_loss = 0 # 初始化平均代理损失 mean_surrogate_loss = 0

第二阶段,它遍历生成器产生的每个mini-batch,对每个batch进行一系列的操作
注意,以下是整个FOR大循环开始

 # 遍历小批量生成器产生的数据 for ( obs_batch, # 观察值小批量 critic_obs_batch, # 评论家观察值小批量 actions_batch, # 动作小批量 target_values_batch, # 目标价值小批量 (用于价值损失计算,通常是回报) advantages_batch, # 优势小批量 returns_batch, # 回报小批量 (用于价值损失计算) old_actions_log_prob_batch, # 旧策略下的动作对数概率小批量 old_mu_batch, # 旧策略下的动作均值小批量 old_sigma_batch, # 旧策略下的动作标准差小批量 hid_states_batch, # 隐藏状态小批量 (仅用于循环网络) masks_batch, # 掩码小批量 (仅用于循环网络) ) in generator:

第一阶段,它首先根据网络是否是循环的,选择合适的小批量生成器(`mini_batch_generator` 或 `reccurent_mini_batch_generator`) 从 `storage` 中采样数据

 # 检查 Actor-Critic 网络是否是循环网络 if self.actor_critic.is_recurrent: # 如果是循环网络,使用循环小批量生成器 generator = self.storage.reccurent_mini_batch_generator(self.num_mini_batches, self.num_learning_epochs) # 如果不是循环网络 else: # 使用标准小批量生成器 generator = self.storage.mini_batch_generator(self.num_mini_batches, self.num_learning_epochs) 
    接下来,分析下整个FOR循环

    对于每个小批量 (mini-batch)

最后,计算总损失:将代理损失、价值损失(乘以系数 `value_loss_coef`)和熵奖励(乘以系数 `entropy_coef`,鼓励探索)结合起来

 # 计算总损失 = 代理损失 + 价值损失 - 熵奖励 loss = surrogate_loss + self.value_loss_coef * value_loss - self.entropy_coef * entropy_batch.mean() 

且进行梯度更新: 计算总损失的梯度,执行梯度裁剪 (`clip_grad_norm_`) 防止梯度爆炸,然后让优化器执行一步更新

 # 梯度更新步骤 # 清空优化器的梯度 self.optimizer.zero_grad() # 反向传播计算梯度 loss.backward() # 对梯度进行裁剪,防止梯度爆炸 nn.utils.clip_grad_norm_(self.actor_critic.parameters(), self.max_grad_norm) # 执行一步优化器更新 self.optimizer.step() # 累加当前小批量的价值损失 (转换为 Python float) mean_value_loss += value_loss.item() # 累加当前小批量的代理损失 (转换为 Python float) mean_surrogate_loss += surrogate_loss.item() 

价值函数损失 (Value Function Loss):计算价值网络的损失
如果设置了使用裁剪价值损失,那么会计算裁剪的价值损失和未裁剪的价值损失,并取两者的最大值作为最终的价值损失

 # 价值函数损失计算 # 如果使用裁剪的价值损失 if self.use_clipped_value_loss: # 计算裁剪后的价值预测 value_clipped = target_values_batch + (value_batch - target_values_batch).clamp( # 将价值预测与目标价值的差值裁剪到 [-clip, clip] -self.clip_param, self.clip_param ) # 计算未裁剪的价值损失 (均方误差) value_losses = (value_batch - returns_batch).pow(2) # 计算裁剪后的价值损失 (均方误差) value_losses_clipped = (value_clipped - returns_batch).pow(2) # 取未裁剪和裁剪后损失中的较大者,并计算平均值 value_loss = torch.max(value_losses, value_losses_clipped).mean() 

否则,直接计算未裁剪的价值损失

 # 如果不使用裁剪的价值损失 else: # 直接计算价值损失 (均方误差) value_loss = (returns_batch - value_batch).pow(2).mean() 

————
其实就是之前这篇文章《从零实现带RLHF的类ChatGPT:逐行解析微软DeepSpeed Chat的源码》中3.6中AC架构下的PPO训练:在加了β惩罚且截断后的RM之下,通过经验数据不断迭代策略且估计value讲过
1个ppo_batch中,critic的损失计算公式为:
裁剪新价值估计

V_{new}

,使其不至于太偏离采集经验时的旧价值估计,使得经验回放仍能有效:

V_{clip} = clip(V_{new}, V_{old}-\phi, V_{old}+\phi)

critic将拟合回报R:

vf\_loss = \frac{1}{2} \cdot E_{\tau \sim \pi_{old}^{RL}} E_{s_t \sim {\tau}} [\max((V_{new}(s_t)-R_t)^2, (V_{clip}(s_t)-R_t)^2)]

可能有同学疑问上面的代码和我说的这个公式并没有一一对齐呀,为了方便大家一目了然,我们把代码逐行再分析下

对于这行代码
value_clipped = target_values_batch + (value_batch - target_values_batch).clamp(-self.clip_param,self.clip_param)

转换成公式便是
V_clipped = V_old + clip(V_new - V_old, -ε, ε)

它和我上面贴的公式表达的其实是一样的
因为我上面贴的公式要表达的是:

V_{old}-\phi < V_{new} < V_{old}+\phi

,那该不等式两边都减去个

V_{old}

,不就意味着

-\phi < V_{new} - V_{old} < \phi



而接下来这三行代码
value_losses = (value_batch - returns_batch).pow(2)
value_losses_clipped = (value_clipped - returns_batch).pow(2)
value_loss = torch.max(value_losses, value_losses_clipped).mean()

则表达的就是如下公式

vf\_loss = \frac{1}{2} \cdot E_{\tau \sim \pi_{old}^{RL}} E_{s_t \sim {\tau}} [\max((V_{new}(s_t)-R_t)^2, (V_{clip}(s_t)-R_t)^2)]

是不一目了然了..

接下来,计算 PPO 的核心损失项:actor损失 (Surrogate Loss)
它首先计算新旧策略的比率——即重要性采样比率 (`ratio`) ,和优势 (`advantages_batch`),然后计算未裁剪和裁剪的代理损失,并取两者的最大值作为最终的actor损失

 # 代理损失计算 (PPO 核心目标) # 计算重要性采样比率 (当前概率 / 旧概率) ratio = torch.exp(actions_log_prob_batch - torch.squeeze(old_actions_log_prob_batch)) # 计算未裁剪的代理损失项 surrogate = -torch.squeeze(advantages_batch) * ratio # 计算裁剪后的代理损失项 surrogate_clipped = -torch.squeeze(advantages_batch) * torch.clamp( # 且将比率裁剪到 [1-clip, 1+clip] 范围内 ratio, 1.0 - self.clip_param, 1.0 + self.clip_param ) # 取未裁剪和裁剪后损失中的较大者,并计算平均值 surrogate_loss = torch.max(surrogate, surrogate_clipped).mean() 
其实上面的代码就是对近端策略优化裁剪PPO-clip的直接实现,其对应的公式如下『详细了解请查看本博客内此文《强化学习极简入门:通俗理解MDP、DP MC TC和Q学习、策略梯度、PPO》的第4.4.2节PPO算法的另一个变种:近端策略优化裁剪PPO-clip)

KL 散度与自适应学习率:如果启用了自适应学习率 (`schedule == "adaptive"`)
首先,计算当前策略和旧策略(生成数据时的策略)之间的 KL 散度

 # KL 散度计算 (用于自适应学习率) # 如果设置了期望 KL 且调度策略是自适应的 if self.desired_kl is not None and self.schedule == "adaptive": # 在无梯度计算模式下进行 with torch.inference_mode(): # 计算当前策略和旧策略之间的 KL 散度 kl = torch.sum( # 对数标准差比项 (+1e-5 防止除零) torch.log(sigma_batch / old_sigma_batch + 1.0e-5) # 旧标准差平方 + 均值差平方项 + (torch.square(old_sigma_batch) + torch.square(old_mu_batch - mu_batch)) # 除以 2 倍当前标准差平方 / (2.0 * torch.square(sigma_batch)) # 减去 0.5 - 0.5, # 沿着最后一个维度求和 axis=-1, )

上面这个KL公式稍微有点小复杂,但为方便大家更好的理解 更为大家看着舒服、省心,我还是把上面这段代码对应的公式 写一下,且把代码的每一行 与公式当中的各个项,逐一对应说明下


假设当前策略的均值和标准差分别为 μ 和 σ,旧策略的均值和标准差分别为

\mu_{\text {old }}

\sigma_{\text {old }}

,则 KL 散度的公式为

\mathrm{KL}=\sum_{i=1}^{d}\left[\log \left(\frac{\sigma_{i}}{\sigma_{\mathrm{old}, i}}+10^{-5}\right)+\frac{\sigma_{\mathrm{old}, i}^{2}+\left(\mu_{\mathrm{old}, i}-\mu_{i}\right)^{2}}{2 \sigma_{i}^{2}}-\frac{1}{2}\right]


首先,其中这行代码

 # 对数标准差比项 (+1e-5 防止除零) torch.log(sigma_batch / old_sigma_batch + 1.0e-5) 

对应为

\log \left(\frac{\sigma_{i}}{\sigma_{\text {old }, i}}+10^{-5}\right)



其次,接下来的这行代码

 # 旧标准差平方 + 均值差平方项 + (torch.square(old_sigma_batch) + torch.square(old_mu_batch - mu_batch)) 

对应于

\sigma_{\mathrm{old}, i}^{2}+\left(\mu_{\mathrm{old}, i}-\mu_{i}\right)^{2}



然后其中的「除以 2 倍当前标准差平方」的代码:

 # 除以 2 倍当前标准差平方 / (2.0 * torch.square(sigma_batch)) 

对应于

\frac{\sigma_{\text {old }, i}^{2}+\left(\mu_{\text {old }, i}-\mu_{i}\right)^{2}}{2 \sigma_{i}^{2}}



接下来,减去0.5的代码 对应为

\frac{\sigma_{\text {old }, i}^{2}+\left(\mu_{\mathrm{old}, i}-\mu_{i}\right)^{2}}{2 \sigma_{i}^{2}}-\frac{1}{2}



最后,沿着最后一个维度求和的代码 对应为

\sum_{i=1}^{d}\left[\log \left(\frac{\sigma_{i}}{\sigma_{\text {old }, i}}+10^{-5}\right)+\frac{\sigma_{\text {old }, i}^{2}+\left(\mu_{\text {old }, i}-\mu_{i}\right)^{2}}{2 \sigma_{i}^{2}}-\frac{1}{2}\right]

哦了,上面这个公式解释好了
那接下来,根据 KL 散度与 `desired_kl` 的比较,动态调整优化器的学习率

 # 计算 KL 散度的平均值 kl_mean = torch.mean(kl) # 如果 KL 散度远大于期望值 if kl_mean > self.desired_kl * 2.0: # 降低学习率 (最小为 1e-5) self.learning_rate = max(1e-5, self.learning_rate / 1.5) # 如果 KL 散度远小于期望值 (且大于 0) elif kl_mean < self.desired_kl / 2.0 and kl_mean > 0.0: # 提高学习率 (最大为 1e-2) self.learning_rate = min(1e-2, self.learning_rate * 1.5) 

上面这个过程,也是有说法滴,根据此文的《强化学习极简入门:通俗理解MDP、DP MC TD和Q学习、策略梯度、PPO》第4.4节,可知:上述的5行代码是典型的自适应KL惩罚的过程

J_{\mathrm{PPO}}^{\theta^{\prime}}(\theta)=\mathbb{E}_{\left(s_{t}, a_{t}\right) \sim \pi_{\theta^{\prime}}}\left[\frac{p_{\theta}\left(a_{t} \mid s_{t}\right)}{p_{\theta^{\prime}}\left(a_{t} \mid s_{t}\right)} A^{\theta^{\prime}}\left(s_{t}, a_{t}\right)\right]-\beta \mathrm{KL}\left(\theta, \theta^{\prime}\right)


上述公式中的

\beta

是怎么取值的呢,事实上,

\beta

是可以动态调整的,称之为自适应KL惩罚(adaptive KL penalty),具体而言

\rightarrow

  先设一个可以接受的 KL 散度的最大值

KL_{max}


假设优化完

\\J_{\mathrm{PPO}}^{\theta^{\prime}}(\theta)=J^{\theta^{\prime}}(\theta)-\beta \mathrm{KL}\left(\theta, \theta^{\prime}\right)

以后,KL 散度值太大导致

\mathrm{KL}(\theta,\theta')>\mathrm{KL}_{\max}

,意味着 

\theta

\theta'

差距过大(即学习率/步长过大),也就代表后面惩罚的项

\beta \mathrm{KL}(\theta ,\theta ')

惩罚效果太弱而没有发挥作用,故增大惩罚把

\beta

增大,所以需要减小学习率

\rightarrow

  再设一个 KL 散度的最小值

KL_{min}


如果优化完

\\J_{\mathrm{PPO}}^{\theta^{\prime}}(\theta)=J^{\theta^{\prime}}(\theta)-\beta \mathrm{KL}\left(\theta, \theta^{\prime}\right)

以后,KL散度值比最小值还要小导致

\mathrm{KL}(\theta,\theta')< {KL}_{\min}

,意味着 

\theta

与 

\theta'

 差距过小,也就代表后面这一项

\beta \mathrm{KL}(\theta ,\theta ')

的惩罚效果太强了,我们怕它只优化后一项,使

\theta

与 

\theta'

 一样,这不是我们想要的,所以减小惩罚即减小

\beta

,所以需要增大学习率
————
至于详细了解请查看本博客内此文《强化学习极简入门:通俗理解MDP、DP MC TC和Q学习、策略梯度、PPO》的4.4.1 什么是近端策略优化PPO与PPO-penalty

重新计算当前策略下的小批量数据的动作对数概率、价值和熵

 # 使用当前策略重新评估动作 (主要为了获取内部状态如均值、标准差) self.actor_critic.act(obs_batch, masks=masks_batch, hidden_states=hid_states_batch[0]) # 获取当前策略下动作的对数概率 actions_log_prob_batch = self.actor_critic.get_actions_log_prob(actions_batch) # 使用当前策略评估状态价值 value_batch = self.actor_critic.evaluate( # 传入评论家观察、掩码和隐藏状态 critic_obs_batch, masks=masks_batch, hidden_states=hid_states_batch[1] ) # 获取当前策略的动作均值 mu_batch = self.actor_critic.action_mean # 获取当前策略的动作标准差 sigma_batch = self.actor_critic.action_std # 获取当前策略的熵 entropy_batch = self.actor_critic.entropy 

      至此,便结束了上述的整个FOR循环 

第三阶段,在所有mini-batch更新完成后,累积并计算平均的价值损失和代理损失,用于监控训练过程

 # 计算总的更新次数 num_updates = self.num_learning_epochs * self.num_mini_batches # 计算整个 update 过程中的平均价值损失 mean_value_loss /= num_updates # 计算整个 update 过程中的平均代理损失 mean_surrogate_loss /= num_updates

清空 `storage`,为下一轮数据收集做准备

 # 清空经验存储器,为下一轮数据收集做准备 self.storage.clear() 

返回平均损失值

 # 返回平均价值损失和平均代理损失 return mean_value_loss, mean_surrogate_loss

1.2 rsl_rl/env

1.3 rsl_rl/modules

1.3.0 modules/actor_critic_transformer.py

actor_critic_transformer.py这段代码定义了一个基于BERT风格的Transformer模型,用于强化学习中的Actor-Critic方法

它由三个主要的类组成:Transformer_Block, Transformer、ActorCriticTransformer 

  • ActorCriticTransformer 类
    ActorcriticTransformer 类实现了Actor-Critic方法,其中Actor和Critic都使用了上述的Transformer模型
    Actor负责生成动作,而Critic负责评估当前策略的价值。这个类还包括了动作噪声的处理,以及一些用于强化学习的特定方法,如 act、evaluate 和update-distribution 等

Transformer 类
Transformer 类构建了一个完整的 Transformer模型,它接收输入数据并通过一系列的Transformer_Block 进行处理

首先,输入数据通过一个线性层和位置嵌入 (Position Embedding) 进行处理,以增加位置信息

class Transformer(nn.Module): def __init__(self, input_dim, output_dim, context_len, latent_dim=128, num_head=4, num_layer=4, dropout_rate=0.1) -> None: super().__init__() self.input_dim = input_dim self.output_dim = output_dim self.context_len = context_len self.latent_dim = latent_dim self.num_head = num_head self.num_layer = num_layer self.input_layer = nn.Sequential( nn.Linear(input_dim, latent_dim), nn.Dropout(dropout_rate), ) self.weight_pos_embed = nn.Embedding(context_len, latent_dim)

然后,数据通过多个 Transformer_Block 进行处理

 self.attention_blocks = nn.Sequential( *[Transformer_Block(latent_dim, num_head, dropout_rate) for _ in range(num_layer)], )

最后,模型通过另一个线性层输出最終结果

 self.output_layer = nn.Sequential( nn.LayerNorm(latent_dim), nn.Linear(latent_dim, output_dim), ) def forward(self, x): x = self.input_layer(x) x = x + self.weight_pos_embed(torch.arange(x.shape[1], device=x.device)) x = self.attention_blocks(x) # take the last token x = x[:, -1, :] x = self.output_layer(x) return x

Transformer_Block 类
Transformer_Block 类是构建Transformer模型的基础块(下图来自此文中对ViT的架构的介绍:Embedding层 + Transformer Encoder + MLP Head)

它包含了多头注意力机制 (Muitihead Attention) 和前馈神经网络 (FeedForward Neural Network)

# a BERT-style transformer block class Transformer_Block(nn.Module): def __init__(self, latent_dim, num_head, dropout_rate) -> None: super().__init__() self.num_head = num_head self.latent_dim = latent_dim self.ln_1 = nn.LayerNorm(latent_dim) self.attn = nn.MultiheadAttention(latent_dim, num_head, dropout=dropout_rate, batch_first=True) self.ln_2 = nn.LayerNorm(latent_dim) self.mlp = nn.Sequential( nn.Linear(latent_dim, 4 * latent_dim), nn.GELU(), nn.Linear(4 * latent_dim, latent_dim), nn.Dropout(dropout_rate), ) def forward(self, x): x = self.ln_1(x) x = x + self.attn(x, x, x, need_weights=False)[0] x = self.ln_2(x) x = x + self.mlp(x) return x

这个类首先通过层归一代 (Layer Normalization),然后应用多头注意力机制,再次进行层归一化,并最后通过一个前馈神经网络

1.3.1 modules/actor_critic_depth_cnn.py

1.3.2 modules/actor_critic_history.py

1.3.3 modules/actor_critic_recurrent.py

1.3.4  modules/actor_critic.py

这段代码定义了一个名为 ActorCritic的类,它继承自nn.Module,是一个用于强化学习的演员-评论家模型的实现

这个模型包含两个主要部分:一个用于决策的策略网络(演员)和一个用于评估动作价值的价值网络(评论家)

  1. 此外,构造函数还初始化了一个用于动作输出的噪声参数 self.std,并设置了一个分布self.distribution,该分布稍后将用于生成带有噪声的动作

接下来,构造函数初始化演员和评论家网络

 mlp_input_dim_a = num_actor_obs mlp_input_dim_c = num_critic_obs

这两个网络都是使用多层感知机MLP构建的,其中每一层都是通过 nn.Linear 创建的,并且在每个线性层之后应用了激活函数

比如这是对policy的构建

 # Policy actor_layers = [] actor_layers.append(nn.Linear(mlp_input_dim_a, actor_hidden_dims[0])) actor_layers.append(activation) for l in range(len(actor_hidden_dims)): if l == len(actor_hidden_dims) - 1: actor_layers.append(nn.Linear(actor_hidden_dims[l], num_actions)) else: actor_layers.append(nn.Linear(actor_hidden_dims[l], actor_hidden_dims[l + 1])) actor_layers.append(activation) self.actor = nn.Sequential(*actor_layers)

再比如这是对value的构建

 # Value function critic_layers = [] critic_layers.append(nn.Linear(mlp_input_dim_c, critic_hidden_dims[0])) critic_layers.append(activation) for l in range(len(critic_hidden_dims)): if l == len(critic_hidden_dims) - 1: critic_layers.append(nn.Linear(critic_hidden_dims[l], 1)) else: critic_layers.append(nn.Linear(critic_hidden_dims[l], critic_hidden_dims[l + 1])) critic_layers.append(activation) self.critic = nn.Sequential(*critic_layers) print(f"Actor MLP: {self.actor}") print(f"Critic MLP: {self.critic}")

演员网络的输出维度等于动作空间的维度,而评论家网络的输出维度固定为1,表示对当前状态的价值评估

构造函数首先检查是否有末预期的参数传入,并打印警告信息。然后,它调用get_activation 函数来获取指定的激活函数

 if kwargs: print("ActorCritic.__init__ got unexpected arguments, which will be ignored: " + str([key for key in kwargs.keys()])) super(ActorCritic, self).__init__() activation = get_activation(activation)

构造函数_-init--- 接收多个参数,包括观察空问的维度(分别为演员和评论家)、动作空问的维度、隐藏层的维度、激活函数类型以及初始噪声标淮差

class ActorCritic(nn.Module): is_recurrent = False def __init__(self, num_actor_obs, num_critic_obs, num_actions, actor_hidden_dims=[256, 256, 256], critic_hidden_dims=[256, 256, 256], activation='elu', init_noise_std=1.0, **kwargs):

类中还定义了几个方法,包括reset(重置状态,当前为空实现)、forward(抽象方法,未实现)、update_distribution(根据当前观察更新动作分布)、act (根据当前观察采取动作)、get_actions_1og-prob(计算动作的对数概率)、 act-inference(推断模式下的动作选择,不包含噪声)和evaluate(评估给定观察的价值)

最后,get_activation 函数根据传入的激活函数名称返回对应的PyTorch激活函数对象,如果传入的名称无效,则打印错误信息井返回None

1.3.5 modules/depth_backbone.py

1.3.6 modules/normalizer.py

1.4 rsl_rl/runners

// 待更

Read more

GraphQL在Python中的完整实现:从基础到企业级实战

GraphQL在Python中的完整实现:从基础到企业级实战

目录 摘要 1 引言:为什么GraphQL是API设计的未来 1.1 GraphQL的核心价值定位 1.2 GraphQL技术演进路线图 2 GraphQL核心技术原理深度解析 2.1 Schema定义语言与类型系统 2.1.1 Schema定义原则 2.1.2 类型系统架构 2.2 Resolver解析机制深度解析 2.2.1 Resolver执行模型 2.2.2 Resolver执行流程 2.3 Strawberry vs Graphene框架深度对比 2.3.1 架构设计哲学对比 2.3.2 框架选择决策树 3 实战部分:

By Ne0inhk
C++学习之旅【C++伸展树介绍以及红黑树的实现】

C++学习之旅【C++伸展树介绍以及红黑树的实现】

🔥承渊政道:个人主页 ❄️个人专栏: 《C语言基础语法知识》《数据结构与算法》 《C++知识内容》《Linux系统知识》 ✨逆境不吐心中苦,顺境不忘来时路!🎬 博主简介: 引言:前篇文章,小编已经介绍了关于C++AVL树的实现!相信大家应该有所收获!接下来我将带领大家继续深入学习C++的相关内容!本篇文章着重介绍关于C++伸展树介绍以及红黑树的实现!伸展树与红黑树是两类极具代表性的BBST,且在工程实践中各有不可替代的价值:伸展树摒弃了"严格平衡”的执念,通过“伸展”操作将最近访问的节点移至根节点,利用“局部性原理”优化频繁访问的场景,实现均摊O(logn)的时间复杂度,适合缓存、热点数据查询等场景;红黑树则通过给节点着色并遵守严格的颜色规则,确保树的最长路径不超过最短路径的两倍,以 “弱平衡” 换稳定的最坏O(logn)性能,是C++ STL 中 std::map、std:

By Ne0inhk
C++之unordered_set和unordered_map

C++之unordered_set和unordered_map

一、unordered系列关联式容器  在C++98中,STL提供了底层为红黑树结构的一系列关联式容器,在查询时效率可达到log_2 N,即最差情况下需要比较红黑树的高度次,当树中的节点非常多时,查询效率也不理想。最好 的查询是,进行很少的比较次数就能够将元素找到,因此在C++11中,STL又提供了4个 unordered系列的关联式容器,这四个容器与红黑树结构的关联式容器使用方式基本类似,只是 其底层结构不同 1.1 unordered_map  1.2 unordered_map的文档介绍 1. unordered_map是存储<key, value>键值对的关联式容器,其允许通过keys快速的索引到与 其对应的value。 2. 在unordered_map中,键值通常用于惟一地标识元素,而映射值是一个对象,其内容与此 键关联。键和映射值的类型可能不同。 3. 在内部, unordered_

By Ne0inhk

YOLOv8 C++部署实战:高性能推理引擎实现

YOLOv8 C++部署实战:高性能推理引擎实现 在智能安防摄像头实时检测行人、工业质检流水线上自动识别缺陷产品,或是自动驾驶车辆感知周围环境的瞬间——这些对响应速度要求极高的场景中,一个高效的视觉推理系统往往决定了整个项目的成败。而在这背后,YOLOv8 正成为越来越多工程师的首选目标检测模型。 但问题也随之而来:Python 训练出的模型虽然开发便捷,却难以满足生产环境中“低延迟、高吞吐”的硬性指标。解释器开销、GIL 锁限制、内存占用高等问题,在边缘设备或服务器集群上尤为突出。于是,将 YOLOv8 模型从 PyTorch 导出,并用 C++ 构建原生推理引擎,便成了通往真正落地的关键一步。 这不仅是一次语言层面的迁移,更是一场性能与可控性的全面升级。 为什么是 YOLOv8? YOLO 系列自诞生以来就以“单次前向传播完成检测”著称,尤其适合需要实时处理的应用。而 YOLOv8 作为 Ultralytics 推出的新一代版本,在保持高精度的同时进一步优化了架构设计和训练流程。 它支持多种任务类型—

By Ne0inhk