Skip to content

Check for unused subsystem signals in InterconnectedSystem #652

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 205 additions & 1 deletion control/iosys.py
Original file line number Diff line number Diff line change
Expand Up @@ -1398,7 +1398,164 @@ def set_output_map(self, output_map):
self.noutputs = output_map.shape[0]


def unused_signals(self):
"""Find unused subsystem inputs and outputs

Returns
-------

unused_inputs : dict

A mapping from tuple of indices (isys, isig) to string
'{sys}.{sig}', for all unused subsystem inputs.

unused_outputs : dict

A mapping from tuple of indices (isys, isig) to string
'{sys}.{sig}', for all unused subsystem outputs.

"""
used_sysinp_via_inp = np.nonzero(self.input_map)[0]
used_sysout_via_out = np.nonzero(self.output_map)[1]
used_sysinp_via_con, used_sysout_via_con = np.nonzero(self.connect_map)

used_sysinp = set(used_sysinp_via_inp) | set(used_sysinp_via_con)
used_sysout = set(used_sysout_via_out) | set(used_sysout_via_con)

nsubsysinp = sum(sys.ninputs for sys in self.syslist)
nsubsysout = sum(sys.noutputs for sys in self.syslist)

unused_sysinp = sorted(set(range(nsubsysinp)) - used_sysinp)
unused_sysout = sorted(set(range(nsubsysout)) - used_sysout)

inputs = [(isys,isig, f'{sys.name}.{sig}')
for isys, sys in enumerate(self.syslist)
for sig, isig in sys.input_index.items()]

outputs = [(isys,isig,f'{sys.name}.{sig}')
for isys, sys in enumerate(self.syslist)
for sig, isig in sys.output_index.items()]

return ({inputs[i][:2]:inputs[i][2]
for i in unused_sysinp},
{outputs[i][:2]:outputs[i][2]
for i in unused_sysout})


def _find_inputs_by_basename(self, basename):
"""Find all subsystem inputs matching basename

Returns
-------
Mapping from (isys, isig) to '{sys}.{sig}'

"""
return {(isys, isig) : f'{sys.name}.{basename}'
for isys, sys in enumerate(self.syslist)
for sig, isig in sys.input_index.items()
if sig == (basename)}


def _find_outputs_by_basename(self, basename):
"""Find all subsystem outputs matching basename

Returns
-------
Mapping from (isys, isig) to '{sys}.{sig}'

"""
return {(isys, isig) : f'{sys.name}.{basename}'
for isys, sys in enumerate(self.syslist)
for sig, isig in sys.output_index.items()
if sig == (basename)}


def check_unused_signals(self, ignore_inputs=None, ignore_outputs=None):
"""Check for unused subsystem inputs and outputs

If any unused inputs or outputs are found, emit a warning.

Parameters
----------
ignore_inputs : list of input-spec
Subsystem inputs known to be unused. input-spec can be any of:
'sig', 'sys.sig', (isys, isig), ('sys', isig)

If the 'sig' form is used, all subsystem inputs with that
name are considered ignored.

ignore_outputs : list of output-spec
Subsystem outputs known to be unused. output-spec can be any of:
'sig', 'sys.sig', (isys, isig), ('sys', isig)

If the 'sig' form is used, all subsystem outputs with that
name are considered ignored.

"""

if ignore_inputs is None:
ignore_inputs = []

if ignore_outputs is None:
ignore_outputs = []

unused_inputs, unused_outputs = self.unused_signals()

# (isys, isig) -> signal-spec
ignore_input_map = {}
for ignore_input in ignore_inputs:
if isinstance(ignore_input, str) and '.' not in ignore_input:
ignore_idxs = self._find_inputs_by_basename(ignore_input)
if not ignore_idxs:
raise ValueError(f"Couldn't find ignored input {ignore_input} in subsystems")
ignore_input_map.update(ignore_idxs)
else:
ignore_input_map[self._parse_signal(ignore_input, 'input')[:2]] = ignore_input

# (isys, isig) -> signal-spec
ignore_output_map = {}
for ignore_output in ignore_outputs:
if isinstance(ignore_output, str) and '.' not in ignore_output:
ignore_found = self._find_outputs_by_basename(ignore_output)
if not ignore_found:
raise ValueError(f"Couldn't find ignored output {ignore_output} in subsystems")
ignore_output_map.update(ignore_found)
else:
ignore_output_map[self._parse_signal(ignore_output, 'output')[:2]] = ignore_output

dropped_inputs = set(unused_inputs) - set(ignore_input_map)
dropped_outputs = set(unused_outputs) - set(ignore_output_map)

used_ignored_inputs = set(ignore_input_map) - set(unused_inputs)
used_ignored_outputs = set(ignore_output_map) - set(unused_outputs)

if dropped_inputs:
msg = ('Unused input(s) in InterconnectedSystem: '
+ '; '.join(f'{inp}={unused_inputs[inp]}'
for inp in dropped_inputs))
warn(msg)

if dropped_outputs:
msg = ('Unused output(s) in InterconnectedSystem: '
+ '; '.join(f'{out} : {unused_outputs[out]}'
for out in dropped_outputs))
warn(msg)

if used_ignored_inputs:
msg = ('Input(s) specified as ignored is (are) used: '
+ '; '.join(f'{inp} : {ignore_input_map[inp]}'
for inp in used_ignored_inputs))
warn(msg)

if used_ignored_outputs:
msg = ('Output(s) specified as ignored is (are) used: '
+ '; '.join(f'{out}={ignore_output_map[out]}'
for out in used_ignored_outputs))
warn(msg)


class LinearICSystem(InterconnectedSystem, LinearIOSystem):

"""Interconnection of a set of linear input/output systems.

This class is used to implement a system that is an interconnection of
Expand Down Expand Up @@ -2020,7 +2177,9 @@ def tf2io(*args, **kwargs):
# Function to create an interconnected system
def interconnect(syslist, connections=None, inplist=[], outlist=[],
inputs=None, outputs=None, states=None,
params={}, dt=None, name=None, **kwargs):
params={}, dt=None, name=None,
check_unused=True, ignore_inputs=None, ignore_outputs=None,
**kwargs):
"""Interconnect a set of input/output systems.

This function creates a new system that is an interconnection of a set of
Expand Down Expand Up @@ -2145,6 +2304,32 @@ def interconnect(syslist, connections=None, inplist=[], outlist=[],
System name (used for specifying signals). If unspecified, a generic
name <sys[id]> is generated with a unique integer id.

check_unused : bool
If True, check for unused sub-system signals. This check is
not done if connections is False, and neither input nor output
mappings are specified.

ignore_inputs : list of input-spec
A list of sub-system inputs known not to be connected. This is
*only* used in checking for unused signals, and does not
disable use of the input.

Besides the usual input-spec forms (see `connections`), an
input-spec can be just the signal base name, in which case all
signals from all sub-systems with that base name are
considered ignored.

ignore_outputs : list of output-spec
A list of sub-system outputs known not to be connected. This
is *only* used in checking for unused signals, and does not
disable use of the output.

Besides the usual output-spec forms (see `connections`), an
output-spec can be just the signal base name, in which all
outputs from all sub-systems with that base name are
considered ignored.


Example
-------
>>> P = control.LinearIOSystem(
Expand Down Expand Up @@ -2199,6 +2384,17 @@ def interconnect(syslist, connections=None, inplist=[], outlist=[],
inputs = _parse_signal_parameter(inputs, 'input', kwargs)
outputs = _parse_signal_parameter(outputs, 'output', kwargs, end=True)

if not check_unused and (ignore_inputs or ignore_outputs):
raise ValueError('check_unused is False, but either '
+ 'ignore_inputs or ignore_outputs non-empty')

if (connections is False
and not inplist and not outlist
and not inputs and not outputs):
# user has disabled auto-connect, and supplied neither input
# nor output mappings; assume they know what they're doing
check_unused = False

# If connections was not specified, set up default connection list
if connections is None:
# For each system input, look for outputs with the same name
Expand All @@ -2211,7 +2407,11 @@ def interconnect(syslist, connections=None, inplist=[], outlist=[],
connect.append(output_sys.name + "." + input_name)
if len(connect) > 1:
connections.append(connect)

auto_connect = True

elif connections is False:
check_unused = False
# Use an empty connections list
connections = []

Expand Down Expand Up @@ -2282,6 +2482,10 @@ def interconnect(syslist, connections=None, inplist=[], outlist=[],
inputs=inputs, outputs=outputs, states=states,
params=params, dt=dt, name=name)


# check for implicity dropped signals
if check_unused:
newsys.check_unused_signals(ignore_inputs, ignore_outputs)
# If all subsystems are linear systems, maintain linear structure
if all([isinstance(sys, LinearIOSystem) for sys in syslist]):
return LinearICSystem(newsys, None)
Expand Down
137 changes: 137 additions & 0 deletions control/tests/iosys_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"""

from __future__ import print_function
import re

import numpy as np
import pytest
Expand Down Expand Up @@ -1396,3 +1397,139 @@ def secord_update(t, x, u, params={}):
def secord_output(t, x, u, params={}):
"""Second order system dynamics output"""
return np.array([x[0]])


def test_interconnect_unused_input():
# test that warnings about unused inputs are reported, or not,
# as required
g = ct.LinearIOSystem(ct.ss(-1,1,1,0),
inputs=['u'],
outputs=['y'],
name='g')

s = ct.summing_junction(inputs=['r','-y','-n'],
outputs=['e'],
name='s')

k = ct.LinearIOSystem(ct.ss(0,10,2,0),
inputs=['e'],
outputs=['u'],
name='k')

with pytest.warns(UserWarning, match=r"Unused input\(s\) in InterconnectedSystem"):
h = ct.interconnect([g,s,k],
inputs=['r'],
outputs=['y'])

with pytest.warns(None) as record:
# no warning if output explicitly ignored, various argument forms
h = ct.interconnect([g,s,k],
inputs=['r'],
outputs=['y'],
ignore_inputs=['n'])

h = ct.interconnect([g,s,k],
inputs=['r'],
outputs=['y'],
ignore_inputs=['s.n'])

# no warning if auto-connect disabled
h = ct.interconnect([g,s,k],
connections=False)

#https://docs.pytest.org/en/6.2.x/warnings.html#recwarn
for r in record:
# strip out matrix warnings
if re.match(r'.*matrix subclass', str(r.message)):
continue
print(r.message)
pytest.fail(f'Unexpected warning: {r.message}')


# warn if explicity ignored input in fact used
with pytest.warns(UserWarning, match=r"Input\(s\) specified as ignored is \(are\) used:") as record:
h = ct.interconnect([g,s,k],
inputs=['r'],
outputs=['y'],
ignore_inputs=['u','n'])

with pytest.warns(UserWarning, match=r"Input\(s\) specified as ignored is \(are\) used:") as record:
h = ct.interconnect([g,s,k],
inputs=['r'],
outputs=['y'],
ignore_inputs=['k.e','n'])

# error if ignored signal doesn't exist
with pytest.raises(ValueError):
h = ct.interconnect([g,s,k],
inputs=['r'],
outputs=['y'],
ignore_inputs=['v'])


def test_interconnect_unused_output():
# test that warnings about ignored outputs are reported, or not,
# as required
g = ct.LinearIOSystem(ct.ss(-1,1,[[1],[-1]],[[0],[1]]),
inputs=['u'],
outputs=['y','dy'],
name='g')

s = ct.summing_junction(inputs=['r','-y'],
outputs=['e'],
name='s')

k = ct.LinearIOSystem(ct.ss(0,10,2,0),
inputs=['e'],
outputs=['u'],
name='k')

with pytest.warns(UserWarning, match=r"Unused output\(s\) in InterconnectedSystem:") as record:
h = ct.interconnect([g,s,k],
inputs=['r'],
outputs=['y'])


# no warning if output explicitly ignored
with pytest.warns(None) as record:
h = ct.interconnect([g,s,k],
inputs=['r'],
outputs=['y'],
ignore_outputs=['dy'])

h = ct.interconnect([g,s,k],
inputs=['r'],
outputs=['y'],
ignore_outputs=['g.dy'])

# no warning if auto-connect disabled
h = ct.interconnect([g,s,k],
connections=False)

#https://docs.pytest.org/en/6.2.x/warnings.html#recwarn
for r in record:
# strip out matrix warnings
if re.match(r'.*matrix subclass', str(r.message)):
continue
print(r.message)
pytest.fail(f'Unexpected warning: {r.message}')

# warn if explicity ignored output in fact used
with pytest.warns(UserWarning, match=r"Output\(s\) specified as ignored is \(are\) used:"):
h = ct.interconnect([g,s,k],
inputs=['r'],
outputs=['y'],
ignore_outputs=['dy','u'])

with pytest.warns(UserWarning, match=r"Output\(s\) specified as ignored is \(are\) used:"):
h = ct.interconnect([g,s,k],
inputs=['r'],
outputs=['y'],
ignore_outputs=['dy', ('k.u')])

# error if ignored signal doesn't exist
with pytest.raises(ValueError):
h = ct.interconnect([g,s,k],
inputs=['r'],
outputs=['y'],
ignore_outputs=['v'])