From 00606182b5d7f540589d738ce5fcbc97cfc7a6d9 Mon Sep 17 00:00:00 2001 From: Richard Murray Date: Mon, 24 Jan 2022 21:09:04 -0800 Subject: [PATCH 1/9] fix handling of endpoint in discrete optimal --- control/optimal.py | 15 +++++----- control/tests/optimal_test.py | 52 +++++++++++++++++++++-------------- 2 files changed, 40 insertions(+), 27 deletions(-) diff --git a/control/optimal.py b/control/optimal.py index dd09532c5..860f67582 100644 --- a/control/optimal.py +++ b/control/optimal.py @@ -58,6 +58,7 @@ class OptimalControlProblem(): extension of the time axis. log : bool, optional If `True`, turn on logging messages (using Python logging module). + Use ``logging.basicConfig`` to enable logging output (e.g., to a file). kwargs : dict, optional Additional parameters (passed to :func:`scipy.optimal.minimize`). @@ -95,7 +96,7 @@ class OptimalControlProblem(): trajectory generated by the proposed input. It does this by calling a user-defined function for the integral_cost given the current states and inputs at each point along the trajectory and then adding the value of a - user-defined terminal cost at the final pint in the trajectory. + user-defined terminal cost at the final point in the trajectory. The `_constraint_function` method evaluates the constraint functions along the trajectory generated by the proposed input. As in the case of the @@ -269,7 +270,6 @@ def _cost_function(self, coeffs): + str(states)) # Trajectory cost - # TODO: vectorize if ct.isctime(self.system): # Evaluate the costs costs = [self.integral_cost(states[:, i], inputs[:, i]) for @@ -279,6 +279,7 @@ def _cost_function(self, coeffs): dt = np.diff(self.timepts) # Integrate the cost + # TODO: vectorize cost = 0 for i in range(self.timepts.size-1): # Approximate the integral using trapezoidal rule @@ -288,8 +289,8 @@ def _cost_function(self, coeffs): # Sum the integral cost over the time (second) indices # cost += self.integral_cost(states[:,i], inputs[:,i]) cost = sum(map( - self.integral_cost, np.transpose(states), - np.transpose(inputs))) + self.integral_cost, np.transpose(states[:, :-1]), + np.transpose(inputs[:, :-1]))) # Terminal cost if self.terminal_cost is not None: @@ -661,8 +662,8 @@ def _output(t, x, u, params={}): return ct.NonlinearIOSystem( _update, _output, dt=dt, inputs=self.system.nstates, outputs=self.system.ninputs, - states=self.system.ninputs * - (self.timepts.size if self.basis is None else self.basis.N)) + states=self.system.ninputs * \ + (self.timepts.size if self.basis is None else self.basis.N)) # Compute the optimal trajectory from the current state def compute_trajectory( @@ -755,7 +756,7 @@ def compute_mpc(self, x, squeeze=None): """ res = self.compute_trajectory(x, squeeze=squeeze) - return inputs[:, 0] if res.success else None + return res.inputs[:, 0] # Optimal control result diff --git a/control/tests/optimal_test.py b/control/tests/optimal_test.py index 528313e9d..10484795b 100644 --- a/control/tests/optimal_test.py +++ b/control/tests/optimal_test.py @@ -39,7 +39,8 @@ def test_finite_horizon_simple(): # Retrieve the full open-loop predictions res = opt.solve_ocp( - sys, time, x0, cost, constraints, squeeze=True) + sys, time, x0, cost, constraints, squeeze=True, + terminal_cost=cost) # include to match MPT3 formulation t, u_openloop = res.time, res.inputs np.testing.assert_almost_equal( u_openloop, [-1, -1, 0.1393, 0.3361, -5.204e-16], decimal=4) @@ -57,9 +58,7 @@ def test_finite_horizon_simple(): # # The next unit test is intended to confirm that a finite horizon # optimal control problem with terminal cost set to LQR "cost to go" -# gives the same answer as LQR. Unfortunately, it requires a discrete -# time LQR function which is not yet availbale => for now this just -# tests the interface a bit. +# gives the same answer as LQR. # @slycotonly def test_discrete_lqr(): @@ -76,35 +75,43 @@ def test_discrete_lqr(): # Include weights on states/inputs Q = np.eye(2) R = 1 - K, S, E = ct.lqr(A, B, Q, R) # note: *continuous* time LQR + K, S, E = ct.dlqr(A, B, Q, R) # Compute the integral and terminal cost integral_cost = opt.quadratic_cost(sys, Q, R) terminal_cost = opt.quadratic_cost(sys, S, None) - # Formulate finite horizon MPC problem + # Solve the LQR problem + lqr_sys = ct.ss2io(ct.ss(A - B @ K, B, C, D, 1)) + + # Generate a simulation of the LQR controller time = np.arange(0, 5, 1) x0 = np.array([1, 1]) + _, _, lqr_x = ct.input_output_response( + lqr_sys, time, 0, x0, return_x=True) + + # Use LQR input as initial guess to avoid convergence/precision issues + lqr_u = -K @ lqr_x[0:time.size] + + # Formulate the optimal control problem and compute optimal trajectory optctrl = opt.OptimalControlProblem( - sys, time, integral_cost, terminal_cost=terminal_cost) + sys, time, integral_cost, terminal_cost=terminal_cost, + initial_guess=lqr_u) res1 = optctrl.compute_trajectory(x0, return_states=True) - with pytest.xfail("discrete LQR not implemented"): - # Result should match LQR - K, S, E = ct.dlqr(A, B, Q, R) - lqr_sys = ct.ss2io(ct.ss(A - B @ K, B, C, D, 1)) - _, _, lqr_x = ct.input_output_response( - lqr_sys, time, 0, x0, return_x=True) - np.testing.assert_almost_equal(res1.states, lqr_x) + # Compare to make sure results are the same + np.testing.assert_almost_equal(res1.inputs, lqr_u[0]) + np.testing.assert_almost_equal(res1.states, lqr_x) # Add state and input constraints trajectory_constraints = [ - (sp.optimize.LinearConstraint, np.eye(3), [-10, -10, -1], [10, 10, 1]), + (sp.optimize.LinearConstraint, np.eye(3), [-5, -5, -.5], [5, 5, 0.5]), ] # Re-solve res2 = opt.solve_ocp( - sys, time, x0, integral_cost, constraints, terminal_cost=terminal_cost) + sys, time, x0, integral_cost, trajectory_constraints, + terminal_cost=terminal_cost, initial_guess=lqr_u) # Make sure we got a different solution assert np.any(np.abs(res1.inputs - res2.inputs) > 0.1) @@ -205,7 +212,9 @@ def test_constraint_specification(constraint_list): # Create a model predictive controller system time = np.arange(0, 5, 1) - optctrl = opt.OptimalControlProblem(sys, time, cost, constraints) + optctrl = opt.OptimalControlProblem( + sys, time, cost, constraints, + terminal_cost=cost) # include to match MPT3 formulation # Compute optimal control and compare against MPT3 solution x0 = [4, 0] @@ -223,7 +232,7 @@ def test_constraint_specification(constraint_list): ([[1, 0], [0, 1]], np.eye(2), np.eye(2), 0, 1), id = "discrete, dt=1"), pytest.param( - (np.zeros((2,2)), np.eye(2), np.eye(2), 0), + (np.zeros((2, 2)), np.eye(2), np.eye(2), 0), id = "continuous"), ]) def test_terminal_constraints(sys_args): @@ -274,8 +283,11 @@ def test_terminal_constraints(sys_args): # Re-run using a basis function and see if we get the same answer res = opt.solve_ocp(sys, time, x0, cost, terminal_constraints=final_point, - basis=flat.BezierFamily(4, Tf)) - np.testing.assert_almost_equal(res.inputs, u1, decimal=2) + basis=flat.BezierFamily(8, Tf)) + + # Final point doesn't affect cost => don't need to test + np.testing.assert_almost_equal( + res.inputs[:, :-1], u1[:, :-1], decimal=2) # Impose some cost on the state, which should change the path Q = np.eye(2) From 395dbbba75d1e9b862a57a1d942039840a9515e0 Mon Sep 17 00:00:00 2001 From: Richard Murray Date: Mon, 24 Jan 2022 21:12:47 -0800 Subject: [PATCH 2/9] check for unused keywords in OptimalControlProblem --- control/optimal.py | 4 ++++ control/tests/optimal_test.py | 5 +++++ 2 files changed, 9 insertions(+) diff --git a/control/optimal.py b/control/optimal.py index 860f67582..1f17db0d6 100644 --- a/control/optimal.py +++ b/control/optimal.py @@ -134,6 +134,10 @@ def __init__( self.minimize_kwargs['options'] = kwargs.pop('minimize_options', {}) self.minimize_kwargs.update(kwargs.pop('minimize_kwargs', {})) + if len(kwargs) > 0: + raise ValueError( + f'unrecognized keyword(s): {list(kwargs.keys())}') + # Process trajectory constraints if isinstance(trajectory_constraints, tuple): self.trajectory_constraints = [trajectory_constraints] diff --git a/control/tests/optimal_test.py b/control/tests/optimal_test.py index 10484795b..680911259 100644 --- a/control/tests/optimal_test.py +++ b/control/tests/optimal_test.py @@ -436,6 +436,11 @@ def test_ocp_argument_errors(): res = opt.solve_ocp( sys, time, x0, cost, constraints, initial_guess=np.zeros((4,1,1))) + # Unrecognized arguments + with pytest.raises(ValueError, match="unrecognized keyword"): + res = opt.solve_ocp( + sys, time, x0, cost, constraints, terminal_constraint=None) + def test_optimal_basis_simple(): sys = ct.ss2io(ct.ss([[1, 1], [0, 1]], [[1], [0.5]], np.eye(2), 0, 1)) From de4c75cdc938b65ee1af3a85051410f8643a559f Mon Sep 17 00:00:00 2001 From: Richard Murray Date: Tue, 25 Jan 2022 07:14:25 -0800 Subject: [PATCH 3/9] generate iosys warning if solve_ivp does not succeed --- control/iosys.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/control/iosys.py b/control/iosys.py index c8e921c90..46124e669 100644 --- a/control/iosys.py +++ b/control/iosys.py @@ -1870,6 +1870,11 @@ def ivp_rhs(t, x): ivp_rhs, (T0, Tf), X0, t_eval=T, vectorized=False, **solve_ivp_kwargs) + if not soln.success or soln.status != 0: + # Something went wrong + warn("sp.integrate.solve_ivp failed") + print("Return bunch:", soln) + # Compute the output associated with the state (and use sys.out to # figure out the number of outputs just in case it wasn't specified) u = U[0] if len(U.shape) == 1 else U[:, 0] @@ -1886,7 +1891,7 @@ def ivp_rhs(t, x): "equally spaced.") # Make sure the sample time matches the given time - if (sys.dt is not True): + if sys.dt is not True: # Make sure that the time increment is a multiple of sampling time # TODO: add back functionality for undersampling From 93a5fae4aa3e000479a90be6d99e3471ca720e7f Mon Sep 17 00:00:00 2001 From: Richard Murray Date: Tue, 25 Jan 2022 07:15:57 -0800 Subject: [PATCH 4/9] remove one step delay in create_mpc_iosystem, discrete time only --- control/optimal.py | 72 +++++++++++++++++++++++++--------------------- 1 file changed, 40 insertions(+), 32 deletions(-) diff --git a/control/optimal.py b/control/optimal.py index 1f17db0d6..f1c352263 100644 --- a/control/optimal.py +++ b/control/optimal.py @@ -639,35 +639,9 @@ def _print_statistics(self, reset=True): if reset: self._reset_statistics(self.log) - # Create an input/output system implementing an MPC controller - def _create_mpc_iosystem(self, dt=True): - """Create an I/O system implementing an MPC controller""" - def _update(t, x, u, params={}): - coeffs = x.reshape((self.system.ninputs, -1)) - if self.basis: - # Keep the coeffecients unchanged - # TODO: could compute input vector, shift, and re-project (?) - self.initial_guess = coeffs - else: - # Shift the basis elements by one time step - self.initial_guess = np.hstack( - [coeffs[:, 1:], coeffs[:, -1:]]).reshape(-1) - res = self.compute_trajectory(u, print_summary=False) - return res.inputs.reshape(-1) - - def _output(t, x, u, params={}): - if self.basis: - # TODO: compute inputs from basis elements - raise NotImplementedError("basis elements not implemented") - else: - inputs = x.reshape((self.system.ninputs, -1)) - return inputs[:, 0] - - return ct.NonlinearIOSystem( - _update, _output, dt=dt, - inputs=self.system.nstates, outputs=self.system.ninputs, - states=self.system.ninputs * \ - (self.timepts.size if self.basis is None else self.basis.N)) + # + # Optimal control computations + # # Compute the optimal trajectory from the current state def compute_trajectory( @@ -762,6 +736,41 @@ def compute_mpc(self, x, squeeze=None): res = self.compute_trajectory(x, squeeze=squeeze) return res.inputs[:, 0] + # Create an input/output system implementing an MPC controller + def create_mpc_iosystem(self): + """Create an I/O system implementing an MPC controller""" + # Check to make sure we are in discrete time + if self.system.dt == 0: + raise ControlNotImplemented( + "MPC for continuous time systems not implemented") + + def _update(t, x, u, params={}): + coeffs = x.reshape((self.system.ninputs, -1)) + if self.basis: + # Keep the coeffecients unchanged + # TODO: could compute input vector, shift, and re-project (?) + self.initial_guess = coeffs + else: + # Shift the basis elements by one time step + self.initial_guess = np.hstack( + [coeffs[:, 1:], coeffs[:, -1:]]).reshape(-1) + res = self.compute_trajectory(u, print_summary=False) + + # New state is the new input vector + return res.inputs.reshape(-1) + + def _output(t, x, u, params={}): + # Start with initial guess and recompute based on input state (u) + self.initial_guess = x + res = self.compute_trajectory(u, print_summary=False) + return res.inputs[:, 0] + + return ct.NonlinearIOSystem( + _update, _output, dt=self.system.dt, + inputs=self.system.nstates, outputs=self.system.ninputs, + states=self.system.ninputs * \ + (self.timepts.size if self.basis is None else self.basis.N)) + # Optimal control result class OptimalControlResult(sp.optimize.OptimizeResult): @@ -952,7 +961,7 @@ def solve_ocp( # Create a model predictive controller for an optimal control problem def create_mpc_iosystem( sys, horizon, cost, constraints=[], terminal_cost=None, - terminal_constraints=[], dt=True, log=False, **kwargs): + terminal_constraints=[], log=False, **kwargs): """Create a model predictive I/O control system This function creates an input/output system that implements a model @@ -1001,7 +1010,6 @@ def create_mpc_iosystem( :func:`OptimalControlProblem` for more information. """ - # Set up the optimal control problem ocp = OptimalControlProblem( sys, horizon, cost, trajectory_constraints=constraints, @@ -1009,7 +1017,7 @@ def create_mpc_iosystem( log=log, **kwargs) # Return an I/O system implementing the model predictive controller - return ocp._create_mpc_iosystem(dt=dt) + return ocp.create_mpc_iosystem() # From 7396f76db2dd5828339e43b1f34e3eddd620aff1 Mon Sep 17 00:00:00 2001 From: Richard Murray Date: Fri, 4 Feb 2022 21:27:37 -0800 Subject: [PATCH 5/9] add params to flat systems + updated keywords --- control/flatsys/flatsys.py | 48 +++++++++++++++++++++++--------------- control/flatsys/linflat.py | 4 ++-- control/flatsys/systraj.py | 6 +++-- 3 files changed, 35 insertions(+), 23 deletions(-) diff --git a/control/flatsys/flatsys.py b/control/flatsys/flatsys.py index 9ea40f2fb..581d4d998 100644 --- a/control/flatsys/flatsys.py +++ b/control/flatsys/flatsys.py @@ -108,7 +108,7 @@ class FlatSystem(NonlinearIOSystem): ----- The class must implement two functions: - zflag = flatsys.foward(x, u) + zflag = flatsys.foward(x, u, params) This function computes the flag (derivatives) of the flat output. The inputs to this function are the state 'x' and inputs 'u' (both 1D arrays). The output should be a 2D array with the first @@ -116,7 +116,7 @@ class FlatSystem(NonlinearIOSystem): dimension of the length required to represent the full system dynamics (typically the number of states) - x, u = flatsys.reverse(zflag) + x, u = flatsys.reverse(zflag, params) This function system state and inputs give the the flag (derivatives) of the flat output. The input to this function is an 2D array whose first dimension is equal to the number of system inputs and whose @@ -244,8 +244,8 @@ def _basis_flag_matrix(sys, basis, flag, t, params={}): # Solve a point to point trajectory generation problem for a flat system def point_to_point( - sys, timepts, x0=0, u0=0, xf=0, uf=0, T0=0, basis=None, cost=None, - constraints=None, initial_guess=None, minimize_kwargs={}, **kwargs): + sys, timepts, x0=0, u0=0, xf=0, uf=0, T0=0, cost=None, basis=None, + trajectory_constraints=None, initial_guess=None, params=None, **kwargs): """Compute trajectory between an initial and final conditions. Compute a feasible trajectory for a differentially flat system between an @@ -284,7 +284,7 @@ def point_to_point( Function that returns the integral cost given the current state and input. Called as `cost(x, u)`. - constraints : list of tuples, optional + trajectory_constraints : list of tuples, optional List of constraints that should hold at each point in the time vector. Each element of the list should consist of a tuple with first element given by :class:`scipy.optimize.LinearConstraint` or @@ -337,8 +337,15 @@ def point_to_point( T0 = timepts[0] if len(timepts) > 1 else T0 # Process keyword arguments + if trajectory_constraints is None: + # Backwards compatibility + trajectory_constraints = kwargs.pop('constraints', None) + + minimize_kwargs = {} minimize_kwargs['method'] = kwargs.pop('minimize_method', None) minimize_kwargs['options'] = kwargs.pop('minimize_options', {}) + minimize_kwargs.update(kwargs.pop('minimize_kwargs', {})) + if kwargs: raise TypeError("unrecognized keywords: ", str(kwargs)) @@ -353,11 +360,14 @@ def point_to_point( # Make sure we have enough basis functions to solve the problem if basis.N * sys.ninputs < 2 * (sys.nstates + sys.ninputs): raise ValueError("basis set is too small") - elif (cost is not None or constraints is not None) and \ + elif (cost is not None or trajectory_constraints is not None) and \ basis.N * sys.ninputs == 2 * (sys.nstates + sys.ninputs): warnings.warn("minimal basis specified; optimization not possible") cost = None - constraints = None + trajectory_constraints = None + + # Figure out the parameters to use, if any + params = sys.params if params is None else params # # Map the initial and final conditions to flat output conditions @@ -366,8 +376,8 @@ def point_to_point( # and then evaluate this at the initial and final condition. # - zflag_T0 = sys.forward(x0, u0) - zflag_Tf = sys.forward(xf, uf) + zflag_T0 = sys.forward(x0, u0, params) + zflag_Tf = sys.forward(xf, uf, params) # # Compute the matrix constraints for initial and final conditions @@ -400,7 +410,7 @@ def point_to_point( # Start by solving the least squares problem alpha, residuals, rank, s = np.linalg.lstsq(M, Z, rcond=None) - if cost is not None or constraints is not None: + if cost is not None or trajectory_constraints is not None: # Search over the null space to minimize cost/satisfy constraints N = sp.linalg.null_space(M) @@ -418,7 +428,7 @@ def traj_cost(null_coeffs): zflag = (M_t @ coeffs).reshape(sys.ninputs, -1) # Find states and inputs at the time points - x, u = sys.reverse(zflag) + x, u = sys.reverse(zflag, params) # Evaluate the cost at this time point costval += cost(x, u) @@ -429,13 +439,13 @@ def traj_cost(null_coeffs): traj_cost = lambda coeffs: coeffs @ coeffs # Process the constraints we were given - traj_constraints = constraints - if constraints is None: + traj_constraints = trajectory_constraints + if traj_constraints is None: traj_constraints = [] - elif isinstance(constraints, tuple): + elif isinstance(traj_constraints, tuple): # TODO: Check to make sure this is really a constraint - traj_constraints = [constraints] - elif not isinstance(constraints, list): + traj_constraints = [traj_constraints] + elif not isinstance(traj_constraints, list): raise TypeError("trajectory constraints must be a list") # Process constraints @@ -456,7 +466,7 @@ def traj_const(null_coeffs): zflag = (M_t @ coeffs).reshape(sys.ninputs, -1) # Find states and inputs at the time points - states, inputs = sys.reverse(zflag) + states, inputs = sys.reverse(zflag, params) # Evaluate the constraint function along the trajectory for type, fun, lb, ub in traj_constraints: @@ -507,8 +517,8 @@ def traj_const(null_coeffs): # Transform the trajectory from flat outputs to states and inputs # - # Createa trajectory object to store the resul - systraj = SystemTrajectory(sys, basis) + # Create a trajectory object to store the result + systraj = SystemTrajectory(sys, basis, params=params) # Store the flag lengths and coefficients # TODO: make this more pythonic diff --git a/control/flatsys/linflat.py b/control/flatsys/linflat.py index 931446ca8..94523cc0b 100644 --- a/control/flatsys/linflat.py +++ b/control/flatsys/linflat.py @@ -113,7 +113,7 @@ def __init__(self, linsys, inputs=None, outputs=None, states=None, self.Cf = Cfz @ Tr # Compute the flat flag from the state (and input) - def forward(self, x, u): + def forward(self, x, u, params): """Compute the flat flag given the states and input. See :func:`control.flatsys.FlatSystem.forward` for more info. @@ -130,7 +130,7 @@ def forward(self, x, u): return zflag # Compute state and input from flat flag - def reverse(self, zflag): + def reverse(self, zflag, params): """Compute the states and input given the flat flag. See :func:`control.flatsys.FlatSystem.reverse` for more info. diff --git a/control/flatsys/systraj.py b/control/flatsys/systraj.py index c6ffb0867..5e390a7b5 100644 --- a/control/flatsys/systraj.py +++ b/control/flatsys/systraj.py @@ -62,7 +62,7 @@ class SystemTrajectory: """ - def __init__(self, sys, basis, coeffs=[], flaglen=[]): + def __init__(self, sys, basis, coeffs=[], flaglen=[], params=None): """Initilize a system trajectory object.""" self.nstates = sys.nstates self.ninputs = sys.ninputs @@ -70,6 +70,7 @@ def __init__(self, sys, basis, coeffs=[], flaglen=[]): self.basis = basis self.coeffs = list(coeffs) self.flaglen = list(flaglen) + self.params = sys.params if params is None else params # Evaluate the trajectory over a list of time points def eval(self, tlist): @@ -112,6 +113,7 @@ def eval(self, tlist): # Now copy the states and inputs # TODO: revisit order of list arguments - xd[:,tind], ud[:,tind] = self.system.reverse(zflag) + xd[:,tind], ud[:,tind] = \ + self.system.reverse(zflag, self.params) return xd, ud From 994fedc637304ca8f2379480feacc2ca483c68e6 Mon Sep 17 00:00:00 2001 From: Richard Murray Date: Mon, 28 Feb 2022 21:32:03 -0800 Subject: [PATCH 6/9] fix discrete time X0, U processing --- control/iosys.py | 3 ++- control/tests/iosys_test.py | 27 ++++++++++++++++++++++++++- 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/control/iosys.py b/control/iosys.py index 46124e669..c3d8ebefb 100644 --- a/control/iosys.py +++ b/control/iosys.py @@ -1820,6 +1820,7 @@ def input_output_response( legal_shapes = [(sys.ninputs, n_steps)] U = _check_convert_array(U, legal_shapes, 'Parameter ``U``: ', squeeze=False) + U = U.reshape(-1, n_steps) # Check to make sure this is not a static function nstates = _find_size(sys.nstates, X0) @@ -1908,7 +1909,7 @@ def ivp_rhs(t, x): # Compute the solution soln = sp.optimize.OptimizeResult() soln.t = T # Store the time vector directly - x = [float(x0) for x0 in X0] # State vector (store as floats) + x = X0 # Initilize state soln.y = [] # Solution, following scipy convention y = [] # System output for i in range(len(T)): diff --git a/control/tests/iosys_test.py b/control/tests/iosys_test.py index 5fd83e946..d9a781936 100644 --- a/control/tests/iosys_test.py +++ b/control/tests/iosys_test.py @@ -731,6 +731,32 @@ def test_discrete(self, tsys): np.testing.assert_allclose(ios_t, lin_t,atol=0.002,rtol=0.) np.testing.assert_allclose(ios_y, lin_y,atol=0.002,rtol=0.) + def test_discrete_iosys(self, tsys): + """Create a discrete time system from scratch""" + linsys = ct.StateSpace( + [[-1, 1], [0, -2]], [[0], [1]], [[1, 0]], [[0]], True) + + # Create nonlinear version of the same system + def nlsys_update(t, x, u, params): + A, B = params['A'], params['B'] + return A @ x + B @ u + def nlsys_output(t, x, u, params): + C = params['C'] + return C @ x + nlsys = ct.NonlinearIOSystem( + nlsys_update, nlsys_output, inputs=1, outputs=1, states=2, dt=True) + + # Set up parameters for simulation + T, U, X0 = tsys.T, tsys.U, tsys.X0 + + # Simulate and compare to LTI output + ios_t, ios_y = ios.input_output_response( + nlsys, T, U, X0, + params={'A': linsys.A, 'B': linsys.B, 'C': linsys.C}) + lin_t, lin_y = ct.forced_response(linsys, T, U, X0) + np.testing.assert_allclose(ios_t, lin_t,atol=0.002,rtol=0.) + np.testing.assert_allclose(ios_y, lin_y,atol=0.002,rtol=0.) + def test_find_eqpts(self, tsys): """Test find_eqpt function""" # Simple equilibrium point with no inputs @@ -1526,7 +1552,6 @@ def secord_update(t, x, u, params={}): """Second order system dynamics""" omega0 = params.get('omega0', 1.) zeta = params.get('zeta', 0.5) - u = np.array(u, ndmin=1) return np.array([ x[1], -2 * zeta * omega0 * x[1] - omega0*omega0 * x[0] + u[0] From d01a52cdce111a80020e113731f2c513613569b3 Mon Sep 17 00:00:00 2001 From: Richard Murray Date: Sat, 12 Mar 2022 07:57:33 -0800 Subject: [PATCH 7/9] fix optimal unit tests (matrix case) + PEP8 --- control/tests/iosys_test.py | 2 +- control/tests/optimal_test.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/control/tests/iosys_test.py b/control/tests/iosys_test.py index d9a781936..7a7cc0f43 100644 --- a/control/tests/iosys_test.py +++ b/control/tests/iosys_test.py @@ -745,7 +745,7 @@ def nlsys_output(t, x, u, params): return C @ x nlsys = ct.NonlinearIOSystem( nlsys_update, nlsys_output, inputs=1, outputs=1, states=2, dt=True) - + # Set up parameters for simulation T, U, X0 = tsys.T, tsys.U, tsys.X0 diff --git a/control/tests/optimal_test.py b/control/tests/optimal_test.py index 680911259..124cbc6dd 100644 --- a/control/tests/optimal_test.py +++ b/control/tests/optimal_test.py @@ -91,7 +91,7 @@ def test_discrete_lqr(): lqr_sys, time, 0, x0, return_x=True) # Use LQR input as initial guess to avoid convergence/precision issues - lqr_u = -K @ lqr_x[0:time.size] + lqr_u = np.array(-K @ lqr_x[0:time.size]) # convert from matrix # Formulate the optimal control problem and compute optimal trajectory optctrl = opt.OptimalControlProblem( From be660ceaf796f11bfb961ff494577393a8baa73c Mon Sep 17 00:00:00 2001 From: Richard Murray Date: Sat, 12 Mar 2022 09:16:24 -0800 Subject: [PATCH 8/9] default outfcn() bug fix + unit tests for flatsys --- control/flatsys/bezier.py | 2 +- control/flatsys/flatsys.py | 2 +- control/flatsys/poly.py | 2 +- control/tests/flatsys_test.py | 33 ++++++++++++++++++++++++++++++++- examples/kincar-flatsys.py | 2 +- 5 files changed, 36 insertions(+), 5 deletions(-) diff --git a/control/flatsys/bezier.py b/control/flatsys/bezier.py index 45a28995f..7e41c546e 100644 --- a/control/flatsys/bezier.py +++ b/control/flatsys/bezier.py @@ -55,7 +55,7 @@ class BezierFamily(BasisFamily): """ def __init__(self, N, T=1): """Create a polynomial basis of order N.""" - self.N = N # save number of basis functions + super(BezierFamily, self).__init__(N) self.T = T # save end of time interval # Compute the kth derivative of the ith basis function at time t diff --git a/control/flatsys/flatsys.py b/control/flatsys/flatsys.py index 581d4d998..2f20aa1e9 100644 --- a/control/flatsys/flatsys.py +++ b/control/flatsys/flatsys.py @@ -216,7 +216,7 @@ def _flat_updfcn(self, t, x, u, params={}): def _flat_outfcn(self, t, x, u, params={}): # Return the flat output zflag = self.forward(x, u, params) - return np.array(zflag[:][0]) + return np.array([zflag[i][0] for i in range(len(zflag))]) # Utility function to compute flag matrix given a basis diff --git a/control/flatsys/poly.py b/control/flatsys/poly.py index 2d9f62455..08dcfb1c9 100644 --- a/control/flatsys/poly.py +++ b/control/flatsys/poly.py @@ -52,7 +52,7 @@ class PolyFamily(BasisFamily): """ def __init__(self, N): """Create a polynomial basis of order N.""" - self.N = N # save number of basis functions + super(PolyFamily, self).__init__(N) # Compute the kth derivative of the ith basis function at time t def eval_deriv(self, i, k, t): diff --git a/control/tests/flatsys_test.py b/control/tests/flatsys_test.py index 6f4ef7cef..8b182a17a 100644 --- a/control/tests/flatsys_test.py +++ b/control/tests/flatsys_test.py @@ -113,8 +113,10 @@ def test_kinematic_car(self, vehicle_flat, poly): np.testing.assert_array_almost_equal(uf, u[:, 1]) # Simulate the system and make sure we stay close to desired traj - T = np.linspace(0, Tf, 500) + T = np.linspace(0, Tf, 100) xd, ud = traj.eval(T) + resp = ct.input_output_response(vehicle_flat, T, ud, x0) + np.testing.assert_array_almost_equal(resp.states, xd, decimal=2) # For SciPy 1.0+, integrate equations and compare to desired if StrictVersion(sp.__version__) >= "1.0": @@ -122,6 +124,35 @@ def test_kinematic_car(self, vehicle_flat, poly): vehicle_flat, T, ud, x0, return_x=True) np.testing.assert_allclose(x, xd, atol=0.01, rtol=0.01) + def test_flat_default_output(self, vehicle_flat): + # Construct a flat system with the default outputs + flatsys = fs.FlatSystem( + vehicle_flat.forward, vehicle_flat.reverse, vehicle_flat.updfcn, + inputs=vehicle_flat.ninputs, outputs=vehicle_flat.ninputs, + states=vehicle_flat.nstates) + + # Define the endpoints of the trajectory + x0 = [0., -2., 0.]; u0 = [10., 0.] + xf = [100., 2., 0.]; uf = [10., 0.] + Tf = 10 + + # Find trajectory between initial and final conditions + poly = fs.PolyFamily(6) + traj1 = fs.point_to_point(vehicle_flat, Tf, x0, u0, xf, uf, basis=poly) + traj2 = fs.point_to_point(flatsys, Tf, x0, u0, xf, uf, basis=poly) + + # Verify that the trajectory computation is correct + T = np.linspace(0, Tf, 10) + x1, u1 = traj1.eval(T) + x2, u2 = traj2.eval(T) + np.testing.assert_array_almost_equal(x1, x2) + np.testing.assert_array_almost_equal(u1, u2) + + # Run a simulation and verify that the outputs are correct + resp1 = ct.input_output_response(vehicle_flat, T, u1, x0) + resp2 = ct.input_output_response(flatsys, T, u1, x0) + np.testing.assert_array_almost_equal(resp1.outputs[0:2], resp2.outputs) + def test_flat_cost_constr(self): # Double integrator system sys = ct.ss([[0, 1], [0, 0]], [[0], [1]], [[1, 0]], 0) diff --git a/examples/kincar-flatsys.py b/examples/kincar-flatsys.py index ca2a946ed..967bdb310 100644 --- a/examples/kincar-flatsys.py +++ b/examples/kincar-flatsys.py @@ -109,7 +109,7 @@ def plot_results(t, x, ud): # Create differentially flat input/output system vehicle_flat = fs.FlatSystem( vehicle_flat_forward, vehicle_flat_reverse, vehicle_update, - inputs=('v', 'delta'), outputs=('x', 'y', 'theta'), + inputs=('v', 'delta'), outputs=('x', 'y'), states=('x', 'y', 'theta')) # Define the endpoints of the trajectory From f807b338bc770502bf997024f1f43b6575b9241f Mon Sep 17 00:00:00 2001 From: Richard Murray Date: Sat, 12 Mar 2022 10:16:03 -0800 Subject: [PATCH 9/9] updated optimal constraint handling + unit tests --- control/optimal.py | 33 ++++++++++++++---- control/tests/optimal_test.py | 65 +++++++++++++++++++++++++++++++++++ 2 files changed, 91 insertions(+), 7 deletions(-) diff --git a/control/optimal.py b/control/optimal.py index f1c352263..493b6bc3d 100644 --- a/control/optimal.py +++ b/control/optimal.py @@ -146,6 +146,11 @@ def __init__( else: self.trajectory_constraints = trajectory_constraints + # Make sure that we recognize all of the constraint types + for ctype, fun, lb, ub in self.trajectory_constraints: + if not ctype in [opt.LinearConstraint, opt.NonlinearConstraint]: + raise TypeError(f"unknown constraint type {ctype}") + # Process terminal constraints if isinstance(terminal_constraints, tuple): self.terminal_constraints = [terminal_constraints] @@ -154,6 +159,11 @@ def __init__( else: self.terminal_constraints = terminal_constraints + # Make sure that we recognize all of the constraint types + for ctype, fun, lb, ub in self.terminal_constraints: + if not ctype in [opt.LinearConstraint, opt.NonlinearConstraint]: + raise TypeError(f"unknown constraint type {ctype}") + # # Compute and store constraints # @@ -401,7 +411,8 @@ def _constraint_function(self, coeffs): value.append(fun @ np.hstack([states[:, i], inputs[:, i]])) elif ctype == opt.NonlinearConstraint: value.append(fun(states[:, i], inputs[:, i])) - else: + else: # pragma: no cover + # Checked above => we should never get here raise TypeError(f"unknown constraint type {ctype}") # Evaluate the terminal constraint functions @@ -413,7 +424,8 @@ def _constraint_function(self, coeffs): value.append(fun @ np.hstack([states[:, i], inputs[:, i]])) elif ctype == opt.NonlinearConstraint: value.append(fun(states[:, i], inputs[:, i])) - else: + else: # pragma: no cover + # Checked above => we should never get here raise TypeError(f"unknown constraint type {ctype}") # Update statistics @@ -485,7 +497,8 @@ def _eqconst_function(self, coeffs): value.append(fun @ np.hstack([states[:, i], inputs[:, i]])) elif ctype == opt.NonlinearConstraint: value.append(fun(states[:, i], inputs[:, i])) - else: + else: # pragma: no cover + # Checked above => we should never get here raise TypeError(f"unknown constraint type {ctype}") # Evaluate the terminal constraint functions @@ -497,7 +510,8 @@ def _eqconst_function(self, coeffs): value.append(fun @ np.hstack([states[:, i], inputs[:, i]])) elif ctype == opt.NonlinearConstraint: value.append(fun(states[:, i], inputs[:, i])) - else: + else: # pragma: no cover + # Checked above => we should never get here raise TypeError("unknown constraint type {ctype}") # Update statistics @@ -844,7 +858,7 @@ def __init__( # Compute the input for a nonlinear, (constrained) optimal control problem def solve_ocp( - sys, horizon, X0, cost, constraints=[], terminal_cost=None, + sys, horizon, X0, cost, trajectory_constraints=[], terminal_cost=None, terminal_constraints=[], initial_guess=None, basis=None, squeeze=None, transpose=None, return_states=False, log=False, **kwargs): @@ -865,7 +879,7 @@ def solve_ocp( Function that returns the integral cost given the current state and input. Called as `cost(x, u)`. - constraints : list of tuples, optional + trajectory_constraints : list of tuples, optional List of constraints that should hold at each point in the time vector. Each element of the list should consist of a tuple with first element given by :meth:`scipy.optimize.LinearConstraint` or @@ -943,13 +957,18 @@ def solve_ocp( :func:`OptimalControlProblem` for more information. """ + # Process keyword arguments + if trajectory_constraints is None: + # Backwards compatibility + trajectory_constraints = kwargs.pop('constraints', None) + # Allow 'return_x` as a synonym for 'return_states' return_states = ct.config._get_param( 'optimal', 'return_x', kwargs, return_states, pop=True) # Set up the optimal control problem ocp = OptimalControlProblem( - sys, horizon, cost, trajectory_constraints=constraints, + sys, horizon, cost, trajectory_constraints=trajectory_constraints, terminal_cost=terminal_cost, terminal_constraints=terminal_constraints, initial_guess=initial_guess, basis=basis, log=log, **kwargs) diff --git a/control/tests/optimal_test.py b/control/tests/optimal_test.py index 124cbc6dd..f059c4fc6 100644 --- a/control/tests/optimal_test.py +++ b/control/tests/optimal_test.py @@ -441,6 +441,17 @@ def test_ocp_argument_errors(): res = opt.solve_ocp( sys, time, x0, cost, constraints, terminal_constraint=None) + # Unrecognized trajectory constraint type + constraints = [(None, np.eye(3), [0, 0, 0], [0, 0, 0])] + with pytest.raises(TypeError, match="unknown constraint type"): + res = opt.solve_ocp( + sys, time, x0, cost, trajectory_constraints=constraints) + + # Unrecognized terminal constraint type + with pytest.raises(TypeError, match="unknown constraint type"): + res = opt.solve_ocp( + sys, time, x0, cost, terminal_constraints=constraints) + def test_optimal_basis_simple(): sys = ct.ss2io(ct.ss([[1, 1], [0, 1]], [[1], [0.5]], np.eye(2), 0, 1)) @@ -484,3 +495,57 @@ def test_optimal_basis_simple(): basis=flat.BezierFamily(4, Tf), return_x=True, log=True) assert res3.success np.testing.assert_almost_equal(res3.inputs, res1.inputs, decimal=3) + + +def test_equality_constraints(): + """Test out the ability to handle equality constraints""" + # Create the system (double integrator, continuous time) + sys = ct.ss2io(ct.ss(np.zeros((2, 2)), np.eye(2), np.eye(2), 0)) + + # Shortest path to a point is a line + Q = np.zeros((2, 2)) + R = np.eye(2) + cost = opt.quadratic_cost(sys, Q, R) + + # Set up the terminal constraint to be the origin + final_point = [opt.state_range_constraint(sys, [0, 0], [0, 0])] + + # Create the optimal control problem + time = np.arange(0, 3, 1) + optctrl = opt.OptimalControlProblem( + sys, time, cost, terminal_constraints=final_point) + + # Find a path to the origin + x0 = np.array([4, 3]) + res = optctrl.compute_trajectory(x0, squeeze=True, return_x=True) + t, u1, x1 = res.time, res.inputs, res.states + + # Bug prior to SciPy 1.6 will result in incorrect results + if NumpyVersion(sp.__version__) < '1.6.0': + pytest.xfail("SciPy 1.6 or higher required") + + np.testing.assert_almost_equal(x1[:,-1], 0, decimal=4) + + # Set up terminal constraints as a nonlinear constraint + def final_point_eval(x, u): + return x + final_point = [ + (sp.optimize.NonlinearConstraint, final_point_eval, [0, 0], [0, 0])] + + optctrl = opt.OptimalControlProblem( + sys, time, cost, terminal_constraints=final_point) + + # Find a path to the origin + x0 = np.array([4, 3]) + res = optctrl.compute_trajectory(x0, squeeze=True, return_x=True) + t, u2, x2 = res.time, res.inputs, res.states + np.testing.assert_almost_equal(x2[:,-1], 0, decimal=4) + np.testing.assert_almost_equal(u1, u2) + np.testing.assert_almost_equal(x1, x2) + + # Try passing and unknown constraint type + final_point = [(None, final_point_eval, [0, 0], [0, 0])] + with pytest.raises(TypeError, match="unknown constraint type"): + optctrl = opt.OptimalControlProblem( + sys, time, cost, terminal_constraints=final_point) + res = optctrl.compute_trajectory(x0, squeeze=True, return_x=True)