# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
"""
References:
[Her21] Tue Herlau. Sequential decision making. (See 02465_Notes.pdf), 2021.
"""
# from line_profiler_pycharm import profile
import inspect
import itertools
import os
import sys
from collections import OrderedDict, namedtuple
import numpy as np
from tqdm import tqdm
from irlc.utils.common import load_time_series, log_time_series
from irlc.utils.irlc_plot import existing_runs
import shutil
# Main agent class. See for further details. for additional details.
from gym import Env
[docs]class Agent:
"""
Main agent class. See (Her21, Subsection 4.4.3) for additional details.
:Example:
>>> print("Hello World")
"Hello world"
.. math::
:nowrap:
\\begin{eqnarray}
y & = & ax^2 + bx + c \\\\
f(x) & = & x^2 + 2xy + y^2
\\end{eqnarray}
"""
[docs] def __init__(self, env: Env):
"""
Instantiate the Agent class.
Args:
env: An openai gym Environment instance.
"""
self.env = env
[docs] def pi(self, s, k=None):
""" Evaluate the Agent's policy at time step `k` in state `s`
The details will differ depending on whether the agent interacts in a discrete-time or continous-time setting.
- For discrete application (dynamical programming/search and reinforcement learning), k is discrete k=0, 1, 2, ...
- For control applications, k is continious and denote simulation time t, i.e. it should be called as
> agent.pi(x, t)
:param s: Current state
:param k: Current time index.
:return: action
"""
return self.env.action_space.sample()
[docs] def train(self, s, a, r, sp, done=False):
"""
Called at each step of the simulation after a = pi(s,k) and environment transition to sp.
Allows the agent to learn from experience
:param s: Current state x_k
:param a: Action taken
:param r: Reward obtained by taking action a_k in x_k
:param sp: The state that the environment transitioned to :math:`{\\bf x}_{k+1}`
:param done: Whether environment terminated when transitioning to sp
:return: None
"""
pass
def __str__(self):
""" Optional: A unique name for this agent. Used for labels when plotting, but can be kept like this. """
return super().__str__()
[docs] def hello(self, greeting: str) -> str:
""" The canonical hello world example.
A *longer* description with some **RST**.
Args:
greeting: The person to say hello to.
Returns:
str: The greeting
"""
return f"{greeting} {self.name}"
fields = ('time', 'state', 'action', 'reward')
Trajectory = namedtuple('Trajectory', fields + ("env_info",))
[docs]def train(env, agent=None, experiment_name=None, num_episodes=1, verbose=True, reset=True, max_steps=1e10,
max_runs=None,
return_trajectory=False, # Return the current trajectories as a list
resume_stats=None, # Resume stat collection from last save. None implies same as saveload_model
log_interval=1, # Only log every log_interval steps. Reduces size of log files.
delete_old_experiments=False,
):
"""
Implement the main training loop, see (Her21, Subsection 4.4.4).
Simulate the interaction between agent `agent` and the environment `env`.
The function has a lot of special functionality, so it is useful to consider the common cases. An example:
>>> stats, _ = train(env, agent, num_episodes=2)
Simulate interaction for two episodes (i.e. environment terminates two times and is reset).
`stats` will be a list of length two containing information from each run
>>> stats, trajectories = train(env, agent, num_episodes=2, return_Trajectory=True)
`trajectories` will be a list of length two containing information from the two trajectories.
>>> stats, _ = train(env, agent, experiment_name='experiments/my_run', num_episodes=2)
Save `stats`, and trajectories, to a file which can easily be loaded/plotted (see course software for examples of this).
The file will be time-stamped so using several calls you can repeat the same experiment (run) many times.
>>> stats, _ = train(env, agent, experiment_name='experiments/my_run', num_episodes=2, max_runs=10)
As above, but do not perform more than 10 runs. Useful for repeated experiments.
:param env: Environment (Gym instance)
:param agent: Agent instance
:param experiment_name: Save outcome to file for easy plotting (Optional)
:param num_episodes: Number of episodes to simulate
:param verbose: Display progress bar
:param reset: Call env.reset() before simulation start.
:param max_steps: Terminate if this many steps have elapsed (for non-terminating environments)
:param max_runs: Maximum number of repeated experiments (requires `experiment_name`)
:param return_trajectory: Return trajectories list (Off by default since it might consume lots of memory)
:param resume_stats: Resume stat collection from last run (requires `experiment_name`)
:param log_interval: Log stats less frequently
:param delete_old_experiment: Delete old experiment with the same name
:return: stats, trajectories (both as lists)
"""
from irlc import cache_write
from irlc import cache_read
saveload_model = False
temporal_policy = None
save_stats = True
if agent is None:
print("[train] No agent was specified. Using irlc.Agent(env) (random actions)")
agent = Agent(env)
if delete_old_experiments and experiment_name is not None and os.path.isdir(experiment_name):
shutil.rmtree(experiment_name)
if experiment_name is not None and max_runs is not None and existing_runs(experiment_name) >= max_runs:
stats, recent = load_time_series(experiment_name=experiment_name)
if return_trajectory:
trajectories = cache_read(recent+"/trajectories.pkl")
else:
trajectories = []
return stats, trajectories
stats = []
steps = 0
ep_start = 0
resume_stats = saveload_model if resume_stats is None else resume_stats
recent = None
if resume_stats:
stats, recent = load_time_series(experiment_name=experiment_name)
if recent is not None:
ep_start, steps = stats[-1]['Episode']+1, stats[-1]['Steps']
if temporal_policy is None:
a = inspect.getfullargspec(agent.pi)
temporal_policy = len(a.args) >= 3 # does the policy include a time step?
trajectories = []
include_metadata = len(inspect.getfullargspec(agent.train).args) >= 7
break_outer = False
with tqdm(total=num_episodes, disable=not verbose, file=sys.stdout) as tq:
for i_episode in range(num_episodes):
if break_outer:
break
if reset or i_episode > 0:
s = env.reset()
elif hasattr(env, "s"):
s = env.s
elif hasattr(env, 'state'):
s = env.state
else:
s = env.model.s
time = 0
reward = []
trajectory = Trajectory(time=[], state=[], action=[], reward=[], env_info=[])
for _ in itertools.count():
a = agent.pi(s,time) if temporal_policy else agent.pi(s)
sp, r, done, metadata = env.step(a)
if not include_metadata:
agent.train(s, a, r, sp, done)
else:
agent.train(s, a, r, sp, done, metadata)
if return_trajectory:
trajectory.time.append(np.asarray(time))
trajectory.state.append(s)
trajectory.action.append(a)
trajectory.reward.append(np.asarray(r))
trajectory.env_info.append(metadata)
reward.append(r)
steps += 1
time += metadata['dt'] if 'dt' in metadata else 1
if done or steps >= max_steps:
trajectory.state.append(sp)
trajectory.time.append(np.asarray(time))
break_outer = steps >= max_steps
break
s = sp
if return_trajectory:
try:
trajectory = Trajectory(**{field: np.stack([np.asarray(x_) for x_ in getattr(trajectory, field)]) for field in fields}, env_info=trajectory.env_info)
except Exception as e:
pass
trajectories.append(trajectory)
stats.append({"Episode": i_episode + ep_start,
"Accumulated Reward": sum(reward),
"Average Reward": np.mean(reward),
"Length": len(reward),
"Steps": steps, **agent.extra_stats()}) if (i_episode+1) % log_interval == 0 else None
tq.set_postfix(ordered_dict=OrderedDict(list(OrderedDict(stats[-1]).items() )[:5] )) if len(stats) > 0 else None
tq.update()
# print(i_episode)
sys.stderr.flush()
if resume_stats and save_stats and recent is not None:
os.remove(recent+"/log.txt")
if experiment_name is not None and save_stats:
path = log_time_series(experiment=experiment_name, list_obs=stats)
if return_trajectory:
cache_write(trajectories, path+"/trajectories.pkl")
print(f"Training completed. Logging {experiment_name}: '{', '.join( stats[0].keys()) }'")
for i, t in enumerate(trajectories):
from collections import defaultdict
nt = defaultdict(lambda: [])
if "supersample" in t.env_info[0]:
for f in fields:
for k, ei in enumerate(t.env_info):
z = ei['supersample'].__getattribute__(f).T
if k == 0:
pass
else:
z = z[1:]
nt[f].append(z)
# traj = Trajectory(time=np.concatenate(nt['time']), state=
for f in fields:
nt[f] = np.concatenate([z for z in nt[f]],axis=0)
# nt[f] = np.concatenate( [ei['supersample'].__getattribute__(f) for ei in t.env_info] , axis=0)
traj2 = Trajectory(**nt, env_info=[])
trajectories[i] = traj2 # .__setattr__(f, nt[f])
return stats, trajectories