Skip to content

print a connection table for interconnected systems #925

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Sep 16, 2023
Merged
4 changes: 2 additions & 2 deletions control/iosys.py
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ def isctime(sys, strict=False):
return sys.isctime(strict)


# Utility function to parse nameio keywords
# Utility function to parse iosys keywords
def _process_iosys_keywords(
keywords={}, defaults={}, static=False, end=False):
"""Process iosys specification.
Expand Down Expand Up @@ -611,7 +611,7 @@ def pop_with_default(kw, defval=None, return_list=True):
return name, inputs, outputs, states, dt

#
# Parse 'dt' in for named I/O system
# Parse 'dt' for I/O system
#
# The 'dt' keyword is used to set the timebase for a system. Its
# processing is a bit unusual: if it is not specified at all, then the
Expand Down
153 changes: 134 additions & 19 deletions control/nlsys.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

__all__ = ['NonlinearIOSystem', 'InterconnectedSystem', 'nlsys',
'input_output_response', 'find_eqpt', 'linearize',
'interconnect']
'interconnect', 'connection_table']


class NonlinearIOSystem(InputOutputSystem):
Expand Down Expand Up @@ -395,7 +395,7 @@ def dynamics(self, t, x, u, params=None):
current state
u : array_like
input
params : dict (optional)
params : dict, optional
system parameter values

Returns
Expand Down Expand Up @@ -436,7 +436,7 @@ def output(self, t, x, u, params=None):
current state
u : array_like
input
params : dict (optional)
params : dict, optional
system parameter values

Returns
Expand Down Expand Up @@ -589,11 +589,14 @@ class InterconnectedSystem(NonlinearIOSystem):

"""
def __init__(self, syslist, connections=None, inplist=None, outlist=None,
params=None, warn_duplicate=None, **kwargs):
params=None, warn_duplicate=None, connection_type=None,
**kwargs):
"""Create an I/O system from a list of systems + connection info."""
from .statesp import _convert_to_statespace
from .xferfcn import TransferFunction

self.connection_type = connection_type # explicit, implicit, or None

# Convert input and output names to lists if they aren't already
if inplist is not None and not isinstance(inplist, list):
inplist = [inplist]
Expand Down Expand Up @@ -1001,6 +1004,80 @@ def unused_signals(self):
return ({inputs[i][:2]: inputs[i][2] for i in unused_sysinp},
{outputs[i][:2]: outputs[i][2] for i in unused_sysout})

def connection_table(self, show_names=False, column_width=32):
"""Print table of connections inside an interconnected system model.

Intended primarily for :class:`InterconnectedSystems` that have been
connected implicitly using signal names.

Parameters
----------
show_names : bool, optional
Instead of printing out the system number, print out the name of
each system. Default is False because system name is not usually
specified when performing implicit interconnection using
:func:`interconnect`.
column_width : int, optional
Character width of printed columns.

Examples
--------
>>> P = ct.ss(1,1,1,0, inputs='u', outputs='y', name='P')
>>> C = ct.tf(10, [.1, 1], inputs='e', outputs='u', name='C')
>>> L = ct.interconnect([C, P], inputs='e', outputs='y')
>>> L.connection_table(show_names=True) # doctest: +SKIP
signal | source | destination
--------------------------------------------------------------------
e | input | C
u | C | P
y | P | output
"""

print('signal'.ljust(10) + '| source'.ljust(column_width) + \
'| destination')
print('-'*(10 + column_width * 2))

# TODO: update this method for explicitly-connected systems
if not self.connection_type == 'implicit':
warn('connection_table only gives useful output for implicitly-'\
'connected systems')

# collect signal labels
signal_labels = []
for sys in self.syslist:
signal_labels += sys.input_labels + sys.output_labels
signal_labels = set(signal_labels)

for signal_label in signal_labels:
print(signal_label.ljust(10), end='')
sources = '| '
dests = '| '

# overall interconnected system inputs and outputs
if self.find_input(signal_label) is not None:
sources += 'input'
if self.find_output(signal_label) is not None:
dests += 'output'

# internal connections
for idx, sys in enumerate(self.syslist):
loc = sys.find_output(signal_label)
if loc is not None:
if not sources.endswith(' '):
sources += ', '
sources += sys.name if show_names else 'system ' + str(idx)
loc = sys.find_input(signal_label)
if loc is not None:
if not dests.endswith(' '):
dests += ', '
dests += sys.name if show_names else 'system ' + str(idx)
if len(sources) >= column_width:
sources = sources[:column_width - 3] + '.. '
print(sources.ljust(column_width), end='')
if len(dests) > column_width:
dests = dests[:column_width - 3] + '.. '
print(dests.ljust(column_width), end='\n')

def _find_inputs_by_basename(self, basename):
"""Find all subsystem inputs matching basename

Expand Down Expand Up @@ -1955,7 +2032,7 @@ def interconnect(
signals are given names, then the forms 'sys.sig' or ('sys', 'sig')
are also recognized. Finally, for multivariable systems the signal
index can be given as a list, for example '(subsys_i, [inp_j1, ...,
inp_jn])'; as a slice, for example, 'sys.sig[i:j]'; or as a base
inp_jn])'; or as a slice, for example, 'sys.sig[i:j]'; or as a base
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wasn't this correct before? Now it says 'a; or b; or c' instead of 'a; b; or c'.

name `sys.sig` (which matches `sys.sig[i]`).

Similarly, each output-spec should describe an output signal from
Expand Down Expand Up @@ -2132,8 +2209,8 @@ def interconnect(
If a system is duplicated in the list of systems to be connected,
a warning is generated and a copy of the system is created with the
name of the new system determined by adding the prefix and suffix
strings in config.defaults['iosys.linearized_system_name_prefix']
and config.defaults['iosys.linearized_system_name_suffix'], with the
strings in config.defaults['iosys.duplicate_system_name_prefix']
and config.defaults['iosys.duplicate_system_name_suffix'], with the
default being to add the suffix '$copy' to the system name.

In addition to explicit lists of system signals, it is possible to
Expand Down Expand Up @@ -2167,19 +2244,21 @@ def interconnect(

dt = kwargs.pop('dt', None) # bypass normal 'dt' processing
name, inputs, outputs, states, _ = _process_iosys_keywords(kwargs)
connection_type = None # explicit, implicit, or None

if not check_unused and (ignore_inputs or ignore_outputs):
raise ValueError('check_unused is False, but either '
+ 'ignore_inputs or ignore_outputs non-empty')

if connections is False and not inplist and not outlist \
and not inputs and not outputs:
if connections is False and not any((inplist, outlist, inputs, outputs)):
# user has disabled auto-connect, and supplied neither input
# nor output mappings; assume they know what they're doing
check_unused = False

# If connections was not specified, set up default connection list
# If connections was not specified, assume implicit interconnection.
# set up default connection list
if connections is None:
connection_type = 'implicit'
# For each system input, look for outputs with the same name
connections = []
for input_sys in syslist:
Expand All @@ -2191,17 +2270,17 @@ def interconnect(
if len(connect) > 1:
connections.append(connect)

auto_connect = True

elif connections is False:
check_unused = False
# Use an empty connections list
connections = []

elif isinstance(connections, list) and \
all([isinstance(cnxn, (str, tuple)) for cnxn in connections]):
# Special case where there is a single connection
connections = [connections]
else:
connection_type = 'explicit'
if isinstance(connections, list) and \
all([isinstance(cnxn, (str, tuple)) for cnxn in connections]):
# Special case where there is a single connection
connections = [connections]

# If inplist/outlist is not present, try using inputs/outputs instead
inplist_none, outlist_none = False, False
Expand Down Expand Up @@ -2436,7 +2515,7 @@ def interconnect(
syslist, connections=connections, inplist=inplist,
outlist=outlist, inputs=inputs, outputs=outputs, states=states,
params=params, dt=dt, name=name, warn_duplicate=warn_duplicate,
**kwargs)
connection_type=connection_type, **kwargs)

# See if we should add any signals
if add_unused:
Expand All @@ -2457,15 +2536,15 @@ def interconnect(
syslist, connections=connections, inplist=inplist,
outlist=outlist, inputs=inputs, outputs=outputs, states=states,
params=params, dt=dt, name=name, warn_duplicate=warn_duplicate,
**kwargs)
connection_type=connection_type, **kwargs)

# check for implicitly dropped signals
if check_unused:
newsys.check_unused_signals(ignore_inputs, ignore_outputs)

# If all subsystems are linear systems, maintain linear structure
if all([isinstance(sys, StateSpace) for sys in newsys.syslist]):
return LinearICSystem(newsys, None)
newsys = LinearICSystem(newsys, None, connection_type=connection_type)

return newsys

Expand Down Expand Up @@ -2500,3 +2579,39 @@ def _convert_static_iosystem(sys):
return NonlinearIOSystem(
None, lambda t, x, u, params: sys @ u,
outputs=sys.shape[0], inputs=sys.shape[1])

def connection_table(sys, show_names=False, column_width=32):
"""Print table of connections inside an interconnected system model.

Intended primarily for :class:`InterconnectedSystems` that have been
connected implicitly using signal names.

Parameters
----------
sys : :class:`InterconnectedSystem`
Interconnected system object
show_names : bool, optional
Instead of printing out the system number, print out the name of
each system. Default is False because system name is not usually
specified when performing implicit interconnection using
:func:`interconnect`.
column_width : int, optional
Character width of printed columns.


Examples
--------
>>> P = ct.ss(1,1,1,0, inputs='u', outputs='y', name='P')
>>> C = ct.tf(10, [.1, 1], inputs='e', outputs='u', name='C')
>>> L = ct.interconnect([C, P], inputs='e', outputs='y')
>>> L.connection_table(show_names=True) # doctest: +SKIP
signal | source | destination
--------------------------------------------------------------
e | input | C
u | C | P
y | P | output
"""
assert isinstance(sys, InterconnectedSystem), "system must be"\
"an InterconnectedSystem."

sys.connection_table(show_names=show_names, column_width=column_width)
3 changes: 2 additions & 1 deletion control/statesp.py
Original file line number Diff line number Diff line change
Expand Up @@ -1459,7 +1459,7 @@ class LinearICSystem(InterconnectedSystem, StateSpace):

"""

def __init__(self, io_sys, ss_sys=None):
def __init__(self, io_sys, ss_sys=None, connection_type=None):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to allow the connection type to be overridden? It seems like this should just be inherited from the underlying InterconnectedSystem.

#
# Because this is a "hybrid" object, the initialization proceeds in
# stages. We first create an empty InputOutputSystem of the
Expand All @@ -1483,6 +1483,7 @@ def __init__(self, io_sys, ss_sys=None):
self.input_map = io_sys.input_map
self.output_map = io_sys.output_map
self.params = io_sys.params
self.connection_type = connection_type

# If we didnt' get a state space system, linearize the full system
if ss_sys is None:
Expand Down
Loading