qcoding

[강화학습] DDPG (Deep Deterministic Policy Gradient)실습_ContinuosMountainCar(2/2) 본문

머신러닝 딥러닝

[강화학습] DDPG (Deep Deterministic Policy Gradient)실습_ContinuosMountainCar(2/2)

Qcoding 2023. 3. 15. 21:30
반응형

** 이번 실습은 앞선 글의 DDPG 이론을 Mountain Car 문제에 적용하는 실습이다. MountainCar 문제 중 Continuos Action Space를 갖는 문제에 대해 적용해 볼 것이다.

 

이번실습에서 진행하는 코드는 아래의 블로그를 참조하여 만들었으며,  Mountain Car에 대한 문제는 이전 블로그를 참고하면 자세히 알 수 있다.

https://pasus.tistory.com/138

 

Tensorflow2로 만든 DDPG 코드: Pendulum-v0

OpenAI Gym에서 제공하는 Pendulum-v0 환경을 대상으로 DDPG 알고리즘을 Tensorflow2 코드로 구현하였다. 학습결과는 다음과 같다. DDPG는 오프-폴리시 방법으로서 온-폴리시인 A2C에 비해서 데이터 효율이

pasus.tistory.com

2023.01.29 - [머신러닝 딥러닝] - [강화학습]Dqn_Mountain car_강화학습 예제

 

[강화학습]Dqn_Mountain car_강화학습 예제

https://gymnasium.farama.org/environments/classic_control/mountain_car/ Gymnasium Documentation A standard API for reinforcement learning and a diverse set of reference environments (formerly Gym) gymnasium.farama.org * 이번 실습은 Mountain car로 ope

qcoding.tistory.com

이번 글은 아래와 같은 순서로 진행예정이다.

1) Continuos Mountain Car 환경 소개

2) 코드 소개

3) 결과 확인

 

 

1)  Continuos Mountain Car 환경 소개

https://www.gymlibrary.dev/environments/classic_control/mountain_car_continuous/

 

Mountain Car Continuous - Gym Documentation

Previous Cart Pole

www.gymlibrary.dev

-> Openai gym 사이트를 보면 Continuos Mountain Car 환경을 확인할 수 있다.

 

왼쪽이 Mountain Car 환경이며 오른쪽이 Continuous 환경이다. 차이를 보면 우선 Action Space가 왼쪽은 이산적으로 3가지 action이 가능한 반면에 오른쪽은 Action값이 (1,) 형태로 [-1.1] 사이의 하나의 값이 되며, 이값은 force term으로 velocity 계산 수식에 적용된다. 

이외에 에피소드의 종료조건이 조금 다른데, Goal 목표에 도달하는 기준이 왼쪽은 0.5m 위치에 차량이 도달할 때를 나타내며, Continuos의 경우에는 0.45m에 도착하면 Goal에 도착하여 에피소드가 종료된다는 조건이 다르다.

 

2)  코드 소개

-> 코드 소개 전 간략히 구성할 Network를 살펴보도록 하자.

1) Actor Network

Actor Network는 state 정보를 받을 때 action에 해당하는 정보를 결정하는 부분으로 실제 수행하는 action을 결정하는 actor network와 Critic 함수 부분의 target 계산시에 들어갈 action을 결정하는 target actor network로 나뉘게 된다. Dense layer를 통해 구성되어 있으며 마지막은 action_size에 해당하는, 여기서는 Action Space가 (1,)로 1개의 action에 대한 값을 output으로 갖는다. 마지막 lambda 부분은 action의 bound를 정해주는 것으로 코드에서는 아래와 같이 적용되었다.

### 여기서 self.action_bound는 action값의 상하한을 잡아주기 위해서 사용함.
action=keras.layers.Lambda(lambda x : x * self.action_bound)(action)

 

2) Critic Network

Critic Network에서는 Input으로 2개의 값이 사용된다. state와 action에 대한 값이 필요하며, Critic network의 경우는 현재의 state_t와 Actor network에서 결정된 a_t가 사용되고, Target Critic network에서는 다음 상태인 s_t+1과 Target Actor network에서 나온 μ(s_t+1)인 action값이 들어가게 된다. Output으로는 1개의 Q함수가 나오게 되며, reward값과 합쳐 TD-target을 구한 뒤 Loss함수를 최소화하도록 학습되어 진다.

 

3) Soft Update

Soft 업데이트의 경우 위의 식에서 처럼 Target network 각각을 업데이트하며, 여기서 tau는 하이퍼파라미터로 임의의 값을 설정한다. 코드로 보면 아래와 같다.

## Soft update Target network
def update_target_network(self, TAU):
    theta = self.actor.get_weights()
    target_theta = self.target_actor.get_weights()
    for i in range(len(theta)):
        target_theta[i] = TAU * theta[i] + (1 - TAU) * target_theta[i]
    self.target_actor.set_weights(target_theta)

    phi = self.critic.get_weights()
    target_phi = self.target_critic.get_weights()
    for i in range(len(phi)):
        target_phi[i] = TAU * phi[i] + (1 - TAU) * target_phi[i]
    self.target_critic.set_weights(target_phi)

이외의 내용은 크게 DQN과 Actor Critic 알고리즘에서 사용한 것과 다르지 않으며 아래에 코드를 첨부하였다.

 

### 패키지 다운로드

!sudo apt-get install -y python-numpy python-dev cmake zlib1g-dev libjpeg-dev xvfb \
    xorg-dev python-opengl libboost-all-dev libsdl2-dev swig
!pip install pyvirtualdisplay
!pip install piglet

## gym
!pip install gym[classic_control]

##ffmpeg
!sudo apt-get install ffmpeg -y

### colab 환경 시 video 코드

from pyvirtualdisplay import Display
display = Display(visible=0, size=(1400, 900))
display.start()
from base64 import b64encode
from glob import glob
from IPython.display import HTML
from IPython import display as ipy_display
from gym import logger as gym_logger
from gym.wrappers.record_video import RecordVideo

#### show video func
def show_video(mode='train', filename=None):
    mp4_list = glob(mode+'/*.mp4')
    # print(mp4_list)
    if mp4_list:
        if filename :
            file_lists = glob(mode+'/'+filename)
            if not file_lists:
                print('No {} found'.format(filename))
                return -1
            mp4 = file_lists[0]
                    
        else:
            mp4 = sorted(mp4_list)[-1]

        print(mp4)
        video = open(mp4, 'r+b').read()
        encoded = b64encode(video)
        ipy_display.display(HTML(data='''
            <video alt="gameplay" autoplay controls style="height: 400px;">
                <source src="data:video/mp4;base64,%s" type="video/mp4" />
            </video>
        ''' % (encoded.decode('ascii'))))
    else:
        print('No video found')
        return -1

## plot 함수

## save them to file if done
def plot_result(save_epi_score):
    plt.plot(save_epi_score)
    plt.show()

## DDPG Agent

import numpy as np
import matplotlib.pyplot as plt
import os
import gym
import random

import tensorflow as tf
from tensorflow import keras
from keras.utils.vis_utils import plot_model

import warnings
warnings.filterwarnings(action='ignore')

from collections import deque

class DDPGagent():
    def __init__(self, state_size, action_size, max_action):
        # 상태 및 행동 크기 정의
        self.state_size = state_size
        self.action_size = action_size
        self.action_bound = max_action

        ## hyperparameters
        self.gamma = 0.95
        self.batch_size = 32

        # 리플레이 버퍼 크기 및 학습 시작 크기 정의
        self.buffer_size = 20000
        self.buffer_size_train_start = 2000

        self.buffer = deque(maxlen=self.buffer_size)


        ## NN Network
        self.actor = self.actor_network()
        self.target_actor = self.actor_network()

        self.critic = self.critic_network()
        self.target_critic = self.critic_network()

        self.actor_learning_rate = 0.0001
        self.critic_learning_rate = 0.001
        self.TAU = 0.001

        self.actor_opt =  keras.optimizers.Adam(learning_rate = self.actor_learning_rate, clipnorm=5.0)
        self.critic_opt =  keras.optimizers.Adam(learning_rate = self.critic_learning_rate, clipnorm=5.0)


        # save the results
        self.save_epi_score = []

    ## actor 
    def actor_network(self,): 
        input_ = keras.layers.Input(shape=(self.state_size))

        x = keras.layers.Dense(24, activation='relu')(input_)
        x = keras.layers.Dense(64, activation='relu', kernel_initializer=keras.initializers.RandomUniform(-1e-3, 1e-3))(x)
        x = keras.layers.Dense(16, activation='tanh', kernel_initializer=keras.initializers.RandomUniform(-1e-3, 1e-3))(x)
        action = keras.layers.Dense(self.action_size, kernel_initializer=keras.initializers.RandomUniform(-1e-3, 1e-3))(x)
        action=keras.layers.Lambda(lambda x : x * self.action_bound)(action)
        ## model
        model = keras.models.Model(inputs=[input_], outputs=[action])

        return model   

    ## critic
    def critic_network(self,):
        input_state = keras.layers.Input(shape=(self.state_size))
        input_action = keras.layers.Input(shape=(self.action_size))

        state = keras.layers.Dense(32, activation='relu')(input_state)
        action = keras.layers.Dense(32, activation='relu')(input_action)

        h = keras.layers.concatenate([state, action], axis=-1)
        x = keras.layers.Dense(32, activation='relu')(h)
        x = keras.layers.Dense(16, activation='relu')(x)
        q_func = keras.layers.Dense(1, activation='relu')(x)
        ## model
        model = keras.models.Model(inputs=[input_state, input_action], outputs=[q_func])

        return model


    # 입력받은 상태, 행동, 보상, 다음상태, done flag를 리플레이 버퍼에 축적하는 함수 구현
    def remember(self, state, action, reward, next_state, done):
        item = (state, action, reward, next_state, done)
        self.buffer.append(item)


    ## get action
    def get_action(self,state, pre_noise):
        action = self.actor(tf.convert_to_tensor([state], dtype=tf.float32))
        # print(f'action_raw : {action}')
        action = action.numpy()[0]
        ## noise
        noise = self.ou_noise(pre_noise, dim=self.action_size)
        # clip continuous action to be within action_bound
        action = np.clip(action + noise, -self.action_bound, self.action_bound)

        # print(f'action : {action}')
        return action , noise



      ## Soft update Target network
    def update_target_network(self, TAU):
        theta = self.actor.get_weights()
        target_theta = self.target_actor.get_weights()
        for i in range(len(theta)):
            target_theta[i] = TAU * theta[i] + (1 - TAU) * target_theta[i]
        self.target_actor.set_weights(target_theta)

        phi = self.critic.get_weights()
        target_phi = self.target_critic.get_weights()
        for i in range(len(phi)):
            target_phi[i] = TAU * phi[i] + (1 - TAU) * target_phi[i]
        self.target_critic.set_weights(target_phi)


    ## single gradient update on a single batch data
    def critic_learn(self, states, actions, td_targets):
        with tf.GradientTape() as tape:
            q = self.critic([states, actions], training=True)
            loss = tf.reduce_mean(tf.square(q - td_targets))

        grads = tape.gradient(loss, self.critic.trainable_variables)
        self.critic_opt.apply_gradients(zip(grads, self.critic.trainable_variables))

    ## train the actor network
    def actor_learn(self, states):
        with tf.GradientTape() as tape:
            actions = self.actor(states, training=True)
            critic_q = self.critic([states, actions])
            loss = -tf.reduce_mean(critic_q)

        grads = tape.gradient(loss, self.actor.trainable_variables)
        self.actor_opt.apply_gradients(zip(grads, self.actor.trainable_variables))
  
    ## Ornstein Uhlenbeck Noise
    def ou_noise(self, x, rho=0.15, mu=0, dt=1e-1, sigma=0.2, dim=1):
        return x + rho*(mu - x)*dt + sigma*np.sqrt(dt)*np.random.normal(size=dim)


    ## computing TD target: y_k = r_k + gamma*Q(x_k+1, u_k+1)
    def td_target(self, rewards, q_values, dones):
        y_k = np.asarray(q_values)
        for i in range(q_values.shape[0]): # number of batch
            if dones[i]:
                y_k[i] = rewards[i]
            else:
                y_k[i] = rewards[i] + self.gamma * q_values[i]
        return y_k

    ## load actor weights
    def load_weights(self, path):
        self.actor.load_weights(f'./{path}/actor/mountain_car.h5')
        self.critic.load_weights(f'./{path}/critic/mountain_car.h5')


    ## train
    def train_model(self):
        ### replay memory 에서 random하게 minibatch 만큼 샘플을 가져옴
        mini_batch = random.sample(self.buffer, self.batch_size)
        # mini_batch에서 각 아래 정보로 분리하기
        states, actions, rewards, next_states, dones = zip(*mini_batch)

        # 분리된 정보를 tensor 형태로 변환
        states = tf.convert_to_tensor(states)
        actions = tf.convert_to_tensor(actions)
        rewards = tf.convert_to_tensor(rewards)
        next_states = tf.convert_to_tensor(next_states)
        # dones를 True False로 바꿀 껀데 tf.float32 실수 형태로 바꿔 주는코드 (1.0 , 0.0)
        dones = tf.convert_to_tensor(dones, dtype=tf.float32)


        # predict target Q-values
        target_qs = self.target_critic([
                                        next_states,   ## next_state (s_prime)
                                        self.target_actor(next_states) ## next_action (a_prime)
                                      ])
        
        # compute TD targets
        y_i = self.td_target(rewards, target_qs.numpy(), dones)

        # train critic using sampled batch
        self.critic_learn(states ,   ### state (s)
                          actions,   ### action (a)
                          y_i )      ## TD target: y_k = r_k + gamma*Q(x_k+1, u_k+1)

        # train actor
        self.actor_learn(states)

        # update both target network
        self.update_target_network(self.TAU)

### Env

ENV_NAME = 'MountainCarContinuous-v0'
env = gym.make(ENV_NAME)

# 비디오 레코딩
env = RecordVideo(env, './train', episode_trigger =lambda episode_number: True )
# env.metadata = {'render.modes': ['human', 'ansi']}

# MountainCar 환경의 상태와 행동 크기 정의
state_size = env.observation_space.shape[0]
action_size = env.action_space.shape[0]
max_action = env.action_space.high[0]

# print(f'state_size : {state_size} , action_size:{action_size} , max_action: {max_action}')

# 위에서 정의한 DDPG 클래스를 활용하여 agent 정의
agent = DDPGagent(state_size, action_size, max_action)



## 초기값
success = 0
max_position = -0.4
# initial transfer model weights to target model network
agent.update_target_network(1.0)

num_episode = 20


for ep in range(num_episode):
      # reset episode
    step, time, episode_score, done = 0 ,0, 0, False

    # 초기 noise 설정
    pre_noise = np.zeros(agent.action_size)

    # 환경 reset을 통해 초기 상태 정의
    state = env.reset()

    while not done:
        action , noise = agent.get_action(state, pre_noise) ## actor network로 action 생성 // 다음 step 때 pre_noise를 여기서 생성된 noise로 사용

        # observe reward, new_state
        next_state, reward, done, _ = env.step(action)

        # score
        episode_score += reward


        ## 보상설계
        car_pos = next_state[0]
        car_vel = next_state[1]


        ## 2차함수로 만들어 속도가 커지게 더큰 리워드를 위치에 따라 받게함
        if car_vel > 0:
            reward = float(((car_pos+0.5)*20)**2/10+15*car_vel - step/300) 
        else:
            reward = float(((car_pos+0.5)*20)**2/10 - step/300) 


        ### max position   
        if car_pos > max_position:
          ## max position
          max_position = car_pos 

        ## 성공 시 success
        if car_pos >=  0.45:
            reward+=100
            success += 1

        step+=1

        # add transition to replay buffer
        train_reward= reward

        # 획득된 상태, 행동, 보상, 다음상태, done flag를 리플레이 버퍼에 축적
        agent.remember(state, action, reward, next_state, done)


        # buffer 크기가 일정 기준 이상 쌓이면 학습 진행
        if len(agent.buffer) >= agent.buffer_size_train_start :
            agent.train_model()


        # update current state
        pre_noise = noise
        state = next_state
        time += 1

    ## display rewards every episode
    print(f'Episode: {ep+1}, Success: {success}, max_position: {max_position :.2f}, Time: {time}, Reward: {episode_score :.2f}')

    agent.save_epi_score.append(episode_score)

    ## save weights every episode
    #print('Now save')
    save_path = './save_weights'
    try:
        os.makedirs(f'{save_path}/critic')
        os.makedirs(f'{save_path}/actor')
        print("make folder")
    except:
        pass


    agent.actor.save_weights(f"{save_path}/actor/mountain_car.h5")
    agent.critic.save_weights(f"{save_path}/critic/mountain_car.h5")


np.savetxt('./save_weights/mountain_car_epi_reward.txt', agent.save_epi_score)
print(agent.save_epi_score)


plot_result(agent.save_epi_score)

### 학습된 agent 영상 확인

### max episode
### nan이 젤 큰값이므로 이값을제거하고 계산함
episode=np.argmax(agent.save_epi_score)
# episode=4
filename = 'rl-video-episode-{}.mp4'.format(episode)
print("최대 avg : {} ,에피소드 번호 : {}".format(max(agent.save_epi_score) , episode))
show_video(filename=filename)

### 학습된 weights로 테스트

ENV_NAME = 'MountainCarContinuous-v0'
env = gym.make(ENV_NAME)
# 비디오 레코딩
env = RecordVideo(env, './test', episode_trigger =lambda episode_number: True )
agent = DDPGagent(env)
agent.load_weights('./save_weights/')

time = 0
state = env.reset()

while True:
    action = agent.actor(tf.convert_to_tensor([state], dtype=tf.float32)).numpy()[0]
    # print(action.shape)
    state, reward, done, _ = env.step(action)
    time += 1
    
    if done:
       print('Time: ', time, 'Reward: ', reward)
       break
       
### video 
show_video(mode='test')

 

3)  결과확인

-> DDPG 알고리즘 적용결과 Continuos Mountain Car 문제는 잘 해결이 되었다. 이론을 정리할 때 stochastic policy gradient 알고리즘은 아래와 같이 state와 action의 샘플이 다 필요하지만

DDPG 알고리즘에서 사용하는 Deterministic Policy Gradient의 경우는 아래와 같이 state의 정보만 필요함으로

학습이 효율적이라고 설명하였는 데, 실제 알고리즘을 구현하여 적용해보니, 에피소드를 많이 수행하지 않아도 문제를 잘 해결하는 것을 확인할 수 있었다. 그렇지만 DQN처럼 replay buffer를 사용하기 때문에 아무래도 학습에 시간이 많이 소요되었다. 

DDPG 알고리즘의 경우 성능이 단조적으로 증가(monotonically)하지 않는 다고 하였는 데, 이는 성능 개선이 계속해서 일어나는 것을 보장하지 못한다는 것으로 수렴성이 크게 좋지 않다는 것을 의미한다. 이를 개선하기 위해 가능한한 학습의 크기를 증가하면서 성능개선을 보장하는 TRPO , PPO 등의 알고리즘이 등장하였다.

TRPO의 경우 이론이 매우 어렵고, 알고리즘으로 구현하는 게 쉽지 않지만 PPO의 경우 구현이 간단하여 많이 사용되는 있는 알고리즘이다. 다음 실습에서는 PPO알고리즘에 대하여 공부하고 실습을 진행하려고 한다.

[DDPG]Continuos_MountainCar_final.ipynb
0.02MB

반응형
Comments