Skip to content
66 changes: 33 additions & 33 deletions control/iosys.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,12 @@ class for a set of subclasses that are used to implement specific
The :class:`~control.InputOuputSystem` class (and its subclasses) makes
use of two special methods for implementing much of the work of the class:

* _rhs(t, x, u): compute the right hand side of the differential or
difference equation for the system. This must be specified by the
* _rhs(t, x, u, params={}): compute the right hand side of the differential
or difference equation for the system. This must be specified by the
subclass for the system.

* _out(t, x, u): compute the output for the current state of the system.
The default is to return the entire system state.
* _out(t, x, u, params={}): compute the output for the current state of the
system. The default is to return the entire system state.

"""

Expand Down Expand Up @@ -149,6 +149,7 @@ def __init__(self, params={}, **kwargs):

# default parameters
self.params = params.copy()
self._current_params = self.params.copy()

def __mul__(sys2, sys1):
"""Multiply two input/output systems (series interconnection)"""
Expand Down Expand Up @@ -369,7 +370,7 @@ def _rhs(self, t, x, u, params={}):
NotImplemented("Evaluation not implemented for system of type ",
type(self))

def dynamics(self, t, x, u):
def dynamics(self, t, x, u, params={}):
"""Compute the dynamics of a differential or difference equation.

Given time `t`, input `u` and state `x`, returns the value of the
Expand All @@ -395,12 +396,15 @@ def dynamics(self, t, x, u):
current state
u : array_like
input
params : dict (optional)
system parameters


Returns
-------
dx/dt or x[t+dt] : ndarray
"""
return self._rhs(t, x, u)
return self._rhs(t, x, u, params=params)

def _out(self, t, x, u, params={}):
"""Evaluate the output of a system at a given state, input, and time
Expand All @@ -414,7 +418,7 @@ def _out(self, t, x, u, params={}):
# If no output function was defined in subclass, return state
return x

def output(self, t, x, u):
def output(self, t, x, u, params={}):
"""Compute the output of the system

Given time `t`, input `u` and state `x`, returns the output of the
Expand All @@ -432,12 +436,14 @@ def output(self, t, x, u):
current state
u : array_like
input
params : dict (optional)
system parameters

Returns
-------
y : ndarray
"""
return self._out(t, x, u)
return self._out(t, x, u, params=params)

def feedback(self, other=1, sign=-1, params={}):
"""Feedback interconnection between two input/output systems
Expand Down Expand Up @@ -673,17 +679,9 @@ def _update_params(self, params={}, warning=True):
if params and warning:
warn("Parameters passed to LinearIOSystems are ignored.")

def _rhs(self, t, x, u):
# Convert input to column vector and then change output to 1D array
xdot = self.A @ np.reshape(x, (-1, 1)) \
+ self.B @ np.reshape(u, (-1, 1))
return np.array(xdot).reshape((-1,))

def _out(self, t, x, u):
# Convert input to column vector and then change output to 1D array
y = self.C @ np.reshape(x, (-1, 1)) \
+ self.D @ np.reshape(u, (-1, 1))
return np.array(y).reshape((-1,))
# inherit methods as necessary
_rhs = StateSpace._rhs
_out = StateSpace._out

def __repr__(self):
# Need to define so that I/O system gets used instead of StateSpace
Expand Down Expand Up @@ -799,7 +797,7 @@ def __str__(self):
f"Output: {self.outfcn}"

# Return the value of a static nonlinear system
def __call__(sys, u, params=None, squeeze=None):
def __call__(sys, u, params={}, squeeze=None):
"""Evaluate a (static) nonlinearity at a given input value

If a nonlinear I/O system has no internal state, then evaluating the
Expand All @@ -825,12 +823,8 @@ def __call__(sys, u, params=None, squeeze=None):
"function evaluation is only supported for static "
"input/output systems")

# If we received any parameters, update them before calling _out()
if params is not None:
sys._update_params(params)

# Evaluate the function on the argument
out = sys._out(0, np.array((0,)), np.asarray(u))
out = sys._out(0, np.array((0,)), np.asarray(u), params)
_, out = _process_time_response(
None, out, issiso=sys.issiso(), squeeze=squeeze)
return out
Expand All @@ -840,13 +834,17 @@ def _update_params(self, params, warning=False):
self._current_params = self.params.copy()
self._current_params.update(params)

def _rhs(self, t, x, u):
xdot = self.updfcn(t, x, u, self._current_params) \
def _rhs(self, t, x, u, params={}):
current_params = self._current_params.copy()
current_params.update(params)
xdot = self.updfcn(t, x, u, current_params) \
if self.updfcn is not None else []
return np.array(xdot).reshape((-1,))

def _out(self, t, x, u):
y = self.outfcn(t, x, u, self._current_params) \
def _out(self, t, x, u, params={}):
current_params = self._current_params.copy()
current_params.update(params)
y = self.outfcn(t, x, u, current_params) \
if self.outfcn is not None else x
return np.array(y).reshape((-1,))

Expand Down Expand Up @@ -1018,7 +1016,7 @@ def _update_params(self, params, warning=False):
local.update(params) # update with locally passed parameters
sys._update_params(local, warning=warning)

def _rhs(self, t, x, u):
def _rhs(self, t, x, u, params={}):
# Make sure state and input are vectors
x = np.array(x, ndmin=1)
u = np.array(u, ndmin=1)
Expand All @@ -1031,18 +1029,20 @@ def _rhs(self, t, x, u):
state_index, input_index = 0, 0 # Start at the beginning
for sys in self.syslist:
# Update the right hand side for this subsystem
sys_params = sys._current_params.copy()
sys_params.update(params)
if sys.nstates != 0:
xdot[state_index:state_index + sys.nstates] = sys._rhs(
t, x[state_index:state_index + sys.nstates],
ulist[input_index:input_index + sys.ninputs])
ulist[input_index:input_index + sys.ninputs], sys_params)

# Update the state and input index counters
state_index += sys.nstates
input_index += sys.ninputs

return xdot

def _out(self, t, x, u):
def _out(self, t, x, u, params={}):
# Make sure state and input are vectors
x = np.array(x, ndmin=1)
u = np.array(u, ndmin=1)
Expand Down Expand Up @@ -2838,7 +2838,7 @@ def interconnect(syslist, connections=None, inplist=[], outlist=[], params={},
newsys.check_unused_signals(ignore_inputs, ignore_outputs)

# If all subsystems are linear systems, maintain linear structure
if all([isinstance(sys, LinearIOSystem) for sys in syslist]):
if all([isinstance(sys, (LinearIOSystem, StateSpace)) for sys in syslist]):
return LinearICSystem(newsys, None)

return newsys
Expand Down
66 changes: 46 additions & 20 deletions control/statesp.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,9 +346,8 @@ def __init__(self, *args, init_namedio=True, **kwargs):
defaults = args[0] if len(args) == 1 else \
{'inputs': D.shape[1], 'outputs': D.shape[0],
'states': A.shape[0]}
static = (A.size == 0)
name, inputs, outputs, states, dt = _process_namedio_keywords(
kwargs, defaults, static=static, end=True)
kwargs, defaults, static=self._isstatic(), end=True)

# Initialize LTI (NamedIOSystem) object
super().__init__(
Expand Down Expand Up @@ -386,6 +385,9 @@ def __init__(self, *args, init_namedio=True, **kwargs):
# Check for states that don't do anything, and remove them
if remove_useless_states:
self._remove_useless_states()
# params for compatibility with LinearIOSystems
self.params = {}
self._current_params = self.params.copy()

#
# Class attributes
Expand Down Expand Up @@ -1388,8 +1390,22 @@ def dcgain(self, warn_infinite=False):
scalar is returned.
"""
return self._dcgain(warn_infinite)

def _rhs(self, t, x, u=None, params={}):
"""Compute the right hand side of the differential or difference
equation for the system. Please :meth:`dynamics` for a more user-
friendly interface. """

def dynamics(self, t, x, u=None):
x = np.reshape(x, (-1, 1)) # force to a column in case matrix
if u is None: # received t and x, but ignore t
output = self.A @ x
else: # received t, x, and u, ignore t
u = np.reshape(u, (-1, 1)) # force to column in case matrix
output = self.A @ x + self.B @ u

return output.reshape((-1,)) # return as row vector

def dynamics(self, t, x, u=None, params={}):
"""Compute the dynamics of the system

Given input `u` and state `x`, returns the dynamics of the state-space
Expand Down Expand Up @@ -1417,25 +1433,35 @@ def dynamics(self, t, x, u=None):
current state
u : array_like (optional)
input, zero if omitted
params : dict (optional)
included for compatibility but ignored for :class:`StateSpace` systems

Returns
-------
dx/dt or x[t+dt] : ndarray

"""
x = np.reshape(x, (-1, 1)) # force to a column in case matrix
# chechs
if np.size(x) != self.nstates:
raise ValueError("len(x) must be equal to number of states")
if u is not None:
if np.size(u) != self.ninputs:
raise ValueError("len(u) must be equal to number of inputs")
return self._rhs(t, x, u, params)

def _out(self, t, x, u=None, params={}):
"""Compute the output of the system system. Please :meth:`dynamics`
for a more user-friendly interface. """

x = np.reshape(x, (-1, 1)) # force to a column in case matrix
if u is None:
return (self.A @ x).reshape((-1,)) # return as row vector
y = self.C @ x
else: # received t, x, and u, ignore t
u = np.reshape(u, (-1, 1)) # force to column in case matrix
if np.size(u) != self.ninputs:
raise ValueError("len(u) must be equal to number of inputs")
return (self.A @ x).reshape((-1,)) \
+ (self.B @ u).reshape((-1,)) # return as row vector
u = np.reshape(u, (-1, 1)) # force to a column in case matrix
y = self.C @ x + self.D @ u
return y.reshape((-1,)) # return as row vector

def output(self, t, x, u=None):
def output(self, t, x, u=None, params={}):
"""Compute the output of the system

Given input `u` and state `x`, returns the output `y` of the
Expand Down Expand Up @@ -1465,19 +1491,19 @@ def output(self, t, x, u=None):
-------
y : ndarray
"""
x = np.reshape(x, (-1, 1)) # force to a column in case matrix
if np.size(x) != self.nstates:
raise ValueError("len(x) must be equal to number of states")

if u is None:
return (self.C @ x).reshape((-1,)) # return as row vector
else: # received t, x, and u, ignore t
u = np.reshape(u, (-1, 1)) # force to a column in case matrix
if u is not None:
if np.size(u) != self.ninputs:
raise ValueError("len(u) must be equal to number of inputs")
return (self.C @ x).reshape((-1,)) \
+ (self.D @ u).reshape((-1,)) # return as row vector

return self._out(t, x, u, params)

def _update_params(self, params={}, warning=False):
# Parameters not supported; issue a warning
if params and warning:
warn("Parameters passed to StateSpace system are ignored.")


def _isstatic(self):
"""True if and only if the system has no dynamics, that is,
if A and B are zero. """
Expand Down
13 changes: 13 additions & 0 deletions control/tests/iosys_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ class TSys:
[[-1, 1], [0, -2]], [[0, 1], [1, 0]],
[[1, 0], [0, 1]], np.zeros((2, 2)))

# Create a static gain linear system
T.staticgain = ct.StateSpace(0, 0, 0, 1)

# Create simulation parameters
T.T = np.linspace(0, 10, 100)
T.U = np.sin(T.T)
Expand All @@ -66,6 +69,16 @@ def test_linear_iosys(self, tsys):
np.testing.assert_array_almost_equal(lti_t, ios_t)
np.testing.assert_allclose(lti_y, ios_y, atol=0.002, rtol=0.)

# Make sure that a combination of a LinearIOSystem and a StateSpace
# system results in a LinearIOSystem
assert isinstance(linsys*iosys, ios.LinearIOSystem)

# Make sure that a static linear system has dt=None
# and otherwise dt is as specified
assert ios.LinearIOSystem(tsys.staticgain).dt is None
assert ios.LinearIOSystem(tsys.staticgain, dt=.1).dt == .1


def test_tf2io(self, tsys):
# Create a transfer function from the state space system
linsys = tsys.siso_linsys
Expand Down