El entorno WindyGridWorld consiste en un agente que se mueve en una cuadrícula 7x10 (alto x ancho). En cada paso, el agente tiene 4 opciones de acción o movimiento: ARRIBA, ABAJO, DERECHA, IZQUIERDA. El agente siempre sale de la misma casilla [3, 0] y el juego termina cuando el agente llega a la casilla de llegada [3, 7].
El entorno se corresponde con el ejemplo 'Cuadrícula con viento' explicado en la sección 3.1.2. el módulo "Métodos de Diferencia Temporal". El problema radica en que hay un viento que empuja al agente hacia arriba en la parte central de la cuadrícula. Esto provoca que, aunque se ejecute una acción estándar, en la región central los estados resultantes se desplazan hacia arriba por un viento cuya fuerza varía entre columnas.
El código para implementar este entorno, que se encuentra disponible en el fichero adjunto windy_gridworld_env.py
, ha sido adaptado del siguiente enlace:
Vamos a empezar cargando el entorno y ver qué características tiene, ejecutando un episodio de prueba.
El siguiente código carga los paquetes necesarios para el ejemplo, crea el entorno mediante la instanciación de un objeto de la clase WindyGridworldEnv
(importada del archivo adjunto windy_gridworld_env.py
) e imprime por pantalla la dimensión del espacio de acciones (0=arriba, 1=derecha, 2=abajo y 3=izquierda), el espacio de observaciones (una tupla que indica la posición del agente en la cuadrícula) y el rango de la variable de recompensa (cuyo valor es -1 para cualquier acción y que por tanto va de menos infinito a más infinito).
!pip install gym
import gym
import numpy as np
import windy_gridworld_env as wge
from collections import defaultdict
import sys
env=wge.WindyGridworldEnv()
print("Action space is {
} ".format(env.action_space))
print("Observation space is {} ".format(env.observation_space))
print("Reward range is {
} ".format(env.reward_range))
# Inicializamos el entorno
obs = env.reset()
t, total_reward, done = 0, 0, False
print("Obs inicial: {
} ".format(obs))
switch_action = {
0: "U",
1: "R",
2: "D",
3: "L",
}
while not done:
# Elegir una acción aleatoria (ésta es la implementación del agente)
action = env.action_space.sample()
# Ejecutar la acción y esperar la respuesta del entorno
new_obs, reward, done, info = env.step(action)
# Imprimir time-step
print("Action: {
} -> Obs: {} and reward: {}".format(switch_action[action], new_obs, reward))
# Actualizar variables
obs = new_obs
total_reward += reward
t += 1
print("Episode finished after {} timesteps and reward was {
} ".format(t, total_reward))
env.close()
El entorno WindyGridWorld tiene varios parámetros que pueden ser modificados:
Modificar el codigo de WindyGridWorld (fichero adjunto windy_gridworld_env.py
) para que represente las propiedades de la cuadrícula descritas a continuación.
Guardar el entorno modificado en el archivo windy_gridworld_env_v2.py
, en la misma carpeta que el original.
import gym
import numpy as np
import windy_gridworld_env_v2 as wge
env=wge.WindyGridworldEnv()
print("Action space is {
} ".format(env.action_space))
print("Observation space is {} ".format(env.observation_space))
print("Reward range is {
} ".format(env.reward_range))
print("Obs inicial: {
} ".format(obs))
A continuación, implementar un agente que siempre realice la misma acción: ir hacia la derecha y modificar el código para que sólo realice 10 time-steps.
Mostrar la trayectoria seguida por el agente. No es necesario graficarla, tan sólo mostrar las coordenadas de las casillas visitadas en orden.
# Environment reset
obs = env.reset()
t, total_reward, done = 0, 0, False
print("Obs inicial: {
} ".format(obs))
switch_action = {
0: "U",
1: "R",
2: "D",
3: "L"
}
def print_mapa(Movimientos, height, width):
print("")
print("")
print("Mapa:")
print("")
contador=0
for fila in range(0,height):
print('--------------------------------------------------------------\n', end="")
resultado= []
for columna in range(0,width):
#print(np.where(Q[fila,columna] == max(Q[fila,columna])))
coordenada = (fila, columna)
if coordenada in Movimientos.keys():
if Movimientos[coordenada]== 0:
resultado.append(' | U' )
if Movimientos[coordenada]== 1:
resultado.append(' | R' )
if Movimientos[coordenada]== 2:
resultado.append(' | D' )
if Movimientos[coordenada]== 3:
resultado.append(' | L')
if Movimientos[coordenada]== 4:
resultado.append(' | G')
else:
resultado.append(' | ')
print(''.join(resultado)+' |\n',end="")
print('--------------------------------------------------------------\n', end="")
# Escribir el código aquí
# Inicializamos el entorno
obs = env.reset()
t, total_reward, done = 0, 0, False
print("Obs inicial: {
} ".format(obs))
switch_action = {
0: "U",
1: "R",
2: "D",
3: "L",
}
Movimientos = defaultdict(lambda: np.zeros(env.action_space.n))
while not done and t<10:
print('Acción número ' + str(t+1))
# Elegir una acción aleatoria (ésta es la implementación del agente)
action = 1
# Ejecutar la acción y esperar la respuesta del entorno
new_obs, reward, done, info = env.step(action)
# Imprimir time-step
print("Action: {
} -> Obs: {} and reward: {}".format(switch_action[action], new_obs, reward))
Movimientos[obs]= action
# Actualizar variables
obs = new_obs
total_reward += reward
t += 1
print("Episode finished after {} timesteps and reward was {
} ".format(t, total_reward))
env.close()
print("A continucación muestro el movimiento del agente para validar la implementación del ejercio.");
print_mapa(Movimientos, 15, 15)
Dado que el entorno es determinista, es factible encontrar una política óptima (que puede no ser única) que consiga el mayor retorno (y por tanto la trayectoria más corta).
El objetivo de este apartado es realizar una estimación de la política óptima mediante los métodos de Montecarlo, en concreto estudiaremos el algoritmo On-policy first-visit MC control (para políticas $\epsilon$-soft).
Implementar el Algoritmo 3 explicado en el módulo "Métodos de Montecarlo" utilizando los siguientes parámetros:
def make_epsilon_greedy_policy(Q, epsilon, nA):
"""
Creates an epsilon-greedy policy based on a given Q-function and epsilon.
Args:
Q: A dictionary that maps from state -> action-values.
Each value is a numpy array of length nA (see below)
epsilon: The probability to select a random action . float between 0 and 1.
nA: Number of actions in the environment.
Returns:
A function that takes the observation as an argument and returns
the probabilities for each action in the form of a numpy array of length nA.
"""
def policy_fn(observation):
A = np.ones(nA, dtype=float) * epsilon / nA
best_action = np.argmax(Q[observation])
A[best_action] += (1.0 - epsilon)
return A
return policy_fn
def mc_control_epsilon_greedy(env, num_episodes, discount_factor=1.0, epsilon=0.1):
"""
Monte Carlo Control using Epsilon-Greedy policies.
Finds an optimal epsilon-greedy policy.
Args:
env: OpenAI gym environment.
num_episodes: Number of episodes to sample.
discount_factor: Gamma discount factor.
epsilon: Chance the sample a random action. Float betwen 0 and 1.
Returns:
A tuple (Q, policy).
Q is a dictionary mapping state -> action values.
policy is a function that takes an observation as an argument and returns
action probabilities
"""
# Keeps track of sum and count of returns for each state
# to calculate an average. We could use an array to save all
# returns (like in the book) but that's memory inefficient.
returns_sum = defaultdict(float)
returns_count = defaultdict(float)
# The final action-value function.
# A nested dictionary that maps state -> (action -> action-value).
Q = defaultdict(lambda: np.zeros(env.action_space.n))
# The policy we're following
policy = make_epsilon_greedy_policy(Q, epsilon, env.action_space.n)
for i_episode in range(1, num_episodes + 1):
# Print out which episode we're on, useful for debugging.
if i_episode % 100 == 0:
print("\rEpisode {}/{
}.".format(i_episode, num_episodes), end="")
sys.stdout.flush()
# Generate an episode.
# An episode is an array of (state, action, reward) tuples
episode = []
state = env.reset()
#for t in range(1000000): #si es hasta que done se hace eterno
for t in range(1000): #considero que este es el mejor limite
#while True:
probs = policy(state)
action = np.random.choice(np.arange(len(probs)), p=probs)
next_state, reward, done, _ = env.step(action)
episode.append((state, action, reward))
if done:
break
state = next_state
# Find all (state, action) pairs we've visited in this episode
# We convert each state to a tuple so that we can use it as a dict key
sa_in_episode = set([(tuple(x[0]), x[1]) for x in episode])
for state, action in sa_in_episode:
sa_pair = (state, action)
# Find the first occurance of the (state, action) pair in the episode
first_occurence_idx = next(i for i,x in enumerate(episode)
if x[0] == state and x[1] == action)
# Sum up all rewards since the first occurance
G = sum([x[2]*(discount_factor**i) for i,x in enumerate(episode[first_occurence_idx:])])
# Calculate average return for this state over all sampled episodes
returns_sum[sa_pair] += G
returns_count[sa_pair] += 1.0
Q[state][action] = returns_sum[sa_pair] / returns_count[sa_pair]
# The policy is improved implicitly by changing the Q dictionary
return Q, policy
# Environment reset
obs = env.reset()
t, total_reward, done = 0, 0, False
print("Obs inicial: {
} ".format(obs))
Q, policy = mc_control_epsilon_greedy(env, num_episodes=250000, epsilon=0.1,discount_factor=1.0)
# Environment reset
obs = env.reset()
t, total_reward, done = 0, 0, False
state = env.reset()
episode = []
print("Ruta óptima")
while True:
probs = policy(state)
action = np.argmax(probs)
next_state, reward, done, info = env.step(action)
print("Action: {
} -> Obs: {} and reward: {}".format(action, next_state, reward))
episode.append((state, action, reward))
if done:
break
state = next_state
Implementar una función que imprima por pantalla la política óptima encontrada para cada celda.
Se muestra a continuación una imagen de ejemplo para la cuadrícula del entorno modificado de 15x15.
import sys
def print_all(policy, height, width):
obs = env.reset()
t, total_reward, done = 0, 0, False
state = env.reset()
episode = []
Movimientos = defaultdict(lambda: np.zeros(env.action_space.n))
print("Movimientos:")
print("")
for fila in range(0,15):
for columna in range(0,15):
state = (fila, columna)
probs = policy(state)
action = np.argmax(probs)
next_state, reward, done, info = env.step(action)
Movimientos[state]= action
#print(state+ '\n' , end="")
state = next_state
# Escribir el código aquí
#print(Movimientos.keys()
print_mapa(Movimientos, height, width)
def print_policy(policy, height, width):
obs = env.reset()
t, total_reward, done = 0, 0, False
state = env.reset()
episode = []
Movimientos = defaultdict(lambda: np.zeros(env.action_space.n))
print("Movimientos:")
print("")
reward_final = 0;
movimientos_realizados = 0;
while True:
probs = policy(state)
action = np.argmax(probs)
next_state, reward, done, info = env.step(action)
print("Action: {
} -> Obs: {} and reward: {} \n".format(action, next_state, reward), end="")
sys.stdout.flush()
episode.append((state, action, reward))
Movimientos[state]= action
#print(state+ '\n' , end="")
reward_final=reward_final+reward
movimientos_realizados = movimientos_realizados +1
if movimientos_realizados>50 and state in Movimientos.keys():
sys.stdout.flush()
print('Posible bucle en la Solución- Considero que la solución no va a ser valida.')
print('Para evitar bucles infinitos en las soluciones con un nivel bajo de episodios añado esta condición')
break;
if movimientos_realizados>12 and state == next_state:
sys.stdout.flush()
print('Bucle en la solucion- Con este metodo no se ha encontrado una solucion valida.')
break;
'''
probs = np.delete(probs,np.argmax(probs))
action = np.argmax(probs)
next_state, reward, done, info = env.step(action)
print("Action: {} -> Obs: {} and reward: {} \n".format(action, next_state, reward), end="")
sys.stdout.flush()
episode.append((state, action, reward))
Movimientos[state]= action
'''
else:
state = next_state
sys.stdout.flush()
if done:
print('La recompensa final es: ' + str(reward_final) + " ; además se han realizado "+ str(movimientos_realizados) + " Movimientos")
Movimientos[state]= 4
break
# Escribir el código aquí
#print(Movimientos.keys())
print_mapa(Movimientos, height, width)
def print_mapa(Movimientos, height, width):
print("")
print("")
print("Mapa:")
print("")
contador=0
for fila in range(0,height):
print('--------------------------------------------------------------\n', end="")
resultado= []
for columna in range(0,width):
#print(np.where(Q[fila,columna] == max(Q[fila,columna])))
coordenada = (fila, columna)
if coordenada in Movimientos.keys():
if Movimientos[coordenada]== 0:
resultado.append(' | U' )
if Movimientos[coordenada]== 1:
resultado.append(' | R' )
if Movimientos[coordenada]== 2:
resultado.append(' | D' )
if Movimientos[coordenada]== 3:
resultado.append(' | L')
if Movimientos[coordenada]== 4:
resultado.append(' | G')
else:
resultado.append(' | ')
print(''.join(resultado)+' |\n',end="")
print('--------------------------------------------------------------\n', end="")
print_policy(policy,15,15)