Source code for irlc.ex04.continuous_time_model

# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
"""
References:
  [Her21] Tue Herlau. Sequential decision making. (See 02465_Notes.pdf), 2021.
  [Kel17] Matthew Kelly. An introduction to trajectory optimization: how to do your own direct collocation. SIAM Review, 59(4):849–904, 2017. (See kelly2017.pdf).
"""
import sympy as sym
import numpy as np
import time
import sys
import matplotlib.pyplot as plt
from scipy.optimize import Bounds

def sp2bounds(space): # Turn a state/action space (gym) into a bounds object.
    return Bounds(list(space.low), list(space.high))

[docs]class ContiniousTimeSymbolicModel: ''' Continious time symbolic model. See (Her21, Section 11.3) for a top-level description. This model represents the top-level description of the physical system as a differential equation > dx/dt = f(x, u, t) and a cost-function defined as an integral: > Cost = g(x(t0), x(tf), t0, tf) + int_t0^tf g(x, u, t) dt and bounds on x, u and t. In this description both x and u are vectors. The overall idea is that you write a new model by editing the def sym_f function. Add a *symbolic* expression here, and the class will automatically convert it into a numpy function, and allow e.g. the discrete model to compute derivatives. ''' action_space = None observation_space = None state_labels = None action_labels = None
[docs] def __init__(self, cost=None, simple_bounds=None): self.cost = cost t = sym.symbols("t") x = symv("x", self.state_size) u = symv("u", self.action_size) self.f = sym.lambdify( (x, u, t), self.sym_f(x, u, t)) sb = dict(t0=Bounds([0], [0]), tF=Bounds([0], [np.inf]), u=sp2bounds(self.action_space), x=sp2bounds(self.observation_space), x0=sp2bounds(self.observation_space), xF=sp2bounds(self.observation_space)) if simple_bounds is None: simple_bounds = {} self.simple_bounds_ = {**sb, **simple_bounds} if self.state_labels is None: self.state_labels = [f'x{i}' for i in range(self.state_size)] if self.action_labels is None: self.action_labels = [f'u{i}' for i in range(self.action_size)]
def reset(self): # Returns a start position for the system. raise NotImplementedError() def simulate(self, x0, u_fun, t0, tF, N_steps=1000, method='rk4'): """ Defaults to RK4 simulation of the trajectory from x0, u0, t0 to tf, see (Her21, Algorithm 18) Method can be either 'rk4' or 'euler' u_fun has to be a function which returns a list/tuple with same dimension as action_space x0 is initial position; it too must be a list of size state_space """ u_fun = ensure_policy(u_fun) tt = np.linspace(t0, tF, N_steps+1) # Time grid t_k = tt[k] between t0 and tF. xs = [ np.asarray(x0) ] u = [ u_fun(x0, t0 )] for k in range(N_steps): Delta = tt[k+1] - tt[k] tn = tt[k] xn = xs[k] un = u[k] unp = u_fun(xn, tn + Delta) if method == 'rk4': """ Implement the RK4 method here. This is a rather important question, so please get in touch with me if you are stuck. The algorithm you implement is: (Her21, Algorithm 18). Remember to use breakpoints and the debugger console. """ # TODO: 5 lines missing. raise NotImplementedError("Insert your solution and remove this error.") elif method == 'euler': xnp = xn + Delta * np.asarray(self.f(xn, un, tn)) else: raise Exception("Bad integration method", method) xs.append(xnp) u.append(unp) xs = np.stack(xs, axis=0) u = np.stack(u, axis=0) return xs, u, tt def sym_f(self, x, u, t=None): raise NotImplementedError("Implement a function which return the environment dynamics f(x,u,t) as a sympy exression") @property def state_size(self): return np.prod(self.observation_space.shape) @property def action_size(self): return np.prod(self.action_space.low.shape) def render(self, x, mode="human"): raise NotImplementedError() """ Below are less important helper-functions etc. """ def animate_rollout(self, x0, u_fun, t0, tF, N_steps = 1000, fps=10): if sys.gettrace() is not None: print("Not animating stuff in debugger as it crashes.") return y, _, tt = self.simulate(x0, u_fun, t0, tF, N_steps=N_steps) secs = tF-t0 frames = int( np.ceil( secs * fps ) ) I = np.round( np.linspace(0, N_steps-1, frames)).astype(int) y = y[I,:] for i in range(frames): self.render(x=y[i] ) time.sleep(1/fps) def sym_cf(self, t0, tF, x0, xF): """ Compute Mayer term in cost function """ return self.cost.sym_cf(t0=t0, tF=tF, x0=x0, xF=xF) def sym_c(self, x, u, t): ''' Compute Lagrange term in cost function ''' return self.cost.sym_c(x=x, u=u, t=t) def sym_h(self, x, u, t): ''' Dynamical path constraint of the form: (See (Kel17, Eq.(1.3))) h(x, u, t) <= 0. These are very rarely used by the methods we consider (so far only in brachiostochrone yafcport with direct methods). ''' return [] def simple_bounds(self): ''' Simple inequality constraints (i.e. z_lb <= z <= z_ub) Returned as a dict with keys representing the variables they constrain. For instance: >>> sb = env.simple_bounds() >>> b = sb['x0'] b will now be a scipy Bounds object and constraints can be found as: >>> b.lb <= x0 <= b.ub returned as a dict; see implementations. ''' return self.simple_bounds_ def set_simple_bounds(self, bounds): self.simple_bounds_ = {**self.simple_bounds_, **bounds} def sym_g(self, t0,tF,x0,xF): ''' Boundary constraints g(t0,tF,x0,xF) <= 0. Note: We will not use this function for the course. ''' return [] def close(self): pass def guess(self): bnds = self.simple_bounds() def mfin(z): return [z_ if np.isfinite(z_) else 0 for z_ in z] xL = mfin(bnds['x0'].lb) xU = mfin(bnds['xF'].ub) tF = 10 if not np.isfinite(bnds['tF'].ub[0]) else bnds['tF'].ub[0] gs = {'t0': 0, 'tF': tF, 'x': [xL, xU], 'u': [mfin(bnds['u'].lb), mfin(bnds['u'].ub)]} return gs
def symv(s, n): """ Returns a vector of symbolic functions. For instance if s='x' and n=3 then it will return [x0,x1,x2] where x0,..,x2 are symbolic variables. """ return sym.symbols(" ".join(["%s%i," % (s, i) for i in range(n)])) def ensure_policy(u): """ Ensure u corresponds to a policy function with input arguments u(x, t) """ if callable(u): return u else: return lambda x, t: np.asarray(u).reshape((-1,)) def plot_trajectory(x_res, tt, lt='k-', ax=None, labels=None, legend=None): M = x_res.shape[1] if labels is None: labels = [f"x_{i}" for i in range(M)] if ax is None: if M == 2: a = 234 if M == 3: r = 1 c = 3 else: r = 2 if M > 1 else 1 c = (M + 1) // 2 H = 2*r if r > 1 else 3 W = 6*c # if M == 2: # W = 12 f, ax = plt.subplots(r,c, figsize=(W,H)) if M == 1: ax = np.asarray([ax]) print(M,r,c) for i in range(M): if len(ax) <= i: print("issue!") a = ax.flat[i] a.plot(tt, x_res[:, i], lt, label=legend) a.set_xlabel("Time/seconds") a.set_ylabel(labels[i]) # a.set_title(labels[i]) a.grid(True) if legend is not None and i == 0: a.legend() # if i == M: plt.tight_layout() return ax def make_space_above(axes, topmargin=1.0): """ increase figure size to make topmargin (in inches) space for titles, without changing the axes sizes""" fig = axes.flatten()[0].figure s = fig.subplotpars w, h = fig.get_size_inches() figh = h - (1-s.top)*h + topmargin fig.subplots_adjust(bottom=s.bottom*h/figh, top=1-topmargin/figh) fig.set_figheight(figh)