diff --git a/control/iosys.py b/control/iosys.py index 99f0e7db6..53cda7d19 100644 --- a/control/iosys.py +++ b/control/iosys.py @@ -539,7 +539,7 @@ def isctime(sys, strict=False): return sys.isctime(strict) -# Utility function to parse nameio keywords +# Utility function to parse iosys keywords def _process_iosys_keywords( keywords={}, defaults={}, static=False, end=False): """Process iosys specification. @@ -611,7 +611,7 @@ def pop_with_default(kw, defval=None, return_list=True): return name, inputs, outputs, states, dt # -# Parse 'dt' in for named I/O system +# Parse 'dt' for I/O system # # The 'dt' keyword is used to set the timebase for a system. Its # processing is a bit unusual: if it is not specified at all, then the diff --git a/control/nlsys.py b/control/nlsys.py index 82b6aeef3..fd6e207fc 100644 --- a/control/nlsys.py +++ b/control/nlsys.py @@ -31,7 +31,7 @@ __all__ = ['NonlinearIOSystem', 'InterconnectedSystem', 'nlsys', 'input_output_response', 'find_eqpt', 'linearize', - 'interconnect'] + 'interconnect', 'connection_table'] class NonlinearIOSystem(InputOutputSystem): @@ -395,7 +395,7 @@ def dynamics(self, t, x, u, params=None): current state u : array_like input - params : dict (optional) + params : dict, optional system parameter values Returns @@ -436,7 +436,7 @@ def output(self, t, x, u, params=None): current state u : array_like input - params : dict (optional) + params : dict, optional system parameter values Returns @@ -589,11 +589,14 @@ class InterconnectedSystem(NonlinearIOSystem): """ def __init__(self, syslist, connections=None, inplist=None, outlist=None, - params=None, warn_duplicate=None, **kwargs): + params=None, warn_duplicate=None, connection_type=None, + **kwargs): """Create an I/O system from a list of systems + connection info.""" from .statesp import _convert_to_statespace from .xferfcn import TransferFunction + self.connection_type = connection_type # explicit, implicit, or None + # Convert input and output names to lists if they aren't already if inplist is not None and not isinstance(inplist, list): inplist = [inplist] @@ -1001,6 +1004,80 @@ def unused_signals(self): return ({inputs[i][:2]: inputs[i][2] for i in unused_sysinp}, {outputs[i][:2]: outputs[i][2] for i in unused_sysout}) + def connection_table(self, show_names=False, column_width=32): + """Print table of connections inside an interconnected system model. + + Intended primarily for :class:`InterconnectedSystems` that have been + connected implicitly using signal names. + + Parameters + ---------- + show_names : bool, optional + Instead of printing out the system number, print out the name of + each system. Default is False because system name is not usually + specified when performing implicit interconnection using + :func:`interconnect`. + column_width : int, optional + Character width of printed columns. + + Examples + -------- + >>> P = ct.ss(1,1,1,0, inputs='u', outputs='y', name='P') + >>> C = ct.tf(10, [.1, 1], inputs='e', outputs='u', name='C') + >>> L = ct.interconnect([C, P], inputs='e', outputs='y') + >>> L.connection_table(show_names=True) # doctest: +SKIP + signal | source | destination + -------------------------------------------------------------------- + e | input | C + u | C | P + y | P | output + """ + + print('signal'.ljust(10) + '| source'.ljust(column_width) + \ + '| destination') + print('-'*(10 + column_width * 2)) + + # TODO: update this method for explicitly-connected systems + if not self.connection_type == 'implicit': + warn('connection_table only gives useful output for implicitly-'\ + 'connected systems') + + # collect signal labels + signal_labels = [] + for sys in self.syslist: + signal_labels += sys.input_labels + sys.output_labels + signal_labels = set(signal_labels) + + for signal_label in signal_labels: + print(signal_label.ljust(10), end='') + sources = '| ' + dests = '| ' + + # overall interconnected system inputs and outputs + if self.find_input(signal_label) is not None: + sources += 'input' + if self.find_output(signal_label) is not None: + dests += 'output' + + # internal connections + for idx, sys in enumerate(self.syslist): + loc = sys.find_output(signal_label) + if loc is not None: + if not sources.endswith(' '): + sources += ', ' + sources += sys.name if show_names else 'system ' + str(idx) + loc = sys.find_input(signal_label) + if loc is not None: + if not dests.endswith(' '): + dests += ', ' + dests += sys.name if show_names else 'system ' + str(idx) + if len(sources) >= column_width: + sources = sources[:column_width - 3] + '.. ' + print(sources.ljust(column_width), end='') + if len(dests) > column_width: + dests = dests[:column_width - 3] + '.. ' + print(dests.ljust(column_width), end='\n') + def _find_inputs_by_basename(self, basename): """Find all subsystem inputs matching basename @@ -1955,7 +2032,7 @@ def interconnect( signals are given names, then the forms 'sys.sig' or ('sys', 'sig') are also recognized. Finally, for multivariable systems the signal index can be given as a list, for example '(subsys_i, [inp_j1, ..., - inp_jn])'; as a slice, for example, 'sys.sig[i:j]'; or as a base + inp_jn])'; or as a slice, for example, 'sys.sig[i:j]'; or as a base name `sys.sig` (which matches `sys.sig[i]`). Similarly, each output-spec should describe an output signal from @@ -2132,8 +2209,8 @@ def interconnect( If a system is duplicated in the list of systems to be connected, a warning is generated and a copy of the system is created with the name of the new system determined by adding the prefix and suffix - strings in config.defaults['iosys.linearized_system_name_prefix'] - and config.defaults['iosys.linearized_system_name_suffix'], with the + strings in config.defaults['iosys.duplicate_system_name_prefix'] + and config.defaults['iosys.duplicate_system_name_suffix'], with the default being to add the suffix '$copy' to the system name. In addition to explicit lists of system signals, it is possible to @@ -2167,19 +2244,21 @@ def interconnect( dt = kwargs.pop('dt', None) # bypass normal 'dt' processing name, inputs, outputs, states, _ = _process_iosys_keywords(kwargs) + connection_type = None # explicit, implicit, or None if not check_unused and (ignore_inputs or ignore_outputs): raise ValueError('check_unused is False, but either ' + 'ignore_inputs or ignore_outputs non-empty') - if connections is False and not inplist and not outlist \ - and not inputs and not outputs: + if connections is False and not any((inplist, outlist, inputs, outputs)): # user has disabled auto-connect, and supplied neither input # nor output mappings; assume they know what they're doing check_unused = False - # If connections was not specified, set up default connection list + # If connections was not specified, assume implicit interconnection. + # set up default connection list if connections is None: + connection_type = 'implicit' # For each system input, look for outputs with the same name connections = [] for input_sys in syslist: @@ -2191,17 +2270,17 @@ def interconnect( if len(connect) > 1: connections.append(connect) - auto_connect = True - elif connections is False: check_unused = False # Use an empty connections list connections = [] - elif isinstance(connections, list) and \ - all([isinstance(cnxn, (str, tuple)) for cnxn in connections]): - # Special case where there is a single connection - connections = [connections] + else: + connection_type = 'explicit' + if isinstance(connections, list) and \ + all([isinstance(cnxn, (str, tuple)) for cnxn in connections]): + # Special case where there is a single connection + connections = [connections] # If inplist/outlist is not present, try using inputs/outputs instead inplist_none, outlist_none = False, False @@ -2436,7 +2515,7 @@ def interconnect( syslist, connections=connections, inplist=inplist, outlist=outlist, inputs=inputs, outputs=outputs, states=states, params=params, dt=dt, name=name, warn_duplicate=warn_duplicate, - **kwargs) + connection_type=connection_type, **kwargs) # See if we should add any signals if add_unused: @@ -2457,7 +2536,7 @@ def interconnect( syslist, connections=connections, inplist=inplist, outlist=outlist, inputs=inputs, outputs=outputs, states=states, params=params, dt=dt, name=name, warn_duplicate=warn_duplicate, - **kwargs) + connection_type=connection_type, **kwargs) # check for implicitly dropped signals if check_unused: @@ -2465,7 +2544,7 @@ def interconnect( # If all subsystems are linear systems, maintain linear structure if all([isinstance(sys, StateSpace) for sys in newsys.syslist]): - return LinearICSystem(newsys, None) + newsys = LinearICSystem(newsys, None, connection_type=connection_type) return newsys @@ -2500,3 +2579,39 @@ def _convert_static_iosystem(sys): return NonlinearIOSystem( None, lambda t, x, u, params: sys @ u, outputs=sys.shape[0], inputs=sys.shape[1]) + +def connection_table(sys, show_names=False, column_width=32): + """Print table of connections inside an interconnected system model. + + Intended primarily for :class:`InterconnectedSystems` that have been + connected implicitly using signal names. + + Parameters + ---------- + sys : :class:`InterconnectedSystem` + Interconnected system object + show_names : bool, optional + Instead of printing out the system number, print out the name of + each system. Default is False because system name is not usually + specified when performing implicit interconnection using + :func:`interconnect`. + column_width : int, optional + Character width of printed columns. + + + Examples + -------- + >>> P = ct.ss(1,1,1,0, inputs='u', outputs='y', name='P') + >>> C = ct.tf(10, [.1, 1], inputs='e', outputs='u', name='C') + >>> L = ct.interconnect([C, P], inputs='e', outputs='y') + >>> L.connection_table(show_names=True) # doctest: +SKIP + signal | source | destination + -------------------------------------------------------------- + e | input | C + u | C | P + y | P | output + """ + assert isinstance(sys, InterconnectedSystem), "system must be"\ + "an InterconnectedSystem." + + sys.connection_table(show_names=show_names, column_width=column_width) diff --git a/control/statesp.py b/control/statesp.py index 362945ad6..38dd2388d 100644 --- a/control/statesp.py +++ b/control/statesp.py @@ -1459,7 +1459,7 @@ class LinearICSystem(InterconnectedSystem, StateSpace): """ - def __init__(self, io_sys, ss_sys=None): + def __init__(self, io_sys, ss_sys=None, connection_type=None): # # Because this is a "hybrid" object, the initialization proceeds in # stages. We first create an empty InputOutputSystem of the @@ -1483,6 +1483,7 @@ def __init__(self, io_sys, ss_sys=None): self.input_map = io_sys.input_map self.output_map = io_sys.output_map self.params = io_sys.params + self.connection_type = connection_type # If we didnt' get a state space system, linearize the full system if ss_sys is None: diff --git a/control/tests/interconnect_test.py b/control/tests/interconnect_test.py index 3a333aef5..a37b18eec 100644 --- a/control/tests/interconnect_test.py +++ b/control/tests/interconnect_test.py @@ -201,6 +201,126 @@ def test_interconnect_docstring(): np.testing.assert_almost_equal(T.C @ T. A @ T.B, T_ss.C @ T_ss.A @ T_ss.B) np.testing.assert_almost_equal(T.D, T_ss.D) +@pytest.mark.parametrize("show_names", (True, False)) +def test_connection_table(capsys, show_names): + P = ct.ss(1,1,1,0, inputs='u', outputs='y', name='P') + C = ct.tf(10, [.1, 1], inputs='e', outputs='u', name='C') + L = ct.interconnect([C, P], inputs='e', outputs='y') + L.connection_table(show_names=show_names) + captured_from_method = capsys.readouterr().out + + ct.connection_table(L, show_names=show_names) + captured_from_function = capsys.readouterr().out + + # break the following strings separately because the printout order varies + # because signal names are stored as a set + mystrings = \ + ["signal | source | destination", + "------------------------------------------------------------------"] + if show_names: + mystrings += \ + ["e | input | C", + "u | C | P", + "y | P | output"] + else: + mystrings += \ + ["e | input | system 0", + "u | system 0 | system 1", + "y | system 1 | output"] + + for str_ in mystrings: + assert str_ in captured_from_method + assert str_ in captured_from_function + + # check auto-sum + P1 = ct.ss(1,1,1,0, inputs='u', outputs='y', name='P1') + P2 = ct.tf(10, [.1, 1], inputs='e', outputs='y', name='P2') + P3 = ct.tf(10, [.1, 1], inputs='x', outputs='y', name='P3') + P = ct.interconnect([P1, P2, P3], inputs=['e', 'u', 'x'], outputs='y') + P.connection_table(show_names=show_names) + captured_from_method = capsys.readouterr().out + + ct.connection_table(P, show_names=show_names) + captured_from_function = capsys.readouterr().out + + mystrings = \ + ["signal | source | destination", + "-------------------------------------------------------------------"] + if show_names: + mystrings += \ + ["u | input | P1", + "e | input | P2", + "x | input | P3", + "y | P1, P2, P3 | output"] + else: + mystrings += \ + ["u | input | system 0", + "e | input | system 1", + "x | input | system 2", + "y | system 0, system 1, system 2 | output"] + + for str_ in mystrings: + assert str_ in captured_from_method + assert str_ in captured_from_function + + # check auto-split + P1 = ct.ss(1,1,1,0, inputs='u', outputs='x', name='P1') + P2 = ct.tf(10, [.1, 1], inputs='u', outputs='y', name='P2') + P3 = ct.tf(10, [.1, 1], inputs='u', outputs='z', name='P3') + P = ct.interconnect([P1, P2, P3], inputs=['u'], outputs=['x','y','z']) + P.connection_table(show_names=show_names) + captured_from_method = capsys.readouterr().out + + ct.connection_table(P, show_names=show_names) + captured_from_function = capsys.readouterr().out + + mystrings = \ + ["signal | source | destination", + "-------------------------------------------------------------------"] + if show_names: + mystrings += \ + ["u | input | P1, P2, P3", + "x | P1 | output ", + "y | P2 | output", + "z | P3 | output"] + else: + mystrings += \ + ["u | input | system 0, system 1, system 2", + "x | system 0 | output ", + "y | system 1 | output", + "z | system 2 | output"] + + for str_ in mystrings: + assert str_ in captured_from_method + assert str_ in captured_from_function + + # check change column width + P.connection_table(show_names=show_names, column_width=20) + captured_from_method = capsys.readouterr().out + + ct.connection_table(P, show_names=show_names, column_width=20) + captured_from_function = capsys.readouterr().out + + mystrings = \ + ["signal | source | destination", + "------------------------------------------------"] + if show_names: + mystrings += \ + ["u | input | P1, P2, P3", + "x | P1 | output ", + "y | P2 | output", + "z | P3 | output"] + else: + mystrings += \ + ["u | input | system 0, syste.. ", + "x | system 0 | output ", + "y | system 1 | output", + "z | system 2 | output"] + + for str_ in mystrings: + assert str_ in captured_from_method + assert str_ in captured_from_function + def test_interconnect_exceptions(): # First make sure the docstring example works diff --git a/doc/control.rst b/doc/control.rst index a2fb8e69b..96714bf7d 100644 --- a/doc/control.rst +++ b/doc/control.rst @@ -36,6 +36,7 @@ System interconnections negate parallel series + connection_table Frequency domain plotting