Reinforcement Learning with TF2 and Gym (part II)
Z. QIU
Posted on December 26, 2020
In this post I will present a simple version of Policy Gradient method for solving the CartPole game. I have followed this youtube video for fundamental knowledge of Policy Gradient method.
Introduction
We want to obtain a Policy Function (as shown below) that can output actions probability density for a given observed state.
Some basic concepts in Policy Gradient method:
Policy Gradient Method Steps
In order to train the network to have higher and higher performance, we should run many rounds of this game (episodes). In each round at moment t
, we do the following steps and records data (state, action, reward) of each step.
When a round (an episode) ends (done == True), we use all historic data of this round to calculate the gradient of the network:
With the calculated network gradient from multiple episodes, we update the trainable variables of the Gradient Policy Network:
Neural network model
Function create_model()
implements a simple Neural network that will take states as inputs and outputs probabilities of actions.
Code sample:
def create_model(self):
self.model = Sequential([Dense(self.h_dim, activation="relu", input_shape=(None, self.s_dim)),
Dense( self.a_dim, activation="softmax")])
self.optimizer = keras.optimizers.Adam(self.model.variables, lr=self.lr)
self.model.summary()
Step 1 and 2
Below are the code for observing a state and calculate an action.
# with a state inpute state_current, calculate the action probability density function action_pdf
action_pdf = agent.model(state_current.reshape(1, 4))
# with action_pdf, randomly sampling an action
a = np.random.choice(action_pdf[0], p=action_pdf[0])
a = np.argmax(action_pdf == a)
# use env.step(a) to get an observed state
state_obs, reward, done, info = env.step(a)
Step 3, 4 and 5
Function Calc_grad()
will take historic states and rewards data as input, defines a loss function to minimize and obtains gradients of this loss function.
def calc_grad(self, state_input, action_holder, reward_holder):
with tf.GradientTape(persistent=True) as tape:
self.output = self.model(state_input)
indexes = tf.range(0, tf.shape(self.output)[0]) * tf.shape(self.output)[1] + action_holder
self.outputs = tf.gather(tf.reshape(self.output, [-1]), indexes)
self.loss = -tf.reduce_mean(K.log(self.outputs) * reward_holder)
self.gradients = tape.gradient(self.loss, self.model.variables)
Step 6
Function update_gradient()
will take calculated gradients of multiple executed rounds of games as input, then update the trainable variables of self.model
.
def update_gradient(self, gradient_holders):
self.optimizer.apply_gradients(grads_and_vars=zip(gradient_holders, self.model.variables))
Entire Code
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = "2"
import tensorflow as tf
if tf.__version__.startswith("1."):
raise RuntimeError("Error!! You are using tensorflow-v1")
import numpy as np
import gym
import tensorflow as tf
import tensorflow.keras as keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Activation
import tensorflow.keras.backend as K
# a class of Policy Gradient Neural Network
class PolicyGradientNet:
## learn_rate: learning rate
## state_dim: dimension of state space
## action_dim: dimension of action space
## hidden_dim: perceptron number in hidden layer
## discount factor for actualisation of future Reward
def __init__(self, learn_rate, state_dim, action_dim, hidden_dim, discount_ratio=0.99):
self.lr = learn_rate
self.model = None
self.gradients = []
self.optimizer = None
self.gamma = discount_ratio # discount ratio for future rewards
self.h_dim = hidden_dim
self.a_dim = action_dim
self.s_dim = state_dim
def create_model(self):
self.model = Sequential([Dense(self.h_dim, activation="relu", input_shape=(None, self.s_dim)),
Dense( self.a_dim, activation="softmax")])
self.optimizer = keras.optimizers.Adam(self.model.variables, lr=self.lr)
self.model.summary()
def calc_grad(self, state_input, action_holder, reward_holder):
with tf.GradientTape(persistent=True) as tape:
self.output = self.model(state_input)
indexes = tf.range(0, tf.shape(self.output)[0]) * tf.shape(self.output)[1] + action_holder
self.outputs = tf.gather(tf.reshape(self.output, [-1]), indexes)
self.loss = -tf.reduce_mean(K.log(self.outputs) * reward_holder)
self.gradients = tape.gradient(self.loss, self.model.variables)
def update_gradient(self, gradient_holders):
self.optimizer.apply_gradients(grads_and_vars=zip(gradient_holders, self.model.variables))
def get_gradients(self):
return self.gradients
def get_variable(self):
return self.model.variables
def calc_rewards(self, rewards):
discounted_r = np.zeros_like(rewards)
running_add = 0
for t in reversed(range(0, rewards.size)):
running_add = running_add * self.gamma + rewards[t]
discounted_r[t] = running_add
return discounted_r
def save_model(self):
self.model.save_weights('cartpole_chkpt/weights.chkpt')
def load_model(self):
self.model.load_weights('cartpole_chkpt/weights.chkpt')
ENV_SEED = 1024 ## Reproducibility of the game
NP_SEED = 1024 ## Reproducibility of numpy random
env = gym.make('CartPole-v0')
env = env.unwrapped # use unwrapped version, otherwise episodes will terminate after 200 steps
env.seed(ENV_SEED)
np.random.seed(NP_SEED)
### The Discrete space allows a fixed range of non-negative numbers, so in this case valid actions are either 0 or 1.
print(env.action_space)
### The Box space represents an n-dimensional box, so valid observations will be an array of 4 numbers.
print(env.observation_space)
### We can also check the Box’s bounds:
print(env.observation_space.high)
print(env.observation_space.low)
update_step = 5 # number of episodes for updating the network's gradient
limit_train = 1000 # training episode limit for stopping
theta_limit = env.theta_threshold_radians #
# a PolicyGradientNet instance
# action_dim = 2: left or right
# state_dim = 4: x-position, x-velocity, angle, angular-velority
agent = PolicyGradientNet(learn_rate=0.01, action_dim=2, state_dim=4, hidden_dim=8)
agent.create_model()
# total reward
total_reward = []
gradient_buffer = agent.model.variables
for index, grad in enumerate(gradient_buffer):
gradient_buffer[index] = 0
i = 0 # episode counter
max_step = 0
while i < limit_train:
step = 0
state_current = env.reset()
episode_reward = 0
history_data = []
while True:
env.render() ## refreshing of visual result
step += 1
#a = np.random.choice([0, 1], p=[0.5, 0.5])
action_pdf = agent.model(state_current.reshape(1, 4))
action_pdf = np.array(action_pdf)
action_pdf /= action_pdf.sum()
a = np.random.choice(action_pdf[0], p=action_pdf[0])
a = np.argmax(action_pdf == a)
## env.step() shall return: observation(object), reward(float), done(boolean),info(dict)
## check more info at https://gym.openai.com/docs/
state_obs, reward, done, info = env.step(a)
x, x_prime, theta, theta_prime = state_obs
# my heuristic for reward
rwd = 0.2*np.exp(-1 * abs(x_prime)) # for limiting x-velocity
rwd += 0.5*(1.0 - abs(theta)/theta_limit) # for limiting angle about vertical axis
history_data.append([state_current, a, rwd, state_obs])
episode_reward += r
state_current = state_obs
if done: # done being True indicates the episode has terminated.
history_data = np.array(history_data)
history_data[:, 2] = agent.calc_rewards(history_data[:, 2])
feed_data = {
"state_input": np.vstack(history_data[:, 0]),
"action_holder": history_data[:, 1],
"reward_holder": history_data[:, 2]
}
# Calculating gradients
agent.calc_grad(feed_data["state_input"], feed_data["action_holder"], feed_data["reward_holder"])
grads = agent.get_gradients()
for idx, grad in enumerate(grads):
gradient_buffer[idx] += grad
if i % update_step == 0 and i != 0:
# Apply the calculated gradients for updating model
agent.update_gradient(gradient_buffer)
for index, grad in enumerate(gradient_buffer):
gradient_buffer[index] = grad * 0
total_reward.append(episode_reward)
if max_step < step:
max_step = step
#print("Step: ", step)
break
if i % 50 == 0:
print("Max step is {} until episode {} ".format(max_step, i))
print("Average reward for episodes {} - {} : {}".format(i, i + 50, np.mean(total_reward[-50:])))
i += 1
Below is a short training video for showing a temporary result (final max_step is more than 10K):
Posted on December 26, 2020
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.