Skip to content

fix to insure that when a StateSpace and a LinearIOSystem are combined, the result is a LinearIOSystem #785

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
66 changes: 33 additions & 33 deletions control/iosys.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,12 @@ class for a set of subclasses that are used to implement specific
The :class:`~control.InputOuputSystem` class (and its subclasses) makes
use of two special methods for implementing much of the work of the class:

* _rhs(t, x, u): compute the right hand side of the differential or
difference equation for the system. This must be specified by the
* _rhs(t, x, u, params={}): compute the right hand side of the differential
or difference equation for the system. This must be specified by the
subclass for the system.

* _out(t, x, u): compute the output for the current state of the system.
The default is to return the entire system state.
* _out(t, x, u, params={}): compute the output for the current state of the
system. The default is to return the entire system state.

"""

Expand Down Expand Up @@ -149,6 +149,7 @@ def __init__(self, params={}, **kwargs):

# default parameters
self.params = params.copy()
self._current_params = self.params.copy()

def __mul__(sys2, sys1):
"""Multiply two input/output systems (series interconnection)"""
Expand Down Expand Up @@ -369,7 +370,7 @@ def _rhs(self, t, x, u, params={}):
NotImplemented("Evaluation not implemented for system of type ",
type(self))

def dynamics(self, t, x, u):
def dynamics(self, t, x, u, params={}):
"""Compute the dynamics of a differential or difference equation.

Given time `t`, input `u` and state `x`, returns the value of the
Expand All @@ -395,12 +396,15 @@ def dynamics(self, t, x, u):
current state
u : array_like
input
params : dict (optional)
system parameters


Returns
-------
dx/dt or x[t+dt] : ndarray
"""
return self._rhs(t, x, u)
return self._rhs(t, x, u, params=params)

def _out(self, t, x, u, params={}):
"""Evaluate the output of a system at a given state, input, and time
Expand All @@ -414,7 +418,7 @@ def _out(self, t, x, u, params={}):
# If no output function was defined in subclass, return state
return x

def output(self, t, x, u):
def output(self, t, x, u, params={}):
"""Compute the output of the system

Given time `t`, input `u` and state `x`, returns the output of the
Expand All @@ -432,12 +436,14 @@ def output(self, t, x, u):
current state
u : array_like
input
params : dict (optional)
system parameters

Returns
-------
y : ndarray
"""
return self._out(t, x, u)
return self._out(t, x, u, params=params)

def feedback(self, other=1, sign=-1, params={}):
"""Feedback interconnection between two input/output systems
Expand Down Expand Up @@ -673,17 +679,9 @@ def _update_params(self, params={}, warning=True):
if params and warning:
warn("Parameters passed to LinearIOSystems are ignored.")

def _rhs(self, t, x, u):
# Convert input to column vector and then change output to 1D array
xdot = self.A @ np.reshape(x, (-1, 1)) \
+ self.B @ np.reshape(u, (-1, 1))
return np.array(xdot).reshape((-1,))

def _out(self, t, x, u):
# Convert input to column vector and then change output to 1D array
y = self.C @ np.reshape(x, (-1, 1)) \
+ self.D @ np.reshape(u, (-1, 1))
return np.array(y).reshape((-1,))
# inherit methods as necessary
_rhs = StateSpace._rhs
_out = StateSpace._out

def __repr__(self):
# Need to define so that I/O system gets used instead of StateSpace
Expand Down Expand Up @@ -799,7 +797,7 @@ def __str__(self):
f"Output: {self.outfcn}"

# Return the value of a static nonlinear system
def __call__(sys, u, params=None, squeeze=None):
def __call__(sys, u, params={}, squeeze=None):
"""Evaluate a (static) nonlinearity at a given input value

If a nonlinear I/O system has no internal state, then evaluating the
Expand All @@ -825,12 +823,8 @@ def __call__(sys, u, params=None, squeeze=None):
"function evaluation is only supported for static "
"input/output systems")

# If we received any parameters, update them before calling _out()
if params is not None:
sys._update_params(params)

# Evaluate the function on the argument
out = sys._out(0, np.array((0,)), np.asarray(u))
out = sys._out(0, np.array((0,)), np.asarray(u), params)
_, out = _process_time_response(
None, out, issiso=sys.issiso(), squeeze=squeeze)
return out
Expand All @@ -840,13 +834,17 @@ def _update_params(self, params, warning=False):
self._current_params = self.params.copy()
self._current_params.update(params)

def _rhs(self, t, x, u):
xdot = self.updfcn(t, x, u, self._current_params) \
def _rhs(self, t, x, u, params={}):
current_params = self._current_params.copy()
current_params.update(params)
xdot = self.updfcn(t, x, u, current_params) \
if self.updfcn is not None else []
return np.array(xdot).reshape((-1,))

def _out(self, t, x, u):
y = self.outfcn(t, x, u, self._current_params) \
def _out(self, t, x, u, params={}):
current_params = self._current_params.copy()
current_params.update(params)
y = self.outfcn(t, x, u, current_params) \
if self.outfcn is not None else x
return np.array(y).reshape((-1,))

Expand Down Expand Up @@ -1018,7 +1016,7 @@ def _update_params(self, params, warning=False):
local.update(params) # update with locally passed parameters
sys._update_params(local, warning=warning)

def _rhs(self, t, x, u):
def _rhs(self, t, x, u, params={}):
# Make sure state and input are vectors
x = np.array(x, ndmin=1)
u = np.array(u, ndmin=1)
Expand All @@ -1031,18 +1029,20 @@ def _rhs(self, t, x, u):
state_index, input_index = 0, 0 # Start at the beginning
for sys in self.syslist:
# Update the right hand side for this subsystem
sys_params = sys._current_params.copy()
sys_params.update(params)
if sys.nstates != 0:
xdot[state_index:state_index + sys.nstates] = sys._rhs(
t, x[state_index:state_index + sys.nstates],
ulist[input_index:input_index + sys.ninputs])
ulist[input_index:input_index + sys.ninputs], sys_params)

# Update the state and input index counters
state_index += sys.nstates
input_index += sys.ninputs

return xdot

def _out(self, t, x, u):
def _out(self, t, x, u, params={}):
# Make sure state and input are vectors
x = np.array(x, ndmin=1)
u = np.array(u, ndmin=1)
Expand Down Expand Up @@ -2838,7 +2838,7 @@ def interconnect(syslist, connections=None, inplist=[], outlist=[], params={},
newsys.check_unused_signals(ignore_inputs, ignore_outputs)

# If all subsystems are linear systems, maintain linear structure
if all([isinstance(sys, LinearIOSystem) for sys in syslist]):
if all([isinstance(sys, (LinearIOSystem, StateSpace)) for sys in syslist]):
return LinearICSystem(newsys, None)

return newsys
Expand Down
66 changes: 46 additions & 20 deletions control/statesp.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,9 +346,8 @@ def __init__(self, *args, init_namedio=True, **kwargs):
defaults = args[0] if len(args) == 1 else \
{'inputs': D.shape[1], 'outputs': D.shape[0],
'states': A.shape[0]}
static = (A.size == 0)
name, inputs, outputs, states, dt = _process_namedio_keywords(
kwargs, defaults, static=static, end=True)
kwargs, defaults, static=self._isstatic(), end=True)

# Initialize LTI (NamedIOSystem) object
super().__init__(
Expand Down Expand Up @@ -386,6 +385,9 @@ def __init__(self, *args, init_namedio=True, **kwargs):
# Check for states that don't do anything, and remove them
if remove_useless_states:
self._remove_useless_states()
# params for compatibility with LinearIOSystems
self.params = {}
self._current_params = self.params.copy()

#
# Class attributes
Expand Down Expand Up @@ -1388,8 +1390,22 @@ def dcgain(self, warn_infinite=False):
scalar is returned.
"""
return self._dcgain(warn_infinite)

def _rhs(self, t, x, u=None, params={}):
"""Compute the right hand side of the differential or difference
equation for the system. Please :meth:`dynamics` for a more user-
friendly interface. """

def dynamics(self, t, x, u=None):
x = np.reshape(x, (-1, 1)) # force to a column in case matrix
if u is None: # received t and x, but ignore t
output = self.A @ x
else: # received t, x, and u, ignore t
u = np.reshape(u, (-1, 1)) # force to column in case matrix
output = self.A @ x + self.B @ u

return output.reshape((-1,)) # return as row vector

def dynamics(self, t, x, u=None, params={}):
"""Compute the dynamics of the system

Given input `u` and state `x`, returns the dynamics of the state-space
Expand Down Expand Up @@ -1417,25 +1433,35 @@ def dynamics(self, t, x, u=None):
current state
u : array_like (optional)
input, zero if omitted
params : dict (optional)
included for compatibility but ignored for :class:`StateSpace` systems

Returns
-------
dx/dt or x[t+dt] : ndarray

"""
x = np.reshape(x, (-1, 1)) # force to a column in case matrix
# chechs
if np.size(x) != self.nstates:
raise ValueError("len(x) must be equal to number of states")
if u is not None:
if np.size(u) != self.ninputs:
raise ValueError("len(u) must be equal to number of inputs")
return self._rhs(t, x, u, params)

def _out(self, t, x, u=None, params={}):
"""Compute the output of the system system. Please :meth:`dynamics`
for a more user-friendly interface. """

x = np.reshape(x, (-1, 1)) # force to a column in case matrix
if u is None:
return (self.A @ x).reshape((-1,)) # return as row vector
y = self.C @ x
else: # received t, x, and u, ignore t
u = np.reshape(u, (-1, 1)) # force to column in case matrix
if np.size(u) != self.ninputs:
raise ValueError("len(u) must be equal to number of inputs")
return (self.A @ x).reshape((-1,)) \
+ (self.B @ u).reshape((-1,)) # return as row vector
u = np.reshape(u, (-1, 1)) # force to a column in case matrix
y = self.C @ x + self.D @ u
return y.reshape((-1,)) # return as row vector

def output(self, t, x, u=None):
def output(self, t, x, u=None, params={}):
"""Compute the output of the system

Given input `u` and state `x`, returns the output `y` of the
Expand Down Expand Up @@ -1465,19 +1491,19 @@ def output(self, t, x, u=None):
-------
y : ndarray
"""
x = np.reshape(x, (-1, 1)) # force to a column in case matrix
if np.size(x) != self.nstates:
raise ValueError("len(x) must be equal to number of states")

if u is None:
return (self.C @ x).reshape((-1,)) # return as row vector
else: # received t, x, and u, ignore t
u = np.reshape(u, (-1, 1)) # force to a column in case matrix
if u is not None:
if np.size(u) != self.ninputs:
raise ValueError("len(u) must be equal to number of inputs")
return (self.C @ x).reshape((-1,)) \
+ (self.D @ u).reshape((-1,)) # return as row vector

return self._out(t, x, u, params)

def _update_params(self, params={}, warning=False):
# Parameters not supported; issue a warning
if params and warning:
warn("Parameters passed to StateSpace system are ignored.")


def _isstatic(self):
"""True if and only if the system has no dynamics, that is,
if A and B are zero. """
Expand Down
13 changes: 13 additions & 0 deletions control/tests/iosys_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ class TSys:
[[-1, 1], [0, -2]], [[0, 1], [1, 0]],
[[1, 0], [0, 1]], np.zeros((2, 2)))

# Create a static gain linear system
T.staticgain = ct.StateSpace(0, 0, 0, 1)

# Create simulation parameters
T.T = np.linspace(0, 10, 100)
T.U = np.sin(T.T)
Expand All @@ -66,6 +69,16 @@ def test_linear_iosys(self, tsys):
np.testing.assert_array_almost_equal(lti_t, ios_t)
np.testing.assert_allclose(lti_y, ios_y, atol=0.002, rtol=0.)

# Make sure that a combination of a LinearIOSystem and a StateSpace
# system results in a LinearIOSystem
assert isinstance(linsys*iosys, ios.LinearIOSystem)

# Make sure that a static linear system has dt=None
# and otherwise dt is as specified
assert ios.LinearIOSystem(tsys.staticgain).dt is None
assert ios.LinearIOSystem(tsys.staticgain, dt=.1).dt == .1


def test_tf2io(self, tsys):
# Create a transfer function from the state space system
linsys = tsys.siso_linsys
Expand Down