Skip to content

I/O system updates #536

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 72 additions & 23 deletions control/iosys.py
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ class LinearIOSystem(InputOutputSystem, StateSpace):

"""
def __init__(self, linsys, inputs=None, outputs=None, states=None,
name=None):
name=None, **kwargs):
"""Create an I/O system from a state space linear system.

Converts a :class:`~control.StateSpace` system into an
Expand Down Expand Up @@ -658,6 +658,10 @@ def __init__(self, linsys, inputs=None, outputs=None, states=None,
if not isinstance(linsys, StateSpace):
raise TypeError("Linear I/O system must be a state space object")

# Look for 'input' and 'output' parameter name variants
inputs = _parse_signal_parameter(inputs, 'input', kwargs)
outputs = _parse_signal_parameter(outputs, 'output', kwargs, end=True)

# Create the I/O system object
super(LinearIOSystem, self).__init__(
inputs=linsys.ninputs, outputs=linsys.noutputs,
Expand Down Expand Up @@ -707,8 +711,7 @@ class NonlinearIOSystem(InputOutputSystem):

"""
def __init__(self, updfcn, outfcn=None, inputs=None, outputs=None,
states=None, params={},
name=None, **kwargs):
states=None, params={}, name=None, **kwargs):
"""Create a nonlinear I/O system given update and output functions.

Creates an :class:`~control.InputOutputSystem` for a nonlinear system
Expand Down Expand Up @@ -775,17 +778,25 @@ def __init__(self, updfcn, outfcn=None, inputs=None, outputs=None,
Nonlinear system represented as an input/output system.

"""
# Look for 'input' and 'output' parameter name variants
inputs = _parse_signal_parameter(inputs, 'input', kwargs)
outputs = _parse_signal_parameter(outputs, 'output', kwargs)

# Store the update and output functions
self.updfcn = updfcn
self.outfcn = outfcn

# Initialize the rest of the structure
dt = kwargs.get('dt', config.defaults['control.default_dt'])
dt = kwargs.pop('dt', config.defaults['control.default_dt'])
super(NonlinearIOSystem, self).__init__(
inputs=inputs, outputs=outputs, states=states,
params=params, dt=dt, name=name
)

# Make sure all input arguments got parsed
if kwargs:
raise TypeError("unknown parameters %s" % kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤙


# Check to make sure arguments are consistent
if updfcn is None:
if self.nstates is None:
Expand Down Expand Up @@ -834,7 +845,7 @@ class InterconnectedSystem(InputOutputSystem):
"""
def __init__(self, syslist, connections=[], inplist=[], outlist=[],
inputs=None, outputs=None, states=None,
params={}, dt=None, name=None):
params={}, dt=None, name=None, **kwargs):
"""Create an I/O system from a list of systems + connection info.

The InterconnectedSystem class is used to represent an input/output
Expand All @@ -846,6 +857,10 @@ def __init__(self, syslist, connections=[], inplist=[], outlist=[],
See :func:`~control.interconnect` for a list of parameters.

"""
# Look for 'input' and 'output' parameter name variants
inputs = _parse_signal_parameter(inputs, 'input', kwargs)
outputs = _parse_signal_parameter(outputs, 'output', kwargs, end=True)

# Convert input and output names to lists if they aren't already
if not isinstance(inplist, (list, tuple)):
inplist = [inplist]
Expand Down Expand Up @@ -1810,6 +1825,15 @@ def linearize(sys, xeq, ueq=[], t=0, params={}, **kw):
return sys.linearize(xeq, ueq, t=t, params=params, **kw)


# Utility function to parse a signal parameter
def _parse_signal_parameter(value, name, kwargs, end=False):
if value is None and name in kwargs:
value = list(kwargs.pop(name))
if end and kwargs:
raise TypeError("unknown parameters %s" % kwargs)
return value


def _find_size(sysval, vecval):
"""Utility function to find the size of a system parameter

Expand Down Expand Up @@ -1849,7 +1873,7 @@ def tf2io(*args, **kwargs):
# Function to create an interconnected system
def interconnect(syslist, connections=None, inplist=[], outlist=[],
inputs=None, outputs=None, states=None,
params={}, dt=None, name=None):
params={}, dt=None, name=None, **kwargs):
"""Interconnect a set of input/output systems.

This function creates a new system that is an interconnection of a set of
Expand Down Expand Up @@ -1995,7 +2019,7 @@ def interconnect(syslist, connections=None, inplist=[], outlist=[],
>>> P = control.tf2io(control.tf(1, [1, 0]), inputs='u', outputs='y')
>>> C = control.tf2io(control.tf(10, [1, 1]), inputs='e', outputs='u')
>>> sumblk = control.summing_junction(inputs=['r', '-y'], output='e')
>>> T = control.interconnect([P, C, sumblk], inplist='r', outlist='y')
>>> T = control.interconnect([P, C, sumblk], input='r', output='y')

Notes
-----
Expand All @@ -2020,7 +2044,14 @@ def interconnect(syslist, connections=None, inplist=[], outlist=[],
treated as both a :class:`~control.StateSpace` system as well as an
:class:`~control.InputOutputSystem`.

The `input` and `output` keywords can be used instead of `inputs` and
`outputs`, for more natural naming of SISO systems.

"""
# Look for 'input' and 'output' parameter name variants
inputs = _parse_signal_parameter(inputs, 'input', kwargs)
outputs = _parse_signal_parameter(outputs, 'output', kwargs, end=True)

# If connections was not specified, set up default connection list
if connections is None:
# For each system input, look for outputs with the same name
Expand All @@ -2037,30 +2068,36 @@ def interconnect(syslist, connections=None, inplist=[], outlist=[],
# Use an empty connections list
connections = []

# If inplist/outlist is not present, try using inputs/outputs instead
if not inplist and inputs is not None:
inplist = list(inputs)
if not outlist and outputs is not None:
outlist = list(outputs)

# Process input list
if not isinstance(inplist, (list, tuple)):
inplist = [inplist]
new_inplist = []
for signal in inplist:
# Create an empty connection and append to inplist
connection = []

# Check for signal names without a system name
if isinstance(signal, str) and len(signal.split('.')) == 1:
# Get the signal name
name = signal[1:] if signal[0] == '-' else signal
sign = '-' if signal[0] == '-' else ""

# Look for the signal name as a system input
new_name = None
for sys in syslist:
if name in sys.input_index.keys():
if new_name is not None:
raise ValueError("signal %s is not unique" % name)
new_name = sign + sys.name + "." + name
connection.append(sign + sys.name + "." + name)

# Make sure we found the name
if new_name is None:
if len(connection) == 0:
raise ValueError("could not find signal %s" % name)
else:
new_inplist.append(new_name)
new_inplist.append(connection)
else:
new_inplist.append(signal)
inplist = new_inplist
Expand All @@ -2070,25 +2107,25 @@ def interconnect(syslist, connections=None, inplist=[], outlist=[],
outlist = [outlist]
new_outlist = []
for signal in outlist:
# Create an empty connection and append to inplist
connection = []

# Check for signal names without a system name
if isinstance(signal, str) and len(signal.split('.')) == 1:
# Get the signal name
name = signal[1:] if signal[0] == '-' else signal
sign = '-' if signal[0] == '-' else ""

# Look for the signal name as a system output
new_name = None
for sys in syslist:
if name in sys.output_index.keys():
if new_name is not None:
raise ValueError("signal %s is not unique" % name)
new_name = sign + sys.name + "." + name
connection.append(sign + sys.name + "." + name)

# Make sure we found the name
if new_name is None:
if len(connection) == 0:
raise ValueError("could not find signal %s" % name)
else:
new_outlist.append(new_name)
new_outlist.append(connection)
else:
new_outlist.append(signal)
outlist = new_outlist
Expand All @@ -2106,7 +2143,9 @@ def interconnect(syslist, connections=None, inplist=[], outlist=[],


# Summing junction
def summing_junction(inputs, output='y', dimension=None, name=None, prefix='u'):
def summing_junction(
inputs=None, output=None, dimension=None, name=None,
prefix='u', **kwargs):
"""Create a summing junction as an input/output system.

This function creates a static input/output system that outputs the sum of
Expand Down Expand Up @@ -2145,10 +2184,10 @@ def summing_junction(inputs, output='y', dimension=None, name=None, prefix='u'):

Example
-------
>>> P = control.tf2io(ct.tf(1, [1, 0]), inputs='u', outputs='y')
>>> C = control.tf2io(ct.tf(10, [1, 1]), inputs='e', outputs='u')
>>> P = control.tf2io(ct.tf(1, [1, 0]), input='u', output='y')
>>> C = control.tf2io(ct.tf(10, [1, 1]), input='e', output='u')
>>> sumblk = control.summing_junction(inputs=['r', '-y'], output='e')
>>> T = control.interconnect((P, C, sumblk), inplist='r', outlist='y')
>>> T = control.interconnect((P, C, sumblk), input='r', output='y')

"""
# Utility function to parse input and output signal lists
Expand Down Expand Up @@ -2181,6 +2220,16 @@ def _parse_list(signals, signame='input', prefix='u'):
# Return the parsed list
return nsignals, names, gains

# Look for 'input' and 'output' parameter name variants
inputs = _parse_signal_parameter(inputs, 'input', kwargs)
output = _parse_signal_parameter(output, 'outputs', kwargs, end=True)

# Default values for inputs and output
if inputs is None:
raise TypeError("input specification is required")
if output is None:
output = 'y'

# Read the input list
ninputs, input_names, input_gains = _parse_list(
inputs, signame="input", prefix=prefix)
Expand Down
64 changes: 51 additions & 13 deletions control/tests/interconnect_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ def test_summing_junction(inputs, output, dimension, D):
def test_summation_exceptions():
# Bad input description
with pytest.raises(ValueError, match="could not parse input"):
sumblk = ct.summing_junction(None, 'y')
sumblk = ct.summing_junction(np.pi, 'y')

# Bad output description
with pytest.raises(ValueError, match="could not parse output"):
sumblk = ct.summing_junction('u', None)
sumblk = ct.summing_junction('u', np.pi)

# Bad input dimension
with pytest.raises(ValueError, match="unrecognized dimension"):
Expand Down Expand Up @@ -119,18 +119,22 @@ def test_interconnect_implicit():
# inputs=['r', '-y'], output='e', dimension=2)
# S = control.interconnect([P, C, sumblk], inplist='r', outlist='y')

# Make sure that repeated inplist/outlist names generate an error
# Input not unique
Cbad = ct.tf2io(ct.tf(10, [1, 1]), inputs='r', outputs='x', name='C')
with pytest.raises(ValueError, match="not unique"):
Tio_sum = ct.interconnect(
(Cbad, P, sumblk), inplist=['r'], outlist=['y'])
# Make sure that repeated inplist/outlist names work
pi_io = ct.interconnect(
(kp_io, ki_io), inplist=['e'], outlist=['u'])
pi_ss = ct.tf2ss(kp + ki)
np.testing.assert_almost_equal(pi_io.A, pi_ss.A)
np.testing.assert_almost_equal(pi_io.B, pi_ss.B)
np.testing.assert_almost_equal(pi_io.C, pi_ss.C)
np.testing.assert_almost_equal(pi_io.D, pi_ss.D)

# Output not unique
Cbad = ct.tf2io(ct.tf(10, [1, 1]), inputs='e', outputs='y', name='C')
with pytest.raises(ValueError, match="not unique"):
Tio_sum = ct.interconnect(
(Cbad, P, sumblk), inplist=['r'], outlist=['y'])
# Default input and output lists, along with singular versions
Tio_sum = ct.interconnect(
(kp_io, ki_io, P, sumblk), input='r', output='y')
np.testing.assert_almost_equal(Tio_sum.A, Tss.A)
np.testing.assert_almost_equal(Tio_sum.B, Tss.B)
np.testing.assert_almost_equal(Tio_sum.C, Tss.C)
np.testing.assert_almost_equal(Tio_sum.D, Tss.D)

# Signal not found
with pytest.raises(ValueError, match="could not find"):
Expand Down Expand Up @@ -172,3 +176,37 @@ def test_interconnect_docstring():
np.testing.assert_almost_equal(T.B, T_ss.B)
np.testing.assert_almost_equal(T.C, T_ss.C)
np.testing.assert_almost_equal(T.D, T_ss.D)


def test_interconnect_exceptions():
# First make sure the docstring example works
P = ct.tf2io(ct.tf(1, [1, 0]), input='u', output='y')
C = ct.tf2io(ct.tf(10, [1, 1]), input='e', output='u')
sumblk = ct.summing_junction(inputs=['r', '-y'], output='e')
T = ct.interconnect((P, C, sumblk), input='r', output='y')
assert (T.ninputs, T.noutputs, T.nstates) == (1, 1, 2)

# Unrecognized arguments
# LinearIOSystem
with pytest.raises(TypeError, match="unknown parameter"):
P = ct.LinearIOSystem(ct.rss(2, 1, 1), output_name='y')

# Interconnect
with pytest.raises(TypeError, match="unknown parameter"):
T = ct.interconnect((P, C, sumblk), input_name='r', output='y')

# Interconnected system
with pytest.raises(TypeError, match="unknown parameter"):
T = ct.InterconnectedSystem((P, C, sumblk), input_name='r', output='y')

# NonlinearIOSytem
with pytest.raises(TypeError, match="unknown parameter"):
nlios = ct.NonlinearIOSystem(
None, lambda t, x, u, params: u*u, input_count=1, output_count=1)

# Summing junction
with pytest.raises(TypeError, match="input specification is required"):
sumblk = ct.summing_junction()

with pytest.raises(TypeError, match="unknown parameter"):
sumblk = ct.summing_junction(input_count=2, output_count=2)