qcoding

[강화학습]Cartpole(카트폴) Deep Q-learning (Dqn) 실습 본문

머신러닝 딥러닝

[강화학습]Cartpole(카트폴) Deep Q-learning (Dqn) 실습

Qcoding 2023. 1. 14. 22:45
반응형

[Deep Q-learning]

* 이번실습은 강화학습 실습으로 유명한 Carpole 을 deep q-learning으로 구현해보는 실습을 진행하였다.

DQN은 미래에 받을 가치와 현재 가치의 차이를 줄이면 현재의 가치를 최적의 상태로 만들 수 있다는 것을 

목표로 기존 강화학습 알고리즘에서 사용하는 q-table을 인공신경망으로 대체 한 것이다.

위와 같이 q-network가 신경망으로 되어있는 데, state를 입력으로 받아 행동가치함수 (q_value)를 출력으로 생성한다. 여기서 state_t 와 state_t+1을 각 입력으로 넣은 Q-network 와 target Q-netwrok의 output인 q_value의 차이를 줄이게 parameter를 학습하여 해당 state에서 행동가치가 가장 높은 action을 생성하게 하는 것으로 정리할 수 있다.

[Cartpole] open_ai gym

https://gymnasium.farama.org/environments/classic_control/cart_pole/

 

Gymnasium Documentation

A standard API for reinforcement learning and a diverse set of reference environments (formerly Gym)

gymnasium.farama.org

위의 gym 공식문서에서 classic의 carpole내용을 가져오면 위와 같다.

- Action Space : 행동 (Acion)은 2가지로 0 : 왼쪽으로 밀고 / 1 : 오른쪽으로 민다.

- Observation Space : 여기서 Observation은 State와 동일한 의미로 상태를 나타낸다. 상태는 4가지가 있으며, 0: 위치 / 1 : 속도 / 2 : 폴 각도 / 3: 폴 각속도 등을 나타낸다.

- Rewards: 매 스텝마다 +1 보상을 받는다. 즉 종료되기 전까지 reward가 누적되므로 pole을 쓰러뜨리지 않고 오랜 step을 버티는 것이 목표이다.

- Episode End (종료조건) : 아래의 종료조건은 보상을 설계할 때 중요하게 사용된다.

1) 폴각도가  ±12 이상으로 움직이면 종료  

2) 카트 위치가 ±2.4 이상으로 움직이면 종료 

3) 에피소드의 길이가 200보다 넘으면 종료 

 
 

[Colab 구현]

1) 패키지 설치

### 필요 라이브러리 설치
!sudo apt-get install -y python-numpy python-dev cmake zlib1g-dev libjpeg-dev xvfb \
    xorg-dev python-opengl libboost-all-dev libsdl2-dev swig
!pip install pyvirtualdisplay
!pip install piglet

### gym
!pip install gym[classic_control]

colab에서 실습하기 위한 필요한 라이브러리를 설치한다.

2) import

from pyvirtualdisplay import Display
display = Display(visible=0, size=(1400, 900))
display.start()
from base64 import b64encode
from glob import glob
from IPython.display import HTML
from IPython import display as ipy_display
from gym import logger as gym_logger
from gym.wrappers.record_video import RecordVideo

import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras.losses import MeanSquaredError
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense
from collections import deque
import numpy as np
import random
import gym
from matplotlib import pyplot as plt
import seaborn as sns
import pandas as pd
import os

import warnings
warnings.filterwarnings(action='ignore')

3) colab 환경에서 video를 play 할 수 있게 하는 함수

#### show video func
def show_video(mode='train', filename=None):
    mp4_list = glob(mode+'/*.mp4')
    # print(mp4_list)
    if mp4_list:
        if filename :
            file_lists = glob(mode+'/'+filename)
            if not file_lists:
                print('No {} found'.format(filename))
                return -1
            mp4 = file_lists[0]
                    
        else:
            mp4 = sorted(mp4_list)[-1]

        print(mp4)
        video = open(mp4, 'r+b').read()
        encoded = b64encode(video)
        ipy_display.display(HTML(data='''
            <video alt="gameplay" autoplay controls style="height: 400px;">
                <source src="data:video/mp4;base64,%s" type="video/mp4" />
            </video>
        ''' % (encoded.decode('ascii'))))
    else:
        print('No video found')
        return -1

4) 학습 시 조기 종료를 위한 Ealy stop Class

#### early stopping by avg
class EarlyStopping_by_avg():
    def __init__(self, patience=10, verbose=0):
        super().__init__()

        self.best_avg = 0
        self.step = 0
        self.patience = patience
        self.verbose = verbose

    def check(self, avg , avg_scores):
        ## best avg가 나올경우
        if avg >= self.best_avg:
            self.best_avg = avg
            self.step = 0
            # print("avg_reset")
        ## 이전값보다 현재 avg가 높을경우
        elif len(avg_scores) > 1 and avg > avg_scores[-2]:  ### 이전 값과 비교해야하므로 -2  , -1은 지금 avg와 동일함
            self.step = 0
            # print("이전값보다 avg 높아서 reset")
        else:
            self.step += 1
            if self.step > self.patience:
                if self.verbose:
                    print('조기 종료')
                return True
        return False

기존의 keras에서 제공하는 early_stop callback 의 기능과 같이 reward의 평균을 입력으로 받아서 사용하는 early stopping Class를 생성하여 사용하였다.

 

4) Deep q - learing 

## dqn
class DQN():
    def __init__(self, state_size, action_size):
        # 상태 및 행동 크기 정의
        self.state_size = state_size
        self.action_size = action_size

        # 감쇠율 및 엡실론 정의
        # Greedy in the limit of infinite exploration (GLIE) 정책 고려
        self.gamma = 0.99
        self.epsilon = 1.0
        self.epsilon_decay = 0.999
        self.epsilon_min = 0.01

        # 리플레이 버퍼 크기 및 학습 시작 크기 정의
        self.buffer_size = 2000
        self.buffer_size_train_start = 200

        # 리플레이 버퍼 정의
        self.buffer = deque(maxlen=self.buffer_size)

        # 인공신경망 학습 하이퍼파라미터 설정
        self.loss_fn = MeanSquaredError()
        self.learning_rate = 0.001
        self.optimizer = Adam(learning_rate = self.learning_rate)
        self.batch_size = 32
        
        # Q-네트워크 및 타겟(Q)-네트워크 정의
        self.q_network = self.get_network()
        self.target_q_network = self.get_network()

        # 타겟(Q)-네트워크 파라미터 복제 함수 정의
        self.update_target_network()
        
        ### check point 생성######
        self.dir_name = os.getcwd()
        self.folder_checkpoint = os.path.join(self.dir_name,'checkpoint')
        self.checkpoint = tf.train.Checkpoint(model=self.q_network , optimizer=self.optimizer)
        self.manager = tf.train.CheckpointManager(self.checkpoint, self.folder_checkpoint, max_to_keep=40)
        

    def update_target_network(self):
        weights = self.q_network.get_weights()
        self.target_q_network.set_weights(weights)

    def get_network(self):
        network = Sequential()
        network.add(Dense(24, activation='relu', input_shape=(self.state_size,)))    # (None, 4) = (4,)
        network.add(Dense(24, activation='relu'))
        network.add(Dense(12, activation='relu'))
        network.add(Dense(self.action_size))

        return network

    def remember(self, state, action, reward, next_state, done):
        # 입력받은 상태, 행동, 보상, 다음상태, done flag를 리플레이 버퍼에 축적하는 함수 구현
        item = (state, action, reward, next_state, done)
        self.buffer.append(item)
    
    def policy(self, state):
        # 입력받은 상태에 대하여 행동을 결정하는 함수 구현
        # 엡실론-그리디 알고리즘 구현

        if np.random.uniform(0,1) < self.epsilon:
            # random
            action = np.random.choice([0,1])
            # action = np.random.choice([x for x in range(self.action_size)])
        else:
            # greedy
            # self.q_network.predict(state)
            out = self.q_network(state)
            # out = [Q[0], Q[1]]
            action = np.argmax(out)

        return action


    def train(self):
        # GLIE 구현
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)

        # 리플레이 버퍼에서 배치사이즈만큼 랜덤하게 추출하여 mini_batch 구현
        # random.sample 함수 활용
        mini_batch = random.sample(self.buffer, self.batch_size)

        # mini_batch에서 각 아래 정보로 분리하기
        states, actions, rewards, next_states, dones = zip(*mini_batch)

        # 분리된 정보를 tensor 형태로 변환
        states = tf.convert_to_tensor(states)
        actions = tf.convert_to_tensor(actions)
        rewards = tf.convert_to_tensor(rewards)
        next_states = tf.convert_to_tensor(next_states)
        # dones를 True False로 바꿀 껀데 tf.float32 실수 형태로 바꿔 주는코드 (1.0 , 0.0)
        dones = tf.convert_to_tensor(dones, dtype=tf.float32)

        # 학습 Target 정의
        # r + gamma * max_q_target(next_s)
        # mini_batch 단위로 elements-wise하게 계산되어야 합니다.
        # max_q_target을 위해서 행렬 내 최대값을 반환해주는 np.amax 함수 활용 
        # np.amax([[0.1, 0.9], [0.7, 0.3]], axis=-1))
        # 에피소드의 마지막 상태의 경우, only 보상이 target 값이 된다.

        ## 미래에 대한 target을 정하는 부분 --> 미래에 받을 가치 와 현재가치를 갖게 하면 현재의 가치를 최적의 상태로 만들 수 있음
        q_next = self.target_q_network(next_states)
        # print("q_next", q_next)

        ## 축별로 가장 높은값을 가져옴
        max_q_next =  np.amax(q_next, axis=-1)
        # print("max_q_next", max_q_next)

        ## 끝나지 않았을 경우 (done = False)
        ## 끝났을 경우 (done = True) 의 경우 self.gamma * max_q_next 값을 포함하지 않아야함.
        targets = rewards + ( self.gamma * max_q_next ) * ( 1-dones )

        # print('targets',targets)

        # q_network 학습
        # tf.GradientTape() 함수를 활용하여 자동미분 후 학습에 활용
        # (참조: https://teddylee777.github.io/tensorflow/gradient-tape)
        with tf.GradientTape() as tape:
        
            q = self.q_network(states)
            # print(actions)
            one_hot_a = tf.one_hot( actions, self.action_size )

            ## one hot encindg 값과 q값을 곱해준다.
            q_sa = tf.reduce_sum(q * one_hot_a, axis=1)
            # print(q_sa)
            
            # 오차 계산
            loss = self.loss_fn(targets, q_sa)

        # 손실함수를 통해 계산한 오차를 네트워크 가중치로 미분!
        ## 여기서 q_network 만 학습을 진행함
        grads = tape.gradient(loss, self.q_network.trainable_weights)
        # 미분값을 기준으로 각 네트워크 가중치를 업데이트!
        self.optimizer.apply_gradients(zip(grads, self.q_network.trainable_weights))
        
        ### check point 저장하기
        save_path = self.manager.save()
        # print("Saved checkpoint {}".format(save_path))

위의 코드가 deep q - learing을 사용한 class이다. 코드에 들어가 있는 내용이 많으므로 몇 가지 사항들을 나누어 정리해 보도록 하자.

4-1) 상태 정의 parameter

4-1-1 ) 상태 및 행동 크기 

-> q-network를 생성하기 위해 입력 (state size) 와 행동 (action size)를 설정한다.

 

4-1-2) 감쇠율 및 엡실론 (GLIE : Greedy In the Limit of infinite exploration ) 

-> 이용(Exploitation)과 탐험(Exploration)을 적절히 사용해야 적합한 q_value를 구할 수 있는 데, 랜덤한 값과 엡실론이라는 값을 설정하여 엡실론 보다 작을 경우 이용(Exploitation)을 하여 기존에 존재하는 q_network를 사용하여 action을 선택하고 엡실론 보다 작지 않을 경우 탐험(Exploration)을 통해 랜덤한 action을 수행해 보는 것이다. 

엡실론은 에피소드의 수에 따라 값이 감소하게 사용하는 데, 이를 설정하기 위한 parameter를 설정한다.

 

4-1-3) 리플레이 버퍼 크기 및 학습 시작 크기

-> 리플레이 버퍼는 학습을 사용하기 위해 사용하는 history와 같은 역활을 한다. 강화학습은 독립적이지 않은 연속성이 있는 값을 다루기 때문에 가까운 시간에 있는 값들은 높은 연관성을 갖게 된다. 아래의 그림과 같이 가까운 값을 연결할 경우에는 전체적인 경향성을 반영하기 어렵다. 그러므로 일종의 과거의 기록을 쭉 모아놓은 리플레이 버퍼를 사용하여 전체적인 방향성이 학습할 수 있도록 한다. 이때 버퍼의 크기와 어느정도 버퍼가 쌓이면 학습을 수행하는 지 설정한다.

 # 상태 및 행동 크기 정의
        self.state_size = state_size
        self.action_size = action_size

        # 감쇠율 및 엡실론 정의
        # Greedy in the limit of infinite exploration (GLIE) 정책 고려
        self.gamma = 0.99
        self.epsilon = 1.0
        self.epsilon_decay = 0.999
        self.epsilon_min = 0.01

        # 리플레이 버퍼 크기 및 학습 시작 크기 정의
        self.buffer_size = 2000
        self.buffer_size_train_start = 200

        # 리플레이 버퍼 정의
        # deque 자료구조 활용 (참조: https://chaewonkong.github.io/posts/python-deque.html)
        self.buffer = deque(maxlen=self.buffer_size)

4-2) q-network 신경망 및 네트워크 설정

 

        # 인공신경망 학습 하이퍼파라미터 설정
        self.loss_fn = MeanSquaredError()
        self.learning_rate = 0.001
        self.optimizer = Adam(learning_rate = self.learning_rate)
        self.batch_size = 32
        

        
        # Q-네트워크 및 타겟(Q)-네트워크 정의
        self.q_network = self.get_network()
        self.target_q_network = self.get_network()

        # 타겟(Q)-네트워크 파라미터 복제 함수 정의
        self.update_target_network()

4-2-1 ) 네트워크의 학습 파라미터 설정 : 일반적으로 인공신경망을 사용할 때 설정하는 파라미터를 설정한다.

4-2-2) check point 설정 : 나중에 가중치를 가져오기 위해서 학습 수행 시 model / optimizer를 저장한다. max_to_keep을 통해 최대 몇개의 check point를 저장할 지 설정할 수 있다.

 ### check point 생성 ####

        self.dir_name = os.getcwd()
        self.folder_checkpoint = os.path.join(self.dir_name,'checkpoint')
        self.checkpoint = tf.train.Checkpoint(model=self.q_network , optimizer=self.optimizer)
        self.manager = tf.train.CheckpointManager(self.checkpoint, self.folder_checkpoint, max_to_keep=40)

 

4-2-3 ) q-network 와 target - network 설정

-> 이 부분의 network는 전체 내용을 이해하는 데 가장 중요한 순서 이므로 도식화 하여 내용을 이해하면 좋을 것 같다.

 # Q-네트워크 및 타겟(Q)-네트워크 정의
        self.q_network = self.get_network()
        self.target_q_network = self.get_network()

        # 타겟(Q)-네트워크 파라미터 복제 함수 정의
        self.update_target_network()
        
        ### check point 생성######
        self.dir_name = os.getcwd()
        self.folder_checkpoint = os.path.join(self.dir_name,'checkpoint')
        self.checkpoint = tf.train.Checkpoint(model=self.q_network , optimizer=self.optimizer)
        self.manager = tf.train.CheckpointManager(self.checkpoint, self.folder_checkpoint, max_to_keep=40)
        

    def update_target_network(self):
        weights = self.q_network.get_weights()
        self.target_q_network.set_weights(weights)

    def get_network(self):

        network = Sequential()
        network.add(Dense(24, activation='relu', input_shape=(self.state_size,)))    # (None, 4) = (4,)
        network.add(Dense(24, activation='relu'))
        network.add(Dense(12, activation='relu'))
        network.add(Dense(self.action_size))

        return network

    def remember(self, state, action, reward, next_state, done):
        # 입력받은 상태, 행동, 보상, 다음상태, done flag를 리플레이 버퍼에 축적하는 함수 구현
        item = (state, action, reward, next_state, done)
        self.buffer.append(item)
    
    def policy(self, state):
        # 입력받은 상태에 대하여 행동을 결정하는 함수 구현
        # 엡실론-그리디 알고리즘 구현

        if np.random.uniform(0,1) < self.epsilon:
            # random
            action = np.random.choice([0,1])
            # action = np.random.choice([x for x in range(self.action_size)])
        else:
            # greedy
            # self.q_network.predict(state)
            out = self.q_network(state)
            # out = [Q[0], Q[1]]
            action = np.argmax(out)

        return action


    def train(self):
        # GLIE 구현
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)

        # 리플레이 버퍼에서 배치사이즈만큼 랜덤하게 추출하여 mini_batch 구현
        # random.sample 함수 활용
        mini_batch = random.sample(self.buffer, self.batch_size)

        # mini_batch에서 각 아래 정보로 분리하기
        states, actions, rewards, next_states, dones = zip(*mini_batch)

        # 분리된 정보를 tensor 형태로 변환
        states = tf.convert_to_tensor(states)
        actions = tf.convert_to_tensor(actions)
        rewards = tf.convert_to_tensor(rewards)
        next_states = tf.convert_to_tensor(next_states)
        # dones를 True False로 바꿀 껀데 tf.float32 실수 형태로 바꿔 주는코드 (1.0 , 0.0)
        dones = tf.convert_to_tensor(dones, dtype=tf.float32)

        # 학습 Target 정의
        # r + gamma * max_q_target(next_s)
        # mini_batch 단위로 elements-wise하게 계산되어야 합니다.
        # max_q_target을 위해서 행렬 내 최대값을 반환해주는 np.amax 함수 활용 
        # np.amax([[0.1, 0.9], [0.7, 0.3]], axis=-1))
        # 에피소드의 마지막 상태의 경우, only 보상이 target 값이 된다.

        ## 미래에 대한 target을 정하는 부분 --> 미래에 받을 가치 와 현재가치를 갖게 하면 현재의 가치를 최적의 상태로 만들 수 있음
        q_next = self.target_q_network(next_states)
        # print("q_next", q_next)

        ## 축별로 가장 높은값을 가져옴
        max_q_next =  np.amax(q_next, axis=-1)
        # print("max_q_next", max_q_next)

        ## 끝나지 않았을 경우 (done = False)
        ## 끝났을 경우 (done = True) 의 경우 self.gamma * max_q_next 값을 포함하지 않아야함.
        targets = rewards + ( self.gamma * max_q_next ) * ( 1-dones )

        # print('targets',targets)

        # q_network 학습
        # tf.GradientTape() 함수를 활용하여 자동미분 후 학습에 활용
        with tf.GradientTape() as tape:

            q = self.q_network(states)
            # print(actions)
            one_hot_a = tf.one_hot( actions, self.action_size )

            ## one hot encindg 값과 q값을 곱해준다.
            q_sa = tf.reduce_sum(q * one_hot_a, axis=1)
            # print(q_sa)
            
            # 오차 계산
            loss = self.loss_fn(targets, q_sa)

        # 손실함수를 통해 계산한 오차를 네트워크 가중치로 미분!
        ## 여기서 q_network 만 학습을 진행함
        grads = tape.gradient(loss, self.q_network.trainable_weights)
        # 미분값을 기준으로 각 네트워크 가중치를 업데이트!
        self.optimizer.apply_gradients(zip(grads, self.q_network.trainable_weights))
        
        ### check point 저장하기
        save_path = self.manager.save()
        # print("Saved checkpoint {}".format(save_path))

위의 도식화 순서로 dqn이 동작된다.

 

5) Env (환경) 생성

# CartPole 환경 정의
ENV_NAME = 'CartPole-v1'
# env = gym.make(ENV_NAME, render_mode="rgb_array")
env = gym.make(ENV_NAME)

# 비디오 레코딩
env = RecordVideo(env, './train', episode_trigger =lambda episode_number: True )
env.metadata = {'render.modes': ['human', 'ansi']}

# CartPole 환경의 상태와 행동 크기 정의
state_size = env.observation_space.shape[0]
action_size = env.action_space.n

# 위에서 정의한 DQN 클래스를 활용하여 agent 정의
agent = DQN(state_size, action_size)

scores, avg_scores, episodes, losses = [], [], [], []

# 반복 학습 에피소드 수 정의
num_episode = 100
early_stopping_by_avg = EarlyStopping_by_avg(patience=10, verbose=1)
## early stopping을 위한 초기값 설정 
avg_step = 0

for epoch in range(num_episode):
    # done flag와 score 값 초기화
    done = False
    score = 0


    # 환경 reset을 통해 초기 상태 정의
    state = env.reset()
    
    # print(f"avg: {avg_step}")
    if early_stopping_by_avg.check(avg_step , avg_scores ):
        print("earstpping 실행")
        break

    while not done:

        # 현재 상태에 대하여 행동 정의
        action = agent.policy(state[np.newaxis,:])

        # env.step 함수를 이용하여 행동에 대한 다음 상태, 보상, done flag 등 획득
        next_state, reward, done, info = env.step(action)
        
        # 해당 에피소드의 최종 score를 위해 reward 값 누적
        score += reward

        # 기본 환경은 pole이 쓰러지지 않으면 +1의 보상을 준다.
        # 자신만의 보상가설을 만들어 학습 가능
        ### 종료조건
        # 폴 각도는 ±12° 이상입니다.
        # 카트 위치가 ±2.4 이상(카트 중앙이 디스플레이 가장자리에 도달함)
        # 에피소드 길이가 200보다 큽니다.
        # TODO #
        
        def get_reward(pos, angle , done):
            ### 위치 / 속도 조건으로 보상크게
            cond_pos = (pos < 2.0) and (pos > -2.0)
            cond_angle = (angle < 5.0) and (angle > -5.0)
            ### 실패시 보상 -1
            if done:
                return -100.0
            ### 상점
            elif cond_pos or cond_angle:
                return 0.1
            elif cond_pos:
                return 0.3
            elif cond_pos and cond_angle:
                return 0.5
            ### 벌점
            elif (pos > 2.5) or (pos < -2.5):
                return -20
            elif (angle > 10.0) or (angle < -10.0):
                return -10
            
            
        ### position
        pos = next_state[0]
        ### velocity
        angle = next_state[2]
        reward = get_reward(pos, angle, done)
        # print(f'reward : {reward : .3f}')
        # reward=0.1 if not done else -1


        # 획득된 상태, 행동, 보상, 다음상태, done flag를 리플레이 버퍼에 축적
        agent.remember(state, action, reward, next_state, done)

        # 다음 상태를 현재 상태로 정의
        state = next_state

        # buffer 크기가 일정 기준 이상 쌓이면 학습 진행
        # TODO #
        if len(agent.buffer) >= agent.buffer_size_train_start :
            agent.train()

        if done:
            
            ### early stop
            avg_step = np.mean(scores[-10:])


            # 에피소드가 종료되면 target_q_network 파라미터 복제
            # TODO #
            agent.update_target_network()

            # 에피소드 종료마다 결과 그래프 저장
            scores.append(score)
            avg_scores.append(avg_step)
            episodes.append(epoch)

            
            # 에피소드 종료마다 결과 출력
            print(f'episode: {epoch:3d} | avg_score: { avg_step :3.2f} | buffer_size: {len(agent.buffer):4d} | epsilon: {agent.epsilon:.4f}')
            

env.close()

plt.title('Test graph')
plt.xlabel('episodes')

plt.plot(episodes, avg_scores,
         color='skyblue',
         marker='o', markerfacecolor='blue',
         markersize=6)
plt.ylabel('avg_scores', color='blue')
plt.tick_params(axis='y', labelcolor='blue')

plt.savefig('cartpole_graph.png')
plt.show()

위의 환경 코드에서 중요한 부분은 reward를 설정하는 부분이다.

        ### 종료조건
        # 폴 각도는 ±12° 이상입니다.
        # 카트 위치가 ±2.4 이상(카트 중앙이 디스플레이 가장자리에 도달함)
        # 에피소드 길이가 200보다 큽니다.
        # TODO #
        
        def get_reward(pos, angle , done):
            cond_pos = (pos < 2.0) and (pos > -2.0)
            cond_angle = (angle < 5.0) and (angle > -5.0)
            ### 실패시 보상 -1
            if done:
                return -100.0
            ### 상점
            elif cond_pos or cond_angle:
                return 0.1
            elif cond_pos:
                return 0.3
            elif cond_pos and cond_angle:
                return 0.5
            ### 벌점
            elif (pos > 2.5) or (pos < -2.5):
                return -20
            elif (angle > 10.0) or (angle < -10.0):
                return -10
            
            
        ### position
        pos = next_state[0]
        ### velocity
        angle = next_state[2]
        reward = get_reward(pos, angle, done)

## reward 코드를 다시 살펴보자. 종료조건에서 폴각도와 카트 위치가 들어가므로 좋은 reward를 카트와 위치로 적절하게 보상하게 해주면 더 좋은 결과를 얻을 수 있다. 위에서는 상점과 벌점으로 크게 나누어서 reward를 주었으며, 폴과 카트의 위치가 적절한 위치를 갖으면 상점을 위치에서 벗어나면 벌점을 주었다.

 

위의 코드로 학습을 수행하면 다음과 같은 결과를 얻을 수 있다.

여기서 score / reward를 좀 구분해서 사용하는데,

score는 env에서 주는 reward를 더한 값으로, 매 스텝을 버틸때마다 +1을 얻는다. 즉 score 값이 높을 수록 더 오랜시간 쓰러지지 않고 버틴 것이다. avg_score는 score 10개를 평균낸 값이다.

reward는 사용자가 임의로 설계한 보상으로 q-network를 학습할 때 사용된다. (target q값 계산 시)  다시 말하면 좋은 state일 때 높은 reward를 가질 수 있도록 q-network를 학습하는 데, 학습이 잘되면 state를 받고 좋은 action을 수행하므로 score는 계속 늘어날 것이다.

아래의 그래프에서 avg_score는 지속적으로 증가하고 있으므로 학습이 잘 이루어 졌다고 할 수 있다. 100 에피소드만 수행했을 때의 결과는 아래와 같다.

## 가장 높은 avg_score 값을 나타낸 에피소드를 뽑아서 동영상으로 재생함.

### max episode
### nan이 젤 큰값이므로 이값을제거하고 계산함
avg_scores_fil = [x for x in avg_scores if np.isnan(x) !=True]
episode=np.argmax(avg_scores_fil)
filename = 'rl-video-episode-{}.mp4'.format(episode)
print("최대 avg : {} ,에피소드 번호 : {}".format(max(avg_scores_fil) , episode))
show_video(filename=filename)

### 학습 시 가장 오래된 에피소드  (avg_score = 500)

### 학습을 제대로 수행하지 않았을 때 에피소드

6) 저장된 checkpoint값을 불러와서 새로만든 환경에 적용하기

### 여기서는 중요한 것이 학습된 q-newtork를 이용해서 action을 선택하는 policy (정책)을 사용해야 하므로 epsilon값을 0으로 해서 greedy 알고리즘에 의해서 랜덤에 의해 선택되는 일이 없도록 하는 것이다.

### 학습된 파일을 가지고 다시계산
# CartPole 환경 정의
ENV_NAME = 'CartPole-v1'
env = gym.make(ENV_NAME)

# 비디오 레코딩
env = RecordVideo(env, './test',  episode_trigger = lambda episode_number: True)

# greedy policy
agent.epsilon = 0.0

score = 0
state = env.reset()
done = False

#### 저장된 가중치 load
agent.checkpoint.restore(agent.manager.latest_checkpoint)
if agent.manager.latest_checkpoint:
    print("Restored from {}".format(agent.manager.latest_checkpoint))
else:
    print("Initializing from scratch.")

while not done:

    # 현재 상태에 대하여 행동 정의
    action = agent.policy(state[np.newaxis,:])

    # env.step 함수를 이용하여 행동에 대한 다음 상태, 보상, done flag 등 획득
    next_state, reward, done, info = env.step(action)

    # 해당 에피소드의 최종 score를 위해 reward 값 누적
    score += reward

    # 다음 상태를 현재 상태로 정의
    state = next_state

print("Score: ", score)
### video 
show_video(mode='test')

반응형
Comments