首页
示例页面
首页
关于
更多
联系
博客
主页章节
一点点想法
Search
1
DDPG算法在羊犬博弈中的简要应用
873 阅读
2
辐射实验数据处理一:通过有限点绘制曲面图
579 阅读
3
遗传算法学习笔记一:重新发明简易轮子
547 阅读
4
数值计算一——插值、拟合与线性方程组
477 阅读
5
基于UC3842的Boost升压电路
452 阅读
未分类
感叹
TEC
登录
Search
标签搜索
PYTHON
数值计算
等离子体诊断
换热器设计
核工程
机器学习
深度学习
博弈
tensorflow
量子力学
遗传算法
DeepLearning
粒子模拟
电磁场
Modelica
等离子体
磁约束聚变
流浪地球
重元素聚变
DMCXE
累计撰写
33
篇文章
累计收到
40
条评论
首页
栏目
未分类
感叹
TEC
页面
示例页面
首页
关于
联系
博客
主页章节
一点点想法
搜索到
1
篇与
博弈
的结果
2021-08-30
DDPG算法在羊犬博弈中的简要应用
1. 模型的建立与求解 1.1 机器学习方法:DDPG方法 1.1.1 追逃博弈模型 在该羊犬博弈场景中,羊作为逃跑者,犬作为追捕者。在限制条件下两者具有相反的目的,即羊需要躲避犬并逃出特定区域,犬需要在特定区域内捕获到羊。依次建立的追逃模型如下图所示: 其中,S为逃跑者,D为追捕者;v_S为逃跑者的线速度大小;v_D为追捕者的线速度大小。θ_S为逃跑者速度方向角,θ_D为追捕者位置与圆心所夹角。 逃跑者与追捕者的运动学模型为: 其中,i表示S逃跑者,D追捕者;(x,y)为两者位置;θ是方向,u是转向角。 需要注意的是,犬(追逃者)的运动方式是固定的,而羊的运动方式是需要通过方法确定的。1.1.2 强化学习方法 一个马尔科夫决策过程(MDP)可以通过一个五元组(S,A,T,R,γ)描述,该五元组包含了有限状态空间S,有限行为空间A,状态转移函数T,回报函数R与折扣因子γ。状态转移函数表示了下一时刻状态是依据何种概率分布从当前状态进行转移的。回报函数则描述了当前状态下可以为下一状态提供的回报。对于任意马尔科夫决策过程,描述对象都存在一个确定的最优策略。因而选择一种合适的机器学习方法是解决该博弈问题的关键。依据该模型依赖的过程:对象行为作用于环境并且从环境取得回报,明确了应当采用一种强化学习方法。 1.1.3 基于DDPG的算法 针对该博弈模型中描述对象行为为连续动作,考虑到学习效率,采用深度学习中的DDPG算法,定义MDP状态空间与行为空间,设计回报函数,进而实现逃脱者的控制算法,引导逃脱者在躲避追捕者的前提下逃出特定范围。 给定MDP状态空间: 羊(逃脱者)的策略由强化学习给出,狼的状态固定。动作空间可定为: 其MDP转移函数即为上述运动学方程: 回报函数给定为: DDPG算法是由Google DeepMind 提出的一种使用Actor-Critic结构, 但是输出的不是行为的概率, 而是具体的行为, 用于连续动作 (continuous action) 的预测。可以提高Actor-Critic的稳定性和收敛性。 Actor网络输出动作对象的行为策略,Critic网络输出对该行为的评估。 Critic网络代价函数如下: 其中: 借鉴了DQN和Double Q learning的方式。 Actor网路的策略梯度为: 1.1.4.追逃博弈算法 1.1.5.算法验证 1.1.5.1 参数设置 在python语言环境下,以深度学习框架TensorFlow为基础,隐藏层数为2;算法使用minibatch大小为32,经验池大小为6000,actor学习率为0.001,critic学习率为0.001,折扣因子为0.9,soft replacement设置为0.01。其它实验参数如表所示 1.1.5.2.结果分析 训练时,追捕者犬做可变方向的匀速圆周运动,且会随着羊的运动而不断向羊趋近。经过若干次实验,在算法训练10000次下,记录累计回报。如图所示。算法在500次左右时开始收敛,在1000次内存在一个小幅抖动。对于算法整体10000次的回报情况,如下图所示。不难发现,该算法随着训练次数的增加,并不能够稳定的收敛在一个值上。 图1 1000次训练内EPSISODES-REWARD关系 图2 10000次训练EPSISODES-REWARD关系 考察每次训练时单次训练于完成训练步长。可以得到如下所示图。该图反应了在算法接近稳定后,存在一个稳定在75步长完成一次训练的趋势。 图3 10000次训练EPSISODES-STEP关系 通过导出经过上述过程训练后模型逃逸成功的位置数据,可以直观的描绘出在该初始条件下收敛处的最优逃出路线以及未训练前羊狼运动的轨迹。 图4 训练前某次训练羊犬运动路径 图5 训练后羊逃逸最优路径 对比每次训练时奖励与步长的变化关系,不难发现在该算法目前设置下存在收敛不充分的情况。而这正是Policy Gradient算法自身存在的缺陷。 Policy Gradient会受Learning rate 或 Step size影响,会产生收敛不充分或者时间过长的缺点。而该算法产生缺点的原因,正是此前PPO算法优化和避免的部分。附录:代码#main.py # tensorflow == 1.2.1 # pyglet == 1.2.4 # xlsxwriter == 3.0.1 # numpy == 1.19.5 # gym == 0.18.3 import numpy as np from test import SDEnv from rl import DDPG import xlsxwriter as xw workbook=xw.Workbook('list.xlsx') worksheet = workbook.add_worksheet('sheet1') headings=['EPSISODES','STEP','REWARD'] MAX_EPISODES = 10000 MAX_EP_STEPS = 300 ON_TRAIN = False a1=np.zeros(MAX_EPISODES) a2=np.zeros(MAX_EPISODES) a3=np.zeros(MAX_EPISODES) # set env env = SDEnv() s_dim = env.state_dim a_dim = env.action_dim a_bound = env.action_bound # set RL method (continuous) rl = DDPG(a_dim, s_dim, a_bound) steps = [] def train(): # start training for i in range(MAX_EPISODES): s = env.reset() ep_r = 0. a1[i]=i for j in range(MAX_EP_STEPS): env.render() a = rl.choose_action(s) s_, r, done = env.step(a) rl.store_transition(s, a, r, s_) ep_r += r if rl.memory_full: # start to learn once has fulfilled the memory rl.learn() s = s_ if done or j == MAX_EP_STEPS-1: a2[i]=j a3[i]=ep_r print('Ep: %i | %s | ep_r: %.1f | step: %i' % (i, '---' if not done else 'done', ep_r, j)) break list = [a1, a2, a3] worksheet.write_row('A1',headings) worksheet.write_column('A2',list[0]) worksheet.write_column('B2', list[1]) worksheet.write_column('C2', list[2]) workbook.close() rl.save() def eval(): rl.restore() env.render() env.viewer.set_vsync(True) while True: s = env.reset() for _ in range(200): env.render() a = rl.choose_action(s) s, r, done = env.step(a) if done: break if ON_TRAIN: train() else: eval()#test.py import pyglet import numpy as np pyglet.clock.set_fps_limit(10000) class Viewer(pyglet.window.Window): bar_thc=4 #羊的宽度 def __init__(self,sheep_info,dog_info,i): super(Viewer,self).__init__(width=400,height=400,resizable=False,caption='sheep',vsync=False) #背景颜色 pyglet.gl.glClearColor(1,1,1,1) #将羊的信息放入到batch内 self.batch = pyglet.graphics.Batch() self.sheep_info = sheep_info self.center_coord= np.array([200,200]) self.dog_info = dog_info self.i=i #添加羊的信息 self.sheep = self.batch.add( 4, pyglet.gl.GL_QUADS, None, ('v2f',[200,200, 200,204, 204,204, 204,200]), ('c3B',(86,109,249)*4)) #添加犬的信息 dx,d1=self.dog_info['x'] dy,d2=self.dog_info['y'] dl,d3=self.dog_info['l'] self.dog = self.batch.add( 4, pyglet.gl.GL_QUADS, None, ('v2f', [dx - dl / 2, dy - dl / 2, dx - dl / 2, dy + dl / 2, dx + dl / 2, dy + dl / 2, dx + dl / 2, dy - dl / 2]), ('c3B', (249, 86, 86) * 4)) def render(self): self._update_sheep() self.switch_to() self.dispatch_events() self.dispatch_event('on_draw') self.flip() def on_draw(self): self.clear() # 清屏 self.batch.draw() # 画上 batch 里面的内容 def _update_sheep(self): sx,s1=self.sheep_info['x'] sy,s2=self.sheep_info['y'] sr,s23=self.sheep_info['r'] sxy=self.center_coord sxy_=np.array([sx,sy]) xy01 =sxy+ np.array([sx + self.bar_thc / 2, sy + self.bar_thc / 2]) xy02 =sxy + np.array([sx + self.bar_thc / 2, sy - self.bar_thc / 2]) xy11 =sxy+ np.array([sx - self.bar_thc / 2, sy - self.bar_thc / 2]) xy12 = sxy+ np.array([sx - self.bar_thc / 2, sy + self.bar_thc / 2]) self.sheep.vertices = np.concatenate((xy01,xy02,xy11,xy12)) dx, d1 = self.dog_info['x'] dy, d2 = self.dog_info['y'] dl, d3 = self.dog_info['l'] self.dog.vertices=[dx - dl / 2, dy - dl / 2, dx - dl / 2, dy + dl / 2, dx + dl / 2, dy + dl / 2, dx + dl / 2, dy - dl / 2] print(sx,sy,dx,dy) class SDEnv(object): viewer = None dt = 0.1 # 转动的速度和 dt 有关 action_bound = [-1, 1] # 转动的角度范围 i = 0 #dog = {'x': x_dog, 'y': y_dog, 'l': 4} # 蓝色 dog 的 x,y 坐标和长度 l state_dim = 2 # 两个观测值 action_dim = 2 # 两个动作 def __init__(self): self.sheep_info = np.zeros( 2,dtype=[('x',np.float32),('y',np.float32),('r',np.float32),]) #生成一届矩阵 self.sheep_info['x'] = 1 self.sheep_info['y'] = 1 self.sheep_info['r'] = 0 #羊开始是速度方向角 self.dog_info= np.zeros( 2,dtype=[('x',np.float32),('y',np.float32),('R',np.float32),('v',np.float32),('l',np.float32),('r',np.float32)]) self.dog_info['x']=350 self.dog_info['y']=200 self.dog_info['R']=150 self.dog_info['v']=30 self.dog_info['l']=4 self.dog_info['r'] = 0 def render(self): if self.viewer is None: self.viewer = Viewer(self.sheep_info,self.dog_info,self.i) self.viewer.render() def step(self,action): done = False r = 0 v_sheep = 20 self.i += 1 before1,before11=self.sheep_info['x'] before2,before22=self.sheep_info['y'] if self.sheep_info['x'][0] < 0: if self.sheep_info['y'][0] > 0: srr = np.pi + np.arctan(self.sheep_info['y'][0] / self.sheep_info['x'][0]) else: srr = -np.pi + np.arctan(self.sheep_info['y'][0] / self.sheep_info['x'][0]) else: srr = np.arctan(self.sheep_info['y'][0] / self.sheep_info['x'][0]) drr,drrr=self.dog_info['r'] if srr>=drr: self.dog_info['r'] += (self.dog_info['v'] / self.dog_info['R']) * self.dt else: self.dog_info['r'] -= (self.dog_info['v'] / self.dog_info['R']) * self.dt self.dog_info['x'] = self.dog_info['R'] * np.cos(self.dog_info['r']) + 200 self.dog_info['y'] = self.dog_info['R'] * np.sin(self.dog_info['r']) + 200 #print(self.dog_info['x']) if self.sheep_info['x'][0] > 0: if self.sheep_info['y'][0] > 0: srrr = np.arctan(self.sheep_info['y'][0] / self.sheep_info['x'][0]) else: srrr = 2*np.pi + np.arctan(self.sheep_info['y'][0] / self.sheep_info['x'][0]) else: srrr = np.pi + np.arctan(self.sheep_info['y'][0] / self.sheep_info['x'][0]) action= np.clip(action,*self.action_bound) self.sheep_info['r'] += action*self.dt self.sheep_info['r'] %= 2 * np.pi self.sheep_info['x'] += v_sheep * np.cos(self.sheep_info['r'] ) * self.dt #self.sheep_info['x'] %= 150 self.sheep_info['y'] += v_sheep * np.sin(self.sheep_info['r'] ) * self.dt #self.sheep_info['y'] %= 150 s=self.sheep_info['r'] sx,s2 = self.sheep_info['x'] sy,s21 = self.sheep_info['y'] sr,s22 = self.sheep_info['r'] sxy = np.array([sx,sy]) finger = sxy+np.array([200.,200.]) dx, d1 = self.dog_info['x'] dy, d2 = self.dog_info['y'] dl, d3 = self.dog_info['l'] #print(sx*sx+sy*sy-150*150) if sx*sx+sy*sy < before1*before1+before2*before2: r=-101 #done = True if 148 * 148 < sx * sx + sy * sy < 152 * 152 : self.sheep_info['x'] = 1 self.sheep_info['y'] = 1 self.dog_info['r'] = 0 if (sx+200-dx)**2+(sy+200-dy)**2>4: done = True r = 100 else: done = True r = -100 return s, r, done def reset(self): self.sheep_info['r']=2*np.pi*np.random.rand(2) return self.sheep_info['r'] def sample_action(self): return np.random.rand(2)-0.5 if __name__ == '__main__': env = SDEnv() while True: env.render() env.step(env.sample_action()) #rl.py import tensorflow as tf import numpy as np ##################### hyper parameters #################### LR_A = 0.001 # learning rate for actor LR_C = 0.001 # learning rate for critic GAMMA = 0.9 # reward discount TAU = 0.01 # soft replacement MEMORY_CAPACITY = 60000 BATCH_SIZE = 32 class DDPG(object): def __init__(self, a_dim, s_dim, a_bound,): self.memory = np.zeros((MEMORY_CAPACITY, s_dim * 2 + a_dim + 1), dtype=np.float32) self.pointer = 0 self.memory_full = False self.sess = tf.Session() self.a_replace_counter, self.c_replace_counter = 0, 0 self.a_dim, self.s_dim, self.a_bound = a_dim, s_dim, a_bound[1] self.S = tf.placeholder(tf.float32, [None, s_dim], 's') self.S_ = tf.placeholder(tf.float32, [None, s_dim], 's_') self.R = tf.placeholder(tf.float32, [None, 1], 'r') with tf.variable_scope('Actor'): self.a = self._build_a(self.S, scope='eval', trainable=True) a_ = self._build_a(self.S_, scope='target', trainable=False) with tf.variable_scope('Critic'): # assign self.a = a in memory when calculating q for td_error, # otherwise the self.a is from Actor when updating Actor q = self._build_c(self.S, self.a, scope='eval', trainable=True) q_ = self._build_c(self.S_, a_, scope='target', trainable=False) # networks parameters self.ae_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope='Actor/eval') self.at_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope='Actor/target') self.ce_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope='Critic/eval') self.ct_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope='Critic/target') # target net replacement self.soft_replace = [[tf.assign(ta, (1 - TAU) * ta + TAU * ea), tf.assign(tc, (1 - TAU) * tc + TAU * ec)] for ta, ea, tc, ec in zip(self.at_params, self.ae_params, self.ct_params, self.ce_params)] q_target = self.R + GAMMA * q_ # in the feed_dic for the td_error, the self.a should change to actions in memory td_error = tf.losses.mean_squared_error(labels=q_target, predictions=q) self.ctrain = tf.train.AdamOptimizer(LR_C).minimize(td_error, var_list=self.ce_params) a_loss = - tf.reduce_mean(q) # maximize the q self.atrain = tf.train.AdamOptimizer(LR_A).minimize(a_loss, var_list=self.ae_params) self.sess.run(tf.global_variables_initializer()) def choose_action(self, s): return self.sess.run(self.a, {self.S: s[None, :]})[0] def learn(self): # soft target replacement self.sess.run(self.soft_replace) indices = np.random.choice(MEMORY_CAPACITY, size=BATCH_SIZE) bt = self.memory[indices, :] bs = bt[:, :self.s_dim] ba = bt[:, self.s_dim: self.s_dim + self.a_dim] br = bt[:, -self.s_dim - 1: -self.s_dim] bs_ = bt[:, -self.s_dim:] self.sess.run(self.atrain, {self.S: bs}) self.sess.run(self.ctrain, {self.S: bs, self.a: ba, self.R: br, self.S_: bs_}) def store_transition(self, s, a, r, s_): transition = np.hstack((s, a, [r], s_)) index = self.pointer % MEMORY_CAPACITY # replace the old memory with new memory self.memory[index, :] = transition self.pointer += 1 if self.pointer > MEMORY_CAPACITY: # indicator for learning self.memory_full = True def _build_a(self, s, scope, trainable): with tf.variable_scope(scope): net = tf.layers.dense(s, 100, activation=tf.nn.relu, name='l1', trainable=trainable) a = tf.layers.dense(net, self.a_dim, activation=tf.nn.tanh, name='a', trainable=trainable) return tf.multiply(a, self.a_bound, name='scaled_a') def _build_c(self, s, a, scope, trainable): with tf.variable_scope(scope): n_l1 = 100 w1_s = tf.get_variable('w1_s', [self.s_dim, n_l1], trainable=trainable) w1_a = tf.get_variable('w1_a', [self.a_dim, n_l1], trainable=trainable) b1 = tf.get_variable('b1', [1, n_l1], trainable=trainable) net = tf.nn.relu(tf.matmul(s, w1_s) + tf.matmul(a, w1_a) + b1) return tf.layers.dense(net, 1, trainable=trainable) # Q(s,a) def save(self): saver = tf.train.Saver() saver.save(self.sess, './params', write_meta_graph=False) def restore(self): saver = tf.train.Saver() saver.restore(self.sess, './params')
2021年08月30日
873 阅读
6 评论
6 点赞