qcoding

[강화학습]Continuos A2C(연속적 A2C)_mountain Car 본문

머신러닝 딥러닝

[강화학습]Continuos A2C(연속적 A2C)_mountain Car

Qcoding 2023. 2. 15. 22:08
반응형

* 이번 실습은 Continuos A2C 실습으로 아래의 Mountain Car Continuos 환경에 연속적 A2C알고리즘을 적용해보는 실습을 하였다.

https://www.gymlibrary.dev/environments/classic_control/mountain_car_continuous/

 

Mountain Car Continuous - Gym Documentation

Previous Cart Pole

www.gymlibrary.dev

해당 실습의 코드는 아래의 책을 구매한 후 공부를 하며 진행하였으며, 편의를 위해 몇가지 부분을 추가하거나 수정하였다.

https://wikibook.co.kr/reinforcement-learning/

 

파이썬과 케라스로 배우는 강화학습: 내 손으로 직접 구현하는 게임 인공지능

“강화학습을 쉽게 이해하고 코드로 구현하기” 강화학습의 기초부터 최근 알고리즘까지 친절하게 설명한다! ‘알파고’로부터 받은 신선한 충격으로 많은 사람들이 강화학습에 관심을 가지

wikibook.co.kr

앞의 실습에서 A2C를 통해서 Mountain car 문제를 해결해 보려고 했으나, Policy-gradient 인 Reinforce 알고리즘과 value & policy based 인 A2C를 사용하였을 때 모두 실패하였다. 실패의 큰 이유로는 Mountain Car 환경의 경우 goal에 도착하면 큰 보상을 얻는 환경으로, 미리 환경에 대한 정보를 파악하고 보상설계를 진행한다고 해도 문제를 해결하는 데 실패하였다.

( 그림 출처 : https://wnthqmffhrm.tistory.com/19)

 

이번에는 실습할 내용은Continuos A2C로 A2C에서 Policy를 action이 Discreate (이산적) 인 것이 아니라 Continuos(연속적)인 정규분포를 형태로 만드는 것으로 기존과 큰 차이점은 Actor 신경망을 만들 때, 아래와 같이 정책신경망이 μ (평균), 

σ (표준편차) 를 출력하여 이것을 토대로 정규분포 형태의 Policy를 만들어서 action 선택에 사용한다.

아래의 대략의 개략도를 살펴보면 다음과 같다.

위의 신경망에서 sigma / mu 값을 가지고 정규분포를 만드는 데 사용한 라이브러리는 tensorflow_probability 로 아래의 그사용법을 소개하였다.

여기서 유의해야 하는 것은 평균은 활성홤수 linear를 사용하였으며, 값이 크게 나와도 어차피 Env에 따라 줄수 있는 값이 정해져 있으므로 Env.action_space.high 값을 clipping을 하여 사용하였으며, 표준편차의 경우 0~1값이 나오므로 활성화함수로 sigmoid 함수를 사용하였다.

from tensorflow_probability import distributions as tfd

# A standard normal      --> Tensorflow 공식문서 
## loc = 평균 / scale = 표준편차
normal = tfd.Normal(loc=0., scale=1.)

https://www.tensorflow.org/probability/examples/A_Tour_of_TensorFlow_Probability?hl=ko 

 

TensorFlow Probability 둘러보기

TensorFlow Probability 둘러보기 컬렉션을 사용해 정리하기 내 환경설정을 기준으로 콘텐츠를 저장하고 분류하세요. 이 Colab에서는 TensorFlow Probability의 몇 가지 기본 기능을 살펴보겠습니다. 종속성과

www.tensorflow.org

### Continuos 코드에서 사용

# 1) get action함수
# 정책신경망의 출력을 받아 확률적으로 행동을 선택
def get_action(self, state):
    mu, sigma, _ = self.model(state)
    dist = tfd.Normal(loc=mu[0], scale=sigma[0])
    action = dist.sample([1])[0]
    action = np.clip(action, -self.max_action, self.max_action)
    return action
    
    
# 2) train 시 loss 계산
def train_model()
	````
	dist = tfd.Normal(loc=mu, scale=sigma)
	action_prob = dist.prob([action])[0]
    ```

1)에서 get action 함수는 dist.sample([1]) 로 dist라고 만든 정규분포에서 샘플 1개를 뽑아 action으로 사용.

2)에서 실제 수행한 action의 확률을 구하기 위해 dist라고 만든 정규분포에서 action을 넣어 action prob를 구함.

 

위의 내용을 아래의 그림으로 정리해 보면 이렇게 나온다.

(그림 출처 : https://ko.wikipedia.org/wiki/%EC%A0%95%EA%B7%9C_%EB%B6%84%ED%8F%AC)

 

### 코드 정리

# 설치

!sudo apt-get install -y python-numpy python-dev cmake zlib1g-dev libjpeg-dev xvfb \
    xorg-dev python-opengl libboost-all-dev libsdl2-dev swig
!pip install pyvirtualdisplay
!pip install piglet

## gym
!pip install gym[classic_control]

##ffmpeg
!sudo apt-get install ffmpeg -y

# Colab 환경에서 영상을 위한 video

### import
from pyvirtualdisplay import Display
display = Display(visible=0, size=(1400, 900))
display.start()
from base64 import b64encode
from glob import glob
from IPython.display import HTML
from IPython import display as ipy_display
from gym import logger as gym_logger
from gym.wrappers.record_video import RecordVideo

#### show video func
def show_video(mode='train', filename=None):
    mp4_list = glob(mode+'/*.mp4')
    # print(mp4_list)
    if mp4_list:
        if filename :
            file_lists = glob(mode+'/'+filename)
            if not file_lists:
                print('No {} found'.format(filename))
                return -1
            mp4 = file_lists[0]
                    
        else:
            mp4 = sorted(mp4_list)[-1]

        print(mp4)
        video = open(mp4, 'r+b').read()
        encoded = b64encode(video)
        ipy_display.display(HTML(data='''
            <video alt="gameplay" autoplay controls style="height: 400px;">
                <source src="data:video/mp4;base64,%s" type="video/mp4" />
            </video>
        ''' % (encoded.decode('ascii'))))
    else:
        print('No video found')
        return -1

# Agent (Continuous Actor Critic)

class ContinuosActorCritic():
    def __init__(self, state_size, action_size, max_action):
        ## 상태 및 행동 size 정의
        ## stae = 2가지 정보 , action = 3가지 정보
        self.state_size = state_size
        self.action_size = action_size

        # ContinuosActorCritic 하이퍼 파라메터
        ## carpole = 0.98  / mountain_car = 0.999
        self.discount_factor = 0.99
        self.learning_rate = 0.001

        # ContinuosActorCritic 신경망 모델 생성
        self.model = self.continuos_actor_critic_dnn()
        self.max_action = max_action
        self.loss_fn = keras.losses.MeanSquaredError()
        
        # 최적화 알고리즘 설정, 미분값이 너무 커지는 현상을 막기 위해 clipnorm 설정
        self.optimizer = keras.optimizers.Adam(learning_rate = self.learning_rate, clipnorm=5.0)
        
        
    # 정책신경망의 출력을 받아 확률적으로 행동을 선택
    def get_action(self, state):
        mu, sigma, _ = self.model(state)
        dist = tfd.Normal(loc=mu[0], scale=sigma[0])
        action = dist.sample([1])[0]
        action = np.clip(action, -self.max_action, self.max_action)
        return action
        
        
    # Dnn 모델 생성   
    def continuos_actor_critic_dnn(self):
        ### state가 들어감
        input_ = keras.layers.Input(shape=(self.state_size))

        ### Actor (정책평가) -> 정규분포로 변경하기 위해 mu , sigma 생성함 -> 각각의 action에 대한 정규분포를 생성함.
        actor_fc = keras.layers.Dense(24, activation='tanh')(input_)
        actor_mu = keras.layers.Dense(self.action_size, activation='linear', kernel_initializer=keras.initializers.RandomUniform(-1e-3, 1e-3), name='actor_mu')(actor_fc)
        actor_sigma = keras.layers.Dense(self.action_size, activation='sigmoid', kernel_initializer=keras.initializers.RandomUniform(-1e-3, 1e-3), name='actor_sigma')(actor_fc)
        actor_sigma = actor_sigma + 1e-5
        
        ## Critic (가치평가)
        critic_fc1 = keras.layers.Dense(24, activation='tanh')(input_)
        critic_fc2 = keras.layers.Dense(24, activation='tanh')(critic_fc1)
        value = keras.layers.Dense(1, kernel_initializer=keras.initializers.RandomUniform(-1e-3, 1e-3), name='value')(critic_fc2)

        ## model
        model = keras.models.Model(inputs=[input_], outputs=[actor_mu, actor_sigma, value])

        return model   
        
        
    # 정책신경망 업데이트
    # 각 타임스텝마다 정책신경망과 가치신경망을 업데이트
    def train_model(self, state, action, reward, next_state, done):
        model_params = self.model.trainable_variables
        with tf.GradientTape() as tape:
            mu, sigma, value = self.model(state)
            _, _, next_value = self.model(next_state)
            target = reward + (1 - done) * self.discount_factor * next_value[0]

            # 정책 신경망 오류 함수 구하기
            advantage = tf.stop_gradient(target - value[0])
            dist = tfd.Normal(loc=mu, scale=sigma)
            action_prob = dist.prob([action])[0]
            cross_entropy = - tf.math.log(action_prob + 1e-5)
            actor_loss = tf.reduce_mean(cross_entropy * advantage)

            # 가치 신경망 오류 함수 구하기
            critic_loss = self.loss_fn(tf.stop_gradient(target) , value[0])
            # critic_loss = 0.5 * tf.square(tf.stop_gradient(target) - value[0])
            # critic_loss = tf.reduce_mean(critic_loss)

            # 하나의 오류 함수로 만들기
            loss = 0.1 * actor_loss + critic_loss

        # 오류함수를 줄이는 방향으로 모델 업데이트
        grads = tape.gradient(loss, model_params)
        self.optimizer.apply_gradients(zip(grads, model_params))
        return loss, sigma

 

## Env

#### 학습 환경

gym.envs.register(
        id='MountainCarContinuous-v1',
        entry_point='env:MountainCarContinuousEnv',
        max_episode_steps=500
)


ENV_NAME = 'MountainCarContinuous-v0'
env = gym.make(ENV_NAME)

# 비디오 레코딩
env = RecordVideo(env, './train', episode_trigger =lambda episode_number: True )
env.metadata = {'render.modes': ['human', 'ansi']}

# CartPole 환경의 상태와 행동 크기 정의
state_size = env.observation_space.shape[0]

action_size = env.action_space.shape[0]
max_action = env.action_space.high[0]

load_model=True

# 위에서 정의한 DQN 클래스를 활용하여 agent 정의
agent = ContinuosActorCritic(state_size, action_size, max_action)

if load_model:
  # 위에서 정의한 DQN 클래스를 활용하여 agent 정의
    load_model = '20th'
    agent.model.load_weights(f'reinforcement_weights/mountain_model/{load_model}/')

scores, avg_scores, episodes ,avg_rewards  = [], [], [], [] 

# 반복 학습 에피소드 수 정의
num_episode = 50


## early stopping을 위한 초기값 설정 
avg_reward_episode = 0
success = 0
max_position = -0.4
print_interval = 1

scores, episodes = [], []

for epoch in range(num_episode):
    # done flag와 score 값 초기화
    done = False
    score = 0
    step = 0
    rewards_list = [] 
    loss_list, sigma_list = [], []

    # 환경 reset을 통해 초기 상태 정의 --> state는 [car_position , car_velocity] 2개의 값을 받는데 
    # car_position = -0.6 ~ 0.4 사이의 값을 받으며 , car_velocity = 0 을 받음
    state = env.reset()


    while not done:

        # 현재 상태에 대하여 행동 정의
        # action = agent.policy(init_state[np.newaxis,:]) ### network에 넣어 주기 위해서 기존 (2,) 배열을 (1,2)로 축을 추가함
        action = agent.get_action(np.expand_dims(state, axis=0))
        ### 위와 같은 의미 np.expand_dims(a, axis=0)
        
        # env.step 함수를 이용하여 행동에 대한 다음 상태, 보상, done flag 등 획득
        ### return ex) nex_state=[-0.57330334 -0.00063329]
        ###            reward=-1.0
        ###            done=False
        ###            info={}
        next_state, reward, done, info = env.step(action)
        

        score += reward
        ### reward 설계
        #### 목표는 가능한 한 빨리 오른쪽 언덕 위에 놓인 깃발에 도달하는 것이므로 에이전트는 각 타임스텝에 대해 -1의 보상으로 페널티를 받습니다.
        #### 다음 중 하나가 발생하면 에피소드가 종료됩니다.

        ###종료: 자동차의 위치가 0.5보다 크거나 같습니다(오른쪽 언덕 위의 목표 위치).
        ###잘림: 에피소드 길이는 200입니다.

        ### reward 설계
        #### 목표는 가능한 한 빨리 오른쪽 언덕 위에 놓인 깃발에 도달하는 것이므로 에이전트는 각 타임스텝에 대해 -1의 보상으로 페널티를 받습니다.
        #### 다음 중 하나가 발생하면 에피소드가 종료됩니다.

        ###종료: 자동차의 위치가 0.5보다 크거나 같습니다(오른쪽 언덕 위의 목표 위치).
        ###잘림: 에피소드 길이는 200입니다.
        # reward=0.1 if not done else -1
        # reward = 0.1 if not done or score == 500 else -1
        
        
        ### position에 따라서 action이 맞으면 reward를 줌
        car_pos = next_state[0]
        car_vel = next_state[1]

#         # def get_reward(car_pos, car_vel,max_position,step):
#           ## 2차함수로 만들어 속도가 커지게 더큰 리워드를 위치에 따라 받게함
        if car_vel > 0:
            reward = float(((car_pos+0.5)*20)**2/10+15*car_vel - step/300) 
        else:
            reward = float(((car_pos+0.5)*20)**2/10 - step/300) 


          ### max position   
        if car_pos > max_position:
          ## max position
          max_position = car_pos 

#           ## 성공 시 success
        if car_pos >= 0.5:
            reward+=100
            success +=1
  
        # else:
#             score -= 1  

#         step += 1


        # print(f'reward : {reward}')

        # 획득된 상태, 행동, 보상, 다음상태, done flag를 리플레이 버퍼에 축적
        rewards_list.append(reward)

        ## 매스텝 마다 신경망 업데이트
        ## 학습할때 사용할 next_state의 shape도 맞춰줌  
        ### network에 넣어 주기 위해서 기존 (2,) 배열을 (1,2)로 축을 추가함 np.expand_dims(state, axis=0)
        loss, sigma =agent.train_model(np.expand_dims(state, axis=0), action, reward, np.expand_dims(next_state, axis=0), done)
        loss_list.append(loss)
        sigma_list.append(sigma)

        # 다음 상태를 현재 상태로 정의
        state = next_state
                    
        if done:            
            ### early stop
            avg_score_episode = np.mean(scores[-10:])
            avg_reward_episode = np.mean(rewards_list)


            # 에피소드 종료마다 결과 그래프 저장
            #### reward --> 매 스텝에서 받은 reward의 양 ( state에서 알맞는 action을 많이 할 수록 reward가 커짐)
            avg_rewards.append(avg_reward_episode)


            #### score --> 환경에서 reward는 매 step마다 -1 감소 인데, 200 step이 되면 종료되므로, -200 보다 커질수록 높은 score

            scores.append(score)
            avg_scores.append(avg_score_episode)
            episodes.append(epoch)

            
            # 에피소드 종료마다 결과 출력
            if epoch % print_interval == 0:
                print(f'episode: {epoch:3d} | success: {success} | car_pos_max : {max_position : .3f} | avg_reward : {avg_reward_episode : .3f} | score_avg : {avg_score_episode : .3f} |  step : {step} | loss : {np.mean(loss_list):.2f} | sigma : {np.mean(sigma) :.2f} ')
            
            # 100 에피소드마다 모델 저장
            if epoch % 10 == 0 or epoch % num_episode == 0:
                agent.model.save_weights(f'reinforcement_weights/mountain_model/{epoch}th/', save_format='tf')
                print("model 저장 ")

env.close()

        
plt.title('Test graph')
plt.xlabel('episodes')

plt.plot(episodes, avg_scores,
         color='skyblue',
         marker='o', markerfacecolor='blue',
         markersize=6)
plt.ylabel('avg_scores', color='blue')
plt.tick_params(axis='y', labelcolor='blue')
plt.savefig('cartpole_graph.png')
plt.show()

결과는 역시나 잘되지 않았다. 

## 학습이 조금 잘된것

다음 실습으로는 현재 수행하는 on-policy에서 dqn 의 off - policy 방법에서 사용하는 replay 메모리와 같은 기능을 하는 병렬 Thread를 이용하여 local_network를 만들고 local_network의 loss를 줄여나가는 방향으로 global network를 비동기적으로 업데이트하는 A3C (Asyncrous Advantage Actor Critic) 에 대해서 실습을 진행하려고 한다.

ContinuosActorCritic.ipynb
0.34MB

 

반응형
Comments