qcoding

[강화학습]정책기반 강화학습_Policy Gradient_Reinforce_Cartpole 본문

머신러닝 딥러닝

[강화학습]정책기반 강화학습_Policy Gradient_Reinforce_Cartpole

Qcoding 2023. 2. 5. 10:41
반응형

* 이번에 진행할 학습은 Policy Gradient 방법의 기본적인 Reinforce 알고리즘을 통해 Cartpole 문제를 해결하는 것이다.

이번글에서 정리할 주된 내용은 Policy Gradient를 구현하고, Catpole 문제를 해결하는 코드와 Mountain Car 문제에 적용하였을 때, 문제가 해결되지 않았는 데 그 이유를 한번 고민해보는 과정이다.

 

카트폴에 대한 문제 이해는 이전에 썻던 글을 참고 하면 도움이 될 것 같다.

2023.01.14 - [머신러닝 딥러닝] - [강화학습]Cartpole(카트폴) Deep Q-learning (Dqn) 실습

 

[강화학습]Cartpole(카트폴) Deep Q-learning (Dqn) 실습

[Deep Q-learning] * 이번실습은 강화학습 실습으로 유명한 Carpole 을 deep q-learning으로 구현해보는 실습을 진행하였다. DQN은 미래에 받을 가치와 현재 가치의 차이를 줄이면 현재의 가치를 최적의 상태

qcoding.tistory.com

 

이번 실습은 아래와 같은 순서로 진행된다

 

1) Policy Gradient 이론 소개

2) Reinforce 알고리즘 소개 

3) Cartpole 문제 적용 코드

4) Mountain car 문제 적용 코드 

5) 평가 결과

    ( + Cartpole 문제는 해결하였지만, Mountain Car는 해결하지 못한 이유 고민해보기)

 

1) Policy Gradient 이론 소개

-> 강화학습 알고리즘은 위와 같이 Value-Based / Policy-Based 로 구분되어 있다. value-based는 sarsa, q-learing, dqn 등 value function (가치함수) 중 q 함수(행동가치 함수)를 가지고 정책을 결정하는 것으로, implicit policy (암묵적인) 정책으로 입실론-그리디 정책을 사용한다. 즉, 정책자체가 존재하지는 않지만 가치함수를 가지고 argmax의 행동을 취하는 암묵적인 정책을 사용한다.

-> Poilicy gradient는 가치함수를 가지고 계산하는 것이 아닌 직접 policy를 학습하며 구하게 된다. 이 때 policy를 평가하기 위해서 그에 맞는 목적함수를 구해서 Policy를 발전시키는 방향으로 진행된다.

해당 내용을 좀더 정확하게 살펴보기 위해서 아래와 같이 단계별로 살펴보도록 하자.

(* 아래의 ppt 자료는 https://www.davidsilver.uk/teaching/ 의 강의에서 사용되는 교안이며, 유튜브를 통해 강의를 접할 수 있다. )

-> 좋은 Policy를 구하는 것을 위해서 목적 함수 J를 정하는 과정에 대해 설명하고 있다. 상태 s에서 Policy를 따라 받을 수 있는 가치가 크다면 이는 곧 좋은 policy 이므로 상태 s에서의 상태가치함수의 기대값으로 정의할 수 있다. 또한 Markov Chain에서 상태 s에 머무는 확률은 stationary distribution 이라고 하며, 결국 상태 s에 머무르는 확률 x 상태 s의 가치함수로 표현한다.

위와 같이 상태가치함수는 Bellanman 기대 방정식에 따라 policy x q함수로 변경할 수 있다. 결국 우리가 구하고자 하는 것은 목표함수(J)를 가장 크게 하는(Local Maximum) Parameter theta로 이루어진 Policy 를 찾는 것이 된다.  이를 위해 theta를 변경해 가며 J값을 확인하기 위해서 미분을 사용한다.

▽J(theta)를 구하기 위해서 미분을 활용하고 식을 정리하면 아래와 같다.

결론적으로 ▽J(theta)는 위와 같이 기대값으로 나타 낼 수 있으며, 기대값으로 나타내면 sampling을 통해서 구할 수 있으므로 이 때 등장하는 것이 Monte Carlo 방법을 통한 ReinForce 알고리즘이다. 추가로 아래와 같은 식에서 r (한스텝 reward)가 G_t (리턴값) 으로 변경해서 써주면 ReinForce 알고리즘이 된다.

 

2) Reinforce 알고리즘 소개 

--> Reinforce 알고리즘의 pseudo-code 는 아래와 같다.

여기서 Tensorflow를 통해 구현할 때 몇가지 유의할 점이 있어서 잊어버리지 않기 위해서 아래와 같이 작성하였다.

Loss는 ▽J(theta) 가 아닌 J(theta)가 되어야 하므로, 위와 같이 ▽가 지워진 지게 되며, Loss는 항상 최소화 되게 하는 함수이므로 Gradient Ascent 구현을 위하여 (-) 부호를 붙여 준다. 그렇게 되면 리턴값에 따라 확률을 증가시키거나 감소 시키게 학습이 진행된다. 여기서 이 글의 5번에서 설명할 Reinforce 알고리즘에서 Cartpole은 문제는 해결하였으나, Mountain Car 문제는 해결하지 못한 이유를 고민해 보았으며 리턴값이 (-)인 경우에 Loss를 최소화 하게 하려다 보니 확률을 낮춰야 되는데, log 1이 0이 되므로 Loss를 0으로 만들어 버리게 학습하여 잘못학습이 이루어진 것 같다는 생각이 든다. 

아래는 일반적인 tensorflow에서 gradientTape을 사용하는 방법에 대한 설명이다.

3) Cartpole 적용코드

*본 코드는 아래의 책을 구매하고 공부하면서 참조하였으며, 편의를 위하여 몇가지 코드를 변경하여 사용하였다.

## 패키지 설치

!sudo apt-get install -y python-numpy python-dev cmake zlib1g-dev libjpeg-dev xvfb \
    xorg-dev python-opengl libboost-all-dev libsdl2-dev swig
!pip install pyvirtualdisplay
!pip install piglet

## gym
!pip install gym[classic_control]

##ffmpeg
!sudo apt-get install ffmpeg -y

## Colab에서 비디오 재생을 위한 코드

### import
from pyvirtualdisplay import Display
display = Display(visible=0, size=(1400, 900))
display.start()
from base64 import b64encode
from glob import glob
from IPython.display import HTML
from IPython import display as ipy_display
from gym import logger as gym_logger
from gym.wrappers.record_video import RecordVideo

#### show video func
def show_video(mode='train', filename=None):
    mp4_list = glob(mode+'/*.mp4')
    # print(mp4_list)
    if mp4_list:
        if filename :
            file_lists = glob(mode+'/'+filename)
            if not file_lists:
                print('No {} found'.format(filename))
                return -1
            mp4 = file_lists[0]
                    
        else:
            mp4 = sorted(mp4_list)[-1]

        print(mp4)
        video = open(mp4, 'r+b').read()
        encoded = b64encode(video)
        ipy_display.display(HTML(data='''
            <video alt="gameplay" autoplay controls style="height: 400px;">
                <source src="data:video/mp4;base64,%s" type="video/mp4" />
            </video>
        ''' % (encoded.decode('ascii'))))
    else:
        print('No video found')
        return -1

## 라이브러리 import & Early stop Class

import tensorflow as tf
from tensorflow import keras

from collections import deque
import numpy as np
import random
import gym
from matplotlib import pyplot as plt
import seaborn as sns
import pandas as pd
import os
import warnings
warnings.filterwarnings(action='ignore')

#### early stopping by avg
class EarlyStopping_by_avg():
    def __init__(self, patience=10, verbose=0):
        super().__init__()

        self.best_avg = 0
        self.step = 0
        self.patience = patience
        self.verbose = verbose

    def check(self, avg , avg_scores):
        ## best avg가 나올경우
        if avg >= self.best_avg:
            self.best_avg = avg
            self.step = 0
            # print("avg_reset")
        ## 이전값보다 현재 avg가 높을경우
        elif len(avg_scores) > 1 and avg > avg_scores[-2]:  ### 이전 값과 비교해야하므로 -2  , -1은 지금 avg와 동일함
            self.step = 0
            # print("이전값보다 avg 높아서 reset")
        else:
            self.step += 1
            if self.step > self.patience:
                if self.verbose:
                    print('조기 종료')
                return True
        return False

 

## agent 코드 (ReinForce 알고리즘)

class ReinForceAgent():
    def __init__(self, state_size, action_size):
        ## 상태 및 행동 size 정의
        ## stae = 2가지 정보 , action = 3가지 정보
        self.state_size = state_size
        self.action_size = action_size

        # REINFORCE 하이퍼 파라메터
        ## carpole = 0.98  / mountain_car = 0.999
        self.discount_factor = 1.0
        self.learning_rate = 0.0002

        # REINFORCE 신경망 모델 생성
        self.model = self.reinforce_dnn()
        self.optimizer = keras.optimizers.Adam(learning_rate = self.learning_rate)
        self.states, self.actions, self.rewards = [], [], []



    # 정책신경망으로 행동 선택
    def get_action(self, state):
        policy = self.model(state)[0]
        policy = np.array(policy)
        action =  np.random.choice(self.action_size, 1, p=policy)[0]
        # print(f'action : {action} , policy:{policy}')
        return action



    ### Dnn 모델 생성   
    def reinforce_dnn(self):
        ### state가 들어감
        input_ = keras.layers.Input(shape=(self.state_size))
        x = keras.layers.Dense(128, activation='relu')(input_)
        # x = keras.layers.Dense(128, activation='relu')(x)
        x = keras.layers.Dense(24, activation='relu')(x)

        ## output은 action을 할 확률이므로 행동(=action)의 수와 동일하게 생성
        output = keras.layers.Dense(self.action_size, activation='softmax')(x)
        model = keras.models.Model(inputs=[input_], outputs=[output])

        return model   


    ### return값 계산 (G_t)
    def discount_rewards(self, rewards):
        discounted_rewards = np.zeros_like(rewards)
        running_add = 0
        for t in reversed(range(0, len(rewards))):
            running_add = running_add * self.discount_factor + rewards[t]
            discounted_rewards[t] = running_add
        return discounted_rewards

    # 한 에피소드 동안의 상태, 행동, 보상을 저장
    def append_sample(self, state, action, reward):
        self.states.append(state)    ## 에피소드 동안 state 저장
        self.rewards.append(reward)  ## 에피소드 동안 reward 저장
        one_hot_a = tf.one_hot(action , self.action_size)  ## 실제 한 action 0, 1, 2 를 one-hot encoding 변경 --> 1 이면 [0 , 1, 0]. 2면 [0. 0 .1]
        self.actions.append(one_hot_a)    



    # 정책신경망 업데이트
    def train_model(self):
        ## G_t 값을 정규화 하면 학습이 좀더 잘됨.
        discounted_rewards = np.float32(self.discount_rewards(self.rewards))
        discounted_rewards -= np.mean(discounted_rewards)
        discounted_rewards /= np.std(discounted_rewards)
        
        # 크로스 엔트로피 오류함수 계산
        model_params = self.model.trainable_variables
        with tf.GradientTape() as tape:
            tape.watch(model_params)
            policies = self.model(np.array(self.states))  ### 에피소드 동안 실제 있었던 state 저장 = self.states   // output은 action을 할 확률 []
            actions = np.array(self.actions)   ### 에피소드 동안 실제 행동 했던 action 저장 = self.actions    one-hot encoding 형태
            action_prob = tf.reduce_sum(actions * policies, axis=1)   ## 실제 행동했던 action의 확률을 구함 -> 에피소드안에 있는 state의 길이만큼의 action_prob가 존재 [ 0.3, 0.5, 0.7 ...]
            cross_entropy = - tf.math.log(action_prob + 1e-5)
            loss = tf.reduce_sum(cross_entropy * discounted_rewards)
            entropy = - policies * tf.math.log(policies)

        # 오류함수를 줄이는 방향으로 모델 업데이트
        grads = tape.gradient(loss, model_params)
        self.optimizer.apply_gradients(zip(grads, model_params))
        self.states, self.actions, self.rewards = [], [], []
        return np.mean(entropy)

### env 코드 (Cartpole)

# CartPole 환경 정의
ENV_NAME = 'CartPole-v1'
# env = gym.make(ENV_NAME, render_mode="rgb_array")
env = gym.make(ENV_NAME)

# 비디오 레코딩
env = RecordVideo(env, './train', episode_trigger =lambda episode_number: True )
env.metadata = {'render.modes': ['human', 'ansi']}

# CartPole 환경의 상태와 행동 크기 정의
state_size = env.observation_space.shape[0]
action_size = env.action_space.n

# 위에서 정의한 DQN 클래스를 활용하여 agent 정의
agent = ReinForceAgent(state_size, action_size)

load_model = True

if load_model:
  # 위에서 정의한 DQN 클래스를 활용하여 agent 정의
  load_model = '500th'
  agent.model.load_weights(f'save_model/cartpole_model/{load_model}/')

scores, avg_scores, episodes, losses = [], [], [], []

# 반복 학습 에피소드 수 정의
num_episode = 1000
early_stopping_by_avg = EarlyStopping_by_avg(patience=30, verbose=1)
## early stopping을 위한 초기값 설정 
avg_step = 0

print_interval = 20

for epoch in range(num_episode):
    # done flag와 score 값 초기화
    done = False
    score = 0


    # 환경 reset을 통해 초기 상태 정의
    state = env.reset()
    
    # print(f"avg: {avg_step}")
    if early_stopping_by_avg.check(avg_step , avg_scores ):
        print("earstpping 실행")
        break

    while not done:

        # 현재 상태에 대하여 행동 정의
        action = agent.get_action(np.expand_dims(state, axis=0))


        # env.step 함수를 이용하여 행동에 대한 다음 상태, 보상, done flag 등 획득
        next_state, reward, done, info = env.step(action)
        
        # 해당 에피소드의 최종 score를 위해 reward 값 누적
        score += reward

        # 기본 환경은 pole이 쓰러지지 않으면 +1의 보상을 준다.
        # 자신만의 보상가설을 만들어 학습 가능
        ### 종료조건
        # 폴 각도는 ±12° 이상입니다.
        # 카트 위치가 ±2.4 이상(카트 중앙이 디스플레이 가장자리에 도달함)
        # 에피소드 길이가 200보다 큽니다.
        # TODO #
        
        def get_reward(pos, angle , done):
            ### 위치 / 속도 조건으로 보상크게
            cond_pos = (pos < 2.0) and (pos > -2.0)
            cond_angle = (angle < 5.0) and (angle > -5.0)
            ### 실패시 보상 -1
            if done:
                return -100.0
            ### 상점
            elif cond_pos or cond_angle:
                return 0.1
            elif cond_pos:
                return 0.3
            elif cond_pos and cond_angle:
                return 0.5
            ### 벌점
            elif (pos > 2.5) or (pos < -2.5):
                return -20
            elif (angle > 10.0) or (angle < -10.0):
                return -10
            
            
        ### position
        pos = next_state[0]
        ### velocity
        angle = next_state[2]
        reward = get_reward(pos, angle, done)

        agent.append_sample(state, action, reward)

        # 다음 상태를 현재 상태로 정의
        state = next_state


        if done:

            entropy = agent.train_model()
            
            ### early stop
            avg_step = np.mean(scores[-10:])


            # 에피소드 종료마다 결과 그래프 저장
            scores.append(score)
            avg_scores.append(avg_step)
            episodes.append(epoch)

            
            # 에피소드 종료마다 결과 출력
            if epoch % print_interval == 0:
              print(f'episode: {epoch:3d} | avg_score: { avg_step :3.2f}')

            # 100 에피소드마다 모델 저장
            if epoch % 200 == 0:
               agent.model.save_weights(f'save_model/cartpole_model/{epoch}th/', save_format='tf')
            

env.close()

plt.title('Test graph')
plt.xlabel('episodes')

plt.plot(episodes, avg_scores,
         color='skyblue',
         marker='o', markerfacecolor='blue',
         markersize=6)
plt.ylabel('avg_scores', color='blue')
plt.tick_params(axis='y', labelcolor='blue')

plt.savefig('cartpole_graph.png')
plt.show()

### 비디오 재생 코드

### max episode
### nan이 젤 큰값이므로 이값을제거하고 계산함
avg_scores_fil = [x for x in avg_scores if np.isnan(x) !=True]
episode=np.argmax(avg_scores_fil)
# episode=88
filename = 'rl-video-episode-{}.mp4'.format(episode)
print("최대 avg : {} ,에피소드 번호 : {}".format(max(avg_scores_fil) , episode))
show_video(filename=filename)

### Test 시 저장된 weigt를 불러와서 확인하는 코드

# CartPole 환경 정의
ENV_NAME = 'CartPole-v1'
# env = gym.make(ENV_NAME, render_mode="rgb_array")
env = gym.make(ENV_NAME)

# 비디오 레코딩
env = RecordVideo(env, './test', episode_trigger =lambda episode_number: True )


# CartPole 환경의 상태와 행동 크기 정의
state_size = env.observation_space.shape[0]
action_size = env.action_space.n

# 위에서 정의한 DQN 클래스를 활용하여 agent 정의
load_model = '800th'
agent = ReinForceAgent(state_size, action_size)
agent.model.load_weights(f'save_model/cartpole_model/{load_model}/')

score = 0
state = env.reset()
done = False
    
while not done:

    # 현재 상태에 대하여 행동 정의
    action = agent.get_action(np.expand_dims(state, axis=0))


    # env.step 함수를 이용하여 행동에 대한 다음 상태, 보상, done flag 등 획득
    next_state, reward, done, info = env.step(action)
    
    # 해당 에피소드의 최종 score를 위해 reward 값 누적
    score += reward


    state = next_state

print("Score: ", score)

4) Mountain Car 적용코드

## env 만 변경하면됨

#### 학습 환경
# CartPole 환경 정의
ENV_NAME = 'MountainCar-v0'
env = gym.make(ENV_NAME)

# 비디오 레코딩
env = RecordVideo(env, './train', episode_trigger =lambda episode_number: True )
env.metadata = {'render.modes': ['human', 'ansi']}

# CartPole 환경의 상태와 행동 크기 정의
state_size = env.observation_space.shape[0]
action_size = env.action_space.n

load_model=False

# 위에서 정의한 DQN 클래스를 활용하여 agent 정의
agent = ReinForceAgent(state_size, action_size)

if load_model:
  # 위에서 정의한 DQN 클래스를 활용하여 agent 정의
  load_model = '200th'
  agent.model.load_weights(f'save_model/mountain_model/{load_model}/')

scores, avg_scores, episodes ,avg_rewards  = [], [], [], [] 

# 반복 학습 에피소드 수 정의
num_episode = 1000
early_stopping_by_avg = EarlyStopping_by_avg(patience=2, verbose=1)
## early stopping을 위한 초기값 설정 
avg_reward_episode = 0
success = 0
max_position = -0.4
print_interval = 20

scores, episodes = [], []

for epoch in range(num_episode):
    # done flag와 score 값 초기화
    done = False
    score = 0
    step = 0
    rewards_list = [] 

    # 환경 reset을 통해 초기 상태 정의 --> state는 [car_position , car_velocity] 2개의 값을 받는데 
    # car_position = -0.6 ~ 0.4 사이의 값을 받으며 , car_velocity = 0 을 받음
    state = env.reset()
    # print(f'init_state : {state}')
    
    # print(f"avg: {avg_step}")
    if early_stopping_by_avg.check(avg_reward_episode , avg_scores ):
        print("earstpping 실행")
        break
        
    while not done:

        # 현재 상태에 대하여 행동 정의
        # action = agent.policy(init_state[np.newaxis,:]) ### network에 넣어 주기 위해서 기존 (2,) 배열을 (1,2)로 축을 추가함
        action = agent.get_action(np.expand_dims(state, axis=0))
        ### 위와 같은 의미 np.expand_dims(a, axis=0)
        
        # env.step 함수를 이용하여 행동에 대한 다음 상태, 보상, done flag 등 획득
        ### return ex) nex_state=[-0.57330334 -0.00063329]
        ###            reward=-1.0
        ###            done=False
        ###            info={}
        next_state, reward, done, info = env.step(action)
        


        ### reward 설계
        #### 목표는 가능한 한 빨리 오른쪽 언덕 위에 놓인 깃발에 도달하는 것이므로 에이전트는 각 타임스텝에 대해 -1의 보상으로 페널티를 받습니다.
        #### 다음 중 하나가 발생하면 에피소드가 종료됩니다.

        ###종료: 자동차의 위치가 0.5보다 크거나 같습니다(오른쪽 언덕 위의 목표 위치).
        ###잘림: 에피소드 길이는 200입니다.

        ### reward 설계
        #### 목표는 가능한 한 빨리 오른쪽 언덕 위에 놓인 깃발에 도달하는 것이므로 에이전트는 각 타임스텝에 대해 -1의 보상으로 페널티를 받습니다.
        #### 다음 중 하나가 발생하면 에피소드가 종료됩니다.

        ###종료: 자동차의 위치가 0.5보다 크거나 같습니다(오른쪽 언덕 위의 목표 위치).
        ###잘림: 에피소드 길이는 200입니다.
        # reward=0.1 if not done else -1
        
        ### position에 따라서 action이 맞으면 reward를 줌
        car_pos = next_state[0]
        car_vel = next_state[1]
        # print(f'step : {step} , score : {score:.2f} , max_position : {max_position:.2f}, success : {success}')
        ## pos  = [ -1.2 , 0.6 ]  --> 구간나눔  pos < -0.6   , -0.6 < pos < -0.4 ,   pos >-0.4
        ## vel  = [ -0.07  , 0.07 ] --> 
        # print(car_pos, car_vel)
        ##### 여기서는 pos가 reward 임


        # def get_reward(car_pos, car_vel,max_position,step):
          ## 2차함수로 만들어 속도가 커지게 더큰 리워드를 위치에 따라 받게함
        if car_vel > 0:
          reward = float(((car_pos+0.5)*20)**2/10+15*car_vel-step/400) * 5
        else:
          reward = float(((car_pos+0.5)*20)**2/10 - step/400) * 5

          ### max position   
        if car_pos > max_position:
          ## max position
          max_position = car_pos 
          reward = 20 ### 학습을 위해 큰 보상을 주는 선택을 해봄 -> 소용 x


          ## 성공 시 success
        if car_pos >= 0.5:
           success +=1
  
        else:
          # reward = reward - 0.01  ### 벌점 reward를 안주면 가만히 있으려고만 함. 가만히 있으면 안되기 때문에 위에서 받은 reward에서 -1을 빼줌
          score -= 1  

        if action == 0:
          reward += -20

  
        step += 1
        # print(f'reward : {reward}')

        # 획득된 상태, 행동, 보상, 다음상태, done flag를 리플레이 버퍼에 축적
        rewards_list.append(reward)

        ## sample 넣어줌 --> 모든 것이 반영된 sample을 넣어주어야 되므로 가장 마지막에 있어야함.
        agent.append_sample(state, action, reward)
        
        # 다음 상태를 현재 상태로 정의
        state = next_state
                    
        if done:

            # 에피소드마다 정책신경망 업데이트
            entropy = agent.train_model()
            
            ### early stop
            avg_score_episode = np.mean(scores[-10:])
            avg_reward_episode = np.mean(rewards_list)


            # 에피소드 종료마다 결과 그래프 저장
            #### reward --> 매 스텝에서 받은 reward의 양 ( state에서 알맞는 action을 많이 할 수록 reward가 커짐)
            avg_rewards.append(avg_reward_episode)


            #### score --> 환경에서 reward는 매 step마다 -1 감소 인데, 200 step이 되면 종료되므로, -200 보다 커질수록 높은 score

            scores.append(score)
            avg_scores.append(avg_score_episode)
            episodes.append(epoch)

            
            # 에피소드 종료마다 결과 출력
            if epoch % print_interval == 0:
               print(f'episode: {epoch:3d} | success: {success} | car_pos_max : {max_position : .3f} | avg_reward : {avg_reward_episode : .3f} | score_avg : {avg_score_episode : .3f} |  step : {step} ')
            
            # 100 에피소드마다 모델 저장
            if epoch % 200 == 0 or epoch % num_episode == 0:
               agent.model.save_weights(f'save_model/mountain_model/{epoch}th/', save_format='tf')

env.close()

        
plt.title('Test graph')
plt.xlabel('episodes')

plt.plot(episodes, avg_scores,
         color='skyblue',
         marker='o', markerfacecolor='blue',
         markersize=6)
plt.ylabel('avg_scores', color='blue')
plt.tick_params(axis='y', labelcolor='blue')
plt.savefig('cartpole_graph.png')
plt.show()

5) 평가 결과

5-1) Cartpole 시험 시

--> 우선 Reinforce 알고리즘은 action을 policy에 의해서 결정된다. 다른 q-learing이나 dqn의 경우 행동가치함수(q)가 가장 큰 action을 선택하여, 해당 state에서 하는 action이 결정이 되어 있었다. 그러나 이번 policy based 알고리즘에서는 확률에 의해 action이 선택된다. 아래의 코드를 참고.

    # 정책신경망으로 행동 선택
    def get_action(self, state):
        policy = self.model(state)[0]
        policy = np.array(policy)
        action =  np.random.choice(self.action_size, 1, p=policy)[0]
        # print(f'action : {action} , policy:{policy}')
        return action

그렇기 떄문에 동일한 상태에서도 action이 달라지게 되고, 매번 다른 행동하게 된다. 또한 학습이 dqn 알고리즘 보다 늦게 이루어져서 처음에는 잘못되고 있나 싶었지만 에피소드를 늘리고, discount factor를 변경하면서 학습이 잘 진행되도록 설정하였다.

1000번의 에피소드를 지나면 위와 같이 학습이 잘 진행되는 것을 확인할 수 있었다.

 

5-2) Mountain car 

-> Mountain car의 경우 결국에는 해당 알고리즘으로 해결하지 못하였다. 이것저것 여러번 반복하고, 에피소드를 크게 늘려 보았지만 효과가 없었다.

안되는 이유에 대해서 구글링을 통해서 정확한 이유를 알아보려고 했지만 정확한 이유는 찾기가 어려웠고 대략 고민을 통해 얻어 낸 것을 써보려고 한다. 

 

*** Actor Critic을 실습하다가 여러자료들에서 Mountain Car 문제가 해결안되는 것을 보고 아래 내용을 수정하였다.

 

1) 우선 Mountain car 문제는 Cartpole 문제와는 다르게 즉각적인 보상을 받기보다는 미래의 goal에 도착하면 큰 보상을 받는 환경이다. 

또 다른 큰 차이점은 dqn은 off-policy 이고, Reinforce 의 경우 on-policy 이다. dqn은 학습의 target이 state 상에서 받을 수 있는 가장 큰 가치를 사용하여, 항상 최고의 상태값을 받지만, Reinforce은 확률적으로 정책을 선택하므로 상황에 맞는 다른 가치를 받게 된다. 사실 상황에 따라 다른 가치를 받아 정책을 발전하는 것 자체가 Policy Gradient 알고리즘의 의미인데, Mountain car에서 문제는 꾸준한 보상이 쌓여 행동을 개선시켜 나가야 되는데, 보상 자체가 쌓이기 어려운 환경이라는 것이다. 

다만 아래의 reward 설계에서 행동에 따라 꾸준한 보상을 줄 수 있게 설계를 하였으나, state에서 확률적으로 선택하는 정책이 바뀌기 때문에 선택하는 action이 3개나 되는 이 상황에서 도움이 되지 않은 action이 계속 된다면 문제를 해결할 방향은 더 어려워 질 것 이다. 이것이 off-policy에서 최적의 행동가치를 갖는 action을 선택하는 dqn과 다른 점일 것이다.

        if car_vel > 0:
          reward = float(((car_pos+0.5)*20)**2/10+15*car_vel-step/400) 
        else:
          reward = float(((car_pos+0.5)*20)**2/10 - step/400)

조금 더 개선해 볼 것이 위에서 -step/400 을 변경해 보려고 한다. 그 이유는 처음에는 step이 낮으므로 저 term이 크지 않아서 보상이 되겠지만 step term이 커지기 시작하기 전에 pos이 0.5에 도착해서 보상을 받지 않으면 step term이 커져서 지금껏 했던 reward 들이 다시 작아져서 상황에 맞는 행동들이 다시 확률이 줄어들 것이다. Actor Crtic에서 이 과정을 변경해봐야 겠다.

 

 

2) 두번 째로 아래와 같은 이유가 있는 것 같다. 

 

위의 그래프를 보면 좋은 action 나쁜 action에 따라 Policy의 분포가 균등 또는 치우치는 방향으로 변경될 것 이다. 그런데 Mountain car에서 학습된 policy를 보면 0번 action ( 왼쪽 )으로 가는 것의 확률이 크게 나타나는 것을 볼 수 있다.

먼가 보상 설계가 잘못되거나 알고리즘 특징에 따라서 이렇게 나올 텐데, 생각해 보면 0번 action을 할 때 이 행동에 따른 reward가 계속 안좋게 나와야 보상이 약화 될 것이다. 하지만 Reinforce 특성상 에피소드에서 어떤 action을 취하는 지에 따라 매우 다양한 보상값이 나오게 되므로 상대적으로 우리가 원하는 보상을 받을 기회가 많지 않게 될 것이다.

위의 이런 복합적인 점이 문제를 해겨하지 못하게 했나 생각된다.

 

다음실습으로는 Actor Critic 방법을 통해 실습을 진행하려고 한다.

 

 

 

반응형
Comments