Soluciones tabulares

En esta proyecto nos centraremos en la definición de un entorno e implementaremos los diferentes métodos para buscar una solución óptima del problema.

0. El entorno WindyGridWorld

El entorno WindyGridWorld consiste en un agente que se mueve en una cuadrícula 7x10 (alto x ancho). En cada paso, el agente tiene 4 opciones de acción o movimiento: ARRIBA, ABAJO, DERECHA, IZQUIERDA. El agente siempre sale de la misma casilla [3, 0] y el juego termina cuando el agente llega a la casilla de llegada [3, 7].

El entorno se corresponde con el ejemplo 'Cuadrícula con viento' explicado en la sección 3.1.2. el módulo "Métodos de Diferencia Temporal". El problema radica en que hay un viento que empuja al agente hacia arriba en la parte central de la cuadrícula. Esto provoca que, aunque se ejecute una acción estándar, en la región central los estados resultantes se desplazan hacia arriba por un viento cuya fuerza varía entre columnas.

El código para implementar este entorno, que se encuentra disponible en el fichero adjunto windy_gridworld_env.py, ha sido adaptado del siguiente enlace:

Vamos a empezar cargando el entorno y ver qué características tiene, ejecutando un episodio de prueba.

0.1. Carga de datos

El siguiente código carga los paquetes necesarios para el ejemplo, crea el entorno mediante la instanciación de un objeto de la clase WindyGridworldEnv (importada del archivo adjunto windy_gridworld_env.py) e imprime por pantalla la dimensión del espacio de acciones (0=arriba, 1=derecha, 2=abajo y 3=izquierda), el espacio de observaciones (una tupla que indica la posición del agente en la cuadrícula) y el rango de la variable de recompensa (cuyo valor es -1 para cualquier acción y que por tanto va de menos infinito a más infinito).

In [1]:
!pip install gym
DEPRECATION: Python 3.5 reached the end of its life on September 13th, 2020. Please upgrade your Python as Python 3.5 is no longer maintained. pip 21.0 will drop support for Python 3.5 in January 2021. pip 21.0 will remove support for this functionality.
Defaulting to user installation because normal site-packages is not writeable
Requirement already satisfied: gym in /usr/local/lib/python3.5/dist-packages (0.17.3)
Requirement already satisfied: pyglet<=1.5.0,>=1.4.0 in /usr/local/lib/python3.5/dist-packages (from gym) (1.5.0)
Requirement already satisfied: numpy>=1.10.4 in /usr/local/lib/python3.5/dist-packages (from gym) (1.14.3)
Requirement already satisfied: cloudpickle<1.7.0,>=1.2.0 in /usr/local/lib/python3.5/dist-packages (from gym) (1.6.0)
Requirement already satisfied: scipy in /home/al118345/.local/lib/python3.5/site-packages (from gym) (1.4.1)
Requirement already satisfied: future in /usr/local/lib/python3.5/dist-packages (from pyglet<=1.5.0,>=1.4.0->gym) (0.18.2)
In [2]:
import gym
import numpy as np
import windy_gridworld_env as wge
from collections import defaultdict
import sys

env=wge.WindyGridworldEnv()
print("Action space is {
                } ".format(env.action_space))
print("Observation space is {} ".format(env.observation_space))
print("Reward range is {
                } ".format(env.reward_range))
Action space is Discrete(4)
Observation space is Tuple(Discrete(7), Discrete(10))
Reward range is (-inf, inf)

0.2. Ejecución de un episodio

A continuación, realizaremos la ejecución de un episodio del entorno WindyGridWorld utilizando un agente que selecciona las acciones de forma aleatoria.

In [3]:
# Inicializamos el entorno
obs = env.reset()
t, total_reward, done = 0, 0, False

print("Obs inicial: {
                } ".format(obs))

switch_action = {
        0: "U",
        1: "R",
        2: "D",
        3: "L",
    }

while not done:

    # Elegir una acción aleatoria (ésta es la implementación del agente)
    action = env.action_space.sample()

    # Ejecutar la acción y esperar la respuesta del entorno
    new_obs, reward, done, info = env.step(action)

    # Imprimir time-step
    print("Action: {
                } -> Obs: {} and reward: {}".format(switch_action[action], new_obs, reward))

    # Actualizar variables
    obs = new_obs
    total_reward += reward
    t += 1

print("Episode finished after {} timesteps and reward was {
                } ".format(t, total_reward))
env.close()
Obs inicial: (3, 0)
Action: D -> Obs: (4, 0) and reward: -1
Action: L -> Obs: (4, 0) and reward: -1
Action: L -> Obs: (4, 0) and reward: -1
Action: L -> Obs: (4, 0) and reward: -1
[...]salto algonos acciones[...]
Action: D -> Obs: (4, 9) and reward: -1
Action: L -> Obs: (4, 8) and reward: -1
Action: L -> Obs: (3, 7) and reward: -1
Episode finished after 16728 timesteps and reward was -16728

1. Modificación del entorno

El entorno WindyGridWorld tiene varios parámetros que pueden ser modificados:

  • La dimensión de la cuadrícula.
  • La posición y fuerza del viento.
  • La posición de las casillas de salida y de llegada.
Ejercicio 1.1

Modificar el codigo de WindyGridWorld (fichero adjunto windy_gridworld_env.py) para que represente las propiedades de la cuadrícula descritas a continuación.

  • Cuadrícula 15x15
  • Fuerza del viento = [0, 0, 0, 1, 1, 1, 2, 2, 2, 1, 1, 1, 0, 0, 0]
  • Casilla de inicio [8,0]
  • Casilla final [8,7]

Guardar el entorno modificado en el archivo windy_gridworld_env_v2.py, en la misma carpeta que el original.

In [56]:
import gym
import numpy as np
import windy_gridworld_env_v2 as wge

env=wge.WindyGridworldEnv()
print("Action space is {
                } ".format(env.action_space))
print("Observation space is {} ".format(env.observation_space))
print("Reward range is {
                } ".format(env.reward_range))
print("Obs inicial: {
                } ".format(obs))
Action space is Discrete(4)
Observation space is Tuple(Discrete(15), Discrete(15))
Reward range is (-inf, inf)
Obs inicial: (8, 0)
Ejercicio 1.2

A continuación, implementar un agente que siempre realice la misma acción: ir hacia la derecha y modificar el código para que sólo realice 10 time-steps.

Mostrar la trayectoria seguida por el agente. No es necesario graficarla, tan sólo mostrar las coordenadas de las casillas visitadas en orden.

In [57]:
# Environment reset
obs = env.reset()
t, total_reward, done = 0, 0, False

print("Obs inicial: {
                } ".format(obs))

switch_action = {
        0: "U",
        1: "R",
        2: "D",
        3: "L"
    }
Obs inicial: (8, 0)
In [6]:
def print_mapa(Movimientos, height, width):
    print("")
    print("")
    print("Mapa:")
    print("")
    contador=0
    for fila in range(0,height):
        print('--------------------------------------------------------------\n', end="")
        resultado= []
        for columna in range(0,width):
                    #print(np.where(Q[fila,columna] == max(Q[fila,columna])))
                    coordenada = (fila, columna)

                    if coordenada in Movimientos.keys():
                        if Movimientos[coordenada]== 0:
                            resultado.append(' | U' )
                        if Movimientos[coordenada]== 1:
                            resultado.append(' | R' )
                        if Movimientos[coordenada]== 2:
                            resultado.append(' | D' )
                        if Movimientos[coordenada]== 3:
                            resultado.append(' | L')
                        if Movimientos[coordenada]== 4:
                            resultado.append(' | G')

                    else:
                        resultado.append(' |  ')

        print(''.join(resultado)+' |\n',end="")
    print('--------------------------------------------------------------\n', end="")

In [7]:
# Escribir el código aquí
# Inicializamos el entorno
obs = env.reset()
t, total_reward, done = 0, 0, False

print("Obs inicial: {
                } ".format(obs))

switch_action = {
        0: "U",
        1: "R",
        2: "D",
        3: "L",
    }

Movimientos = defaultdict(lambda: np.zeros(env.action_space.n))

while not done and t<10:
    print('Acción número ' + str(t+1))
    # Elegir una acción aleatoria (ésta es la implementación del agente)
    action = 1

    # Ejecutar la acción y esperar la respuesta del entorno
    new_obs, reward, done, info = env.step(action)

    # Imprimir time-step
    print("Action: {
                } -> Obs: {} and reward: {}".format(switch_action[action], new_obs, reward))
    Movimientos[obs]= action

    # Actualizar variables
    obs = new_obs
    total_reward += reward
    t += 1


print("Episode finished after {} timesteps and reward was {
                } ".format(t, total_reward))
env.close()


print("A continucación muestro el movimiento del agente para validar la implementación del ejercio.");
print_mapa(Movimientos, 15, 15)
Obs inicial: (8, 0)
Acción número 1
Action: R -> Obs: (8, 1) and reward: -1
Acción número 2
Action: R -> Obs: (8, 2) and reward: -1
Acción número 3
Action: R -> Obs: (8, 3) and reward: -1
Acción número 4
Action: R -> Obs: (7, 4) and reward: -1
Acción número 5
Action: R -> Obs: (6, 5) and reward: -1
Acción número 6
Action: R -> Obs: (5, 6) and reward: -1
Acción número 7
Action: R -> Obs: (3, 7) and reward: -1
Acción número 8
Action: R -> Obs: (1, 8) and reward: -1
Acción número 9
Action: R -> Obs: (0, 9) and reward: -1
Acción número 10
Action: R -> Obs: (0, 10) and reward: -1
Episode finished after 10 timesteps and reward was -10
A continucación muestro el movimiento del agente para validar la implementación del ejercio.


Mapa:

--------------------------------------------------------------
 |   |   |   |   |   |   |   |   |   | R |   |   |   |   |   |
--------------------------------------------------------------
 |   |   |   |   |   |   |   |   | R |   |   |   |   |   |   |
--------------------------------------------------------------
 |   |   |   |   |   |   |   |   |   |   |   |   |   |   |   |
--------------------------------------------------------------
 |   |   |   |   |   |   |   | R |   |   |   |   |   |   |   |
--------------------------------------------------------------
 |   |   |   |   |   |   |   |   |   |   |   |   |   |   |   |
--------------------------------------------------------------
 |   |   |   |   |   |   | R |   |   |   |   |   |   |   |   |
--------------------------------------------------------------
 |   |   |   |   |   | R |   |   |   |   |   |   |   |   |   |
--------------------------------------------------------------
 |   |   |   |   | R |   |   |   |   |   |   |   |   |   |   |
--------------------------------------------------------------
 | R | R | R | R |   |   |   |   |   |   |   |   |   |   |   |
--------------------------------------------------------------
 |   |   |   |   |   |   |   |   |   |   |   |   |   |   |   |
--------------------------------------------------------------
 |   |   |   |   |   |   |   |   |   |   |   |   |   |   |   |
--------------------------------------------------------------
 |   |   |   |   |   |   |   |   |   |   |   |   |   |   |   |
--------------------------------------------------------------
 |   |   |   |   |   |   |   |   |   |   |   |   |   |   |   |
--------------------------------------------------------------
 |   |   |   |   |   |   |   |   |   |   |   |   |   |   |   |
--------------------------------------------------------------
 |   |   |   |   |   |   |   |   |   |   |   |   |   |   |   |
--------------------------------------------------------------

2. Métodos de Montecarlo

Dado que el entorno es determinista, es factible encontrar una política óptima (que puede no ser única) que consiga el mayor retorno (y por tanto la trayectoria más corta).

El objetivo de este apartado es realizar una estimación de la política óptima mediante los métodos de Montecarlo, en concreto estudiaremos el algoritmo On-policy first-visit MC control (para políticas $\epsilon$-soft).

Ejercicio 2.1

Implementar el Algoritmo 3 explicado en el módulo "Métodos de Montecarlo" utilizando los siguientes parámetros:

  • Número de episodios = 250000
  • Epsilon = 0.1
  • Factor de descuento = 1
In [27]:
def make_epsilon_greedy_policy(Q, epsilon, nA):
    """
    Creates an epsilon-greedy policy based on a given Q-function and epsilon.
    
    Args:
        Q: A dictionary that maps from state -> action-values.
            Each value is a numpy array of length nA (see below)
        epsilon: The probability to select a random action . float between 0 and 1.
        nA: Number of actions in the environment.
    
    Returns:
        A function that takes the observation as an argument and returns
        the probabilities for each action in the form of a numpy array of length nA.
    
    """
    def policy_fn(observation):
        A = np.ones(nA, dtype=float) * epsilon / nA
        best_action = np.argmax(Q[observation])
        A[best_action] += (1.0 - epsilon)
        return A
    return policy_fn

def mc_control_epsilon_greedy(env, num_episodes, discount_factor=1.0, epsilon=0.1):
    """
    Monte Carlo Control using Epsilon-Greedy policies.
    Finds an optimal epsilon-greedy policy.
    
    Args:
        env: OpenAI gym environment.
        num_episodes: Number of episodes to sample.
        discount_factor: Gamma discount factor.
        epsilon: Chance the sample a random action. Float betwen 0 and 1.
    
    Returns:
        A tuple (Q, policy).
        Q is a dictionary mapping state -> action values.
        policy is a function that takes an observation as an argument and returns
        action probabilities
    """

    # Keeps track of sum and count of returns for each state
    # to calculate an average. We could use an array to save all
    # returns (like in the book) but that's memory inefficient.
    returns_sum = defaultdict(float)
    returns_count = defaultdict(float)

    # The final action-value function.
    # A nested dictionary that maps state -> (action -> action-value).
    Q = defaultdict(lambda: np.zeros(env.action_space.n))

    # The policy we're following
    policy = make_epsilon_greedy_policy(Q, epsilon, env.action_space.n)

    for i_episode in range(1, num_episodes + 1):
        # Print out which episode we're on, useful for debugging.
        if i_episode % 100 == 0:
            print("\rEpisode {}/{
                }.".format(i_episode, num_episodes), end="")
            sys.stdout.flush()

        # Generate an episode.
        # An episode is an array of (state, action, reward) tuples
        episode = []
        state = env.reset()
        #for t in range(1000000): #si es hasta que done se hace eterno
        for t in range(1000): #considero que este  es el mejor limite
        #while True:
            probs = policy(state)
            action = np.random.choice(np.arange(len(probs)), p=probs)
            next_state, reward, done, _ = env.step(action)
            episode.append((state, action, reward))
            if done:
                break
            state = next_state

        # Find all (state, action) pairs we've visited in this episode
        # We convert each state to a tuple so that we can use it as a dict key
        sa_in_episode = set([(tuple(x[0]), x[1]) for x in episode])
        for state, action in sa_in_episode:
            sa_pair = (state, action)
            # Find the first occurance of the (state, action) pair in the episode
            first_occurence_idx = next(i for i,x in enumerate(episode)
                                       if x[0] == state and x[1] == action)
            # Sum up all rewards since the first occurance
            G = sum([x[2]*(discount_factor**i) for i,x in enumerate(episode[first_occurence_idx:])])
            # Calculate average return for this state over all sampled episodes
            returns_sum[sa_pair] += G
            returns_count[sa_pair] += 1.0
            Q[state][action] = returns_sum[sa_pair] / returns_count[sa_pair]

        # The policy is improved implicitly by changing the Q dictionary

    return Q, policy
In [9]:
# Environment reset
obs = env.reset()
t, total_reward, done = 0, 0, False

print("Obs inicial: {
                } ".format(obs))

Q, policy = mc_control_epsilon_greedy(env, num_episodes=250000, epsilon=0.1,discount_factor=1.0)
Obs inicial: (8, 0)
Episode 250000/250000.
In [10]:
# Environment reset
obs = env.reset()
t, total_reward, done = 0, 0, False
state = env.reset()
episode = []
print("Ruta óptima")
while True:
            probs = policy(state)
            action = np.argmax(probs)
            next_state, reward, done, info = env.step(action)
            print("Action: {
                } -> Obs: {} and reward: {}".format(action, next_state, reward))
            episode.append((state, action, reward))
            if done:
                break
            state = next_state
Ruta óptima
Action: 2 -> Obs: (9, 0) and reward: -1
Action: 1 -> Obs: (9, 1) and reward: -1
Action: 2 -> Obs: (10, 1) and reward: -1
Action: 2 -> Obs: (11, 1) and reward: -1
Action: 2 -> Obs: (12, 1) and reward: -1
Action: 2 -> Obs: (13, 1) and reward: -1
Action: 1 -> Obs: (13, 2) and reward: -1
Action: 1 -> Obs: (13, 3) and reward: -1
Action: 1 -> Obs: (12, 4) and reward: -1
Action: 1 -> Obs: (11, 5) and reward: -1
Action: 1 -> Obs: (10, 6) and reward: -1
Action: 1 -> Obs: (8, 7) and reward: -1
Ejercicio 2.2

Implementar una función que imprima por pantalla la política óptima encontrada para cada celda.

Se muestra a continuación una imagen de ejemplo para la cuadrícula del entorno modificado de 15x15.

In [20]:
import sys


def print_all(policy, height, width):
    obs = env.reset()
    t, total_reward, done = 0, 0, False
    state = env.reset()
    episode = []
    Movimientos = defaultdict(lambda: np.zeros(env.action_space.n))
    print("Movimientos:")
    print("")
    for fila in range(0,15):
        for columna in range(0,15):
            state = (fila, columna)
            probs = policy(state)
            action = np.argmax(probs)

            next_state, reward, done, info = env.step(action)


            Movimientos[state]= action
            #print(state+ '\n' , end="")

            state = next_state
            # Escribir el código aquí

            #print(Movimientos.keys()

    print_mapa(Movimientos, height, width)



def print_policy(policy, height, width):
    obs = env.reset()
    t, total_reward, done = 0, 0, False
    state = env.reset()
    episode = []
    Movimientos = defaultdict(lambda: np.zeros(env.action_space.n))
    print("Movimientos:")
    print("")
    reward_final = 0;
    movimientos_realizados = 0;
    while True:
            probs = policy(state)
            action = np.argmax(probs)
            next_state, reward, done, info = env.step(action)

            print("Action: {
                } -> Obs: {} and reward: {} \n".format(action, next_state, reward), end="")
            sys.stdout.flush()

            episode.append((state, action, reward))

            Movimientos[state]= action
            #print(state+ '\n' , end="")
            reward_final=reward_final+reward
            movimientos_realizados = movimientos_realizados +1
            if movimientos_realizados>50 and state in Movimientos.keys():
                sys.stdout.flush()
                print('Posible bucle en la Solución- Considero que la solución no va a ser valida.')
                print('Para evitar bucles infinitos en las soluciones con un nivel bajo de episodios añado esta condición')
                break;


            if movimientos_realizados>12 and  state == next_state:
                sys.stdout.flush()
                print('Bucle en la solucion-  Con este metodo no se ha encontrado una solucion valida.')
                break;
                '''
                probs = np.delete(probs,np.argmax(probs))
                action = np.argmax(probs)
                next_state, reward, done, info = env.step(action)
                print("Action: {} -> Obs: {} and reward: {} \n".format(action, next_state, reward), end="")
                sys.stdout.flush()
                episode.append((state, action, reward)) 
                Movimientos[state]= action
                '''
            else:
                state = next_state
                sys.stdout.flush()
            if done:
                print('La recompensa final es: ' + str(reward_final) + " ; además se han realizado "+ str(movimientos_realizados) + " Movimientos")
                Movimientos[state]= 4
                break




            # Escribir el código aquí

            #print(Movimientos.keys())

    print_mapa(Movimientos, height, width)


def print_mapa(Movimientos, height, width):
    print("")
    print("")
    print("Mapa:")
    print("")
    contador=0
    for fila in range(0,height):
        print('--------------------------------------------------------------\n', end="")
        resultado= []
        for columna in range(0,width):
                    #print(np.where(Q[fila,columna] == max(Q[fila,columna])))
                    coordenada = (fila, columna)

                    if coordenada in Movimientos.keys():
                        if Movimientos[coordenada]== 0:
                            resultado.append(' | U' )
                        if Movimientos[coordenada]== 1:
                            resultado.append(' | R' )
                        if Movimientos[coordenada]== 2:
                            resultado.append(' | D' )
                        if Movimientos[coordenada]== 3:
                            resultado.append(' | L')
                        if Movimientos[coordenada]== 4:
                            resultado.append(' | G')

                    else:
                        resultado.append(' |  ')

        print(''.join(resultado)+' |\n',end="")
    print('--------------------------------------------------------------\n', end="")

In [12]:
print_policy(policy,15,15)
<