Skip to content

Better I/O systems support for phase plots #1001

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 25 additions & 19 deletions control/freqplot.py
Original file line number Diff line number Diff line change
Expand Up @@ -1919,7 +1919,7 @@ def _parse_linestyle(style_name, allow_false=False):
# Internal function to add arrows to a curve
def _add_arrows_to_line2D(
axes, line, arrow_locs=[0.2, 0.4, 0.6, 0.8],
arrowstyle='-|>', arrowsize=1, dir=1, transform=None):
arrowstyle='-|>', arrowsize=1, dir=1):
"""
Add arrows to a matplotlib.lines.Line2D at selected locations.

Expand All @@ -1930,7 +1930,6 @@ def _add_arrows_to_line2D(
arrow_locs: list of locations where to insert arrows, % of total length
arrowstyle: style of the arrow
arrowsize: size of the arrow
transform: a matplotlib transform instance, default to data coordinates

Returns:
--------
Expand All @@ -1939,13 +1938,13 @@ def _add_arrows_to_line2D(
Based on https://stackoverflow.com/questions/26911898/

"""
# Get the coordinates of the line, in plot coordinates
if not isinstance(line, mpl.lines.Line2D):
raise ValueError("expected a matplotlib.lines.Line2D object")
x, y = line.get_xdata(), line.get_ydata()

arrow_kw = {
"arrowstyle": arrowstyle,
}
# Determine the arrow properties
arrow_kw = {"arrowstyle": arrowstyle}

color = line.get_color()
use_multicolor_lines = isinstance(color, np.ndarray)
Expand All @@ -1960,36 +1959,43 @@ def _add_arrows_to_line2D(
else:
arrow_kw['linewidth'] = linewidth

if transform is None:
transform = axes.transData
# Figure out the size of the axes (length of diagonal)
xlim, ylim = axes.get_xlim(), axes.get_ylim()
ul, lr = np.array([xlim[0], ylim[0]]), np.array([xlim[1], ylim[1]])
diag = np.linalg.norm(ul - lr)

# Compute the arc length along the curve
s = np.cumsum(np.sqrt(np.diff(x) ** 2 + np.diff(y) ** 2))

# Truncate the number of arrows if the curve is short
# TODO: figure out a smarter way to do this
frac = min(s[-1] / diag, 1)
if len(arrow_locs) and frac < 0.05:
arrow_locs = [] # too short; no arrows at all
elif len(arrow_locs) and frac < 0.2:
arrow_locs = [0.5] # single arrow in the middle

# Plot the arrows (and return list if patches)
arrows = []
for loc in arrow_locs:
n = np.searchsorted(s, s[-1] * loc)

# Figure out what direction to paint the arrow
if dir == 1:
arrow_tail = (x[n], y[n])
arrow_head = (np.mean(x[n:n + 2]), np.mean(y[n:n + 2]))
elif dir == -1:
# Orient the arrow in the other direction on the segment
arrow_tail = (x[n + 1], y[n + 1])
arrow_head = (np.mean(x[n:n + 2]), np.mean(y[n:n + 2]))
else:
raise ValueError("unknown value for keyword 'dir'")
if dir == 1 and n == 0:
# Move the arrow forward by one if it is at start of a segment
n = 1

# Place the head of the arrow at the desired location
arrow_head = [x[n], y[n]]
arrow_tail = [x[n - dir], y[n - dir]]

p = mpl.patches.FancyArrowPatch(
arrow_tail, arrow_head, transform=transform, lw=0,
arrow_tail, arrow_head, transform=axes.transData, lw=0,
**arrow_kw)
axes.add_patch(p)
arrows.append(p)
return arrows



#
# Function to compute Nyquist curve offsets
#
Expand Down
94 changes: 58 additions & 36 deletions control/nlsys.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@

"""

import numpy as np
import scipy as sp
import copy
from warnings import warn

import numpy as np
import scipy as sp

from . import config
from .iosys import InputOutputSystem, _process_signal_list, \
_process_iosys_keywords, isctime, isdtime, common_timebase, _parse_spec
from .timeresp import _check_convert_array, _process_time_response, \
TimeResponseData
from .iosys import InputOutputSystem, _parse_spec, _process_iosys_keywords, \
_process_signal_list, common_timebase, isctime, isdtime
from .timeresp import TimeResponseData, _check_convert_array, \
_process_time_response

__all__ = ['NonlinearIOSystem', 'InterconnectedSystem', 'nlsys',
'input_output_response', 'find_eqpt', 'linearize',
Expand Down Expand Up @@ -132,13 +133,15 @@ def __init__(self, updfcn, outfcn=None, params=None, **kwargs):
if updfcn is None:
if self.nstates is None:
self.nstates = 0
self.updfcn = lambda t, x, u, params: np.zeros(0)
else:
raise ValueError(
"states specified but no update function given.")

if outfcn is None:
# No output function specified => outputs = states
if self.noutputs is None and self.nstates is not None:
if self.noutputs == 0:
self.outfcn = lambda t, x, u, params: np.zeros(0)
elif self.noutputs is None and self.nstates is not None:
self.noutputs = self.nstates
elif self.noutputs is not None and self.noutputs == self.nstates:
# Number of outputs = number of states => all is OK
Expand Down Expand Up @@ -364,9 +367,8 @@ def _rhs(self, t, x, u):
user-friendly interface you may want to use :meth:`dynamics`.

"""
xdot = self.updfcn(t, x, u, self._current_params) \
if self.updfcn is not None else []
return np.array(xdot).reshape((-1,))
return np.asarray(
self.updfcn(t, x, u, self._current_params)).reshape(-1)

def dynamics(self, t, x, u, params=None):
"""Compute the dynamics of a differential or difference equation.
Expand Down Expand Up @@ -403,7 +405,8 @@ def dynamics(self, t, x, u, params=None):
dx/dt or x[t+dt] : ndarray
"""
self._update_params(params)
return self._rhs(t, x, u)
return self._rhs(
t, np.asarray(x).reshape(-1), np.asarray(u).reshape(-1))

def _out(self, t, x, u):
"""Evaluate the output of a system at a given state, input, and time
Expand All @@ -414,9 +417,17 @@ def _out(self, t, x, u):
:meth:`output`.

"""
y = self.outfcn(t, x, u, self._current_params) \
if self.outfcn is not None else x
return np.array(y).reshape((-1,))
#
# To allow lazy evaluation of the system size, we allow for the
# possibility that noutputs is left unspecified when the system
# is created => we have to check for that case here (and return
# the system state or a portion of it).
#
if self.outfcn is None:
return x if self.noutputs is None else x[:self.noutputs]
else:
return np.asarray(
self.outfcn(t, x, u, self._current_params)).reshape(-1)

def output(self, t, x, u, params=None):
"""Compute the output of the system
Expand Down Expand Up @@ -444,7 +455,8 @@ def output(self, t, x, u, params=None):
y : ndarray
"""
self._update_params(params)
return self._out(t, x, u)
return self._out(
t, np.asarray(x).reshape(-1), np.asarray(u).reshape(-1))

def feedback(self, other=1, sign=-1, params=None):
"""Feedback interconnection between two input/output systems
Expand Down Expand Up @@ -517,14 +529,13 @@ def linearize(self, x0, u0, t=0, params=None, eps=1e-6,
# numerical linearization use the `_rhs()` and `_out()` member
# functions.
#

# If x0 and u0 are specified as lists, concatenate the elements
x0 = _concatenate_list_elements(x0, 'x0')
u0 = _concatenate_list_elements(u0, 'u0')

# Figure out dimensions if they were not specified.
nstates = _find_size(self.nstates, x0)
ninputs = _find_size(self.ninputs, u0)
nstates = _find_size(self.nstates, x0, "states")
ninputs = _find_size(self.ninputs, u0, "inputs")

# Convert x0, u0 to arrays, if needed
if np.isscalar(x0):
Expand All @@ -533,7 +544,7 @@ def linearize(self, x0, u0, t=0, params=None, eps=1e-6,
u0 = np.ones((ninputs,)) * u0

# Compute number of outputs by evaluating the output function
noutputs = _find_size(self.noutputs, self._out(t, x0, u0))
noutputs = _find_size(self.noutputs, self._out(t, x0, u0), "outputs")

# Update the current parameters
self._update_params(params)
Expand Down Expand Up @@ -1306,7 +1317,7 @@ def nlsys(


def input_output_response(
sys, T, U=0., X0=0, params=None,
sys, T, U=0., X0=0, params=None, ignore_errors=False,
transpose=False, return_x=False, squeeze=None,
solve_ivp_kwargs=None, t_eval='T', **kwargs):
"""Compute the output response of a system to a given input.
Expand Down Expand Up @@ -1382,6 +1393,11 @@ def input_output_response(
to 'RK45'.
solve_ivp_kwargs : dict, optional
Pass additional keywords to :func:`scipy.integrate.solve_ivp`.
ignore_errors : bool, optional
If ``False`` (default), errors during computation of the trajectory
will raise a ``RuntimeError`` exception. If ``True``, do not raise
an exception and instead set ``results.success`` to ``False`` and
place an error message in ``results.message``.

Raises
------
Expand Down Expand Up @@ -1516,7 +1532,7 @@ def input_output_response(
X0 = np.hstack([X0, np.zeros(sys.nstates - X0.size)])

# Compute the number of states
nstates = _find_size(sys.nstates, X0)
nstates = _find_size(sys.nstates, X0, "states")

# create X0 if not given, test if X0 has correct shape
X0 = _check_convert_array(X0, [(nstates,), (nstates, 1)],
Expand Down Expand Up @@ -1583,7 +1599,11 @@ def ivp_rhs(t, x):
ivp_rhs, (T0, Tf), X0, t_eval=t_eval,
vectorized=False, **solve_ivp_kwargs)
if not soln.success:
raise RuntimeError("solve_ivp failed: " + soln.message)
message = "solve_ivp failed: " + soln.message
if not ignore_errors:
raise RuntimeError(message)
else:
message = None

# Compute inputs and outputs for each time point
u = np.zeros((ninputs, len(soln.t)))
Expand Down Expand Up @@ -1639,7 +1659,7 @@ def ivp_rhs(t, x):
u = np.transpose(np.array(u))

# Mark solution as successful
soln.success = True # No way to fail
soln.success, message = True, None # No way to fail

else: # Neither ctime or dtime??
raise TypeError("Can't determine system type")
Expand All @@ -1649,7 +1669,8 @@ def ivp_rhs(t, x):
output_labels=sys.output_labels, input_labels=sys.input_labels,
state_labels=sys.state_labels, sysname=sys.name,
title="Input/output response for " + sys.name,
transpose=transpose, return_x=return_x, squeeze=squeeze)
transpose=transpose, return_x=return_x, squeeze=squeeze,
success=soln.success, message=message)


def find_eqpt(sys, x0, u0=None, y0=None, t=0, params=None,
Expand Down Expand Up @@ -1732,9 +1753,9 @@ def find_eqpt(sys, x0, u0=None, y0=None, t=0, params=None,
from scipy.optimize import root

# Figure out the number of states, inputs, and outputs
nstates = _find_size(sys.nstates, x0)
ninputs = _find_size(sys.ninputs, u0)
noutputs = _find_size(sys.noutputs, y0)
nstates = _find_size(sys.nstates, x0, "states")
ninputs = _find_size(sys.ninputs, u0, "inputs")
noutputs = _find_size(sys.noutputs, y0, "outputs")

# Convert x0, u0, y0 to arrays, if needed
if np.isscalar(x0):
Expand Down Expand Up @@ -1977,23 +1998,23 @@ def linearize(sys, xeq, ueq=None, t=0, params=None, **kw):
return sys.linearize(xeq, ueq, t=t, params=params, **kw)


def _find_size(sysval, vecval):
def _find_size(sysval, vecval, label):
"""Utility function to find the size of a system parameter

If both parameters are not None, they must be consistent.
"""
if hasattr(vecval, '__len__'):
if sysval is not None and sysval != len(vecval):
raise ValueError("Inconsistent information to determine size "
"of system component")
raise ValueError(
f"inconsistent information for number of {label}")
return len(vecval)
# None or 0, which is a valid value for "a (sysval, ) vector of zeros".
if not vecval:
return 0 if sysval is None else sysval
elif sysval == 1:
# (1, scalar) is also a valid combination from legacy code
return 1
raise ValueError("can't determine size of system component")
raise ValueError(f"can't determine number of {label}")


# Function to create an interconnected system
Expand Down Expand Up @@ -2241,7 +2262,7 @@ def interconnect(
`outputs`, for more natural naming of SISO systems.

"""
from .statesp import StateSpace, LinearICSystem, _convert_to_statespace
from .statesp import LinearICSystem, StateSpace, _convert_to_statespace
from .xferfcn import TransferFunction

dt = kwargs.pop('dt', None) # bypass normal 'dt' processing
Expand Down Expand Up @@ -2551,7 +2572,7 @@ def interconnect(
return newsys


# Utility function to allow lists states, inputs
# Utility function to allow lists of states, inputs
def _concatenate_list_elements(X, name='X'):
# If we were passed a list, concatenate the elements together
if isinstance(X, (tuple, list)):
Expand All @@ -2574,13 +2595,14 @@ def _convert_static_iosystem(sys):
# Convert sys1 to an I/O system if needed
if isinstance(sys, (int, float, np.number)):
return NonlinearIOSystem(
None, lambda t, x, u, params: sys * u, inputs=1, outputs=1)
None, lambda t, x, u, params: sys * u,
outputs=1, inputs=1, dt=None)

elif isinstance(sys, np.ndarray):
sys = np.atleast_2d(sys)
return NonlinearIOSystem(
None, lambda t, x, u, params: sys @ u,
outputs=sys.shape[0], inputs=sys.shape[1])
outputs=sys.shape[0], inputs=sys.shape[1], dt=None)

def connection_table(sys, show_names=False, column_width=32):
"""Print table of connections inside an interconnected system model.
Expand Down
Loading
Loading