Skip to content

Fix interconnect type conversion bug for StateSpace systems #788

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 57 additions & 40 deletions control/iosys.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class for a set of subclasses that are used to implement specific
# Allow ndarray * InputOutputSystem to give IOSystem._rmul_() priority
__array_priority__ = 12 # override ndarray, matrix, SS types

def __init__(self, params={}, **kwargs):
def __init__(self, params=None, **kwargs):
"""Create an input/output system.

The InputOutputSystem constructor is used to create an input/output
Expand All @@ -148,7 +148,7 @@ def __init__(self, params={}, **kwargs):
states=states, name=name, dt=dt)

# default parameters
self.params = params.copy()
self.params = {} if params is None else params.copy()

def __mul__(sys2, sys1):
"""Multiply two input/output systems (series interconnection)"""
Expand Down Expand Up @@ -357,7 +357,7 @@ def _update_params(self, params, warning=False):
if warning:
warn("Parameters passed to InputOutputSystem ignored.")

def _rhs(self, t, x, u, params={}):
def _rhs(self, t, x, u):
"""Evaluate right hand side of a differential or difference equation.

Private function used to compute the right hand side of an
Expand All @@ -369,23 +369,24 @@ def _rhs(self, t, x, u, params={}):
NotImplemented("Evaluation not implemented for system of type ",
type(self))

def dynamics(self, t, x, u):
def dynamics(self, t, x, u, params=None):
"""Compute the dynamics of a differential or difference equation.

Given time `t`, input `u` and state `x`, returns the value of the
right hand side of the dynamical system. If the system is continuous,
returns the time derivative

dx/dt = f(t, x, u)
dx/dt = f(t, x, u[, params])

where `f` is the system's (possibly nonlinear) dynamics function.
If the system is discrete-time, returns the next value of `x`:

x[t+dt] = f(t, x[t], u[t])
x[t+dt] = f(t, x[t], u[t][, params])

Where `t` is a scalar.
where `t` is a scalar.

The inputs `x` and `u` must be of the correct length.
The inputs `x` and `u` must be of the correct length. The `params`
argument is an optional dictionary of parameter values.

Parameters
----------
Expand All @@ -395,14 +396,17 @@ def dynamics(self, t, x, u):
current state
u : array_like
input
params : dict (optional)
system parameter values

Returns
-------
dx/dt or x[t+dt] : ndarray
"""
self._update_params(params)
return self._rhs(t, x, u)

def _out(self, t, x, u, params={}):
def _out(self, t, x, u):
"""Evaluate the output of a system at a given state, input, and time

Private function used to compute the output of of an input/output
Expand All @@ -414,13 +418,13 @@ def _out(self, t, x, u, params={}):
# If no output function was defined in subclass, return state
return x

def output(self, t, x, u):
def output(self, t, x, u, params=None):
"""Compute the output of the system

Given time `t`, input `u` and state `x`, returns the output of the
system:

y = g(t, x, u)
y = g(t, x, u[, params])

The inputs `x` and `u` must be of the correct length.

Expand All @@ -432,14 +436,17 @@ def output(self, t, x, u):
current state
u : array_like
input
params : dict (optional)
system parameter values

Returns
-------
y : ndarray
"""
self._update_params(params)
return self._out(t, x, u)

def feedback(self, other=1, sign=-1, params={}):
def feedback(self, other=1, sign=-1, params=None):
"""Feedback interconnection between two input/output systems

Parameters
Expand Down Expand Up @@ -507,7 +514,7 @@ def feedback(self, other=1, sign=-1, params={}):
# Return the newly created system
return newsys

def linearize(self, x0, u0, t=0, params={}, eps=1e-6,
def linearize(self, x0, u0, t=0, params=None, eps=1e-6,
name=None, copy=False, **kwargs):
"""Linearize an input/output system at a given state and input.

Expand Down Expand Up @@ -651,7 +658,7 @@ def __init__(self, linsys, **kwargs):
# Note: don't use super() to override StateSpace MRO
InputOutputSystem.__init__(
self, inputs=inputs, outputs=outputs, states=states,
params={}, dt=dt, name=name)
params=None, dt=dt, name=name)

# Initalize additional state space variables
StateSpace.__init__(
Expand All @@ -668,7 +675,7 @@ def __init__(self, linsys, **kwargs):
#: number of states, use :attr:`nstates`.
states = property(StateSpace._get_states, StateSpace._set_states)

def _update_params(self, params={}, warning=True):
def _update_params(self, params=None, warning=True):
# Parameters not supported; issue a warning
if params and warning:
warn("Parameters passed to LinearIOSystems are ignored.")
Expand Down Expand Up @@ -756,7 +763,7 @@ class NonlinearIOSystem(InputOutputSystem):
defaults.

"""
def __init__(self, updfcn, outfcn=None, params={}, **kwargs):
def __init__(self, updfcn, outfcn=None, params=None, **kwargs):
"""Create a nonlinear I/O system given update and output functions."""
# Process keyword arguments
name, inputs, outputs, states, dt = _process_namedio_keywords(
Expand Down Expand Up @@ -791,7 +798,7 @@ def __init__(self, updfcn, outfcn=None, params={}, **kwargs):
"(and nstates not known).")

# Initialize current parameters to default parameters
self._current_params = params.copy()
self._current_params = {} if params is None else params.copy()

def __str__(self):
return f"{InputOutputSystem.__str__(self)}\n\n" + \
Expand Down Expand Up @@ -838,7 +845,8 @@ def __call__(sys, u, params=None, squeeze=None):
def _update_params(self, params, warning=False):
# Update the current parameter values
self._current_params = self.params.copy()
self._current_params.update(params)
if params:
self._current_params.update(params)

def _rhs(self, t, x, u):
xdot = self.updfcn(t, x, u, self._current_params) \
Expand All @@ -862,20 +870,22 @@ class InterconnectedSystem(InputOutputSystem):
See :func:`~control.interconnect` for a list of parameters.

"""
def __init__(self, syslist, connections=[], inplist=[], outlist=[],
params={}, warn_duplicate=None, **kwargs):
def __init__(self, syslist, connections=None, inplist=None, outlist=None,
params=None, warn_duplicate=None, **kwargs):
"""Create an I/O system from a list of systems + connection info."""
# Convert input and output names to lists if they aren't already
if not isinstance(inplist, (list, tuple)):
if inplist is not None and not isinstance(inplist, (list, tuple)):
inplist = [inplist]
if not isinstance(outlist, (list, tuple)):
if outlist is not None and not isinstance(outlist, (list, tuple)):
outlist = [outlist]

# Check if dt argument was given; if not, pull from systems
dt = kwargs.pop('dt', None)

# Process keyword arguments (except dt)
defaults = {'inputs': len(inplist), 'outputs': len(outlist)}
defaults = {
'inputs': len(inplist or []),
'outputs': len(outlist or [])}
name, inputs, outputs, states, _ = _process_namedio_keywords(
kwargs, defaults, end=True)

Expand All @@ -894,6 +904,12 @@ def __init__(self, syslist, connections=[], inplist=[], outlist=[],

# Go through the system list and keep track of counts, offsets
for sysidx, sys in enumerate(syslist):
# If we were passed a SS or TF system, convert to LinearIOSystem
if isinstance(sys, (StateSpace, TransferFunction)) and \
not isinstance(sys, LinearIOSystem):
sys = LinearIOSystem(sys)
syslist[sysidx] = sys

# Make sure time bases are consistent
dt = common_timebase(dt, sys.dt)

Expand Down Expand Up @@ -969,7 +985,7 @@ def __init__(self, syslist, connections=[], inplist=[], outlist=[],

# Convert the list of interconnections to a connection map (matrix)
self.connect_map = np.zeros((ninputs, noutputs))
for connection in connections:
for connection in connections or []:
input_index = self._parse_input_spec(connection[0])
for output_spec in connection[1:]:
output_index, gain = self._parse_output_spec(output_spec)
Expand All @@ -980,7 +996,7 @@ def __init__(self, syslist, connections=[], inplist=[], outlist=[],

# Convert the input list to a matrix: maps system to subsystems
self.input_map = np.zeros((ninputs, self.ninputs))
for index, inpspec in enumerate(inplist):
for index, inpspec in enumerate(inplist or []):
if isinstance(inpspec, (int, str, tuple)):
inpspec = [inpspec]
if not isinstance(inpspec, list):
Expand All @@ -995,7 +1011,7 @@ def __init__(self, syslist, connections=[], inplist=[], outlist=[],

# Convert the output list to a matrix: maps subsystems to system
self.output_map = np.zeros((self.noutputs, noutputs + ninputs))
for index, outspec in enumerate(outlist):
for index, outspec in enumerate(outlist or []):
if isinstance(outspec, (int, str, tuple)):
outspec = [outspec]
if not isinstance(outspec, list):
Expand All @@ -1009,13 +1025,14 @@ def __init__(self, syslist, connections=[], inplist=[], outlist=[],
self.output_map[index, ylist_index] += gain

# Save the parameters for the system
self.params = params.copy()
self.params = {} if params is None else params.copy()

def _update_params(self, params, warning=False):
for sys in self.syslist:
local = sys.params.copy() # start with system parameters
local.update(self.params) # update with global params
local.update(params) # update with locally passed parameters
if params:
local.update(params) # update with locally passed parameters
sys._update_params(local, warning=warning)

def _rhs(self, t, x, u):
Expand Down Expand Up @@ -1565,7 +1582,7 @@ def __init__(self, io_sys, ss_sys=None):


def input_output_response(
sys, T, U=0., X0=0, params={},
sys, T, U=0., X0=0, params=None,
transpose=False, return_x=False, squeeze=None,
solve_ivp_kwargs={}, t_eval='T', **kwargs):
"""Compute the output response of a system to a given input.
Expand Down Expand Up @@ -1781,7 +1798,7 @@ def input_output_response(

# Update the parameter values
sys._update_params(params)

#
# Define a function to evaluate the input at an arbitrary time
#
Expand Down Expand Up @@ -1900,7 +1917,7 @@ def ivp_rhs(t, x):
transpose=transpose, return_x=return_x, squeeze=squeeze)


def find_eqpt(sys, x0, u0=[], y0=None, t=0, params={},
def find_eqpt(sys, x0, u0=None, y0=None, t=0, params=None,
iu=None, iy=None, ix=None, idx=None, dx0=None,
return_y=False, return_result=False):
"""Find the equilibrium point for an input/output system.
Expand Down Expand Up @@ -2151,7 +2168,7 @@ def rootfun(z):


# Linearize an input/output system
def linearize(sys, xeq, ueq=[], t=0, params={}, **kw):
def linearize(sys, xeq, ueq=None, t=0, params=None, **kw):
"""Linearize an input/output system at a given state and input.

This function computes the linearization of an input/output system at a
Expand Down Expand Up @@ -2242,7 +2259,7 @@ def ss(*args, **kwargs):
Convert a linear system into space system form. Always creates a
new system, even if sys is already a state space system.

``ss(updfcn, outfucn)``
``ss(updfcn, outfcn)``
Create a nonlinear input/output system with update function ``updfcn``
and output function ``outfcn``. See :class:`NonlinearIOSystem` for
more information.
Expand Down Expand Up @@ -2523,9 +2540,9 @@ def tf2io(*args, **kwargs):


# Function to create an interconnected system
def interconnect(syslist, connections=None, inplist=[], outlist=[], params={},
check_unused=True, ignore_inputs=None, ignore_outputs=None,
warn_duplicate=None, **kwargs):
def interconnect(syslist, connections=None, inplist=None, outlist=None,
params=None, check_unused=True, ignore_inputs=None,
ignore_outputs=None, warn_duplicate=None, **kwargs):
"""Interconnect a set of input/output systems.

This function creates a new system that is an interconnection of a set of
Expand Down Expand Up @@ -2767,10 +2784,10 @@ def interconnect(syslist, connections=None, inplist=[], outlist=[], params={},
connections = []

# If inplist/outlist is not present, try using inputs/outputs instead
if not inplist and inputs is not None:
inplist = list(inputs)
if not outlist and outputs is not None:
outlist = list(outputs)
if inplist is None:
inplist = list(inputs or [])
if outlist is None:
outlist = list(outputs or [])

# Process input list
if not isinstance(inplist, (list, tuple)):
Expand Down
10 changes: 8 additions & 2 deletions control/statesp.py
Original file line number Diff line number Diff line change
Expand Up @@ -1389,7 +1389,7 @@ def dcgain(self, warn_infinite=False):
"""
return self._dcgain(warn_infinite)

def dynamics(self, t, x, u=None):
def dynamics(self, t, x, u=None, params=None):
"""Compute the dynamics of the system

Given input `u` and state `x`, returns the dynamics of the state-space
Expand Down Expand Up @@ -1423,6 +1423,9 @@ def dynamics(self, t, x, u=None):
dx/dt or x[t+dt] : ndarray

"""
if params is not None:
warn("params keyword ignored for StateSpace object")

x = np.reshape(x, (-1, 1)) # force to a column in case matrix
if np.size(x) != self.nstates:
raise ValueError("len(x) must be equal to number of states")
Expand All @@ -1435,7 +1438,7 @@ def dynamics(self, t, x, u=None):
return (self.A @ x).reshape((-1,)) \
+ (self.B @ u).reshape((-1,)) # return as row vector

def output(self, t, x, u=None):
def output(self, t, x, u=None, params=None):
"""Compute the output of the system

Given input `u` and state `x`, returns the output `y` of the
Expand Down Expand Up @@ -1465,6 +1468,9 @@ def output(self, t, x, u=None):
-------
y : ndarray
"""
if params is not None:
warn("params keyword ignored for StateSpace object")

x = np.reshape(x, (-1, 1)) # force to a column in case matrix
if np.size(x) != self.nstates:
raise ValueError("len(x) must be equal to number of states")
Expand Down
12 changes: 12 additions & 0 deletions control/tests/statesp_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1158,3 +1158,15 @@ def test_linfnorm_ct_mimo(self, ct_siso):
gpeak, fpeak = linfnorm(sys)
np.testing.assert_allclose(gpeak, refgpeak)
np.testing.assert_allclose(fpeak, reffpeak)


# Make sure that using params for StateSpace objects generates a warning
def test_params_warning():
sys = StateSpace(-1, 1, 1, 0)

with pytest.warns(UserWarning, match="params keyword ignored"):
sys.dynamics(0, [0], [0], {'k': 5})

with pytest.warns(UserWarning, match="params keyword ignored"):
sys.output(0, [0], [0], {'k': 5})

Loading