From 4023edd8b1aa77c70af48dd68345e3accd0d347e Mon Sep 17 00:00:00 2001 From: Richard Murray Date: Sun, 21 Nov 2021 13:37:05 -0800 Subject: [PATCH 1/5] add iosys conversions (mul, rmul, add, radd, sub, rsub) + PEP8 cleanup --- control/iosys.py | 198 ++++++++++++++++++-------- control/tests/iosys_test.py | 106 +++++++++++++- control/tests/type_conversion_test.py | 24 ++-- 3 files changed, 256 insertions(+), 72 deletions(-) diff --git a/control/iosys.py b/control/iosys.py index 28c6f2632..0e1cc06f2 100644 --- a/control/iosys.py +++ b/control/iosys.py @@ -32,6 +32,7 @@ from warnings import warn from .statesp import StateSpace, tf2ss, _convert_to_statespace +from .xferfcn import TransferFunction from .timeresp import _check_convert_array, _process_time_response, \ TimeResponseData from .lti import isctime, isdtime, common_timebase @@ -120,6 +121,9 @@ class for a set of subclasses that are used to implement specific """ + # Allow ndarray * InputOutputSystem to give IOSystem._rmul_() priority + __array_priority__ = 12 # override ndarray, matrix, SS types + _idCounter = 0 def _name_or_default(self, name=None): @@ -195,14 +199,19 @@ def __str__(self): def __mul__(sys2, sys1): """Multiply two input/output systems (series interconnection)""" + # Note: order of arguments is flipped so that self = sys2, + # corresponding to the ordering convention of sys2 * sys1 + # Convert sys1 to an I/O system if needed if isinstance(sys1, (int, float, np.number)): - # TODO: Scale the output - raise NotImplemented("Scalar multiplication not yet implemented") + sys1 = LinearIOSystem(StateSpace( + [], [], [], sys1 * np.eye(sys2.ninputs))) elif isinstance(sys1, np.ndarray): - # TODO: Post-multiply by a matrix - raise NotImplemented("Matrix multiplication not yet implemented") + sys1 = LinearIOSystem(StateSpace([], [], [], sys1)) + + elif isinstance(sys1, (StateSpace, TransferFunction)): + sys1 = LinearIOSystem(sys1) elif not isinstance(sys1, InputOutputSystem): raise TypeError("Unknown I/O system object ", sys1) @@ -239,42 +248,41 @@ def __mul__(sys2, sys1): def __rmul__(sys1, sys2): """Pre-multiply an input/output systems by a scalar/matrix""" - if isinstance(sys2, InputOutputSystem): - # Both systems are InputOutputSystems => use __mul__ - return InputOutputSystem.__mul__(sys2, sys1) - - elif isinstance(sys2, (int, float, np.number)): - # TODO: Scale the output - raise NotImplemented("Scalar multiplication not yet implemented") + # Convert sys2 to an I/O system if needed + if isinstance(sys2, (int, float, np.number)): + sys2 = LinearIOSystem(StateSpace( + [], [], [], sys2 * np.eye(sys1.noutputs))) elif isinstance(sys2, np.ndarray): - # TODO: Post-multiply by a matrix - raise NotImplemented("Matrix multiplication not yet implemented") + sys2 = LinearIOSystem(StateSpace([], [], [], sys2)) - elif isinstance(sys2, StateSpace): - # TODO: Should eventuall preserve LinearIOSystem structure - return StateSpace.__mul__(sys2, sys1) + elif isinstance(sys2, (StateSpace, TransferFunction)): + sys2 = LinearIOSystem(sys2) - else: - raise TypeError("Unknown I/O system object ", sys1) + elif not isinstance(sys2, InputOutputSystem): + raise TypeError("Unknown I/O system object ", sys2) + + return InputOutputSystem.__mul__(sys2, sys1) def __add__(sys1, sys2): """Add two input/output systems (parallel interconnection)""" - # TODO: Allow addition of scalars and matrices + # Convert sys1 to an I/O system if needed if isinstance(sys2, (int, float, np.number)): - # TODO: Scale the output - raise NotImplemented("Scalar addition not yet implemented") + sys2 = LinearIOSystem(StateSpace( + [], [], [], sys2 * np.eye(sys1.ninputs))) elif isinstance(sys2, np.ndarray): - # TODO: Post-multiply by a matrix - raise NotImplemented("Matrix addition not yet implemented") + sys2 = LinearIOSystem(StateSpace([], [], [], sys2)) + + elif isinstance(sys2, (StateSpace, TransferFunction)): + sys2 = LinearIOSystem(sys2) elif not isinstance(sys2, InputOutputSystem): raise TypeError("Unknown I/O system object ", sys2) # Make sure number of input and outputs match if sys1.ninputs != sys2.ninputs or sys1.noutputs != sys2.noutputs: - raise ValueError("Can't add systems with different numbers of " + raise ValueError("Can't add systems with incompatible numbers of " "inputs or outputs.") ninputs = sys1.ninputs noutputs = sys1.noutputs @@ -293,16 +301,87 @@ def __add__(sys1, sys2): # Return the newly created InterconnectedSystem return newsys - # TODO: add __radd__ to allow postaddition by scalars and matrices + def __radd__(sys1, sys2): + """Parallel addition of input/output system to a compatible object.""" + # Convert sys2 to an I/O system if needed + if isinstance(sys2, (int, float, np.number)): + sys2 = LinearIOSystem(StateSpace( + [], [], [], sys2 * np.eye(sys1.noutputs))) + + elif isinstance(sys2, np.ndarray): + sys2 = LinearIOSystem(StateSpace([], [], [], sys2)) + + elif isinstance(sys2, (StateSpace, TransferFunction)): + sys2 = LinearIOSystem(sys2) + + elif not isinstance(sys2, InputOutputSystem): + raise TypeError("Unknown I/O system object ", sys2) + + return InputOutputSystem.__add__(sys2, sys1) + + def __sub__(sys1, sys2): + """Subtract two input/output systems (parallel interconnection)""" + # Convert sys1 to an I/O system if needed + if isinstance(sys2, (int, float, np.number)): + sys2 = LinearIOSystem(StateSpace( + [], [], [], sys2 * np.eye(sys1.ninputs))) + + elif isinstance(sys2, np.ndarray): + sys2 = LinearIOSystem(StateSpace([], [], [], sys2)) + + elif isinstance(sys2, (StateSpace, TransferFunction)): + sys2 = LinearIOSystem(sys2) + + elif not isinstance(sys2, InputOutputSystem): + raise TypeError("Unknown I/O system object ", sys2) + + # Make sure number of input and outputs match + if sys1.ninputs != sys2.ninputs or sys1.noutputs != sys2.noutputs: + raise ValueError("Can't add systems with incompatible numbers of " + "inputs or outputs.") + ninputs = sys1.ninputs + noutputs = sys1.noutputs + + # Create a new system to handle the composition + inplist = [[(0, i), (1, i)] for i in range(ninputs)] + outlist = [[(0, i), (1, i, -1)] for i in range(noutputs)] + newsys = InterconnectedSystem( + (sys1, sys2), inplist=inplist, outlist=outlist) + + # If both systems are linear, create LinearICSystem + if isinstance(sys1, StateSpace) and isinstance(sys2, StateSpace): + ss_sys = StateSpace.__sub__(sys1, sys2) + return LinearICSystem(newsys, ss_sys) + + # Return the newly created InterconnectedSystem + return newsys + + def __rsub__(sys1, sys2): + """Parallel subtraction of I/O system to a compatible object.""" + # Convert sys2 to an I/O system if needed + if isinstance(sys2, (int, float, np.number)): + sys2 = LinearIOSystem(StateSpace( + [], [], [], sys2 * np.eye(sys1.noutputs))) + + elif isinstance(sys2, np.ndarray): + sys2 = LinearIOSystem(StateSpace([], [], [], sys2)) + + elif isinstance(sys2, (StateSpace, TransferFunction)): + sys2 = LinearIOSystem(sys2) + + elif not isinstance(sys2, InputOutputSystem): + raise TypeError("Unknown I/O system object ", sys2) + + return InputOutputSystem.__sub__(sys2, sys1) def __neg__(sys): """Negate an input/output systems (rescale)""" if sys.ninputs is None or sys.noutputs is None: raise ValueError("Can't determine number of inputs or outputs") + # Create a new system to hold the negation inplist = [(0, i) for i in range(sys.ninputs)] outlist = [(0, i, -1) for i in range(sys.noutputs)] - # Create a new system to hold the negation newsys = InterconnectedSystem( (sys,), dt=sys.dt, inplist=inplist, outlist=outlist) @@ -667,8 +746,8 @@ class LinearIOSystem(InputOutputSystem, StateSpace): Parameters ---------- - linsys : StateSpace - LTI StateSpace system to be converted + linsys : StateSpace or TransferFunction + LTI system to be converted inputs : int, list of str or None, optional Description of the system inputs. This can be given as an integer count or as a list of strings that name the individual signals. If an @@ -711,12 +790,16 @@ def __init__(self, linsys, inputs=None, outputs=None, states=None, states. The new system can be a continuous or discrete time system. """ - if not isinstance(linsys, StateSpace): + if isinstance(linsys, TransferFunction): + # Convert system to StateSpace + linsys = _convert_to_statespace(linsys) + + elif not isinstance(linsys, StateSpace): raise TypeError("Linear I/O system must be a state space object") # Look for 'input' and 'output' parameter name variants inputs = _parse_signal_parameter(inputs, 'input', kwargs) - outputs = _parse_signal_parameter(outputs, 'output', kwargs, end=True) + outputs = _parse_signal_parameter(outputs, 'output', kwargs, end=True) # Create the I/O system object super(LinearIOSystem, self).__init__( @@ -837,7 +920,7 @@ def __init__(self, updfcn, outfcn=None, inputs=None, outputs=None, """Create a nonlinear I/O system given update and output functions.""" # Look for 'input' and 'output' parameter name variants inputs = _parse_signal_parameter(inputs, 'input', kwargs) - outputs = _parse_signal_parameter(outputs, 'output', kwargs) + outputs = _parse_signal_parameter(outputs, 'output', kwargs) # Store the update and output functions self.updfcn = updfcn @@ -1399,13 +1482,12 @@ def set_output_map(self, output_map): self.output_map = output_map self.noutputs = output_map.shape[0] - def unused_signals(self): """Find unused subsystem inputs and outputs Returns ------- - + unused_inputs : dict A mapping from tuple of indices (isys, isig) to string @@ -1430,66 +1512,61 @@ def unused_signals(self): unused_sysinp = sorted(set(range(nsubsysinp)) - used_sysinp) unused_sysout = sorted(set(range(nsubsysout)) - used_sysout) - inputs = [(isys,isig, f'{sys.name}.{sig}') + inputs = [(isys, isig, f'{sys.name}.{sig}') for isys, sys in enumerate(self.syslist) for sig, isig in sys.input_index.items()] - outputs = [(isys,isig,f'{sys.name}.{sig}') + outputs = [(isys, isig, f'{sys.name}.{sig}') for isys, sys in enumerate(self.syslist) for sig, isig in sys.output_index.items()] - return ({inputs[i][:2]:inputs[i][2] - for i in unused_sysinp}, - {outputs[i][:2]:outputs[i][2] - for i in unused_sysout}) - + return ({inputs[i][:2]: inputs[i][2] for i in unused_sysinp}, + {outputs[i][:2]: outputs[i][2] for i in unused_sysout}) def _find_inputs_by_basename(self, basename): """Find all subsystem inputs matching basename Returns ------- - Mapping from (isys, isig) to '{sys}.{sig}' + Mapping from (isys, isig) to '{sys}.{sig}' """ - return {(isys, isig) : f'{sys.name}.{basename}' + return {(isys, isig): f'{sys.name}.{basename}' for isys, sys in enumerate(self.syslist) for sig, isig in sys.input_index.items() if sig == (basename)} - def _find_outputs_by_basename(self, basename): """Find all subsystem outputs matching basename Returns ------- - Mapping from (isys, isig) to '{sys}.{sig}' + Mapping from (isys, isig) to '{sys}.{sig}' """ - return {(isys, isig) : f'{sys.name}.{basename}' + return {(isys, isig): f'{sys.name}.{basename}' for isys, sys in enumerate(self.syslist) for sig, isig in sys.output_index.items() if sig == (basename)} - def check_unused_signals(self, ignore_inputs=None, ignore_outputs=None): """Check for unused subsystem inputs and outputs If any unused inputs or outputs are found, emit a warning. - + Parameters ---------- ignore_inputs : list of input-spec Subsystem inputs known to be unused. input-spec can be any of: 'sig', 'sys.sig', (isys, isig), ('sys', isig) - + If the 'sig' form is used, all subsystem inputs with that name are considered ignored. ignore_outputs : list of output-spec Subsystem outputs known to be unused. output-spec can be any of: 'sig', 'sys.sig', (isys, isig), ('sys', isig) - + If the 'sig' form is used, all subsystem outputs with that name are considered ignored. @@ -1509,10 +1586,12 @@ def check_unused_signals(self, ignore_inputs=None, ignore_outputs=None): if isinstance(ignore_input, str) and '.' not in ignore_input: ignore_idxs = self._find_inputs_by_basename(ignore_input) if not ignore_idxs: - raise ValueError(f"Couldn't find ignored input {ignore_input} in subsystems") + raise ValueError(f"Couldn't find ignored input " + "{ignore_input} in subsystems") ignore_input_map.update(ignore_idxs) else: - ignore_input_map[self._parse_signal(ignore_input, 'input')[:2]] = ignore_input + ignore_input_map[self._parse_signal( + ignore_input, 'input')[:2]] = ignore_input # (isys, isig) -> signal-spec ignore_output_map = {} @@ -1520,16 +1599,18 @@ def check_unused_signals(self, ignore_inputs=None, ignore_outputs=None): if isinstance(ignore_output, str) and '.' not in ignore_output: ignore_found = self._find_outputs_by_basename(ignore_output) if not ignore_found: - raise ValueError(f"Couldn't find ignored output {ignore_output} in subsystems") + raise ValueError(f"Couldn't find ignored output " + "{ignore_output} in subsystems") ignore_output_map.update(ignore_found) else: - ignore_output_map[self._parse_signal(ignore_output, 'output')[:2]] = ignore_output + ignore_output_map[self._parse_signal( + ignore_output, 'output')[:2]] = ignore_output dropped_inputs = set(unused_inputs) - set(ignore_input_map) dropped_outputs = set(unused_outputs) - set(ignore_output_map) used_ignored_inputs = set(ignore_input_map) - set(unused_inputs) - used_ignored_outputs = set(ignore_output_map) - set(unused_outputs) + used_ignored_outputs = set(ignore_output_map) - set(unused_outputs) if dropped_inputs: msg = ('Unused input(s) in InterconnectedSystem: ' @@ -2407,7 +2488,7 @@ def interconnect(syslist, connections=None, inplist=[], outlist=[], """ # Look for 'input' and 'output' parameter name variants inputs = _parse_signal_parameter(inputs, 'input', kwargs) - outputs = _parse_signal_parameter(outputs, 'output', kwargs, end=True) + outputs = _parse_signal_parameter(outputs, 'output', kwargs, end=True) if not check_unused and (ignore_inputs or ignore_outputs): raise ValueError('check_unused is False, but either ' @@ -2507,7 +2588,6 @@ def interconnect(syslist, connections=None, inplist=[], outlist=[], inputs=inputs, outputs=outputs, states=states, params=params, dt=dt, name=name) - # check for implicity dropped signals if check_unused: newsys.check_unused_signals(ignore_inputs, ignore_outputs) @@ -2598,7 +2678,7 @@ def _parse_list(signals, signame='input', prefix='u'): # Look for 'input' and 'output' parameter name variants inputs = _parse_signal_parameter(inputs, 'input', kwargs) - output = _parse_signal_parameter(output, 'outputs', kwargs, end=True) + output = _parse_signal_parameter(output, 'outputs', kwargs, end=True) # Default values for inputs and output if inputs is None: @@ -2623,8 +2703,8 @@ def _parse_list(signals, signame='input', prefix='u'): ninputs = ninputs * dimension output_names = ["%s[%d]" % (name, dim) - for name in output_names - for dim in range(dimension)] + for name in output_names + for dim in range(dimension)] noutputs = noutputs * dimension elif dimension is not None: raise ValueError( diff --git a/control/tests/iosys_test.py b/control/tests/iosys_test.py index ba56fcea3..abea9be9d 100644 --- a/control/tests/iosys_test.py +++ b/control/tests/iosys_test.py @@ -1,4 +1,4 @@ -"""iosys_test.py - test input/output system oeprations +"""iosys_test.py - test input/output system operations RMM, 17 Apr 2019 @@ -595,6 +595,58 @@ def test_bdalg_functions(self, tsys): ios_t, ios_y = ios.input_output_response(iosys_feedback, T, U, X0) np.testing.assert_allclose(ios_y, lin_y,atol=0.002,rtol=0.) + @noscipy0 + def test_algebraic_functions(self, tsys): + """Test algebraic operations on I/O systems""" + # Set up parameters for simulation + T = tsys.T + U = [np.sin(T), np.cos(T)] + X0 = 0 + + # Set up systems to be composed + linsys1 = tsys.mimo_linsys1 + linio1 = ios.LinearIOSystem(linsys1) + linsys2 = tsys.mimo_linsys2 + linio2 = ios.LinearIOSystem(linsys2) + + # Multiplication + linsys_mul = linsys2 * linsys1 + iosys_mul = linio2 * linio1 + lin_t, lin_y = ct.forced_response(linsys_mul, T, U, X0) + ios_t, ios_y = ios.input_output_response(iosys_mul, T, U, X0) + np.testing.assert_allclose(ios_y, lin_y,atol=0.002,rtol=0.) + + # Make sure that systems don't commute + linsys_mul = linsys1 * linsys2 + lin_t, lin_y = ct.forced_response(linsys_mul, T, U, X0) + assert not (np.abs(lin_y - ios_y) < 1e-3).all() + + # Addition + linsys_add = linsys1 + linsys2 + iosys_add = linio1 + linio2 + lin_t, lin_y = ct.forced_response(linsys_add, T, U, X0) + ios_t, ios_y = ios.input_output_response(iosys_add, T, U, X0) + np.testing.assert_allclose(ios_y, lin_y,atol=0.002,rtol=0.) + + # Subtraction + linsys_sub = linsys1 - linsys2 + iosys_sub = linio1 - linio2 + lin_t, lin_y = ct.forced_response(linsys_sub, T, U, X0) + ios_t, ios_y = ios.input_output_response(iosys_sub, T, U, X0) + np.testing.assert_allclose(ios_y, lin_y,atol=0.002,rtol=0.) + + # Make sure that systems don't commute + linsys_sub = linsys2 - linsys1 + lin_t, lin_y = ct.forced_response(linsys_sub, T, U, X0) + assert not (np.abs(lin_y - ios_y) < 1e-3).all() + + # Negation + linsys_negate = -linsys1 + iosys_negate = -linio1 + lin_t, lin_y = ct.forced_response(linsys_negate, T, U, X0) + ios_t, ios_y = ios.input_output_response(iosys_negate, T, U, X0) + np.testing.assert_allclose(ios_y, lin_y,atol=0.002,rtol=0.) + @noscipy0 def test_nonsquare_bdalg(self, tsys): # Set up parameters for simulation @@ -1196,6 +1248,58 @@ def test_lineariosys_statespace(self, tsys): np.testing.assert_allclose(io_series.C, ss_series.C) np.testing.assert_allclose(io_series.D, ss_series.D) + @pytest.mark.parametrize( + "Pout, Pin, C, op, PCout, PCin", [ + (2, 2, ct.rss(2, 2, 2), ct.LinearIOSystem.__mul__, 2, 2), + (2, 2, 2, ct.LinearIOSystem.__mul__, 2, 2), + (2, 3, 2, ct.LinearIOSystem.__mul__, 2, 3), + (2, 2, np.random.rand(2, 2), ct.LinearIOSystem.__mul__, 2, 2), + (2, 2, ct.rss(2, 2, 2), ct.LinearIOSystem.__rmul__, 2, 2), + (2, 2, 2, ct.LinearIOSystem.__rmul__, 2, 2), + (2, 3, 2, ct.LinearIOSystem.__rmul__, 2, 3), + (2, 2, np.random.rand(2, 2), ct.LinearIOSystem.__rmul__, 2, 2), + (2, 2, ct.rss(2, 2, 2), ct.LinearIOSystem.__add__, 2, 2), + (2, 2, 2, ct.LinearIOSystem.__add__, 2, 2), + (2, 2, np.random.rand(2, 2), ct.LinearIOSystem.__add__, 2, 2), + (2, 2, ct.rss(2, 2, 2), ct.LinearIOSystem.__radd__, 2, 2), + (2, 2, 2, ct.LinearIOSystem.__radd__, 2, 2), + (2, 2, np.random.rand(2, 2), ct.LinearIOSystem.__radd__, 2, 2), + (2, 2, ct.rss(2, 2, 2), ct.LinearIOSystem.__sub__, 2, 2), + (2, 2, 2, ct.LinearIOSystem.__sub__, 2, 2), + (2, 2, np.random.rand(2, 2), ct.LinearIOSystem.__sub__, 2, 2), + (2, 2, ct.rss(2, 2, 2), ct.LinearIOSystem.__rsub__, 2, 2), + (2, 2, 2, ct.LinearIOSystem.__rsub__, 2, 2), + (2, 2, np.random.rand(2, 2), ct.LinearIOSystem.__rsub__, 2, 2), + + ]) + def test_operand_conversion(self, Pout, Pin, C, op, PCout, PCin): + P = ct.LinearIOSystem( + ct.rss(2, Pout, Pin, strictly_proper=True), name='P') + PC = op(P, C) + assert isinstance(PC, ct.LinearIOSystem) + assert isinstance(PC, ct.StateSpace) + assert PC.noutputs == PCout + assert PC.ninputs == PCin + + @pytest.mark.parametrize( + "Pout, Pin, C, op", [ + (2, 2, ct.rss(2, 3, 2), ct.LinearIOSystem.__mul__), + (2, 2, ct.rss(2, 2, 3), ct.LinearIOSystem.__rmul__), + (2, 2, ct.rss(2, 3, 2), ct.LinearIOSystem.__add__), + (2, 2, ct.rss(2, 2, 3), ct.LinearIOSystem.__radd__), + (2, 3, 2, ct.LinearIOSystem.__add__), + (2, 3, 2, ct.LinearIOSystem.__radd__), + (2, 2, ct.rss(2, 3, 2), ct.LinearIOSystem.__sub__), + (2, 2, ct.rss(2, 2, 3), ct.LinearIOSystem.__rsub__), + (2, 3, 2, ct.LinearIOSystem.__sub__), + (2, 3, 2, ct.LinearIOSystem.__rsub__), + ]) + def test_operand_incompatible(self, Pout, Pin, C, op): + P = ct.LinearIOSystem( + ct.rss(2, Pout, Pin, strictly_proper=True), name='P') + with pytest.raises(ValueError, match="incompatible"): + PC = op(P, C) + def test_docstring_example(self): P = ct.LinearIOSystem( ct.rss(2, 2, 2, strictly_proper=True), name='P') diff --git a/control/tests/type_conversion_test.py b/control/tests/type_conversion_test.py index 3f51c2bbc..dadcc587e 100644 --- a/control/tests/type_conversion_test.py +++ b/control/tests/type_conversion_test.py @@ -62,28 +62,28 @@ def sys_dict(): ('add', 'ss', ['ss', 'ss', 'xrd', 'ss', 'xos', 'ss', 'ss' ]), ('add', 'tf', ['tf', 'tf', 'xrd', 'tf', 'xos', 'tf', 'tf' ]), ('add', 'frd', ['xrd', 'xrd', 'frd', 'xrd', 'E', 'xrd', 'xrd']), - ('add', 'lio', ['xio', 'xio', 'xrd', 'lio', 'ios', 'xio', 'xio']), - ('add', 'ios', ['xos', 'xos', 'E', 'ios', 'ios', 'xos', 'xos']), - ('add', 'arr', ['ss', 'tf', 'xrd', 'xio', 'xos', 'arr', 'arr']), - ('add', 'flt', ['ss', 'tf', 'xrd', 'xio', 'xos', 'arr', 'flt']), + ('add', 'lio', ['lio', 'lio', 'xrd', 'lio', 'ios', 'lio', 'lio']), + ('add', 'ios', ['ios', 'ios', 'E', 'ios', 'ios', 'ios', 'ios']), + ('add', 'arr', ['ss', 'tf', 'xrd', 'lio', 'ios', 'arr', 'arr']), + ('add', 'flt', ['ss', 'tf', 'xrd', 'lio', 'ios', 'arr', 'flt']), # op left ss tf frd lio ios arr flt ('sub', 'ss', ['ss', 'ss', 'xrd', 'ss', 'xos', 'ss', 'ss' ]), ('sub', 'tf', ['tf', 'tf', 'xrd', 'tf', 'xos', 'tf', 'tf' ]), ('sub', 'frd', ['xrd', 'xrd', 'frd', 'xrd', 'E', 'xrd', 'xrd']), - ('sub', 'lio', ['xio', 'xio', 'xrd', 'lio', 'ios', 'xio', 'xio']), - ('sub', 'ios', ['xos', 'xio', 'E', 'ios', 'xos' 'xos', 'xos']), - ('sub', 'arr', ['ss', 'tf', 'xrd', 'xio', 'xos', 'arr', 'arr']), - ('sub', 'flt', ['ss', 'tf', 'xrd', 'xio', 'xos', 'arr', 'flt']), + ('sub', 'lio', ['lio', 'lio', 'xrd', 'lio', 'ios', 'lio', 'lio']), + ('sub', 'ios', ['ios', 'ios', 'E', 'ios', 'ios', 'ios', 'ios']), + ('sub', 'arr', ['ss', 'tf', 'xrd', 'lio', 'ios', 'arr', 'arr']), + ('sub', 'flt', ['ss', 'tf', 'xrd', 'lio', 'ios', 'arr', 'flt']), # op left ss tf frd lio ios arr flt ('mul', 'ss', ['ss', 'ss', 'xrd', 'ss', 'xos', 'ss', 'ss' ]), ('mul', 'tf', ['tf', 'tf', 'xrd', 'tf', 'xos', 'tf', 'tf' ]), ('mul', 'frd', ['xrd', 'xrd', 'frd', 'xrd', 'E', 'xrd', 'frd']), - ('mul', 'lio', ['xio', 'xio', 'xrd', 'lio', 'ios', 'xio', 'xio']), - ('mul', 'ios', ['xos', 'xos', 'E', 'ios', 'ios', 'xos', 'xos']), - ('mul', 'arr', ['ss', 'tf', 'xrd', 'xio', 'xos', 'arr', 'arr']), - ('mul', 'flt', ['ss', 'tf', 'frd', 'xio', 'xos', 'arr', 'flt']), + ('mul', 'lio', ['lio', 'lio', 'xrd', 'lio', 'ios', 'lio', 'lio']), + ('mul', 'ios', ['ios', 'ios', 'E', 'ios', 'ios', 'ios', 'ios']), + ('mul', 'arr', ['ss', 'tf', 'xrd', 'lio', 'ios', 'arr', 'arr']), + ('mul', 'flt', ['ss', 'tf', 'frd', 'lio', 'ios', 'arr', 'flt']), # op left ss tf frd lio ios arr flt ('truediv', 'ss', ['xs', 'tf', 'xrd', 'xio', 'xos', 'xs', 'xs' ]), From 8912b7714a0e8de2bb68a2d52bffc8d273b8eebd Mon Sep 17 00:00:00 2001 From: Richard Murray Date: Sun, 21 Nov 2021 13:52:24 -0800 Subject: [PATCH 2/5] update tests to avoid NumPy matrix deprecation --- control/tests/iosys_test.py | 31 +++++++++++++++++++------------ 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/control/tests/iosys_test.py b/control/tests/iosys_test.py index abea9be9d..864a0b3bc 100644 --- a/control/tests/iosys_test.py +++ b/control/tests/iosys_test.py @@ -1250,24 +1250,24 @@ def test_lineariosys_statespace(self, tsys): @pytest.mark.parametrize( "Pout, Pin, C, op, PCout, PCin", [ - (2, 2, ct.rss(2, 2, 2), ct.LinearIOSystem.__mul__, 2, 2), + (2, 2, 'rss', ct.LinearIOSystem.__mul__, 2, 2), (2, 2, 2, ct.LinearIOSystem.__mul__, 2, 2), (2, 3, 2, ct.LinearIOSystem.__mul__, 2, 3), (2, 2, np.random.rand(2, 2), ct.LinearIOSystem.__mul__, 2, 2), - (2, 2, ct.rss(2, 2, 2), ct.LinearIOSystem.__rmul__, 2, 2), + (2, 2, 'rss', ct.LinearIOSystem.__rmul__, 2, 2), (2, 2, 2, ct.LinearIOSystem.__rmul__, 2, 2), (2, 3, 2, ct.LinearIOSystem.__rmul__, 2, 3), (2, 2, np.random.rand(2, 2), ct.LinearIOSystem.__rmul__, 2, 2), - (2, 2, ct.rss(2, 2, 2), ct.LinearIOSystem.__add__, 2, 2), + (2, 2, 'rss', ct.LinearIOSystem.__add__, 2, 2), (2, 2, 2, ct.LinearIOSystem.__add__, 2, 2), (2, 2, np.random.rand(2, 2), ct.LinearIOSystem.__add__, 2, 2), - (2, 2, ct.rss(2, 2, 2), ct.LinearIOSystem.__radd__, 2, 2), + (2, 2, 'rss', ct.LinearIOSystem.__radd__, 2, 2), (2, 2, 2, ct.LinearIOSystem.__radd__, 2, 2), (2, 2, np.random.rand(2, 2), ct.LinearIOSystem.__radd__, 2, 2), - (2, 2, ct.rss(2, 2, 2), ct.LinearIOSystem.__sub__, 2, 2), + (2, 2, 'rss', ct.LinearIOSystem.__sub__, 2, 2), (2, 2, 2, ct.LinearIOSystem.__sub__, 2, 2), (2, 2, np.random.rand(2, 2), ct.LinearIOSystem.__sub__, 2, 2), - (2, 2, ct.rss(2, 2, 2), ct.LinearIOSystem.__rsub__, 2, 2), + (2, 2, 'rss', ct.LinearIOSystem.__rsub__, 2, 2), (2, 2, 2, ct.LinearIOSystem.__rsub__, 2, 2), (2, 2, np.random.rand(2, 2), ct.LinearIOSystem.__rsub__, 2, 2), @@ -1275,6 +1275,9 @@ def test_lineariosys_statespace(self, tsys): def test_operand_conversion(self, Pout, Pin, C, op, PCout, PCin): P = ct.LinearIOSystem( ct.rss(2, Pout, Pin, strictly_proper=True), name='P') + if isinstance(C, str) and C == 'rss': + # Need to generate inside class to avoid matrix deprecation error + C = ct.rss(2, 2, 2) PC = op(P, C) assert isinstance(PC, ct.LinearIOSystem) assert isinstance(PC, ct.StateSpace) @@ -1283,20 +1286,24 @@ def test_operand_conversion(self, Pout, Pin, C, op, PCout, PCin): @pytest.mark.parametrize( "Pout, Pin, C, op", [ - (2, 2, ct.rss(2, 3, 2), ct.LinearIOSystem.__mul__), - (2, 2, ct.rss(2, 2, 3), ct.LinearIOSystem.__rmul__), - (2, 2, ct.rss(2, 3, 2), ct.LinearIOSystem.__add__), - (2, 2, ct.rss(2, 2, 3), ct.LinearIOSystem.__radd__), + (2, 2, 'rss32', ct.LinearIOSystem.__mul__), + (2, 2, 'rss23', ct.LinearIOSystem.__rmul__), + (2, 2, 'rss32', ct.LinearIOSystem.__add__), + (2, 2, 'rss23', ct.LinearIOSystem.__radd__), (2, 3, 2, ct.LinearIOSystem.__add__), (2, 3, 2, ct.LinearIOSystem.__radd__), - (2, 2, ct.rss(2, 3, 2), ct.LinearIOSystem.__sub__), - (2, 2, ct.rss(2, 2, 3), ct.LinearIOSystem.__rsub__), + (2, 2, 'rss32', ct.LinearIOSystem.__sub__), + (2, 2, 'rss23', ct.LinearIOSystem.__rsub__), (2, 3, 2, ct.LinearIOSystem.__sub__), (2, 3, 2, ct.LinearIOSystem.__rsub__), ]) def test_operand_incompatible(self, Pout, Pin, C, op): P = ct.LinearIOSystem( ct.rss(2, Pout, Pin, strictly_proper=True), name='P') + if isinstance(C, str) and C == 'rss32': + C = ct.rss(2, 3, 2) + elif isinstance(C, str) and C == 'rss23': + C = ct.rss(2, 2, 3) with pytest.raises(ValueError, match="incompatible"): PC = op(P, C) From be9e7eabf86b3efa025a0e6854a276cf32782b35 Mon Sep 17 00:00:00 2001 From: Richard Murray Date: Sun, 21 Nov 2021 14:42:43 -0800 Subject: [PATCH 3/5] add a few more unit tests for coverage --- control/iosys.py | 2 +- control/tests/iosys_test.py | 26 ++++++++++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/control/iosys.py b/control/iosys.py index 0e1cc06f2..7365e2b40 100644 --- a/control/iosys.py +++ b/control/iosys.py @@ -423,7 +423,7 @@ def _find_signal(self, name, sigdict): return sigdict.get(name, None) # Update parameters used for _rhs, _out (used by subclasses) def _update_params(self, params, warning=False): - if (warning): + if warning: warn("Parameters passed to InputOutputSystem ignored.") def _rhs(self, t, x, u, params={}): diff --git a/control/tests/iosys_test.py b/control/tests/iosys_test.py index 864a0b3bc..4c8001797 100644 --- a/control/tests/iosys_test.py +++ b/control/tests/iosys_test.py @@ -1307,6 +1307,32 @@ def test_operand_incompatible(self, Pout, Pin, C, op): with pytest.raises(ValueError, match="incompatible"): PC = op(P, C) + @pytest.mark.parametrize( + "C, op", [ + (None, ct.LinearIOSystem.__mul__), + (None, ct.LinearIOSystem.__rmul__), + (None, ct.LinearIOSystem.__add__), + (None, ct.LinearIOSystem.__radd__), + (None, ct.LinearIOSystem.__sub__), + (None, ct.LinearIOSystem.__rsub__), + ]) + def test_operand_badtype(self, C, op): + P = ct.LinearIOSystem( + ct.rss(2, 2, 2, strictly_proper=True), name='P') + with pytest.raises(TypeError, match="Unknown"): + op(P, C) + + def test_neg_badsize(self): + # Create a system of unspecified size + sys = ct.InputOutputSystem() + with pytest.raises(ValueError, match="Can't determine"): + -sys + + def test_bad_signal_list(self): + # Create a ystem with a bad signal list + with pytest.raises(TypeError, match="Can't parse"): + ct.InputOutputSystem(inputs=[1, 2, 3]) + def test_docstring_example(self): P = ct.LinearIOSystem( ct.rss(2, 2, 2, strictly_proper=True), name='P') From 93c3f5c7ed00c047936936a692627b7224f91909 Mon Sep 17 00:00:00 2001 From: Richard Murray Date: Fri, 26 Nov 2021 09:51:19 -0800 Subject: [PATCH 4/5] respond to review comments from @bnavigator --- control/iosys.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/control/iosys.py b/control/iosys.py index 7365e2b40..5cbfedfa4 100644 --- a/control/iosys.py +++ b/control/iosys.py @@ -795,7 +795,8 @@ def __init__(self, linsys, inputs=None, outputs=None, states=None, linsys = _convert_to_statespace(linsys) elif not isinstance(linsys, StateSpace): - raise TypeError("Linear I/O system must be a state space object") + raise TypeError("Linear I/O system must be a state space " + "or transfer function object") # Look for 'input' and 'output' parameter name variants inputs = _parse_signal_parameter(inputs, 'input', kwargs) @@ -1586,8 +1587,8 @@ def check_unused_signals(self, ignore_inputs=None, ignore_outputs=None): if isinstance(ignore_input, str) and '.' not in ignore_input: ignore_idxs = self._find_inputs_by_basename(ignore_input) if not ignore_idxs: - raise ValueError(f"Couldn't find ignored input " - "{ignore_input} in subsystems") + raise ValueError("Couldn't find ignored input " + f"{ignore_input} in subsystems") ignore_input_map.update(ignore_idxs) else: ignore_input_map[self._parse_signal( @@ -1599,8 +1600,8 @@ def check_unused_signals(self, ignore_inputs=None, ignore_outputs=None): if isinstance(ignore_output, str) and '.' not in ignore_output: ignore_found = self._find_outputs_by_basename(ignore_output) if not ignore_found: - raise ValueError(f"Couldn't find ignored output " - "{ignore_output} in subsystems") + raise ValueError("Couldn't find ignored output " + f"{ignore_output} in subsystems") ignore_output_map.update(ignore_found) else: ignore_output_map[self._parse_signal( From a44d1a42ff7c7b4ad5510a06ffda785156f30699 Mon Sep 17 00:00:00 2001 From: Richard Murray Date: Fri, 26 Nov 2021 13:37:50 -0800 Subject: [PATCH 5/5] TRV: fix docstring typo --- control/iosys.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/control/iosys.py b/control/iosys.py index 5cbfedfa4..2c9e3aba5 100644 --- a/control/iosys.py +++ b/control/iosys.py @@ -2252,7 +2252,7 @@ def _find_size(sysval, vecval): """ if hasattr(vecval, '__len__'): if sysval is not None and sysval != len(vecval): - raise ValueError("Inconsistend information to determine size " + raise ValueError("Inconsistent information to determine size " "of system component") return len(vecval) # None or 0, which is a valid value for "a (sysval, ) vector of zeros".