找回密码
 立即注册
首页 业界区 安全 RL 策略优化 (4.2章节)

RL 策略优化 (4.2章节)

董绣梓 昨天 14:30
网格世界策略评估与策略改进(5 动作)

注:

  • 本文参照 《强化学习中的数学原理》一书,4.2章节“策略优化”部分的。
  • 代码借助AI一步步写出,在复现算法过程中,中间结果与书中不一样。
  • 代码大循环迭代17次
1.png

https://github.com/MathFoundationRL/Book-Mathematical-Foundation-of-Reinforcement-Learning/tree/main
本文以一个 5×5 网格世界为例,完整展示“策略评估(Policy Evaluation)→ 策略改进(Policy Improvement)”的动态规划流程,包含公式、代码、执行步骤与可视化结果(图文并茂)。
场景设定


  • 网格大小:5×5,坐标以 1-based 书写,例如左上角为 (1,1)。
  • 奖励规则:

    • 出格(尝试走出边界):保持原地不动,行为与“原地不动”同值(在本例的改进里我们做了并列时偏好不动的处理)。
    • 进入禁区:奖励 −10。
    • 进入目标:奖励 +1。
    • 其余格子:奖励 0。

  • 禁区坐标(橙色):(2,2), (2,3), (3,3), (4,2), (4,4), (5,2)
  • 目标坐标(绿色):(4,3)
  • 动作集合(5 个):上、下、左、右、原地不动。
为了更直观,我们提供网格渲染函数,将禁区标橙色、目标标绿色,并在每个格子里同时显示该格子的价值与当前策略的动作符号(↑ ↓ ← → o)。
核心公式


  • 策略评估(给定策略 π,确定动作):

    \[V_{k+1}(s) = R(s, a) + \gamma\, V_k(s')\]
    其中,a = π(s),s' 是执行 a 后的下一状态;若越界则保持原地,且“原地不动”与越界同值。
  • 策略改进(贪心改进):

    \[\pi_{new}(s) = \arg\max_a \Big[ R(s, a) + \gamma\, V(s') \Big]\]
    当不同动作价值相同(并列)时,本例的实现会偏好“原地不动”,确保边界不出现“继续向外”的箭头。
代码结构(关键片段)

完整代码见本文末尾。以下为关键函数与初始化简述。
初始化与奖励矩阵
  1. n = 5
  2. V0 = np.zeros((n, n))
  3. # 初始策略:全部不动(动作索引 4)
  4. policy0 = np.full((n, n), 4, dtype=int)
  5. # 构建奖励矩阵 r:禁区 -10,目标 +1,其它 0
  6. r = np.zeros((n, n))
  7. forbidden_positions = [(2, 2), (2, 3), (3, 3), (4, 2), (4, 4), (5, 2)]
  8. goal_position = (4, 3)
  9. for (ri, ci) in forbidden_positions:
  10.     r[ri - 1, ci - 1] = -10
  11. r[goal_position[0] - 1, goal_position[1] - 1] = 1
  12. gamma = 0.9
  13. theta = 1e-5
复制代码
策略评估(给定策略)
  1. def policy_evaluation_fixed_policy(V, policy, r, gamma=0.9, theta=1e-5, max_iter=1000):
  2.     n = V.shape[0]
  3.     V_new = V.copy()
  4.     actions = [(-1, 0), (1, 0), (0, -1), (0, 1), (0, 0)]  # 上、下、左、右、不动
  5.     for it in range(max_iter):
  6.         delta = 0
  7.         for i in range(n):
  8.             for j in range(n):
  9.                 a_idx = policy[i, j]
  10.                 di, dj = actions[a_idx]
  11.                 ni, nj = i + di, j + dj
  12.                 # 越界则保持原地(与不动同值)
  13.                 if ni < 0 or ni >= n or nj < 0 or nj >= n:
  14.                     ni, nj = i, j
  15.                 v_temp = r[ni, nj] + gamma * V_new[ni, nj]
  16.                 delta = max(delta, abs(v_temp - V_new[i, j]))
  17.                 V_new[i, j] = v_temp
  18.         if delta < theta:
  19.             break
  20.     return V_new
复制代码
策略改进(贪心,并列偏好“原地不动”)
  1. def policy_improvement_fixed_values(V, r, gamma=0.9):
  2.     n = V.shape[0]
  3.     new_policy = np.zeros((n, n), dtype=int)
  4.     actions = [(-1, 0), (1, 0), (0, -1), (0, 1), (0, 0)]
  5.     for i in range(n):
  6.         for j in range(n):
  7.             action_values = []
  8.             for a_idx, (di, dj) in enumerate(actions):
  9.                 ni, nj = i + di, j + dj
  10.                 # 越界则保持原地(与不动同值)
  11.                 if ni < 0 or ni >= n or nj < 0 or nj >= n:
  12.                     ni, nj = i, j
  13.                 action_value = r[ni, nj] + gamma * V[ni, nj]
  14.                 action_values.append(action_value)
  15.             # tie-breaking:偏好“不动”(索引 4)以避免边界显示外向箭头
  16.             max_val = max(action_values)
  17.             best_idxs = [idx for idx, val in enumerate(action_values) if val == max_val]
  18.             best_action = 4 if 4 in best_idxs else best_idxs[0]
  19.             new_policy[i, j] = best_action
  20.     return new_policy
复制代码
渲染函数(图像)
  1. def render_policy_value(V, policy, forbidden=None, goal=None, filename='policy_value_grid.png'):
  2.     import matplotlib.pyplot as plt
  3.     from matplotlib.table import Table
  4.     n = V.shape[0]
  5.     fig, ax = plt.subplots(figsize=(8, 8))
  6.     ax.set_axis_off()
  7.     tb = Table(ax, bbox=[0, 0, 1, 1])
  8.     actions_symbols = ['\u2191', '\u2193', '\u2190', '\u2192', 'o']
  9.     forb0 = set()
  10.     if forbidden is not None:
  11.         forb0 = {(r - 1, c - 1) for (r, c) in forbidden}
  12.     goal0 = None
  13.     if goal is not None:
  14.         goal0 = (goal[0] - 1, goal[1] - 1)
  15.     for i in range(n):
  16.         for j in range(n):
  17.             cell_text = f"{V[i, j]:.2f}\n{actions_symbols[policy[i, j]]}"
  18.             face = 'white'
  19.             if (i, j) in forb0:
  20.                 face = '#FFA500'  # 橙色:禁区
  21.             elif goal0 is not None and (i, j) == goal0:
  22.                 face = '#00AA00'  # 绿色:目标
  23.             tb.add_cell(i, j, 1/n, 1/n, text=cell_text, loc='center', facecolor=face)
  24.     ax.add_table(tb)
  25.     plt.title('Policy and Value Function')
  26.     plt.savefig(filename)
  27.     plt.close()
复制代码
执行流程与结果


  • 渲染初始策略与价值(全部不动):


  • 生成图像:
  1. render_policy_value(V0, policy0,
  2.                     forbidden=forbidden_positions,
  3.                     goal=goal_position,
  4.                     filename='initial_policy_value.png')
复制代码

  • 图像:
2.png


  • 第一次策略评估(给定策略 policy0,更新价值):
  1. V1 = policy_evaluation_fixed_policy(V0, policy0, r, gamma, theta)
  2. render_policy_value(V1, policy0,
  3.                     forbidden=forbidden_positions,
  4.                     goal=goal_position,
  5.                     filename='policy_value_after_evaluation.png')
复制代码

  • 图像:
3.png


  • 第一次策略改进(在 V1 上贪心选动作,边界并列偏好不动):
  1. policy1 = policy_improvement_fixed_values(V1, r, gamma)
  2. render_policy_value(V1, policy1,
  3.                     forbidden=forbidden_positions,
  4.                     goal=goal_position,
  5.                     filename='policy_after_improvement.png')
复制代码

  • 图像:
4.png


  • 一次性整体迭代:
  1. # 迭代下去,直到策略不再改变,显示最终的最优策略和价值矩阵
  2. policy_curr = policy0
  3. V_curr = V0
  4. iter_n = 1
  5. while True:
  6.     iter_n += 1
  7.     V_next = policy_evaluation_fixed_policy(V_curr, policy_curr, r, gamma, theta)
  8.     policy_next = policy_improvement_fixed_values(V_next, r, gamma)
  9.     if np.array_equal(policy_next, policy_curr):
  10.         break
  11.     V_curr, policy_curr = V_next, policy_next
  12. render_policy_value(V_curr, policy_curr, forbidden=forbidden_positions, goal=goal_position, filename='final_policy_value.png')
  13. print(f"策略迭代共进行了 {iter_n} 次,得到最终最优策略和价值矩阵,见 'final_policy_value.png'。")
复制代码
5.png
  1. import numpy as np# n * n 网格世界中的策略评估示例, n = 5,第一个格子表示为(1,1)最左上方。# 设定奖励规则:## 出格奖励-1,进入禁区奖励-10,进入目标奖励+1,其余格子奖励0## 禁区橙色位置为(2,2),(2,3),(3,3),(4,2),(4,4),(5,2)## 目标位置(4,3)# 初始化参数n = 5  # 网格大小 n*nV0 = np.zeros((n, n))  # 初始价值矩阵 V0,全为 0# 初始化策略为全部不移动# 动作定义:上、下、左、右、不动# actions = [(-1, 0), (1, 0), (0, -1), (0, 1), (0, 0)]# 初始策略设为“不动”,对应动作索引 4policy0 = np.full((n, n), 4, dtype=int)# 根据设定生成奖励矩阵 rr = np.zeros((n, n))forbidden_positions = [(2, 2), (2, 3), (3, 3), (4, 2), (4, 4), (5, 2)]goal_position = (4, 3)# 将坐标从 1-based 转换为 0-based 并赋值for (ri, ci) in forbidden_positions:    r[ri - 1, ci - 1] = -10r[goal_position[0] - 1, goal_position[1] - 1] = 1# 说明:出格奖励 -1 属于转移导致的外部奖励,不直接体现在静态 r 矩阵中,# 若需要在转移模型中体现,应在动态更新时处理。gamma = 0.9theta = 1e-5  # 收敛阈值:两次迭代间最大变化小于等于 theta 时停止# 根据当前价值,固定价值矩阵,找出最优策略,得到新的策略(移动方案)和r_mat## 只做一次策略改进def policy_improvement_fixed_values(V, r, gamma=0.9):    n = V.shape[0]    new_policy = np.zeros((n, n), dtype=int)    actions = [(-1, 0), (1, 0), (0, -1), (0, 1), (0, 0)]  # 上、下、左、右、不动    for i in range(n):        for j in range(n):            action_values = []            for a_idx, (di, dj) in enumerate(actions):                ni, nj = i + di, j + dj                # 检查是否出格                if ni < 0 or ni >= n or nj < 0 or nj >= n:                    ni, nj = i, j  # 出格则不动                action_value = r[ni, nj] + gamma * V[ni, nj]                action_values.append(action_value)            # 平局时优先选择“不动”(索引 4),使边界显示为 'o'            max_val = max(action_values)            best_idxs = [idx for idx, val in enumerate(action_values) if val == max_val]            best_action = 4 if 4 in best_idxs else best_idxs[0]            new_policy[i, j] = best_action    return new_policy# 在确定的移动策略和初始的价值矩阵,计算出新的价值矩阵def policy_evaluation_fixed_policy(V, policy, r, gamma=0.9, theta=1e-5, max_iter=1000):    n = V.shape[0]    V_new = V.copy()    actions = [(-1, 0), (1, 0), (0, -1), (0, 1), (0, 0)]  # 上、下、左、右、不动    for it in range(max_iter):        delta = 0        for i in range(n):            for j in range(n):                a_idx = policy[i, j]                di, dj = actions[a_idx]                ni, nj = i + di, j + dj                # 检查是否出格                if ni < 0 or ni >= n or nj < 0 or nj >= n:                    ni, nj = i, j  # 出格则不动                v_temp = r[ni, nj] + gamma * V_new[ni, nj]                delta = max(delta, abs(v_temp - V_new[i, j]))                V_new[i, j] = v_temp        if delta < theta:            break    return V_new# 一个画图的函数,可视化网格,当前的之前的策略和价值矩阵,找到的最优的策略和计算得到的价值估计def render_policy_value(V, policy, forbidden=None, goal=None, filename='policy_value_grid.png'):    import matplotlib.pyplot as plt    from matplotlib.table import Table    n = V.shape[0]    fig, ax = plt.subplots(figsize=(8, 8))    ax.set_axis_off()    tb = Table(ax, bbox=[0, 0, 1, 1])    actions_symbols = ['↑', '↓', '←', '→', 'o']  # 上、下、左、右、不动    # 处理禁区与目标坐标(输入为 1-based,转换为 0-based)    forb0 = set()    if forbidden is not None:        forb0 = {(r - 1, c - 1) for (r, c) in forbidden}    goal0 = None    if goal is not None:        goal0 = (goal[0] - 1, goal[1] - 1)    for i in range(n):        for j in range(n):            cell_text = f"{V[i, j]:.2f}\n{actions_symbols[policy[i, j]]}"            face = 'white'            if (i, j) in forb0:                face = '#FFA500'  # 橙色:禁区            elif goal0 is not None and (i, j) == goal0:                face = '#00AA00'  # 绿色:目标            tb.add_cell(i, j, 1/n, 1/n, text=cell_text, loc='center', facecolor=face)    ax.add_table(tb)    plt.title('Policy and Value Function')    plt.savefig(filename)    plt.close()# 渲染初始策略和价值矩阵(禁区橙色,目标绿色)render_policy_value(V0, policy0, forbidden=forbidden_positions, goal=goal_position, filename='initial_policy_and_initial_value.png')# 执行一次策略评估V1 = policy_evaluation_fixed_policy(V0, policy0, r, gamma, theta)render_policy_value(V1, policy0, forbidden=forbidden_positions, goal=goal_position, filename='initial_policy_evaluation.png')# 执行一次策略改进policy1 = policy_improvement_fixed_values(V1, r, gamma)render_policy_value(V1, policy1, forbidden=forbidden_positions, goal=goal_position, filename='policy_after_first_improvement.png')# 执行一次策略评估V2 = policy_evaluation_fixed_policy(V1, policy1, r, gamma, theta)# 渲染改进后策略对应的价值矩阵render_policy_value(V2, policy1, forbidden=forbidden_positions, goal=goal_position, filename='1st_improved_policy_evaluation.png')# 迭代下去,直到策略不再改变,显示最终的最优策略和价值矩阵
  2. policy_curr = policy0
  3. V_curr = V0
  4. iter_n = 1
  5. while True:
  6.     iter_n += 1
  7.     V_next = policy_evaluation_fixed_policy(V_curr, policy_curr, r, gamma, theta)
  8.     policy_next = policy_improvement_fixed_values(V_next, r, gamma)
  9.     if np.array_equal(policy_next, policy_curr):
  10.         break
  11.     V_curr, policy_curr = V_next, policy_next
  12. render_policy_value(V_curr, policy_curr, forbidden=forbidden_positions, goal=goal_position, filename='final_policy_value.png')
  13. print(f"策略迭代共进行了 {iter_n} 次,得到最终最优策略和价值矩阵,见 'final_policy_value.png'。")
复制代码
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册