From 45c3abdb534e8421c2582a7a1da8ecd6f10e1cbd Mon Sep 17 00:00:00 2001 From: Richard Murray Date: Sat, 12 Nov 2022 15:45:10 -0800 Subject: [PATCH 1/5] add conversions for LTI systems in interconnect() --- control/iosys.py | 8 ++++- control/tests/type_conversion_test.py | 42 +++++++++++++++++++++++++-- 2 files changed, 46 insertions(+), 4 deletions(-) diff --git a/control/iosys.py b/control/iosys.py index 19f527c22..b88bbb84d 100644 --- a/control/iosys.py +++ b/control/iosys.py @@ -894,6 +894,12 @@ def __init__(self, syslist, connections=[], inplist=[], outlist=[], # Go through the system list and keep track of counts, offsets for sysidx, sys in enumerate(syslist): + # If we were passed a SS or TF system, convert to LinearIOSystem + if isinstance(sys, (StateSpace, TransferFunction)) and \ + not isinstance(sys, LinearIOSystem): + sys = LinearIOSystem(sys) + syslist[sysidx] = sys + # Make sure time bases are consistent dt = common_timebase(dt, sys.dt) @@ -1781,7 +1787,7 @@ def input_output_response( # Update the parameter values sys._update_params(params) - + # # Define a function to evaluate the input at an arbitrary time # diff --git a/control/tests/type_conversion_test.py b/control/tests/type_conversion_test.py index cdf302015..02290d27b 100644 --- a/control/tests/type_conversion_test.py +++ b/control/tests/type_conversion_test.py @@ -59,7 +59,7 @@ def sys_dict(): rtype_list = ['ss', 'tf', 'frd', 'lio', 'ios', 'arr', 'flt'] conversion_table = [ # op left ss tf frd lio ios arr flt - ('add', 'ss', ['ss', 'ss', 'frd', 'ss', 'ios', 'ss', 'ss' ]), + ('add', 'ss', ['ss', 'ss', 'frd', 'lio', 'ios', 'ss', 'ss' ]), ('add', 'tf', ['tf', 'tf', 'frd', 'lio', 'ios', 'tf', 'tf' ]), ('add', 'frd', ['frd', 'frd', 'frd', 'frd', 'E', 'frd', 'frd']), ('add', 'lio', ['lio', 'lio', 'xrd', 'lio', 'ios', 'lio', 'lio']), @@ -68,7 +68,7 @@ def sys_dict(): ('add', 'flt', ['ss', 'tf', 'frd', 'lio', 'ios', 'arr', 'flt']), # op left ss tf frd lio ios arr flt - ('sub', 'ss', ['ss', 'ss', 'frd', 'ss', 'ios', 'ss', 'ss' ]), + ('sub', 'ss', ['ss', 'ss', 'frd', 'lio', 'ios', 'ss', 'ss' ]), ('sub', 'tf', ['tf', 'tf', 'frd', 'lio', 'ios', 'tf', 'tf' ]), ('sub', 'frd', ['frd', 'frd', 'frd', 'frd', 'E', 'frd', 'frd']), ('sub', 'lio', ['lio', 'lio', 'xrd', 'lio', 'ios', 'lio', 'lio']), @@ -77,7 +77,7 @@ def sys_dict(): ('sub', 'flt', ['ss', 'tf', 'frd', 'lio', 'ios', 'arr', 'flt']), # op left ss tf frd lio ios arr flt - ('mul', 'ss', ['ss', 'ss', 'frd', 'ss', 'ios', 'ss', 'ss' ]), + ('mul', 'ss', ['ss', 'ss', 'frd', 'lio', 'ios', 'ss', 'ss' ]), ('mul', 'tf', ['tf', 'tf', 'frd', 'lio', 'ios', 'tf', 'tf' ]), ('mul', 'frd', ['frd', 'frd', 'frd', 'frd', 'E', 'frd', 'frd']), ('mul', 'lio', ['lio', 'lio', 'xrd', 'lio', 'ios', 'lio', 'lio']), @@ -191,3 +191,39 @@ def test_binary_op_type_conversions(opname, ltype, rtype, sys_dict): assert len(result.output_labels) == result.noutputs if result.nstates is not None: assert len(result.state_labels) == result.nstates + +@pytest.mark.parametrize( + "typelist, connections, inplist, outlist, expected", [ + (['lio', 'lio'], [[(1, 0), (0, 0)]], [[(0, 0)]], [[(1, 0)]], 'lio'), + (['lio', 'ss'], [[(1, 0), (0, 0)]], [[(0, 0)]], [[(1, 0)]], 'lio'), + (['ss', 'lio'], [[(1, 0), (0, 0)]], [[(0, 0)]], [[(1, 0)]], 'lio'), + (['ss', 'ss'], [[(1, 0), (0, 0)]], [[(0, 0)]], [[(1, 0)]], 'lio'), + (['lio', 'tf'], [[(1, 0), (0, 0)]], [[(0, 0)]], [[(1, 0)]], 'lio'), + (['lio', 'frd'], [[(1, 0), (0, 0)]], [[(0, 0)]], [[(1, 0)]], 'E'), + (['ios', 'ios'], [[(1, 0), (0, 0)]], [[(0, 0)]], [[(1, 0)]], 'ios'), + (['lio', 'ios'], [[(1, 0), (0, 0)]], [[(0, 0)]], [[(1, 0)]], 'ios'), + (['ss', 'ios'], [[(1, 0), (0, 0)]], [[(0, 0)]], [[(1, 0)]], 'ios'), + (['tf', 'ios'], [[(1, 0), (0, 0)]], [[(0, 0)]], [[(1, 0)]], 'ios'), + (['lio', 'ss', 'tf'], + [[(1, 0), (0, 0)], [(2, 0), (1, 0)]], [[(0, 0)]], [[(2, 0)]], 'lio'), + (['ios', 'ss', 'tf'], + [[(1, 0), (0, 0)], [(2, 0), (1, 0)]], [[(0, 0)]], [[(2, 0)]], 'ios'), + ]) +def test_interconnect( + typelist, connections, inplist, outlist, expected, sys_dict): + # Create the system list + syslist = [sys_dict[_type] for _type in typelist] + + # Make copies of any duplicates + for sysidx, sys in enumerate(syslist): + if sys == syslist[0]: + syslist[sysidx] = sys.copy() + + # Make sure we get the right result + if expected == 'E' or expected[0] == 'x': + # Exception expected + with pytest.raises(TypeError): + result = ct.interconnect(syslist, connections, inplist, outlist) + else: + result = ct.interconnect(syslist, connections, inplist, outlist) + assert isinstance(result, type_dict[expected]) From 43ce56bd03a26ec52312535e4dacb0fc6579f778 Mon Sep 17 00:00:00 2001 From: Richard Murray Date: Sat, 12 Nov 2022 16:36:17 -0800 Subject: [PATCH 2/5] fix params processing in I/O system + unit tests --- control/iosys.py | 12 +++++++----- control/tests/type_conversion_test.py | 12 +++++++++++- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/control/iosys.py b/control/iosys.py index b88bbb84d..8123b9050 100644 --- a/control/iosys.py +++ b/control/iosys.py @@ -357,7 +357,7 @@ def _update_params(self, params, warning=False): if warning: warn("Parameters passed to InputOutputSystem ignored.") - def _rhs(self, t, x, u, params={}): + def _rhs(self, t, x, u): """Evaluate right hand side of a differential or difference equation. Private function used to compute the right hand side of an @@ -369,7 +369,7 @@ def _rhs(self, t, x, u, params={}): NotImplemented("Evaluation not implemented for system of type ", type(self)) - def dynamics(self, t, x, u): + def dynamics(self, t, x, u, params={}): """Compute the dynamics of a differential or difference equation. Given time `t`, input `u` and state `x`, returns the value of the @@ -400,9 +400,10 @@ def dynamics(self, t, x, u): ------- dx/dt or x[t+dt] : ndarray """ + self._update_params(params) return self._rhs(t, x, u) - def _out(self, t, x, u, params={}): + def _out(self, t, x, u): """Evaluate the output of a system at a given state, input, and time Private function used to compute the output of of an input/output @@ -414,7 +415,7 @@ def _out(self, t, x, u, params={}): # If no output function was defined in subclass, return state return x - def output(self, t, x, u): + def output(self, t, x, u, params={}): """Compute the output of the system Given time `t`, input `u` and state `x`, returns the output of the @@ -437,6 +438,7 @@ def output(self, t, x, u): ------- y : ndarray """ + self._update_params(params) return self._out(t, x, u) def feedback(self, other=1, sign=-1, params={}): @@ -2248,7 +2250,7 @@ def ss(*args, **kwargs): Convert a linear system into space system form. Always creates a new system, even if sys is already a state space system. - ``ss(updfcn, outfucn)`` + ``ss(updfcn, outfcn)`` Create a nonlinear input/output system with update function ``updfcn`` and output function ``outfcn``. See :class:`NonlinearIOSystem` for more information. diff --git a/control/tests/type_conversion_test.py b/control/tests/type_conversion_test.py index 02290d27b..7163f7097 100644 --- a/control/tests/type_conversion_test.py +++ b/control/tests/type_conversion_test.py @@ -19,7 +19,9 @@ def sys_dict(): sdict['frd'] = ct.frd([10+0j, 9 + 1j, 8 + 2j, 7 + 3j], [1, 2, 3, 4]) sdict['lio'] = ct.LinearIOSystem(ct.ss([[-1]], [[5]], [[5]], [[0]])) sdict['ios'] = ct.NonlinearIOSystem( - sdict['lio']._rhs, sdict['lio']._out, inputs=1, outputs=1, states=1) + lambda t, x, u, params: sdict['lio']._rhs(t, x, u), + lambda t, x, u, params: sdict['lio']._out(t, x, u), + inputs=1, outputs=1, states=1) sdict['arr'] = np.array([[2.0]]) sdict['flt'] = 3. return sdict @@ -226,4 +228,12 @@ def test_interconnect( result = ct.interconnect(syslist, connections, inplist, outlist) else: result = ct.interconnect(syslist, connections, inplist, outlist) + + # Make sure the type is correct assert isinstance(result, type_dict[expected]) + + # Make sure we can evaluate the dynamics + np.testing.assert_equal( + result.dynamics( + 0, np.zeros(result.nstates), np.zeros(result.ninputs)), + np.zeros(result.nstates)) From 1a78cb8eb11f53caa444e0833391ffe5f561bcdb Mon Sep 17 00:00:00 2001 From: Richard Murray Date: Sat, 12 Nov 2022 18:48:09 -0800 Subject: [PATCH 3/5] update params docstrings in iosys --- control/iosys.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/control/iosys.py b/control/iosys.py index 8123b9050..e3f37b8b6 100644 --- a/control/iosys.py +++ b/control/iosys.py @@ -376,16 +376,17 @@ def dynamics(self, t, x, u, params={}): right hand side of the dynamical system. If the system is continuous, returns the time derivative - dx/dt = f(t, x, u) + dx/dt = f(t, x, u, params) where `f` is the system's (possibly nonlinear) dynamics function. If the system is discrete-time, returns the next value of `x`: - x[t+dt] = f(t, x[t], u[t]) + x[t+dt] = f(t, x[t], u[t], params) - Where `t` is a scalar. + where `t` is a scalar. - The inputs `x` and `u` must be of the correct length. + The inputs `x` and `u` must be of the correct length. The `params` + argument is an optional dictionary of parameter values. Parameters ---------- @@ -395,6 +396,8 @@ def dynamics(self, t, x, u, params={}): current state u : array_like input + params : dict (optional) + system parameter values Returns ------- @@ -421,7 +424,7 @@ def output(self, t, x, u, params={}): Given time `t`, input `u` and state `x`, returns the output of the system: - y = g(t, x, u) + y = g(t, x, u[, params]) The inputs `x` and `u` must be of the correct length. @@ -433,6 +436,8 @@ def output(self, t, x, u, params={}): current state u : array_like input + params : dict (optional) + system parameter values Returns ------- From 878cf98479f6240bcfa1d29f81ca26454642581f Mon Sep 17 00:00:00 2001 From: Richard Murray Date: Sat, 12 Nov 2022 23:12:44 -0800 Subject: [PATCH 4/5] replace mutables as arguments with None, per @roryyorke suggestion --- control/iosys.py | 66 +++++++++++++++++++++++++----------------------- 1 file changed, 35 insertions(+), 31 deletions(-) diff --git a/control/iosys.py b/control/iosys.py index e3f37b8b6..58eb47db8 100644 --- a/control/iosys.py +++ b/control/iosys.py @@ -126,7 +126,7 @@ class for a set of subclasses that are used to implement specific # Allow ndarray * InputOutputSystem to give IOSystem._rmul_() priority __array_priority__ = 12 # override ndarray, matrix, SS types - def __init__(self, params={}, **kwargs): + def __init__(self, params=None, **kwargs): """Create an input/output system. The InputOutputSystem constructor is used to create an input/output @@ -148,7 +148,7 @@ def __init__(self, params={}, **kwargs): states=states, name=name, dt=dt) # default parameters - self.params = params.copy() + self.params = {} if params is None else params.copy() def __mul__(sys2, sys1): """Multiply two input/output systems (series interconnection)""" @@ -369,7 +369,7 @@ def _rhs(self, t, x, u): NotImplemented("Evaluation not implemented for system of type ", type(self)) - def dynamics(self, t, x, u, params={}): + def dynamics(self, t, x, u, params=None): """Compute the dynamics of a differential or difference equation. Given time `t`, input `u` and state `x`, returns the value of the @@ -418,7 +418,7 @@ def _out(self, t, x, u): # If no output function was defined in subclass, return state return x - def output(self, t, x, u, params={}): + def output(self, t, x, u, params=None): """Compute the output of the system Given time `t`, input `u` and state `x`, returns the output of the @@ -446,7 +446,7 @@ def output(self, t, x, u, params={}): self._update_params(params) return self._out(t, x, u) - def feedback(self, other=1, sign=-1, params={}): + def feedback(self, other=1, sign=-1, params=None): """Feedback interconnection between two input/output systems Parameters @@ -514,7 +514,7 @@ def feedback(self, other=1, sign=-1, params={}): # Return the newly created system return newsys - def linearize(self, x0, u0, t=0, params={}, eps=1e-6, + def linearize(self, x0, u0, t=0, params=None, eps=1e-6, name=None, copy=False, **kwargs): """Linearize an input/output system at a given state and input. @@ -658,7 +658,7 @@ def __init__(self, linsys, **kwargs): # Note: don't use super() to override StateSpace MRO InputOutputSystem.__init__( self, inputs=inputs, outputs=outputs, states=states, - params={}, dt=dt, name=name) + params=None, dt=dt, name=name) # Initalize additional state space variables StateSpace.__init__( @@ -675,7 +675,7 @@ def __init__(self, linsys, **kwargs): #: number of states, use :attr:`nstates`. states = property(StateSpace._get_states, StateSpace._set_states) - def _update_params(self, params={}, warning=True): + def _update_params(self, params=None, warning=True): # Parameters not supported; issue a warning if params and warning: warn("Parameters passed to LinearIOSystems are ignored.") @@ -763,7 +763,7 @@ class NonlinearIOSystem(InputOutputSystem): defaults. """ - def __init__(self, updfcn, outfcn=None, params={}, **kwargs): + def __init__(self, updfcn, outfcn=None, params=None, **kwargs): """Create a nonlinear I/O system given update and output functions.""" # Process keyword arguments name, inputs, outputs, states, dt = _process_namedio_keywords( @@ -798,7 +798,7 @@ def __init__(self, updfcn, outfcn=None, params={}, **kwargs): "(and nstates not known).") # Initialize current parameters to default parameters - self._current_params = params.copy() + self._current_params = {} if params is None else params.copy() def __str__(self): return f"{InputOutputSystem.__str__(self)}\n\n" + \ @@ -845,7 +845,8 @@ def __call__(sys, u, params=None, squeeze=None): def _update_params(self, params, warning=False): # Update the current parameter values self._current_params = self.params.copy() - self._current_params.update(params) + if params: + self._current_params.update(params) def _rhs(self, t, x, u): xdot = self.updfcn(t, x, u, self._current_params) \ @@ -869,20 +870,22 @@ class InterconnectedSystem(InputOutputSystem): See :func:`~control.interconnect` for a list of parameters. """ - def __init__(self, syslist, connections=[], inplist=[], outlist=[], - params={}, warn_duplicate=None, **kwargs): + def __init__(self, syslist, connections=None, inplist=None, outlist=None, + params=None, warn_duplicate=None, **kwargs): """Create an I/O system from a list of systems + connection info.""" # Convert input and output names to lists if they aren't already - if not isinstance(inplist, (list, tuple)): + if inplist is not None and not isinstance(inplist, (list, tuple)): inplist = [inplist] - if not isinstance(outlist, (list, tuple)): + if outlist is not None and not isinstance(outlist, (list, tuple)): outlist = [outlist] # Check if dt argument was given; if not, pull from systems dt = kwargs.pop('dt', None) # Process keyword arguments (except dt) - defaults = {'inputs': len(inplist), 'outputs': len(outlist)} + defaults = { + 'inputs': len(inplist or []), + 'outputs': len(outlist or [])} name, inputs, outputs, states, _ = _process_namedio_keywords( kwargs, defaults, end=True) @@ -982,7 +985,7 @@ def __init__(self, syslist, connections=[], inplist=[], outlist=[], # Convert the list of interconnections to a connection map (matrix) self.connect_map = np.zeros((ninputs, noutputs)) - for connection in connections: + for connection in connections or []: input_index = self._parse_input_spec(connection[0]) for output_spec in connection[1:]: output_index, gain = self._parse_output_spec(output_spec) @@ -993,7 +996,7 @@ def __init__(self, syslist, connections=[], inplist=[], outlist=[], # Convert the input list to a matrix: maps system to subsystems self.input_map = np.zeros((ninputs, self.ninputs)) - for index, inpspec in enumerate(inplist): + for index, inpspec in enumerate(inplist or []): if isinstance(inpspec, (int, str, tuple)): inpspec = [inpspec] if not isinstance(inpspec, list): @@ -1008,7 +1011,7 @@ def __init__(self, syslist, connections=[], inplist=[], outlist=[], # Convert the output list to a matrix: maps subsystems to system self.output_map = np.zeros((self.noutputs, noutputs + ninputs)) - for index, outspec in enumerate(outlist): + for index, outspec in enumerate(outlist or []): if isinstance(outspec, (int, str, tuple)): outspec = [outspec] if not isinstance(outspec, list): @@ -1022,13 +1025,14 @@ def __init__(self, syslist, connections=[], inplist=[], outlist=[], self.output_map[index, ylist_index] += gain # Save the parameters for the system - self.params = params.copy() + self.params = {} if params is None else params.copy() def _update_params(self, params, warning=False): for sys in self.syslist: local = sys.params.copy() # start with system parameters local.update(self.params) # update with global params - local.update(params) # update with locally passed parameters + if params: + local.update(params) # update with locally passed parameters sys._update_params(local, warning=warning) def _rhs(self, t, x, u): @@ -1578,7 +1582,7 @@ def __init__(self, io_sys, ss_sys=None): def input_output_response( - sys, T, U=0., X0=0, params={}, + sys, T, U=0., X0=0, params=None, transpose=False, return_x=False, squeeze=None, solve_ivp_kwargs={}, t_eval='T', **kwargs): """Compute the output response of a system to a given input. @@ -1913,7 +1917,7 @@ def ivp_rhs(t, x): transpose=transpose, return_x=return_x, squeeze=squeeze) -def find_eqpt(sys, x0, u0=[], y0=None, t=0, params={}, +def find_eqpt(sys, x0, u0=None, y0=None, t=0, params=None, iu=None, iy=None, ix=None, idx=None, dx0=None, return_y=False, return_result=False): """Find the equilibrium point for an input/output system. @@ -2164,7 +2168,7 @@ def rootfun(z): # Linearize an input/output system -def linearize(sys, xeq, ueq=[], t=0, params={}, **kw): +def linearize(sys, xeq, ueq=None, t=0, params=None, **kw): """Linearize an input/output system at a given state and input. This function computes the linearization of an input/output system at a @@ -2536,9 +2540,9 @@ def tf2io(*args, **kwargs): # Function to create an interconnected system -def interconnect(syslist, connections=None, inplist=[], outlist=[], params={}, - check_unused=True, ignore_inputs=None, ignore_outputs=None, - warn_duplicate=None, **kwargs): +def interconnect(syslist, connections=None, inplist=None, outlist=None, + params=None, check_unused=True, ignore_inputs=None, + ignore_outputs=None, warn_duplicate=None, **kwargs): """Interconnect a set of input/output systems. This function creates a new system that is an interconnection of a set of @@ -2780,10 +2784,10 @@ def interconnect(syslist, connections=None, inplist=[], outlist=[], params={}, connections = [] # If inplist/outlist is not present, try using inputs/outputs instead - if not inplist and inputs is not None: - inplist = list(inputs) - if not outlist and outputs is not None: - outlist = list(outputs) + if inplist is None: + inplist = list(inputs or []) + if outlist is None: + outlist = list(outputs or []) # Process input list if not isinstance(inplist, (list, tuple)): From a9a62267923ada25f44b18772ee2a5a3f1a08157 Mon Sep 17 00:00:00 2001 From: Richard Murray Date: Sat, 12 Nov 2022 23:22:36 -0800 Subject: [PATCH 5/5] update params docstrings + warnings per @sawyerbfuller suggestion --- control/iosys.py | 4 ++-- control/statesp.py | 10 ++++++++-- control/tests/statesp_test.py | 12 ++++++++++++ 3 files changed, 22 insertions(+), 4 deletions(-) diff --git a/control/iosys.py b/control/iosys.py index 58eb47db8..2bb445bdd 100644 --- a/control/iosys.py +++ b/control/iosys.py @@ -376,12 +376,12 @@ def dynamics(self, t, x, u, params=None): right hand side of the dynamical system. If the system is continuous, returns the time derivative - dx/dt = f(t, x, u, params) + dx/dt = f(t, x, u[, params]) where `f` is the system's (possibly nonlinear) dynamics function. If the system is discrete-time, returns the next value of `x`: - x[t+dt] = f(t, x[t], u[t], params) + x[t+dt] = f(t, x[t], u[t][, params]) where `t` is a scalar. diff --git a/control/statesp.py b/control/statesp.py index 374b036ca..a1fd84b20 100644 --- a/control/statesp.py +++ b/control/statesp.py @@ -1389,7 +1389,7 @@ def dcgain(self, warn_infinite=False): """ return self._dcgain(warn_infinite) - def dynamics(self, t, x, u=None): + def dynamics(self, t, x, u=None, params=None): """Compute the dynamics of the system Given input `u` and state `x`, returns the dynamics of the state-space @@ -1423,6 +1423,9 @@ def dynamics(self, t, x, u=None): dx/dt or x[t+dt] : ndarray """ + if params is not None: + warn("params keyword ignored for StateSpace object") + x = np.reshape(x, (-1, 1)) # force to a column in case matrix if np.size(x) != self.nstates: raise ValueError("len(x) must be equal to number of states") @@ -1435,7 +1438,7 @@ def dynamics(self, t, x, u=None): return (self.A @ x).reshape((-1,)) \ + (self.B @ u).reshape((-1,)) # return as row vector - def output(self, t, x, u=None): + def output(self, t, x, u=None, params=None): """Compute the output of the system Given input `u` and state `x`, returns the output `y` of the @@ -1465,6 +1468,9 @@ def output(self, t, x, u=None): ------- y : ndarray """ + if params is not None: + warn("params keyword ignored for StateSpace object") + x = np.reshape(x, (-1, 1)) # force to a column in case matrix if np.size(x) != self.nstates: raise ValueError("len(x) must be equal to number of states") diff --git a/control/tests/statesp_test.py b/control/tests/statesp_test.py index 97dc84e3c..e97584fbb 100644 --- a/control/tests/statesp_test.py +++ b/control/tests/statesp_test.py @@ -1158,3 +1158,15 @@ def test_linfnorm_ct_mimo(self, ct_siso): gpeak, fpeak = linfnorm(sys) np.testing.assert_allclose(gpeak, refgpeak) np.testing.assert_allclose(fpeak, reffpeak) + + +# Make sure that using params for StateSpace objects generates a warning +def test_params_warning(): + sys = StateSpace(-1, 1, 1, 0) + + with pytest.warns(UserWarning, match="params keyword ignored"): + sys.dynamics(0, [0], [0], {'k': 5}) + + with pytest.warns(UserWarning, match="params keyword ignored"): + sys.output(0, [0], [0], {'k': 5}) +