跳到主要内容 强化学习基础:动态规划算法原理与实现 | 极客日志
Python AI 算法
强化学习基础:动态规划算法原理与实现 介绍强化学习中动态规划的核心概念,包括马尔可夫决策过程、价值函数及贝尔曼方程。详细阐述了策略评估、策略迭代和价值迭代算法的数学原理与步骤,并通过 3x3 网格世界的 Python 代码示例演示了具体实现与收敛验证。
PhpPioneer 发布于 2026/3/22 更新于 2026/4/18 3 浏览强化学习数学原理详解:从动态规划开始
第一部分:基础数学概念
1.1 马尔可夫决策过程(MDP)
一个马尔可夫决策过程由五元组构成:$\text{MDP} = (S, A, P, R, \gamma)$,其中:
$S$:状态空间(有限或无限集合)
$A$:动作空间(有限或无限集合)
$P$:状态转移概率,$P(s' \mid s,a) = \text{Pr}(S_{t+1}=s' \mid S_t=s, A_t=a)$
$R$:奖励函数,$R(s,a,s') = \mathbb{E}(R_{t+1} \mid S_t=s, A_t=a, S_{t+1}=s')$
$\gamma$:折扣因子,$0 \le \gamma < 1$
这里的状态转移概率是一个比较反直觉的点。理论上来说从一个状态出发做出一个动作,下一时刻的状态似乎是确定的,然而在 MDP 中通常涉及概率分布。
1.2 策略(Policy) 策略 $\pi$ 是状态到动作概率分布的映射:
$$\pi(a \mid s) = \text{Pr}(A_t=a \mid S_t=s)$$
第二部分:价值函数与贝尔曼方程
2.1 回报(Return) 从时间 $t$ 开始的累积折扣奖励:
$$G_t = R_{t+1} + \gamma R_{t+2} + \gamma^2 R_{t+3} + \cdots = \sum_{k=0}^{\infty} \gamma^k R_{t+k+1}$$
2.2 状态价值函数(State-Value Function) 在策略 $\pi$ 下,状态 $s$ 的价值是期望回报:
$$V^\pi(s) = \mathbb{E}_\pi(G_t \mid S_t = s)$$
2.3 动作价值函数(Action-Value Function) 在状态 $s$ 执行动作 $a$ 后遵循策略 $\pi$ 的期望回报:
$$Q^\pi(s,a) = \mathbb{E}_\pi(G_t \mid S_t = s, A_t = a)$$
2.4 贝尔曼期望方程(Bellman Expectation Equation)
对于 $V^\pi$: $$V^\pi(s) = \sum_{a \in A} \pi(a \mid s) \sum_{s' \in S} P(s' \mid s,a) \left[ R(s,a,s') + \gamma V^\pi(s') \right]$$
推导 :
从定义出发:$V^\pi(s) = \mathbb{E}\pi(G_t \mid S_t = s)$。
展开 $G_t = R {t+1} + \gamma G_{t+1}$,代入得:
$$V^\pi(s) = \mathbb{E}\pi(R {t+1} + \gamma G_{t+1} \mid S_t = s)$$
利用期望线性性拆分:
$$V^\pi(s) = \mathbb{E}\pi(R {t+1} \mid S_t = s) + \gamma \mathbb{E}\pi(G {t+1} \mid S_t = s)$$
第一项为期望即时奖励 :
$$\mathbb{E}\pi(R {t+1} \mid S_t = s) = \sum_{a} \pi(a \mid s) \sum_{s'} P(s' \mid s,a) R(s,a,s')$$
第二项利用马尔可夫性转换为下一状态价值:
$$\mathbb{E}\pi\left[G {t+1} \mid S_t = s\right] = \sum_{a} \pi(a \mid s) \sum_{s'} P(s' \mid s,a) V^\pi(s') = \mathbb{E}\left[V^\pi(S_{t+1}) \mid S_t = s\right]$$
合并即得贝尔曼期望方程。
贝尔曼期望方程用一句话来讲那就是描述了不同状态的状态价值之间的关系。
这里对贝尔曼期望方程做一个直观理解:贝尔曼期望方程即状态价值函数是某个状态下的期望回报。第一项表达的是从这个状态出发能够拿到的即时回报,具体而言,从这个状态出发,在不同的策略下将会执行不同的动作,执行不同的动作会到达不同的状态(即使从相同的状态出发,执行相同的动作,到达的状态也可能会不同)。结合期望的定义,我们可以很容易理解即时回报就是考虑在这个状态下能够做出所有动作,然后考虑在这个动作下会到达的状态及其对应的奖励,相乘起来就得到了即时回报。
第二项则也同样好理解,从某个状态出发,考虑在这个状态下能够做出所有动作,然后考虑在这个动作下会到达的状态及其在这个状态下的状态价值函数,这个就是未来回报。
注:上面的贝尔曼期望方程,其实是贝尔曼方程的另一种表达,在学习 TD learning 时会看到这种表示形式,大家一般看见的贝尔曼方程的形式:
$$V^\pi(s) = \sum_{a \in A} \pi(a \mid s) \left[ \sum_{r \in R} p(r \mid s,a) r + \gamma \sum_{s' \in S} P(s' \mid s,a) V^\pi(s') \right]$$
二者其实是等价的。
对于 $Q^\pi$: $$Q^\pi(s,a) = \sum_{s' \in S} P(s' \mid s,a) \left[ R(s,a,s') + \gamma \sum_{a' \in A} \pi(a' \mid s') Q^\pi(s',a') \right]$$
动作价值函数与状态价值函数的关系:
$$V^\pi(s) = \sum_{a \in A} \pi(a \mid s) \cdot Q^\pi(s,a)$$
第三部分:动态规划算法
3.1 策略评估(Policy Evaluation) 问题 :给定策略 $\pi$,计算其对应的状态价值函数 $V^\pi$。
方法 :迭代应用贝尔曼期望方程。
算法步骤 :
初始化 $V_0(s)$ 为任意值(通常为 0)。
对于 $k=0,1,2,\ldots$,执行迭代更新:
$$V_{k+1}(s) = \sum_{a \in A} \pi(a \mid s) \sum_{s' \in S} P(s' \mid s,a) \left[ R(s,a,s') + \gamma V_k(s') \right]$$
当 $\max_s |V_{k+1}(s) - V_k(s)| < \theta$($\theta$ 为收敛阈值)时停止。
收敛性 :贝尔曼期望算子是 $\gamma$-收缩映射,必收敛到唯一不动点 $V^\pi$。
3.2 策略改进(Policy Improvement) 问题 :给定价值函数 $V^\pi$,找到更优的策略 $\pi'$。
核心定理 (策略改进定理):
对于任意两个确定性策略 $\pi$ 和 $\pi'$,若对所有状态 $s$ 满足:
$$Q^\pi(s, \pi'(s)) \ge V^\pi(s)$$
则对所有状态 $s$ 有:
$$V^{\pi'}(s) \ge V^\pi(s)$$
3.3 策略迭代(Policy Iteration)
初始化随机策略 $\pi_0$。
对于 $i=0,1,2,\ldots$:
a. 策略评估 :计算 $V^{\pi_i}$。
b. 策略改进 :通过贪心策略更新得到 $\pi_{i+1}$:
$$\pi_{i+1}(s) = \arg\max_{a \in A} \sum_{s' \in S} P(s' \mid s,a) \left[ R(s,a,s') + \gamma V^{\pi_i}(s') \right]$$
即
$$\pi_{i+1}(s) = \arg\max_{a\in A} Q^{\pi_i}(s,a)$$
c. 若 $\pi_{i+1} = \pi_i$,停止迭代,得到最优策略 $\pi^*$。
即:对每个状态,选择能够最大化动作价值的动作。
算法流程:先定策略,再评估,再改进 。
策略评估部分在数学上求解需要求解方程组(或者矩阵计算求逆),在工程上的大规模问题这种方式不方便求解,在工程上一般使用迭代算法 来求解。
3.3.1 策略评估(求解当前策略的 $V^\pi$)
初始化:$V_0(s) = 0, \forall s \in S$
迭代更新:$V_{k+1}(s) = \sum_a \pi(a|s) \sum_{s'} P(s'|s,a) \left( R(s,a,s') + \gamma V_k(s') \right)$
收敛终止:$\max_s |V_{k+1}(s) - V_k(s)| < \theta \quad \Rightarrow \quad V^\pi = V_k$
3.3.2 策略改进(构造更优策略 $\pi'$)
计算动作价值:$Q^\pi(s,a) = \sum_{s'} P(s'|s,a) \left( R(s,a,s') + \gamma V^\pi(s') \right)$
贪心更新策略:$\pi'(s) = \arg\max_a Q^\pi(s,a)$
3.3.3 收敛判断 若 $\pi' = \pi$,则收敛到最优策略 $\pi^* = \pi$;否则令 $\pi = \pi'$,回到「策略评估」继续迭代。
3.4 价值迭代(Value Iteration) 核心思想 :直接迭代最优价值函数,将策略评估的迭代次数简化为 1 次。
算法步骤 :
初始化 $V_0(s)$ 为任意值(通常为 0)。
对于 $k=0,1,2,\ldots$,执行迭代更新:
$$V_{k+1}(s) = \max_{a \in A} \sum_{s' \in S} P(s' \mid s,a) \left[ R(s,a,s') + \gamma V_k(s') \right]$$
当 $\max_s |V_{k+1}(s) - V_k(s)| < \theta$ 时停止。
提取最优策略 :
$$\pi^(s) = \arg\max_{a \in A} \sum_{s' \in S} P(s' \mid s,a) \left[ R(s,a,s') + \gamma V^ (s') \right]$$
第四部分:具体示例与计算
4.1 3×3 网格世界示例
状态:9 个格子(编号 0~8,右下角 8 为目标)
动作:上、下、左、右(4 个动作,撞墙时状态不变)
转移:确定性转移($P(s' \mid s,a) = 1$ 或 0)
奖励:普通移动 -1,到达目标 +10,目标状态 0
折扣因子:$\gamma = 0.9$
4.1.1 策略评估计算(随机策略) 以角落状态 $s_{00}$(编号 0)为例,随机策略下每个动作概率 0.25:
$$\begin{align*} V^\pi(s_{00}) &= 0.25 \times [-1 + 0.9V^\pi(s_{00})] + 0.25 \times [-1 + 0.9V^\pi(s_{10})] \ &+ 0.25 \times [-1 + 0.9V^\pi(s_{01})] + 0.25 \times [-1 + 0.9V^\pi(s_{00})] \end{align*}$$
简化得:
$$V^\pi(s_{00}) = \frac{-1 + 0.225V^\pi(s_{10}) + 0.225V^\pi(s_{01})}{0.55}$$
4.1.2 价值迭代计算(中心状态 $s_{11}$)
第一次迭代:$V_1(s_{11}) = \max(-1, -1, -1, -1) = -1$
第二次迭代:$V_2(s_{11}) = \max(-1 + 0.9 \times (-1), \dots) = -1.9$
第五部分:代码实现(Python)
5.1 策略迭代代码 import numpy as np
class GridWorldMDP :
"""3×3 网格世界 MDP 实现"""
def __init__ (self, size=3 , goal_reward=10 , step_cost=-1 , gamma=0.9 ):
self .size = size
self .n_states = size * size
self .n_actions = 4
self .goal_reward = goal_reward
self .step_cost = step_cost
self .gamma = gamma
self .P = np.zeros((self .n_states, self .n_actions, self .n_states))
self .R = np.zeros((self .n_states, self .n_actions, self .n_states))
self ._build_transitions()
def _build_transitions (self ):
"""构建确定性转移和奖励函数"""
goal_state = self .n_states - 1
for s in range (self .n_states):
row, col = divmod (s, self .size)
for a in range (self .n_actions):
if a == 0 :
next_row, next_col = max (row-1 , 0 ), col
elif a == 1 :
next_row, next_col = min (row+1 , self .size-1 ), col
elif a == 2 :
next_row, next_col = row, max (col-1 , 0 )
else :
next_row, next_col = row, min (col+1 , self .size-1 )
s_next = next_row * self .size + next_col
self .P[s, a, s_next] = 1.0
if s == goal_state:
self .R[s, a, s_next] = 0
elif s_next == goal_state:
self .R[s, a, s_next] = self .goal_reward
else :
self .R[s, a, s_next] = self .step_cost
def policy_evaluation (self, policy, max_iter=1000 , theta=1e-6 ):
"""策略评估:迭代求解 V^π"""
V = np.zeros(self .n_states)
for i in range (max_iter):
delta = 0
V_new = np.zeros(self .n_states)
for s in range (self .n_states):
v = 0
for a in range (self .n_actions):
pi_a_s = policy[s, a]
expected_value = 0
for s_next in range (self .n_states):
p = self .P[s, a, s_next]
r = self .R[s, a, s_next]
expected_value += p * (r + self .gamma * V[s_next])
v += pi_a_s * expected_value
V_new[s] = v
delta = max (delta, abs (v - V[s]))
V = V_new
if delta < theta:
print (f"策略评估在{i+1 } 次迭代后收敛" )
break
return V
def policy_improvement (self, V ):
"""策略改进:贪心策略更新"""
new_policy = np.zeros((self .n_states, self .n_actions))
for s in range (self .n_states):
q_values = np.zeros(self .n_actions)
for a in range (self .n_actions):
q = 0
for s_next in range (self .n_states):
p = self .P[s, a, s_next]
r = self .R[s, a, s_next]
q += p * (r + self .gamma * V[s_next])
q_values[a] = q
best_actions = np.where(q_values == np.max (q_values))[0 ]
for a in best_actions:
new_policy[s, a] = 1.0 / len (best_actions)
return new_policy
def policy_iteration (self, max_iter=100 ):
"""策略迭代算法"""
policy = np.ones((self .n_states, self .n_actions)) / self .n_actions
for i in range (max_iter):
print (f"\n=== 策略迭代第{i+1 } 轮 ===" )
V = self .policy_evaluation(policy)
new_policy = self .policy_improvement(V)
if np.allclose(policy, new_policy):
print (f"策略在{i+1 } 次迭代后收敛到最优" )
return V, new_policy
policy = new_policy
print ("达到最大迭代次数" )
return V, policy
def value_iteration (self, max_iter=1000 , theta=1e-6 ):
"""价值迭代算法"""
V = np.zeros(self .n_states)
for i in range (max_iter):
delta = 0
V_new = np.zeros(self .n_states)
for s in range (self .n_states):
action_values = np.zeros(self .n_actions)
for a in range (self .n_actions):
q = 0
for s_next in range (self .n_states):
p = self .P[s, a, s_next]
r = self .R[s, a, s_next]
q += p * (r + self .gamma * V[s_next])
action_values[a] = q
V_new[s] = np.max (action_values)
delta = max (delta, abs (V_new[s] - V[s]))
V = V_new
if delta < theta:
print (f"价值迭代在{i+1 } 次迭代后收敛" )
break
optimal_policy = self .policy_improvement(V)
return V, optimal_policy
if __name__ == "__main__" :
print ("=" *60 )
print ("3×3 网格世界 MDP 示例" )
print ("=" *60 )
mdp = GridWorldMDP(size=3 , goal_reward=10 , step_cost=-1 , gamma=0.9 )
print ("\n1. 策略迭代:" )
V_pi, policy_pi = mdp.policy_iteration(max_iter=10 )
print (f"最优价值函数(策略迭代):\n{V_pi.reshape(3 ,3 ).round (4 )} " )
print ("\n" + "=" *60 )
print ("\n2. 价值迭代:" )
V_vi, policy_vi = mdp.value_iteration()
print (f"最优价值函数(价值迭代):\n{V_vi.reshape(3 ,3 ).round (4 )} " )
print ("\n" + "=" *60 )
print ("策略迭代 vs 价值迭代:" )
print (f"价值函数最大差异:{np.max (np.abs (V_pi - V_vi)):.6 f} " )
print (f"策略最大差异:{np.max (np.abs (policy_pi - policy_vi)):.6 f} " )
print ("\n" + "=" *60 )
print ("验证贝尔曼最优方程:" )
for s in range (mdp.n_states):
lhs = V_vi[s]
action_values = np.zeros(mdp.n_actions)
for a in range (mdp.n_actions):
q = 0
for s_next in range (mdp.n_states):
p = mdp.P[s, a, s_next]
r = mdp.R[s, a, s_next]
q += p * (r + mdp.gamma * V_vi[s_next])
action_values[a] = q
rhs = np.max (action_values)
print (f"状态{s} : LHS={lhs:.4 f} , RHS={rhs:.4 f} , 差异={abs (lhs-rhs):.6 f} " )
5.2 价值迭代代码 from Policy_iteration import GridWorldMDP
import numpy as np
class ValueIterationMDP (GridWorldMDP ):
def value_iteration (self, theta=1e-6 ):
V = np.zeros(self .n_states)
print ("=" *60 )
print ("价值迭代算法开始" )
print ("=" *60 )
print (f"初始价值函数:\n{V.reshape(self.size, self.size)} \n" )
for iteration in range (1000 ):
delta = 0
V_new = np.zeros(self .n_states)
for s in range (self .n_states):
action_value = []
for a in range (self .n_actions):
q = 0
for next_state in np.where(self .P[s, a]>0 )[0 ]:
q += self .P[s, a, next_state]*(self .R[s, a, next_state]+ self .gamma * V[next_state])
action_value.append(q)
V_new[s] = max (action_value) if action_value else 0.0
delta = max (delta, abs (V[s]- V_new[s]))
V = V_new
if iteration % 5 == 0 :
print (f"第{iteration +1 } 轮迭代 | 本轮最大误差:{delta:.6 f} " )
print (f"当前价值函数:\n{V.reshape(self.size, self.size)} \n" )
if delta < theta:
print (f"\n✅ 价值迭代在第{iteration +1 } 次迭代收敛!" )
print (f"最终收敛误差:{delta:.8 f} " )
break
policy = self ._extract_policy(V)
return V, policy
def _extract_policy (self, V ):
policy = np.zeros((self .n_states, self .n_actions))
for s in range (self .n_states):
q_value = []
for a in range (self .n_actions):
q = 0
for next_state in np.where(self .P[s, a]>0 )[0 ]:
q += self .P[s, a, next_state]*(self .R[s, a, next_state]+ self .gamma * V[next_state])
q_value.append(q)
max_q = max (q_value)
best_action = [a for a, q in enumerate (q_value) if abs (q - max_q)<1e-6 ]
for a in best_action:
policy[s, a] = 1.0 /len (best_action)
return policy
if __name__ == "__main__" :
mdp_vi = ValueIterationMDP()
print ("价值迭代算法完整演示" )
print ("=" *60 )
V_vi, policy_vi = mdp_vi.value_iteration()
print ("\n" + "=" *60 )
print ("最终收敛 - 最优价值函数" )
print ("=" *60 )
V_grid = V_vi.reshape(mdp_vi.size, mdp_vi.size)
print (V_grid)
print ("\n" + "=" *60 )
print ("最终收敛 - 最优策略矩阵 (状态数×动作数)" )
print ("=" *60 )
print (f"策略矩阵形状:{policy_vi.shape} " )
print ("【矩阵含义:行=状态,列=动作,值=选择该动作的概率】" )
print (policy_vi)
print ("\n" + "=" *60 )
print (" 最优策略 - 网格可视化" )
print ("=" *60 )
action_map = {0 :"↑" ,1 :"↓" ,2 :"←" ,3 :"→" }
policy_grid = np.zeros((mdp_vi.size, mdp_vi.size), dtype=object )
for i in range (mdp_vi.size):
for j in range (mdp_vi.size):
s = i * mdp_vi.size + j
best_acts = np.where(policy_vi[s]>0 )[0 ]
act_str = "" .join([action_map[a] for a in best_acts])
policy_grid[i, j] = act_str if act_str else "终端"
print ("【每个网格值:当前位置的最优动作,多动作表示等概率选择】" )
print (policy_grid)
策略迭代是基于初始的状态价值出发,在策略下不断迭代,最终得到对该策略下的状态价值的估计,然后使用这个状态价值,计算动作价值,并去更新策略,循环往复得到最优策略。
价值迭代也是从初始状态价值出发,但是和策略迭代不同的是,每次迭代他都直接计算在该数值(状态价值是状态 s 执行动作 a 后遵循策略 π 的期望回报,策略迭代的内层迭代专门对状态价值进行估计,而这里没有估计,所以这里以及称不上该值为状态价值了,只是一个值而已)下的动作价值,然后在找到每个状态哪个动作的动作价值最高,得到新的策略和新的 V 值,这里的新的策略不会影响后续的值,因为在下一轮迭代中只用用到上一轮迭代的新的 V 值和环境值(P,R)来计算新的动作价值。
微信扫一扫,关注极客日志 微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
相关免费在线工具 加密/解密文本 使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
RSA密钥对生成器 生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
Mermaid 预览与可视化编辑 基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
curl 转代码 解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
Base64 字符串编码/解码 将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
Base64 文件转换器 将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online