Skip to content

Multivariable interconnect functionality #881

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
802 changes: 505 additions & 297 deletions control/iosys.py

Large diffs are not rendered by default.

73 changes: 65 additions & 8 deletions control/namedio.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import numpy as np
from copy import deepcopy
from warnings import warn
import re
from . import config

__all__ = ['issiso', 'timebase', 'common_timebase', 'timebaseEqual',
Expand All @@ -22,22 +23,25 @@
'namedio.linearized_system_name_suffix': '$linearized',
'namedio.sampled_system_name_prefix': '',
'namedio.sampled_system_name_suffix': '$sampled',
'namedio.indexed_system_name_prefix': '',
'namedio.indexed_system_name_suffix': '$indexed',
'namedio.converted_system_name_prefix': '',
'namedio.converted_system_name_suffix': '$converted',
}


class NamedIOSystem(object):
def __init__(
self, name=None, inputs=None, outputs=None, states=None, **kwargs):
self, name=None, inputs=None, outputs=None, states=None,
input_prefix='u', output_prefix='y', state_prefix='x', **kwargs):

# system name
self.name = self._name_or_default(name)

# Parse and store the number of inputs and outputs
self.set_inputs(inputs)
self.set_outputs(outputs)
self.set_states(states)
self.set_inputs(inputs, prefix=input_prefix)
self.set_outputs(outputs, prefix=output_prefix)
self.set_states(states, prefix=state_prefix)

# Process timebase: if not given use default, but allow None as value
self.dt = _process_dt_keyword(kwargs)
Expand All @@ -56,6 +60,9 @@ def _name_or_default(self, name=None, prefix_suffix_name=None):
if name is None:
name = "sys[{}]".format(NamedIOSystem._idCounter)
NamedIOSystem._idCounter += 1
elif re.match(r".*\..*", name):
raise ValueError(f"invalid system name '{name}' ('.' not allowed)")

prefix = "" if prefix_suffix_name is None else config.defaults[
'namedio.' + prefix_suffix_name + '_system_name_prefix']
suffix = "" if prefix_suffix_name is None else config.defaults[
Expand All @@ -64,7 +71,6 @@ def _name_or_default(self, name=None, prefix_suffix_name=None):

# Check if system name is generic
def _generic_name_check(self):
import re
return re.match(r'^sys\[\d*\]$', self.name) is not None

#
Expand Down Expand Up @@ -106,6 +112,39 @@ def __str__(self):
def _find_signal(self, name, sigdict):
return sigdict.get(name, None)

# Find a list of signals by name, index, or pattern
def _find_signals(self, name_list, sigdict):
if not isinstance(name_list, (list, tuple)):
name_list = [name_list]

index_list = []
for name in name_list:
# Look for signal ranges (slice-like or base name)
ms = re.match(r'([\w$]+)\[([\d]*):([\d]*)\]$', name) # slice
mb = re.match(r'([\w$]+)$', name) # base
if ms:
base = ms.group(1)
start = None if ms.group(2) == '' else int(ms.group(2))
stop = None if ms.group(3) == '' else int(ms.group(3))
for var in sigdict:
# Find variables that match
msig = re.match(r'([\w$]+)\[([\d]+)\]$', var)
if msig and msig.group(1) == base and \
(start is None or int(msig.group(2)) >= start) and \
(stop is None or int(msig.group(2)) < stop):
index_list.append(sigdict.get(var))
elif mb and sigdict.get(name, None) is None:
# Try to use name as a base name
for var in sigdict:
msig = re.match(name + r'\[([\d]+)\]$', var)
if msig:
index_list.append(sigdict.get(var))
else:
index_list.append(sigdict.get(name, None))

return None if len(index_list) == 0 or \
any([idx is None for idx in index_list]) else index_list

def _copy_names(self, sys, prefix="", suffix="", prefix_suffix_name=None):
"""copy the signal and system name of sys. Name is given as a keyword
in case a specific name (e.g. append 'linearized') is desired. """
Expand Down Expand Up @@ -151,7 +190,6 @@ def copy(self, name=None, use_prefix_suffix=True):
return newsys

def set_inputs(self, inputs, prefix='u'):

"""Set the number/names of the system inputs.

Parameters
Expand All @@ -175,6 +213,10 @@ def find_input(self, name):
"""Find the index for an input given its name (`None` if not found)"""
return self.input_index.get(name, None)

def find_inputs(self, name_list):
"""Return list of indices matching input spec (`None` if not found)"""
return self._find_signals(name_list, self.input_index)

# Property for getting and setting list of input signals
input_labels = property(
lambda self: list(self.input_index.keys()), # getter
Expand Down Expand Up @@ -204,6 +246,10 @@ def find_output(self, name):
"""Find the index for an output given its name (`None` if not found)"""
return self.output_index.get(name, None)

def find_outputs(self, name_list):
"""Return list of indices matching output spec (`None` if not found)"""
return self._find_signals(name_list, self.output_index)

# Property for getting and setting list of output signals
output_labels = property(
lambda self: list(self.output_index.keys()), # getter
Expand All @@ -227,12 +273,16 @@ def set_states(self, states, prefix='x'):

"""
self.nstates, self.state_index = \
_process_signal_list(states, prefix=prefix)
_process_signal_list(states, prefix=prefix, allow_dot=True)

def find_state(self, name):
"""Find the index for a state given its name (`None` if not found)"""
return self.state_index.get(name, None)

def find_states(self, name_list):
"""Return list of indices matching state spec (`None` if not found)"""
return self._find_signals(name_list, self.state_index)

# Property for getting and setting list of state signals
state_labels = property(
lambda self: list(self.state_index.keys()), # getter
Expand Down Expand Up @@ -578,7 +628,7 @@ def _process_dt_keyword(keywords, defaults={}, static=False):


# Utility function to parse a list of signals
def _process_signal_list(signals, prefix='s'):
def _process_signal_list(signals, prefix='s', allow_dot=False):
if signals is None:
# No information provided; try and make it up later
return None, {}
Expand All @@ -589,10 +639,17 @@ def _process_signal_list(signals, prefix='s'):

elif isinstance(signals, str):
# Single string given => single signal with given name
if not allow_dot and re.match(r".*\..*", signals):
raise ValueError(
f"invalid signal name '{signals}' ('.' not allowed)")
return 1, {signals: 0}

elif all(isinstance(s, str) for s in signals):
# Use the list of strings as the signal names
for signal in signals:
if not allow_dot and re.match(r".*\..*", signal):
raise ValueError(
f"invalid signal name '{signal}' ('.' not allowed)")
return len(signals), {signals[i]: i for i in range(len(signals))}

else:
Expand Down
13 changes: 9 additions & 4 deletions control/statesp.py
Original file line number Diff line number Diff line change
Expand Up @@ -1294,10 +1294,15 @@ def __getitem__(self, indices):
"""Array style access"""
if len(indices) != 2:
raise IOError('must provide indices of length 2 for state space')
i = indices[0]
j = indices[1]
return StateSpace(self.A, self.B[:, j], self.C[i, :],
self.D[i, j], self.dt)
outdx = indices[0] if isinstance(indices[0], list) else [indices[0]]
inpdx = indices[1] if isinstance(indices[1], list) else [indices[1]]
sysname = config.defaults['namedio.indexed_system_name_prefix'] + \
self.name + config.defaults['namedio.indexed_system_name_suffix']
return StateSpace(
self.A, self.B[:, inpdx], self.C[outdx, :], self.D[outdx, inpdx],
self.dt, name=sysname,
inputs=[self.input_labels[i] for i in list(inpdx)],
outputs=[self.output_labels[i] for i in list(outdx)])

def sample(self, Ts, method='zoh', alpha=None, prewarp_frequency=None,
name=None, copy_names=True, **kwargs):
Expand Down
15 changes: 15 additions & 0 deletions control/tests/config_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,3 +301,18 @@ def test_get_param_last(self):

assert ct.config._get_param(
'config', 'second', kwargs, pop=True, last=True) == 2

def test_system_indexing(self):
# Default renaming
sys = ct.TransferFunction(
[ [ [1], [2], [3]], [ [3], [4], [5]] ],
[ [[1, 2], [1, 3], [1, 4]], [[1, 4], [1, 5], [1, 6]] ], 0.5)
sys1 = sys[1:, 1:]
assert sys1.name == sys.name + '$indexed'

# Reset the format
ct.config.set_defaults(
'namedio', indexed_system_name_prefix='PRE',
indexed_system_name_suffix='POST')
sys2 = sys[1:, 1:]
assert sys2.name == 'PRE' + sys.name + 'POST'
Loading