【MADRL】多智能体近端策略优化(MAPPO)算法

【MADRL】多智能体近端策略优化(MAPPO)算法
        本篇文章是博主强化学习RL领域学习时,用于个人学习、研究或者欣赏使用,并基于博主对相关等领域的一些理解而记录的学习摘录和笔记,若有不当和侵权之处,指出后将会立即改正,还望谅解。文章分类在强化学习专栏:

       强化学习(8)---《【MADRL】多智能体近端策略优化(MAPPO)算法》

【MADRL】多智能体近端策略优化(MAPPO)算法

目录

0.前言

1.背景与动机

2.算法结构

3.具体公式

4.算法流程

5.公式总结

6.优势与应用场景

7.结论

 [Python] MAPPO实现(可移植)


0.前言

       多智能体近端策略优化算法 MAPPO(Multi-Agent Proximal Policy Optimization)是PPO(Proximal Policy Optimization)在多智能体环境中的一种扩展,它通过在多智能体系统中引入PPO的策略优化机制,实现了在协作和竞争环境中更加高效的策略学习。MAPPO是一种基于策略梯度的多智能体强化学习算法,特别适用于混合协作和竞争的多智能体场景。

论文:The Surprising Effectiveness of PPO in Cooperative, Multi-Agent Games

代码:MADRL多智能体近端策略优化(MAPPO)算法
 


1.背景与动机

        PPO 是近年来最流行的强化学习算法之一,它通过引入裁剪的策略更新,解决了传统策略梯度方法(如TRPO)中策略更新步长过大导致训练不稳定的问题。在多智能体环境中,多个智能体同时学习策略,每个智能体的行为会影响其他智能体的决策,因此需要一个鲁棒且稳定的策略优化方法。MAPPO通过中心化的Critic和去中心化的Actor来实现多智能体的协同训练,并采用PPO的优势来提高多智能体环境下的学习效率和稳定性。


2.算法结构

        MAPPO继承了PPO的核心思想,并结合了多智能体系统的特点,采用了集中式训练,分布式执行的架构:

  • 集中式训练:在训练阶段,所有智能体的Critic网络都能够访问全局的状态和其他智能体的动作信息,以便于学习到更准确的价值函数。
  • 分布式执行:在执行阶段,每个智能体只使用自己观测到的局部状态和策略进行动作选择,保证了系统的分布式控制。

3.具体公式

        MAPPO算法主要分为两部分:策略更新价值函数估计。我们将分别介绍这两部分的公式。

优势函数的计算:优势函数

( \hat{A}t^i )

用于衡量某个动作

( a_t )

相对于当前策略下平均动作的优劣程度。优势函数可以通过以下公式估计:

[ \hat{A}t^i = \delta_t + (\gamma \lambda) \delta{t+1} + ... + (\gamma \lambda)^{T-t+1} \delta{T-1} ]


其中

( \delta_t )

是时间差分误差,定义为:

[ \delta_t = r_t + \gamma V(s_{t+1}) - V(s_t) ]


这里

( \lambda )

是GAE(Generalized Advantage Estimation)中的权重参数,用于平衡偏差和方差。

价值函数估计:每个智能体

( i )

的价值函数

( V_i(s) )

由一个中心化的Critic网络估计。Critic网络使用全局状态

( s )

和所有智能体的动作

( a_1, a_2, ..., a_N )

来估计全局的价值函数。MAPPO通过使用中心化Critic保证每个智能体在训练过程中可以考虑到其他智能体的策略,从而学到更有效的策略。Critic的目标是最小化均方误差(MSE)损失函数:

[ L(\phi_i) = \mathbb{E}{s_t, r_t, s{t+1}} \left[ \left( V_i(s_t; \phi_i) - R_t \right)^2 \right] ]


其中,

( R_t )

是从当前时刻

( t )

到未来的累计回报,通常通过时间差分法(TD目标)进行估计:

[ R_t = r_t + \gamma V_i(s_{t+1}; \phi'_i) ]


其中,

( \gamma )

是折扣因子,

( \phi'_i )

是目标网络的参数。

( \epsilon )

是裁剪的阈值,用于控制策略更新的幅度。

( \hat{A}_t )

是优势函数的估计值,用于衡量动作

( a_t )

在状态

( s_t )

下的优势;

( r_t(\theta) = \frac{\pi_{\theta}(a_t | s_t)}{\pi_{\theta_{\text{old}}}(a_t | s_t)} )

是当前策略与旧策略的比率;

策略更新:        在PPO中,策略更新的目标是通过最大化策略的期望回报

( \mathbb{E}_{\pi} [R] )

来更新策略。然而,直接使用策略梯度更新容易导致大的策略变化。因此,PPO引入了一个裁剪目标函数来限制每次更新的策略变化幅度。MAPPO也遵循相同的原则,但应用在每个智能体 ( i ) 的策略上。PPO的目标函数为:

[ L^{CLIP}(\theta) = \mathbb{E}_t \left[ \min \left( r_t(\theta) \hat{A}_t, \text{clip}(r_t(\theta), 1 - \epsilon, 1 + \epsilon) \hat{A}_t \right) \right] ]


其中:        MAPPO中的每个智能体

( i )

采用类似的目标函数进行策略更新,但每个智能体的策略仅依赖于自己的观测

( o_i )

,即:

[ L^{CLIP}_i(\theta_i) = \mathbb{E}_t \left[ \min \left( r_t(\theta_i) \hat{A}_t^i, \text{clip}(r_t(\theta_i), 1 - \epsilon, 1 + \epsilon) \hat{A}_t^i \right) \right] ]


其中

( r_t(\theta_i) )

是智能体

( i )

的策略更新比率,

( \hat{A}_t^i )

是智能体

( i )

的优势估计值。


4.算法流程

  1. 交互与经验收集:每个智能体根据当前策略与环境交互,并存储每个时间步的状态、动作、奖励、下一状态、价值等信息。
  2. 更新Critic网络:根据Critic损失函数更新每个智能体的Critic网络参数,最小化均方误差。
  3. 更新Actor网络:根据裁剪的PPO目标函数,使用策略梯度法更新每个智能体的策略网络参数。
  4. 软更新目标网络:使用软更新机制逐步更新目标网络的参数。
  5. 重复:循环进行以上步骤,直到智能体策略达到收敛。

计算回报和优势:通过时间差分法计算每个智能体的回报

( R_t )

和优势函数

( \hat{A}_t^i )

初始化:为每个智能体初始化策略网络

( \pi_i )

和Critic网络

( V_i )

,并初始化对应的目标网络。


5.公式总结

优势函数估计

[ \hat{A}t^i = \sum{l=0}^{T-t} (\gamma \lambda)^l \delta_{t+l} ]

Critic网络损失函数

[ L(\phi_i) = \mathbb{E}{s_t, r_t, s{t+1}} \left[ \left( V_i(s_t; \phi_i) - R_t \right)^2 \right] ]

策略更新目标

[ L^{CLIP}_i(\theta_i) = \mathbb{E}_t \left[ \min \left( r_t(\theta_i) \hat{A}_t^i, \text{clip}(r_t(\theta_i), 1 - \epsilon, 1 + \epsilon) \hat{A}_t^i \right) \right] ]

6.优势与应用场景

  • 鲁棒性与稳定性:PPO算法引入的裁剪更新机制使策略梯度更新更加稳定,避免了更新幅度过大的问题。MAPPO继承了这一优势,能够在多智能体环境中提供更稳定的策略更新。
  • 集中式训练与分布式执行:通过中心化的Critic结构,智能体可以利用全局信息进行策略训练,而去中心化的Actor使得智能体在执行过程中只依赖局部观测,提高了算法的灵活性和扩展性。
  • 适应复杂的多智能体环境:MAPPO可以处理多智能体环境中的协作、竞争或混合型任务,非常适用于复杂的多智能体系统,如机器人集群、自动驾驶车队、多人游戏等。

7.结论

        MAPPO是对PPO算法的多智能体扩展,采用了中心化的Critic和去中心化的Actor结构,能够在多智能体环境中提供稳定、高效的策略优化。通过PPO的裁剪更新机制,MAPPO在策略更新过程中保持了良好的收敛性和鲁棒性,是当前研究和应用中广泛使用的算法之一。


 [Python] MAPPO实现(可移植)

        若是下面代码复现困难或者有问题,欢迎评论区留言;需要以整个项目形式的代码,请在评论区留下您的邮箱,以便于及时分享给您(私信难以及时回复)。

主文件:MAPPO_MPE_main

import torch import numpy as np from torch.utils.tensorboard import SummaryWriter import argparse from normalization import Normalization, RewardScaling from replay_buffer import ReplayBuffer from mappo_mpe import MAPPO_MPE from environment import Env class Runner_MAPPO_MPE: def __init__(self, args, env_name, number, seed): self.args = args self.env_name = env_name self.number = number self.seed = seed # Set random seed np.random.seed(self.seed) torch.manual_seed(self.seed) # Create env self.env = Env(env_name, discrete=True) # Discrete action space self.args.N = self.env.n # The number of agents self.args.obs_dim_n = [self.env.observation_space[i].shape[0] for i in range(self.args.N)] # obs dimensions of N agents self.args.action_dim_n = [self.env.action_space[i].n for i in range(self.args.N)] # actions dimensions of N agents # Only for homogenous agents environments like Spread in MPE,all agents have the same dimension of observation space and action space self.args.obs_dim = self.args.obs_dim_n[0] # The dimensions of an agent's observation space self.args.action_dim = self.args.action_dim_n[0] # The dimensions of an agent's action space self.args.state_dim = np.sum(self.args.obs_dim_n) # The dimensions of global state space(Sum of the dimensions of the local observation space of all agents) print("observation_space=", self.env.observation_space) print("obs_dim_n={}".format(self.args.obs_dim_n)) print("action_space=", self.env.action_space) print("action_dim_n={}".format(self.args.action_dim_n)) # Create N agents self.agent_n = MAPPO_MPE(self.args) self.replay_buffer = ReplayBuffer(self.args) # Create a tensorboard self.writer = SummaryWriter(log_dir='runs/MAPPO/MAPPO_env_{}_number_{}_seed_{}'.format(self.env_name, self.number, self.seed)) self.evaluate_rewards = [] # Record the rewards during the evaluating self.total_steps = 0 if self.args.use_reward_norm: print("------use reward norm------") self.reward_norm = Normalization(shape=self.args.N) elif self.args.use_reward_scaling: print("------use reward scaling------") self.reward_scaling = RewardScaling(shape=self.args.N, gamma=self.args.gamma) def run(self, ): evaluate_num = -1 # Record the number of evaluations while self.total_steps < self.args.max_train_steps: if self.total_steps // self.args.evaluate_freq > evaluate_num: self.evaluate_policy() # Evaluate the policy every 'evaluate_freq' steps evaluate_num += 1 _, episode_steps = self.run_episode_mpe(evaluate=False) # Run an episode self.total_steps += episode_steps if self.replay_buffer.episode_num == self.args.batch_size: self.agent_n.train(self.replay_buffer, self.total_steps) # Training self.replay_buffer.reset_buffer() self.evaluate_policy() self.env.close() def evaluate_policy(self, ): evaluate_reward = 0 for _ in range(self.args.evaluate_times): episode_reward, _ = self.run_episode_mpe(evaluate=True) evaluate_reward += episode_reward evaluate_reward = evaluate_reward / self.args.evaluate_times self.evaluate_rewards.append(evaluate_reward) print("total_steps:{} \t evaluate_reward:{}".format(self.total_steps, evaluate_reward)) self.writer.add_scalar('evaluate_step_rewards_{}'.format(self.env_name), evaluate_reward, global_step=self.total_steps) # Save the rewards and models np.save('./data_train/MAPPO_env_{}_number_{}_seed_{}.npy'.format(self.env_name, self.number, self.seed), np.array(self.evaluate_rewards)) self.agent_n.save_model(self.env_name, self.number, self.seed, self.total_steps) def run_episode_mpe(self, evaluate=False): episode_reward = 0 obs_n = self.env.reset() if self.args.use_reward_scaling: self.reward_scaling.reset() if self.args.use_rnn: # If use RNN, before the beginning of each episode,reset the rnn_hidden of the Q network. self.agent_n.actor.rnn_hidden = None self.agent_n.critic.rnn_hidden = None for episode_step in range(self.args.episode_limit): a_n, a_logprob_n = self.agent_n.choose_action(obs_n, evaluate=evaluate) # Get actions and the corresponding log probabilities of N agents s = np.array(obs_n).flatten() # In MPE, global state is the concatenation of all agents' local obs. v_n = self.agent_n.get_value(s) # Get the state values (V(s)) of N agents obs_next_n, r_n, done_n, _ = self.env.step(a_n) episode_reward += r_n[0] if not evaluate: if self.args.use_reward_norm: r_n = self.reward_norm(r_n) elif args.use_reward_scaling: r_n = self.reward_scaling(r_n) # Store the transition self.replay_buffer.store_transition(episode_step, obs_n, s, v_n, a_n, a_logprob_n, r_n, done_n) obs_n = obs_next_n if all(done_n): break if not evaluate: # An episode is over, store v_n in the last step s = np.array(obs_n).flatten() v_n = self.agent_n.get_value(s) self.replay_buffer.store_last_value(episode_step + 1, v_n) return episode_reward, episode_step + 1 if __name__ == '__main__': parser = argparse.ArgumentParser("Hyperparameters Setting for MAPPO in MPE environment") parser.add_argument("--max_train_steps", type=int, default=int(3e6), help=" Maximum number of training steps") parser.add_argument("--episode_limit", type=int, default=25, help="Maximum number of steps per episode") parser.add_argument("--evaluate_freq", type=float, default=5000, help="Evaluate the policy every 'evaluate_freq' steps") parser.add_argument("--evaluate_times", type=float, default=3, help="Evaluate times") parser.add_argument("--batch_size", type=int, default=32, help="Batch size (the number of episodes)") parser.add_argument("--mini_batch_size", type=int, default=8, help="Minibatch size (the number of episodes)") parser.add_argument("--rnn_hidden_dim", type=int, default=64, help="The number of neurons in hidden layers of the rnn") parser.add_argument("--mlp_hidden_dim", type=int, default=64, help="The number of neurons in hidden layers of the mlp") parser.add_argument("--lr", type=float, default=5e-4, help="Learning rate") parser.add_argument("--gamma", type=float, default=0.99, help="Discount factor") parser.add_argument("--lamda", type=float, default=0.95, help="GAE parameter") parser.add_argument("--epsilon", type=float, default=0.2, help="GAE parameter") parser.add_argument("--K_epochs", type=int, default=15, help="GAE parameter") parser.add_argument("--use_adv_norm", type=bool, default=True, help="Trick 1:advantage normalization") parser.add_argument("--use_reward_norm", type=bool, default=True, help="Trick 3:reward normalization") parser.add_argument("--use_reward_scaling", type=bool, default=False, help="Trick 4:reward scaling. Here, we do not use it.") parser.add_argument("--entropy_coef", type=float, default=0.01, help="Trick 5: policy entropy") parser.add_argument("--use_lr_decay", type=bool, default=True, help="Trick 6:learning rate Decay") parser.add_argument("--use_grad_clip", type=bool, default=True, help="Trick 7: Gradient clip") parser.add_argument("--use_orthogonal_init", type=bool, default=True, help="Trick 8: orthogonal initialization") parser.add_argument("--set_adam_eps", type=float, default=True, help="Trick 9: set Adam epsilon=1e-5") parser.add_argument("--use_relu", type=float, default=False, help="Whether to use relu, if False, we will use tanh") parser.add_argument("--use_rnn", type=bool, default=False, help="Whether to use RNN") parser.add_argument("--add_agent_id", type=float, default=False, help="Whether to add agent_id. Here, we do not use it.") parser.add_argument("--use_value_clip", type=float, default=False, help="Whether to use value clip.") args = parser.parse_args() runner = Runner_MAPPO_MPE(args, env_name="simple_spread", number=1, seed=0) runner.run() 

移植事项:

1.注意环境参数的设置格式

2.注意环境的返回值利用

3.注意主运行流程的runner.run()的相关设置,等

可借鉴:【MADRL】基于MADRL的单调价值函数分解(QMIX)算法​​​​​​ 中关于 QMIX算法移植的注意事项和代码注释。


     文章若有不当和不正确之处,还望理解与指出。由于部分文字、图片等来源于互联网,无法核实真实出处,如涉及相关争议,请联系博主删除。如有错误、疑问和侵权,欢迎评论留言联系作者,或者关注VX公众号:Rain21321,联系作者。

Read more

Flutter for OpenHarmony: Flutter 三方库 dart_style 像官方一样统一你的鸿蒙代码格式(代码美化神器)

Flutter for OpenHarmony: Flutter 三方库 dart_style 像官方一样统一你的鸿蒙代码格式(代码美化神器)

欢迎加入开源鸿蒙跨平台社区:https://openharmonycrossplatform.ZEEKLOG.net 前言 在 OpenHarmony 项目开发中,不论是个人的“心血之作”还是团队协作的“巨无霸”工程,代码的可读性是维护成本的生命线。每个人都有自己的编码习惯:有人喜欢紧凑型,有人喜欢在大括号前后留白。如果代码格式没有统一的标准,代码提交(Git Merge)时的差异对比将是一场灾难。 dart_style(其核心命令即 dart format)是 Dart 语言官方出品的格式化引擎。它通过一套被全球 Dart 开发者公认的算法,强制将你的源码重新排版为最标准、最易读的形态。 一、核心排版逻辑 dart_style 采用“行长度优先”的排版权重算法。 计算行长 修正空白 杂乱的源码 dart_style 解析器 折行与对齐策略

By Ne0inhk
Flutter 组件 shelf_static 的适配 鸿蒙Harmony 实战 - 驾驭极致静态资源分发、实现鸿蒙端文件服务器缓存策略与资产审计方案

Flutter 组件 shelf_static 的适配 鸿蒙Harmony 实战 - 驾驭极致静态资源分发、实现鸿蒙端文件服务器缓存策略与资产审计方案

欢迎加入开源鸿蒙跨平台社区:https://openharmonycrossplatform.ZEEKLOG.net Flutter 组件 shelf_static 的适配 鸿蒙Harmony 实战 - 驾驭极致静态资源分发、实现鸿蒙端文件服务器缓存策略与资产审计方案 前言 在鸿蒙(OpenHarmony)生态的分布式离线静态文档系统、内嵌 H5 业务容器中台以及需要为局域网成员提供高性能资产分发的各种垂直类应用开发中,“静态资源的高速投递与安全性管控”是应用响应质量的基石。面对包含数千张高密度解析图纸、复杂的 Web 前端资产包或者是需要对接 0307 批次资产安全标准的各类文档。如果仅仅依靠原始的 File.readAsBytes() 配合手写 HTTP 头返回。那么不仅会导致在鸿蒙端产生严重的内存拷贝开销,更会因为无法实现对 Etag 缓存校验、范围请求(Range Request)等现代 Web 协议的精确支配。引发鸿蒙系统应用在加载大型资产时的严重卡顿。 我们需要一种“物理对齐、协议自洽”

By Ne0inhk
Flutter for OpenHarmony:blurhash_dart 优雅的图片加载占位符(提升视觉体验的黑科技) 深度解析与鸿蒙适配指南

Flutter for OpenHarmony:blurhash_dart 优雅的图片加载占位符(提升视觉体验的黑科技) 深度解析与鸿蒙适配指南

欢迎加入开源鸿蒙跨平台社区:https://openharmonycrossplatform.ZEEKLOG.net 前言 在移动应用中,图片加载是一个关键的体验点。网络环境不佳时,图片区域长时间显示白屏或灰底,用户体验非常割裂。 传统的做法是放一个 Loading 转圈或固定的占位图,但这种方式依然比较生硬。 BlurHash 是一种革命性的占位符技术。它将图片压缩成一段只有二三十个字符的短字符串。客户端只需要这段字符串,就能瞬间(< 1ms)在本地解码并渲染出一个模糊但色调与原图一致的占位图。 blurhash_dart 是该算法的 Dart 纯实现版本。对于 OpenHarmony 应用,这意味着你可以在不增加太多带宽成本的情况下,实现如丝般顺滑的各种图片加载过渡效果。 一、核心原理与效果 1.1 什么是 BlurHash? BlurHash 算法基于 离散余弦变换 (DCT),类似于 JPEG 的压缩原理,但它只保留最低频的颜色信息(Base83 编码)。 * 输入:

By Ne0inhk
Flutter for OpenHarmony: Flutter 三方库 week_of_year 为鸿蒙应用提供精准的年度周数统计与业务分析支持(日历计算专家)

Flutter for OpenHarmony: Flutter 三方库 week_of_year 为鸿蒙应用提供精准的年度周数统计与业务分析支持(日历计算专家)

欢迎加入开源鸿蒙跨平台社区:https://openharmonycrossplatform.ZEEKLOG.net 前言 在进行 OpenHarmony 的办公自动化(OA)、排班管理或财务统计应用开发时,我们经常需要处理“周”的概念。 1. 周报提交:今天是今年的第几周? 2. 生产计划:第 15 周需要完成哪些鸿蒙节点的部署? 3. 数据报表:按周对鸿蒙设备的运行状态进行汇总。 虽然 Dart 的 DateTime 类非常强大,但它并没有原生支持“获取当前是第几周”。week_of_year 软件包通过对 DateTime 对象的精简扩展,让你能一行代码获取 ISO-8601 标准的周数。 一、周数计算逻辑模型 符合国际标准(ISO-8601)的周数计算,通常将包含一年中第一个周四的那一周定为第 1 周。 DateTime

By Ne0inhk