일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
- expo
- ReactNative
- 강화학습 기초
- 크롤링
- 앱개발
- kaggle
- 정치인
- clone coding
- 전국국밥
- App
- python
- Ros
- 강화학습
- JavaScript
- 데이터분석
- 리액트네이티브
- pandas
- FirebaseV9
- TeachagleMachine
- selenium
- 카트폴
- 조코딩
- React
- 머신러닝
- 사이드프로젝트
- 클론코딩
- coding
- Instagrame clone
- redux
- 딥러닝
- Today
- Total
qcoding
[강화학습] DDPG (Deep Deterministic Policy Gradient)실습_ContinuosMountainCar(2/2) 본문
[강화학습] DDPG (Deep Deterministic Policy Gradient)실습_ContinuosMountainCar(2/2)
Qcoding 2023. 3. 15. 21:30** 이번 실습은 앞선 글의 DDPG 이론을 Mountain Car 문제에 적용하는 실습이다. MountainCar 문제 중 Continuos Action Space를 갖는 문제에 대해 적용해 볼 것이다.
이번실습에서 진행하는 코드는 아래의 블로그를 참조하여 만들었으며, Mountain Car에 대한 문제는 이전 블로그를 참고하면 자세히 알 수 있다.
2023.01.29 - [머신러닝 딥러닝] - [강화학습]Dqn_Mountain car_강화학습 예제
이번 글은 아래와 같은 순서로 진행예정이다.
1) Continuos Mountain Car 환경 소개
2) 코드 소개
3) 결과 확인
1) Continuos Mountain Car 환경 소개
https://www.gymlibrary.dev/environments/classic_control/mountain_car_continuous/
-> Openai gym 사이트를 보면 Continuos Mountain Car 환경을 확인할 수 있다.
왼쪽이 Mountain Car 환경이며 오른쪽이 Continuous 환경이다. 차이를 보면 우선 Action Space가 왼쪽은 이산적으로 3가지 action이 가능한 반면에 오른쪽은 Action값이 (1,) 형태로 [-1.1] 사이의 하나의 값이 되며, 이값은 force term으로 velocity 계산 수식에 적용된다.
이외에 에피소드의 종료조건이 조금 다른데, Goal 목표에 도달하는 기준이 왼쪽은 0.5m 위치에 차량이 도달할 때를 나타내며, Continuos의 경우에는 0.45m에 도착하면 Goal에 도착하여 에피소드가 종료된다는 조건이 다르다.
2) 코드 소개
-> 코드 소개 전 간략히 구성할 Network를 살펴보도록 하자.
1) Actor Network
Actor Network는 state 정보를 받을 때 action에 해당하는 정보를 결정하는 부분으로 실제 수행하는 action을 결정하는 actor network와 Critic 함수 부분의 target 계산시에 들어갈 action을 결정하는 target actor network로 나뉘게 된다. Dense layer를 통해 구성되어 있으며 마지막은 action_size에 해당하는, 여기서는 Action Space가 (1,)로 1개의 action에 대한 값을 output으로 갖는다. 마지막 lambda 부분은 action의 bound를 정해주는 것으로 코드에서는 아래와 같이 적용되었다.
### 여기서 self.action_bound는 action값의 상하한을 잡아주기 위해서 사용함.
action=keras.layers.Lambda(lambda x : x * self.action_bound)(action)
2) Critic Network
Critic Network에서는 Input으로 2개의 값이 사용된다. state와 action에 대한 값이 필요하며, Critic network의 경우는 현재의 state_t와 Actor network에서 결정된 a_t가 사용되고, Target Critic network에서는 다음 상태인 s_t+1과 Target Actor network에서 나온 μ(s_t+1)인 action값이 들어가게 된다. Output으로는 1개의 Q함수가 나오게 되며, reward값과 합쳐 TD-target을 구한 뒤 Loss함수를 최소화하도록 학습되어 진다.
3) Soft Update
Soft 업데이트의 경우 위의 식에서 처럼 Target network 각각을 업데이트하며, 여기서 tau는 하이퍼파라미터로 임의의 값을 설정한다. 코드로 보면 아래와 같다.
## Soft update Target network
def update_target_network(self, TAU):
theta = self.actor.get_weights()
target_theta = self.target_actor.get_weights()
for i in range(len(theta)):
target_theta[i] = TAU * theta[i] + (1 - TAU) * target_theta[i]
self.target_actor.set_weights(target_theta)
phi = self.critic.get_weights()
target_phi = self.target_critic.get_weights()
for i in range(len(phi)):
target_phi[i] = TAU * phi[i] + (1 - TAU) * target_phi[i]
self.target_critic.set_weights(target_phi)
이외의 내용은 크게 DQN과 Actor Critic 알고리즘에서 사용한 것과 다르지 않으며 아래에 코드를 첨부하였다.
### 패키지 다운로드
!sudo apt-get install -y python-numpy python-dev cmake zlib1g-dev libjpeg-dev xvfb \
xorg-dev python-opengl libboost-all-dev libsdl2-dev swig
!pip install pyvirtualdisplay
!pip install piglet
## gym
!pip install gym[classic_control]
##ffmpeg
!sudo apt-get install ffmpeg -y
### colab 환경 시 video 코드
from pyvirtualdisplay import Display
display = Display(visible=0, size=(1400, 900))
display.start()
from base64 import b64encode
from glob import glob
from IPython.display import HTML
from IPython import display as ipy_display
from gym import logger as gym_logger
from gym.wrappers.record_video import RecordVideo
#### show video func
def show_video(mode='train', filename=None):
mp4_list = glob(mode+'/*.mp4')
# print(mp4_list)
if mp4_list:
if filename :
file_lists = glob(mode+'/'+filename)
if not file_lists:
print('No {} found'.format(filename))
return -1
mp4 = file_lists[0]
else:
mp4 = sorted(mp4_list)[-1]
print(mp4)
video = open(mp4, 'r+b').read()
encoded = b64encode(video)
ipy_display.display(HTML(data='''
<video alt="gameplay" autoplay controls style="height: 400px;">
<source src="data:video/mp4;base64,%s" type="video/mp4" />
</video>
''' % (encoded.decode('ascii'))))
else:
print('No video found')
return -1
## plot 함수
## save them to file if done
def plot_result(save_epi_score):
plt.plot(save_epi_score)
plt.show()
## DDPG Agent
import numpy as np
import matplotlib.pyplot as plt
import os
import gym
import random
import tensorflow as tf
from tensorflow import keras
from keras.utils.vis_utils import plot_model
import warnings
warnings.filterwarnings(action='ignore')
from collections import deque
class DDPGagent():
def __init__(self, state_size, action_size, max_action):
# 상태 및 행동 크기 정의
self.state_size = state_size
self.action_size = action_size
self.action_bound = max_action
## hyperparameters
self.gamma = 0.95
self.batch_size = 32
# 리플레이 버퍼 크기 및 학습 시작 크기 정의
self.buffer_size = 20000
self.buffer_size_train_start = 2000
self.buffer = deque(maxlen=self.buffer_size)
## NN Network
self.actor = self.actor_network()
self.target_actor = self.actor_network()
self.critic = self.critic_network()
self.target_critic = self.critic_network()
self.actor_learning_rate = 0.0001
self.critic_learning_rate = 0.001
self.TAU = 0.001
self.actor_opt = keras.optimizers.Adam(learning_rate = self.actor_learning_rate, clipnorm=5.0)
self.critic_opt = keras.optimizers.Adam(learning_rate = self.critic_learning_rate, clipnorm=5.0)
# save the results
self.save_epi_score = []
## actor
def actor_network(self,):
input_ = keras.layers.Input(shape=(self.state_size))
x = keras.layers.Dense(24, activation='relu')(input_)
x = keras.layers.Dense(64, activation='relu', kernel_initializer=keras.initializers.RandomUniform(-1e-3, 1e-3))(x)
x = keras.layers.Dense(16, activation='tanh', kernel_initializer=keras.initializers.RandomUniform(-1e-3, 1e-3))(x)
action = keras.layers.Dense(self.action_size, kernel_initializer=keras.initializers.RandomUniform(-1e-3, 1e-3))(x)
action=keras.layers.Lambda(lambda x : x * self.action_bound)(action)
## model
model = keras.models.Model(inputs=[input_], outputs=[action])
return model
## critic
def critic_network(self,):
input_state = keras.layers.Input(shape=(self.state_size))
input_action = keras.layers.Input(shape=(self.action_size))
state = keras.layers.Dense(32, activation='relu')(input_state)
action = keras.layers.Dense(32, activation='relu')(input_action)
h = keras.layers.concatenate([state, action], axis=-1)
x = keras.layers.Dense(32, activation='relu')(h)
x = keras.layers.Dense(16, activation='relu')(x)
q_func = keras.layers.Dense(1, activation='relu')(x)
## model
model = keras.models.Model(inputs=[input_state, input_action], outputs=[q_func])
return model
# 입력받은 상태, 행동, 보상, 다음상태, done flag를 리플레이 버퍼에 축적하는 함수 구현
def remember(self, state, action, reward, next_state, done):
item = (state, action, reward, next_state, done)
self.buffer.append(item)
## get action
def get_action(self,state, pre_noise):
action = self.actor(tf.convert_to_tensor([state], dtype=tf.float32))
# print(f'action_raw : {action}')
action = action.numpy()[0]
## noise
noise = self.ou_noise(pre_noise, dim=self.action_size)
# clip continuous action to be within action_bound
action = np.clip(action + noise, -self.action_bound, self.action_bound)
# print(f'action : {action}')
return action , noise
## Soft update Target network
def update_target_network(self, TAU):
theta = self.actor.get_weights()
target_theta = self.target_actor.get_weights()
for i in range(len(theta)):
target_theta[i] = TAU * theta[i] + (1 - TAU) * target_theta[i]
self.target_actor.set_weights(target_theta)
phi = self.critic.get_weights()
target_phi = self.target_critic.get_weights()
for i in range(len(phi)):
target_phi[i] = TAU * phi[i] + (1 - TAU) * target_phi[i]
self.target_critic.set_weights(target_phi)
## single gradient update on a single batch data
def critic_learn(self, states, actions, td_targets):
with tf.GradientTape() as tape:
q = self.critic([states, actions], training=True)
loss = tf.reduce_mean(tf.square(q - td_targets))
grads = tape.gradient(loss, self.critic.trainable_variables)
self.critic_opt.apply_gradients(zip(grads, self.critic.trainable_variables))
## train the actor network
def actor_learn(self, states):
with tf.GradientTape() as tape:
actions = self.actor(states, training=True)
critic_q = self.critic([states, actions])
loss = -tf.reduce_mean(critic_q)
grads = tape.gradient(loss, self.actor.trainable_variables)
self.actor_opt.apply_gradients(zip(grads, self.actor.trainable_variables))
## Ornstein Uhlenbeck Noise
def ou_noise(self, x, rho=0.15, mu=0, dt=1e-1, sigma=0.2, dim=1):
return x + rho*(mu - x)*dt + sigma*np.sqrt(dt)*np.random.normal(size=dim)
## computing TD target: y_k = r_k + gamma*Q(x_k+1, u_k+1)
def td_target(self, rewards, q_values, dones):
y_k = np.asarray(q_values)
for i in range(q_values.shape[0]): # number of batch
if dones[i]:
y_k[i] = rewards[i]
else:
y_k[i] = rewards[i] + self.gamma * q_values[i]
return y_k
## load actor weights
def load_weights(self, path):
self.actor.load_weights(f'./{path}/actor/mountain_car.h5')
self.critic.load_weights(f'./{path}/critic/mountain_car.h5')
## train
def train_model(self):
### replay memory 에서 random하게 minibatch 만큼 샘플을 가져옴
mini_batch = random.sample(self.buffer, self.batch_size)
# mini_batch에서 각 아래 정보로 분리하기
states, actions, rewards, next_states, dones = zip(*mini_batch)
# 분리된 정보를 tensor 형태로 변환
states = tf.convert_to_tensor(states)
actions = tf.convert_to_tensor(actions)
rewards = tf.convert_to_tensor(rewards)
next_states = tf.convert_to_tensor(next_states)
# dones를 True False로 바꿀 껀데 tf.float32 실수 형태로 바꿔 주는코드 (1.0 , 0.0)
dones = tf.convert_to_tensor(dones, dtype=tf.float32)
# predict target Q-values
target_qs = self.target_critic([
next_states, ## next_state (s_prime)
self.target_actor(next_states) ## next_action (a_prime)
])
# compute TD targets
y_i = self.td_target(rewards, target_qs.numpy(), dones)
# train critic using sampled batch
self.critic_learn(states , ### state (s)
actions, ### action (a)
y_i ) ## TD target: y_k = r_k + gamma*Q(x_k+1, u_k+1)
# train actor
self.actor_learn(states)
# update both target network
self.update_target_network(self.TAU)
### Env
ENV_NAME = 'MountainCarContinuous-v0'
env = gym.make(ENV_NAME)
# 비디오 레코딩
env = RecordVideo(env, './train', episode_trigger =lambda episode_number: True )
# env.metadata = {'render.modes': ['human', 'ansi']}
# MountainCar 환경의 상태와 행동 크기 정의
state_size = env.observation_space.shape[0]
action_size = env.action_space.shape[0]
max_action = env.action_space.high[0]
# print(f'state_size : {state_size} , action_size:{action_size} , max_action: {max_action}')
# 위에서 정의한 DDPG 클래스를 활용하여 agent 정의
agent = DDPGagent(state_size, action_size, max_action)
## 초기값
success = 0
max_position = -0.4
# initial transfer model weights to target model network
agent.update_target_network(1.0)
num_episode = 20
for ep in range(num_episode):
# reset episode
step, time, episode_score, done = 0 ,0, 0, False
# 초기 noise 설정
pre_noise = np.zeros(agent.action_size)
# 환경 reset을 통해 초기 상태 정의
state = env.reset()
while not done:
action , noise = agent.get_action(state, pre_noise) ## actor network로 action 생성 // 다음 step 때 pre_noise를 여기서 생성된 noise로 사용
# observe reward, new_state
next_state, reward, done, _ = env.step(action)
# score
episode_score += reward
## 보상설계
car_pos = next_state[0]
car_vel = next_state[1]
## 2차함수로 만들어 속도가 커지게 더큰 리워드를 위치에 따라 받게함
if car_vel > 0:
reward = float(((car_pos+0.5)*20)**2/10+15*car_vel - step/300)
else:
reward = float(((car_pos+0.5)*20)**2/10 - step/300)
### max position
if car_pos > max_position:
## max position
max_position = car_pos
## 성공 시 success
if car_pos >= 0.45:
reward+=100
success += 1
step+=1
# add transition to replay buffer
train_reward= reward
# 획득된 상태, 행동, 보상, 다음상태, done flag를 리플레이 버퍼에 축적
agent.remember(state, action, reward, next_state, done)
# buffer 크기가 일정 기준 이상 쌓이면 학습 진행
if len(agent.buffer) >= agent.buffer_size_train_start :
agent.train_model()
# update current state
pre_noise = noise
state = next_state
time += 1
## display rewards every episode
print(f'Episode: {ep+1}, Success: {success}, max_position: {max_position :.2f}, Time: {time}, Reward: {episode_score :.2f}')
agent.save_epi_score.append(episode_score)
## save weights every episode
#print('Now save')
save_path = './save_weights'
try:
os.makedirs(f'{save_path}/critic')
os.makedirs(f'{save_path}/actor')
print("make folder")
except:
pass
agent.actor.save_weights(f"{save_path}/actor/mountain_car.h5")
agent.critic.save_weights(f"{save_path}/critic/mountain_car.h5")
np.savetxt('./save_weights/mountain_car_epi_reward.txt', agent.save_epi_score)
print(agent.save_epi_score)
plot_result(agent.save_epi_score)
### 학습된 agent 영상 확인
### max episode
### nan이 젤 큰값이므로 이값을제거하고 계산함
episode=np.argmax(agent.save_epi_score)
# episode=4
filename = 'rl-video-episode-{}.mp4'.format(episode)
print("최대 avg : {} ,에피소드 번호 : {}".format(max(agent.save_epi_score) , episode))
show_video(filename=filename)
### 학습된 weights로 테스트
ENV_NAME = 'MountainCarContinuous-v0'
env = gym.make(ENV_NAME)
# 비디오 레코딩
env = RecordVideo(env, './test', episode_trigger =lambda episode_number: True )
agent = DDPGagent(env)
agent.load_weights('./save_weights/')
time = 0
state = env.reset()
while True:
action = agent.actor(tf.convert_to_tensor([state], dtype=tf.float32)).numpy()[0]
# print(action.shape)
state, reward, done, _ = env.step(action)
time += 1
if done:
print('Time: ', time, 'Reward: ', reward)
break
### video
show_video(mode='test')
3) 결과확인
-> DDPG 알고리즘 적용결과 Continuos Mountain Car 문제는 잘 해결이 되었다. 이론을 정리할 때 stochastic policy gradient 알고리즘은 아래와 같이 state와 action의 샘플이 다 필요하지만
DDPG 알고리즘에서 사용하는 Deterministic Policy Gradient의 경우는 아래와 같이 state의 정보만 필요함으로
학습이 효율적이라고 설명하였는 데, 실제 알고리즘을 구현하여 적용해보니, 에피소드를 많이 수행하지 않아도 문제를 잘 해결하는 것을 확인할 수 있었다. 그렇지만 DQN처럼 replay buffer를 사용하기 때문에 아무래도 학습에 시간이 많이 소요되었다.
DDPG 알고리즘의 경우 성능이 단조적으로 증가(monotonically)하지 않는 다고 하였는 데, 이는 성능 개선이 계속해서 일어나는 것을 보장하지 못한다는 것으로 수렴성이 크게 좋지 않다는 것을 의미한다. 이를 개선하기 위해 가능한한 학습의 크기를 증가하면서 성능개선을 보장하는 TRPO , PPO 등의 알고리즘이 등장하였다.
TRPO의 경우 이론이 매우 어렵고, 알고리즘으로 구현하는 게 쉽지 않지만 PPO의 경우 구현이 간단하여 많이 사용되는 있는 알고리즘이다. 다음 실습에서는 PPO알고리즘에 대하여 공부하고 실습을 진행하려고 한다.
'머신러닝 딥러닝' 카테고리의 다른 글
[kaggle]2진 분류 코드 - KernerelPCA / GMM / HIST FEATURE / STACKING (0) | 2023.12.12 |
---|---|
[GAN]1D 그래프 생성하기_BASE (0) | 2023.05.06 |
[강화학습] DDPG (Deep Deterministic Policy Gradient) 강화학습 (1/2)_이론 (1) | 2023.03.11 |
[강화학습]A3C_Continuos Actor_Critic_MountainCar (0) | 2023.02.28 |
[강화학습]A3C_Discrete환경_Cartpole_Mountain_car (2) | 2023.02.24 |