Cart Pole
Reinforcement learning using OpenAI Gym.
Reading time: 10 minutes
This post introduces reinforcement learning. This group of AI algorithms concerns the ongoing cycle of an agent taking an action in an environment and receiving an award. OpenAI, a research organisation founded by Elon Musk (SpaceX, Tesla) and Sam Altman (Y-Combinator), has developed a reinforcement learning platform where users can develop reinforcement learning algorithms in different environments. Some of the environments include Pac Man, racing a car around a track and using a robot to lift a block into the air. The environement we will be using in this post is trying to keep a pole attached to a sliding cart, upright.
In this post, you'll learn how to:
- load and run an OpenAI environment;
- train a reinforcement learning model; and
- test an agent in an environment.
By the end, you'll learn how to keep this pole upright:
Import Packages¶
from sklearn.preprocessing import StandardScaler
from torch.utils.data import DataLoader, TensorDataset
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
In addition to the usual packages, we also need to install and import OpenAI gym. Run apt-get install -y python-pyglet
in the command prompt to install OpenAI gym and import the Python package using:
import gym
Cart Pole Environment¶
The OpenAI Gym website describes the Cart Pole environment as:
A pole is attached by an un-actuated joint to a cart, which moves along a frictionless track. The system is controlled by applying a force of +1 or -1 to the cart. The pendulum starts upright, and the goal is to prevent it from falling over. A reward of +1 is provided for every timestep that the pole remains upright. The episode ends when the pole is more than 15 degrees from vertical, or the cart moves more than 2.4 units from the center.
The 2 actions we can take are: to push the cart to the left (0) or to push the cart to the right (1).
Each observation of the environment returns 4 values:
- cart position (-4.8, 4.8)
- cart velocity (-$\infty$, $\infty$)
- pole angle (-24$^{\circ}$, 24$^{\circ}$)
- pole velocity at tip (-$\infty$, $\infty$)
Data Collection¶
Let's collect pairs of actions and observations over 20,000 episodes:
memory = []
try:
env = gym.make('CartPole-v0')
# env = gym.wrappers.Monitor(env, './vid', video_callable=lambda episode_id: True)
for episode in range(1, 20000):
print(f'Episode {episode}')
observation = env.reset()
observations = [observation]
for t in range(1, 200):
previous_observation = observations[-1]
env.render()
action = env.action_space.sample() # take a random action
observation, reward, done, info = env.step(action)
observations.append(observation)
memory.append([episode, t, *previous_observation, reward, action])
if done:
break
finally:
env.close()
Now, let's store them in a Pandas DataFrame:
df_memory = pd.DataFrame(memory, columns=[
'episode',
't',
'cart_position',
'cart_velocity',
'pole_angle',
'pole_velocity_at_tip',
'reward',
'action',
]).set_index(['episode', 't'])
df_memory.to_csv('memory.csv')
Here are what 10 of those episodes look like:
Create and Train Model¶
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
A simple classifier with Dropout:
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.dropout = nn.Dropout(0.2)
self.fc1 = nn.Linear(4, 128)
self.fc2 = nn.Linear(128, 256)
self.fc3 = nn.Linear(256, 128)
self.fc4 = nn.Linear(128, 2)
def forward(self, x):
x = F.relu(self.fc1(x))
x = self.dropout(x)
x = F.relu(self.fc2(x))
x = self.dropout(x)
x = F.relu(self.fc3(x))
x = self.dropout(x)
x = self.fc4(x)
return F.log_softmax(x, dim=1)
net = Net().to(device)
Using CrossEntropyLoss
as the loss function and Adam
as the optimiser:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(net.parameters())
Since we want to train the classifier only on positive actions, we limit ourselves to episodes that last longer than 50 time steps (the pole is upright for a while) and discard the last 20 time steps of those episodes (the pole is not about to fall off balance).
df_memory = pd.read_csv('openai2.csv').set_index(['episode', 't'])
# only keep episodes with more than 50 time steps
df_memory_subset = (df_memory
.groupby('episode')
.filter(lambda x: len(x) > 50))
# discard the last 20 time steps of each episode
df_memory_subset = df_memory_subset[df_memory_subset
.groupby('episode')
.cumcount(ascending=False) >= 20]
Extracting the observation attributes as X and the action as Y:
X = df_memory_subset[['cart_position',
'cart_velocity',
'pole_angle',
'pole_velocity_at_tip']].values
y = df_memory_subset.action.values
We can now train the model:
net.train() # set network to training phase
scaler = StandardScaler()
inputs = torch.from_numpy(scaler.fit_transform(X)).float().to(device)
labels = torch.from_numpy(y).long().to(device)
epochs = 5000
# for each pass of the training dataset
for epoch in range(epochs):
train_loss, train_correct, train_total = 0, 0, 0
optimizer.zero_grad() # zero the parameter gradients
outputs = net(inputs) # forward pass
loss = criterion(outputs, labels) # compare output with ground truth
loss.backward() # backpropagation
optimizer.step() # update network weights
# record statistics
_, preds = torch.max(outputs.data, 1)
train_loss += loss.item()
train_correct += (preds == labels).sum().item()
train_total += len(labels)
# print statistics
if (epoch + 1) % 100 == 0:
print(f'Epoch {epoch + 1}, ' +
f'Train Loss: {(train_loss/100):.5f}, ' +
f'Train Accuracy: {(train_correct/train_total):.5f}')
Note that we have used the StandardScaler
from Scikit-learn to scale observation attributes to have a mean of 0 and a variance of 1.
Test Model¶
net.eval() # set network to evaluation phase
with torch.no_grad():
try:
env=gym.make('CartPole-v0')
# env = gym.wrappers.Monitor(env, './vid', video_callable=lambda episode_id: True)
for episode in range(1, 10):
print(f'Episode {episode}')
observation = env.reset()
for t in range(1, 200):
env.render()
input = scaler.transform(observation.reshape(1, -1))
scaled_input = torch.from_numpy(input).float().to(device)
outputs = net(scaled_input) # forward pass
_, preds = torch.max(outputs.data, 1) # make prediction
action = preds.item(1) # turn prediction in action
observation, reward, done, info = env.step(action)
if done:
break
finally:
env.close()
This is a video of one of the episodes:
The pole balance on the cart!
Summary and Next Steps¶
In this post, you've learnt how to:
- load and run an OpenAI environment;
- train a reinforcement learning model; and
- test an agent in an environment.
You can try your hand at other control theory problems offered in OpenAI gym, such as swinging a pendulum or driving a car up a big hill. In a later post, we'll use Deep Q Learning and learn directly from a video of the environment instead of relying on observations provided by the environment.
Acknowledgements¶
This post has been inspired by posts from:
Build your First AI game bot using OpenAI Gym, Keras, TensorFlow in Python