Skip to content
Merged
802 changes: 505 additions & 297 deletions control/iosys.py

Large diffs are not rendered by default.

73 changes: 65 additions & 8 deletions control/namedio.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import numpy as np
from copy import deepcopy
from warnings import warn
import re
from . import config

__all__ = ['issiso', 'timebase', 'common_timebase', 'timebaseEqual',
Expand All @@ -22,22 +23,25 @@
'namedio.linearized_system_name_suffix': '$linearized',
'namedio.sampled_system_name_prefix': '',
'namedio.sampled_system_name_suffix': '$sampled',
'namedio.indexed_system_name_prefix': '',
'namedio.indexed_system_name_suffix': '$indexed',
'namedio.converted_system_name_prefix': '',
'namedio.converted_system_name_suffix': '$converted',
}


class NamedIOSystem(object):
def __init__(
self, name=None, inputs=None, outputs=None, states=None, **kwargs):
self, name=None, inputs=None, outputs=None, states=None,
input_prefix='u', output_prefix='y', state_prefix='x', **kwargs):

# system name
self.name = self._name_or_default(name)

# Parse and store the number of inputs and outputs
self.set_inputs(inputs)
self.set_outputs(outputs)
self.set_states(states)
self.set_inputs(inputs, prefix=input_prefix)
self.set_outputs(outputs, prefix=output_prefix)
self.set_states(states, prefix=state_prefix)

# Process timebase: if not given use default, but allow None as value
self.dt = _process_dt_keyword(kwargs)
Expand All @@ -56,6 +60,9 @@ def _name_or_default(self, name=None, prefix_suffix_name=None):
if name is None:
name = "sys[{}]".format(NamedIOSystem._idCounter)
NamedIOSystem._idCounter += 1
elif re.match(r".*\..*", name):
raise ValueError(f"invalid system name '{name}' ('.' not allowed)")

prefix = "" if prefix_suffix_name is None else config.defaults[
'namedio.' + prefix_suffix_name + '_system_name_prefix']
suffix = "" if prefix_suffix_name is None else config.defaults[
Expand All @@ -64,7 +71,6 @@ def _name_or_default(self, name=None, prefix_suffix_name=None):

# Check if system name is generic
def _generic_name_check(self):
import re
return re.match(r'^sys\[\d*\]$', self.name) is not None

#
Expand Down Expand Up @@ -106,6 +112,39 @@ def __str__(self):
def _find_signal(self, name, sigdict):
return sigdict.get(name, None)

# Find a list of signals by name, index, or pattern
def _find_signals(self, name_list, sigdict):
if not isinstance(name_list, (list, tuple)):
name_list = [name_list]

index_list = []
for name in name_list:
# Look for signal ranges (slice-like or base name)
ms = re.match(r'([\w$]+)\[([\d]*):([\d]*)\]$', name) # slice
mb = re.match(r'([\w$]+)$', name) # base
if ms:
base = ms.group(1)
start = None if ms.group(2) == '' else int(ms.group(2))
stop = None if ms.group(3) == '' else int(ms.group(3))
for var in sigdict:
# Find variables that match
msig = re.match(r'([\w$]+)\[([\d]+)\]$', var)
if msig and msig.group(1) == base and \
(start is None or int(msig.group(2)) >= start) and \
(stop is None or int(msig.group(2)) < stop):
index_list.append(sigdict.get(var))
elif mb and sigdict.get(name, None) is None:
# Try to use name as a base name
for var in sigdict:
msig = re.match(name + r'\[([\d]+)\]$', var)
if msig:
index_list.append(sigdict.get(var))
else:
index_list.append(sigdict.get(name, None))

return None if len(index_list) == 0 or \
any([idx is None for idx in index_list]) else index_list

def _copy_names(self, sys, prefix="", suffix="", prefix_suffix_name=None):
"""copy the signal and system name of sys. Name is given as a keyword
in case a specific name (e.g. append 'linearized') is desired. """
Expand Down Expand Up @@ -151,7 +190,6 @@ def copy(self, name=None, use_prefix_suffix=True):
return newsys

def set_inputs(self, inputs, prefix='u'):

"""Set the number/names of the system inputs.

Parameters
Expand All @@ -175,6 +213,10 @@ def find_input(self, name):
"""Find the index for an input given its name (`None` if not found)"""
return self.input_index.get(name, None)

def find_inputs(self, name_list):
"""Return list of indices matching input spec (`None` if not found)"""
return self._find_signals(name_list, self.input_index)

# Property for getting and setting list of input signals
input_labels = property(
lambda self: list(self.input_index.keys()), # getter
Expand Down Expand Up @@ -204,6 +246,10 @@ def find_output(self, name):
"""Find the index for an output given its name (`None` if not found)"""
return self.output_index.get(name, None)

def find_outputs(self, name_list):
"""Return list of indices matching output spec (`None` if not found)"""
return self._find_signals(name_list, self.output_index)

# Property for getting and setting list of output signals
output_labels = property(
lambda self: list(self.output_index.keys()), # getter
Expand All @@ -227,12 +273,16 @@ def set_states(self, states, prefix='x'):

"""
self.nstates, self.state_index = \
_process_signal_list(states, prefix=prefix)
_process_signal_list(states, prefix=prefix, allow_dot=True)

def find_state(self, name):
"""Find the index for a state given its name (`None` if not found)"""
return self.state_index.get(name, None)

def find_states(self, name_list):
"""Return list of indices matching state spec (`None` if not found)"""
return self._find_signals(name_list, self.state_index)

# Property for getting and setting list of state signals
state_labels = property(
lambda self: list(self.state_index.keys()), # getter
Expand Down Expand Up @@ -578,7 +628,7 @@ def _process_dt_keyword(keywords, defaults={}, static=False):


# Utility function to parse a list of signals
def _process_signal_list(signals, prefix='s'):
def _process_signal_list(signals, prefix='s', allow_dot=False):
if signals is None:
# No information provided; try and make it up later
return None, {}
Expand All @@ -589,10 +639,17 @@ def _process_signal_list(signals, prefix='s'):

elif isinstance(signals, str):
# Single string given => single signal with given name
if not allow_dot and re.match(r".*\..*", signals):
raise ValueError(
f"invalid signal name '{signals}' ('.' not allowed)")
return 1, {signals: 0}

elif all(isinstance(s, str) for s in signals):
# Use the list of strings as the signal names
for signal in signals:
if not allow_dot and re.match(r".*\..*", signal):
raise ValueError(
f"invalid signal name '{signal}' ('.' not allowed)")
return len(signals), {signals[i]: i for i in range(len(signals))}

else:
Expand Down
13 changes: 9 additions & 4 deletions control/statesp.py
Original file line number Diff line number Diff line change
Expand Up @@ -1294,10 +1294,15 @@ def __getitem__(self, indices):
"""Array style access"""
if len(indices) != 2:
raise IOError('must provide indices of length 2 for state space')
i = indices[0]
j = indices[1]
return StateSpace(self.A, self.B[:, j], self.C[i, :],
self.D[i, j], self.dt)
outdx = indices[0] if isinstance(indices[0], list) else [indices[0]]
inpdx = indices[1] if isinstance(indices[1], list) else [indices[1]]
sysname = config.defaults['namedio.indexed_system_name_prefix'] + \
self.name + config.defaults['namedio.indexed_system_name_suffix']
return StateSpace(
self.A, self.B[:, inpdx], self.C[outdx, :], self.D[outdx, inpdx],
self.dt, name=sysname,
inputs=[self.input_labels[i] for i in list(inpdx)],
outputs=[self.output_labels[i] for i in list(outdx)])

def sample(self, Ts, method='zoh', alpha=None, prewarp_frequency=None,
name=None, copy_names=True, **kwargs):
Expand Down
15 changes: 15 additions & 0 deletions control/tests/config_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,3 +301,18 @@ def test_get_param_last(self):

assert ct.config._get_param(
'config', 'second', kwargs, pop=True, last=True) == 2

def test_system_indexing(self):
# Default renaming
sys = ct.TransferFunction(
[ [ [1], [2], [3]], [ [3], [4], [5]] ],
[ [[1, 2], [1, 3], [1, 4]], [[1, 4], [1, 5], [1, 6]] ], 0.5)
sys1 = sys[1:, 1:]
assert sys1.name == sys.name + '$indexed'

# Reset the format
ct.config.set_defaults(
'namedio', indexed_system_name_prefix='PRE',
indexed_system_name_suffix='POST')
sys2 = sys[1:, 1:]
assert sys2.name == 'PRE' + sys.name + 'POST'
Loading