Skip to content

Small fixes based on Caltech CDS 112 course #849

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Mar 24, 2023
Merged
6 changes: 6 additions & 0 deletions control/flatsys/flatsys.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,12 @@ def point_to_point(
warnings.warn("basis too small; solution may not exist")

if cost is not None or trajectory_constraints is not None:
# Make sure that we have enough timepoints to evaluate
if timepts.size < 3:
raise ControlArgument(
"There must be at least three time points if trajectory"
" cost or constraints are specified")

# Search over the null space to minimize cost/satisfy constraints
N = sp.linalg.null_space(M)

Expand Down
86 changes: 65 additions & 21 deletions control/iosys.py
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,12 @@ def __init__(self, linsys, **kwargs):
StateSpace.__init__(
self, linsys, remove_useless_states=False, init_namedio=False)

# When sampling a LinearIO system, return a LinearIOSystem
def sample(self, *args, **kwargs):
return LinearIOSystem(StateSpace.sample(self, *args, **kwargs))

sample.__doc__ = StateSpace.sample.__doc__

# The following text needs to be replicated from StateSpace in order for
# this entry to show up properly in sphinx doccumentation (not sure why,
# but it was the only way to get it to work).
Expand Down Expand Up @@ -1373,13 +1379,11 @@ def unused_signals(self):
-------

unused_inputs : dict

A mapping from tuple of indices (isys, isig) to string
'{sys}.{sig}', for all unused subsystem inputs.

unused_outputs : dict

A mapping from tuple of indices (isys, isig) to string
A mapping from tuple of indices (osys, osig) to string
'{sys}.{sig}', for all unused subsystem outputs.

"""
Expand Down Expand Up @@ -1433,10 +1437,13 @@ def _find_outputs_by_basename(self, basename):
for sig, isig in sys.output_index.items()
if sig == (basename)}

def check_unused_signals(self, ignore_inputs=None, ignore_outputs=None):
def check_unused_signals(
self, ignore_inputs=None, ignore_outputs=None, warning=True):
"""Check for unused subsystem inputs and outputs

If any unused inputs or outputs are found, emit a warning.
Check to see if there are any unused signals and return a list of
unused input and output signal descriptions. If `warning` is True
and any unused inputs or outputs are found, emit a warning.

Parameters
----------
Expand All @@ -1454,6 +1461,16 @@ def check_unused_signals(self, ignore_inputs=None, ignore_outputs=None):
If the 'sig' form is used, all subsystem outputs with that
name are considered ignored.

Returns
-------
dropped_inputs: list of tuples
A list of the dropped input signals, with each element of the
list in the form of (isys, isig).

dropped_outputs: list of tuples
A list of the dropped output signals, with each element of the
list in the form of (osys, osig).

"""

if ignore_inputs is None:
Expand All @@ -1477,7 +1494,7 @@ def check_unused_signals(self, ignore_inputs=None, ignore_outputs=None):
ignore_input_map[self._parse_signal(
ignore_input, 'input')[:2]] = ignore_input

# (isys, isig) -> signal-spec
# (osys, osig) -> signal-spec
ignore_output_map = {}
for ignore_output in ignore_outputs:
if isinstance(ignore_output, str) and '.' not in ignore_output:
Expand All @@ -1496,30 +1513,32 @@ def check_unused_signals(self, ignore_inputs=None, ignore_outputs=None):
used_ignored_inputs = set(ignore_input_map) - set(unused_inputs)
used_ignored_outputs = set(ignore_output_map) - set(unused_outputs)

if dropped_inputs:
if warning and dropped_inputs:
msg = ('Unused input(s) in InterconnectedSystem: '
+ '; '.join(f'{inp}={unused_inputs[inp]}'
for inp in dropped_inputs))
warn(msg)

if dropped_outputs:
if warning and dropped_outputs:
msg = ('Unused output(s) in InterconnectedSystem: '
+ '; '.join(f'{out} : {unused_outputs[out]}'
for out in dropped_outputs))
warn(msg)

if used_ignored_inputs:
if warning and used_ignored_inputs:
msg = ('Input(s) specified as ignored is (are) used: '
+ '; '.join(f'{inp} : {ignore_input_map[inp]}'
for inp in used_ignored_inputs))
warn(msg)

if used_ignored_outputs:
if warning and used_ignored_outputs:
msg = ('Output(s) specified as ignored is (are) used: '
+ '; '.join(f'{out}={ignore_output_map[out]}'
for out in used_ignored_outputs))
warn(msg)

return dropped_inputs, dropped_outputs


class LinearICSystem(InterconnectedSystem, LinearIOSystem):

Expand Down Expand Up @@ -2580,9 +2599,10 @@ def tf2io(*args, **kwargs):


# Function to create an interconnected system
def interconnect(syslist, connections=None, inplist=None, outlist=None,
params=None, check_unused=True, ignore_inputs=None,
ignore_outputs=None, warn_duplicate=None, **kwargs):
def interconnect(
syslist, connections=None, inplist=None, outlist=None, params=None,
check_unused=True, add_unused=False, ignore_inputs=None,
ignore_outputs=None, warn_duplicate=None, **kwargs):
"""Interconnect a set of input/output systems.

This function creates a new system that is an interconnection of a set of
Expand Down Expand Up @@ -2653,8 +2673,8 @@ def interconnect(syslist, connections=None, inplist=None, outlist=None,
the system input connects to only one subsystem input, a single input
specification can be given (without the inner list).

If omitted, the input map can be specified using the
:func:`~control.InterconnectedSystem.set_input_map` method.
If omitted the `input` parameter will be used to identify the list
of input signals to the overall system.

outlist : list of output connections, optional
List of connections for how the outputs from the subsystems are mapped
Expand Down Expand Up @@ -2707,12 +2727,16 @@ def interconnect(syslist, connections=None, inplist=None, outlist=None,
System name (used for specifying signals). If unspecified, a generic
name <sys[id]> is generated with a unique integer id.

check_unused : bool
check_unused : bool, optional
If True, check for unused sub-system signals. This check is
not done if connections is False, and neither input nor output
mappings are specified.

ignore_inputs : list of input-spec
add_unused : bool, optional
If True, subsystem signals that are not connected to other components
are added as inputs and outputs of the interconnected system.

ignore_inputs : list of input-spec, optional
A list of sub-system inputs known not to be connected. This is
*only* used in checking for unused signals, and does not
disable use of the input.
Expand All @@ -2722,7 +2746,7 @@ def interconnect(syslist, connections=None, inplist=None, outlist=None,
signals from all sub-systems with that base name are
considered ignored.

ignore_outputs : list of output-spec
ignore_outputs : list of output-spec, optional
A list of sub-system outputs known not to be connected. This
is *only* used in checking for unused signals, and does not
disable use of the output.
Expand All @@ -2732,7 +2756,7 @@ def interconnect(syslist, connections=None, inplist=None, outlist=None,
outputs from all sub-systems with that base name are
considered ignored.

warn_duplicate : None, True, or False
warn_duplicate : None, True, or False, optional
Control how warnings are generated if duplicate objects or names are
detected. In `None` (default), then warnings are generated for
systems that have non-generic names. If `False`, warnings are not
Expand Down Expand Up @@ -2886,10 +2910,30 @@ def interconnect(syslist, connections=None, inplist=None, outlist=None,
outlist = new_outlist

newsys = InterconnectedSystem(
syslist, connections=connections, inplist=inplist, outlist=outlist,
inputs=inputs, outputs=outputs, states=states,
syslist, connections=connections, inplist=inplist,
outlist=outlist, inputs=inputs, outputs=outputs, states=states,
params=params, dt=dt, name=name, warn_duplicate=warn_duplicate)

# See if we should add any signals
if add_unused:
# Get all unused signals
dropped_inputs, dropped_outputs = newsys.check_unused_signals(
ignore_inputs, ignore_outputs, warning=False)

# Add on any unused signals that we aren't ignoring
for isys, isig in dropped_inputs:
inplist.append((isys, isig))
inputs.append(newsys.syslist[isys].input_labels[isig])
for osys, osig in dropped_outputs:
outlist.append((osys, osig))
outputs.append(newsys.syslist[osys].output_labels[osig])

# Rebuild the system with new inputs/outputs
newsys = InterconnectedSystem(
syslist, connections=connections, inplist=inplist,
outlist=outlist, inputs=inputs, outputs=outputs, states=states,
params=params, dt=dt, name=name, warn_duplicate=warn_duplicate)

# check for implicitly dropped signals
if check_unused:
newsys.check_unused_signals(ignore_inputs, ignore_outputs)
Expand Down
64 changes: 35 additions & 29 deletions control/optimal.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

from . import config
from .exception import ControlNotImplemented
from .timeresp import TimeResponseData

# Define module default parameter values
_optimal_trajectory_methods = {'shooting', 'collocation'}
Expand Down Expand Up @@ -105,14 +104,14 @@ class OptimalControlProblem():

Notes
-----
To describe an optimal control problem we need an input/output system, a
time horizon, a cost function, and (optionally) a set of constraints on
the state and/or input, either along the trajectory and at the terminal
time. This class sets up an optimization over the inputs at each point in
time, using the integral and terminal costs as well as the trajectory and
terminal constraints. The `compute_trajectory` method sets up an
optimization problem that can be solved using
:func:`scipy.optimize.minimize`.
To describe an optimal control problem we need an input/output system,
a set of time points over a a fixed horizon, a cost function, and
(optionally) a set of constraints on the state and/or input, either
along the trajectory and at the terminal time. This class sets up an
optimization over the inputs at each point in time, using the integral
and terminal costs as well as the trajectory and terminal constraints.
The `compute_trajectory` method sets up an optimization problem that
can be solved using :func:`scipy.optimize.minimize`.

The `_cost_function` method takes the information computes the cost of the
trajectory generated by the proposed input. It does this by calling a
Expand Down Expand Up @@ -140,7 +139,8 @@ class OptimalControlProblem():
def __init__(
self, sys, timepts, integral_cost, trajectory_constraints=[],
terminal_cost=None, terminal_constraints=[], initial_guess=None,
trajectory_method=None, basis=None, log=False, **kwargs):
trajectory_method=None, basis=None, log=False, kwargs_check=True,
**kwargs):
"""Set up an optimal control problem."""
# Save the basic information for use later
self.system = sys
Expand Down Expand Up @@ -183,7 +183,7 @@ def __init__(
" discrete time systems")

# Make sure there were no extraneous keywords
if kwargs:
if kwargs_check and kwargs:
raise TypeError("unrecognized keyword(s): ", str(kwargs))

self.trajectory_constraints = _process_constraints(
Expand Down Expand Up @@ -829,7 +829,7 @@ def compute_mpc(self, x, squeeze=None):
return res.inputs[:, 0]

# Create an input/output system implementing an MPC controller
def create_mpc_iosystem(self):
def create_mpc_iosystem(self, **kwargs):
"""Create an I/O system implementing an MPC controller"""
# Check to make sure we are in discrete time
if self.system.dt == 0:
Expand Down Expand Up @@ -857,11 +857,17 @@ def _output(t, x, u, params={}):
res = self.compute_trajectory(u, print_summary=False)
return res.inputs[:, 0]

# Define signal names, if they are not already given
if not kwargs.get('inputs'):
kwargs['inputs'] = self.system.state_labels
if not kwargs.get('outputs'):
kwargs['outputs'] = self.system.input_labels
if not kwargs.get('states'):
kwargs['states'] = self.system.ninputs * \
(self.timepts.size if self.basis is None else self.basis.N)

return ct.NonlinearIOSystem(
_update, _output, dt=self.system.dt,
inputs=self.system.nstates, outputs=self.system.ninputs,
states=self.system.ninputs * \
(self.timepts.size if self.basis is None else self.basis.N))
_update, _output, dt=self.system.dt, **kwargs)


# Optimal control result
Expand Down Expand Up @@ -923,7 +929,7 @@ def __init__(
print("* Final cost:", self.cost)

# Process data as a time response (with "outputs" = inputs)
response = TimeResponseData(
response = ct.TimeResponseData(
ocp.timepts, inputs, states, issiso=ocp.system.issiso(),
transpose=transpose, return_x=return_states, squeeze=squeeze)

Expand All @@ -934,7 +940,7 @@ def __init__(

# Compute the input for a nonlinear, (constrained) optimal control problem
def solve_ocp(
sys, horizon, X0, cost, trajectory_constraints=None, terminal_cost=None,
sys, timepts, X0, cost, trajectory_constraints=None, terminal_cost=None,
terminal_constraints=[], initial_guess=None, basis=None, squeeze=None,
transpose=None, return_states=True, print_summary=True, log=False,
**kwargs):
Expand All @@ -946,7 +952,7 @@ def solve_ocp(
sys : InputOutputSystem
I/O system for which the optimal input will be computed.

horizon : 1D array_like
timepts : 1D array_like
List of times at which the optimal input should be computed.

X0: array-like or number, optional
Expand Down Expand Up @@ -984,9 +990,9 @@ def solve_ocp(

initial_guess : 1D or 2D array_like
Initial inputs to use as a guess for the optimal input. The inputs
should either be a 2D vector of shape (ninputs, horizon) or a 1D
input of shape (ninputs,) that will be broadcast by extension of the
time axis.
should either be a 2D vector of shape (ninputs, len(timepts)) or a
1D input of shape (ninputs,) that will be broadcast by extension of
the time axis.

log : bool, optional
If `True`, turn on logging messages (using Python logging module).
Expand Down Expand Up @@ -1063,7 +1069,7 @@ def solve_ocp(

# Set up the optimal control problem
ocp = OptimalControlProblem(
sys, horizon, cost, trajectory_constraints=trajectory_constraints,
sys, timepts, cost, trajectory_constraints=trajectory_constraints,
terminal_cost=terminal_cost, terminal_constraints=terminal_constraints,
initial_guess=initial_guess, basis=basis, log=log, **kwargs)

Expand All @@ -1075,12 +1081,12 @@ def solve_ocp(

# Create a model predictive controller for an optimal control problem
def create_mpc_iosystem(
sys, horizon, cost, constraints=[], terminal_cost=None,
sys, timepts, cost, constraints=[], terminal_cost=None,
terminal_constraints=[], log=False, **kwargs):
"""Create a model predictive I/O control system

This function creates an input/output system that implements a model
predictive control for a system given the time horizon, cost function and
predictive control for a system given the time points, cost function and
constraints that define the finite-horizon optimization that should be
carried out at each state.

Expand All @@ -1089,7 +1095,7 @@ def create_mpc_iosystem(
sys : InputOutputSystem
I/O system for which the optimal input will be computed.

horizon : 1D array_like
timepts : 1D array_like
List of times at which the optimal input should be computed.

cost : callable
Expand Down Expand Up @@ -1127,12 +1133,12 @@ def create_mpc_iosystem(
"""
# Set up the optimal control problem
ocp = OptimalControlProblem(
sys, horizon, cost, trajectory_constraints=constraints,
sys, timepts, cost, trajectory_constraints=constraints,
terminal_cost=terminal_cost, terminal_constraints=terminal_constraints,
log=log, **kwargs)
log=log, kwargs_check=False, **kwargs)

# Return an I/O system implementing the model predictive controller
return ocp.create_mpc_iosystem()
return ocp.create_mpc_iosystem(**kwargs)


#
Expand Down
Loading