Skip to content

Optimization-based and moving horizon estimation #877

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
*.pyc
__pycache__/
build/
dist/
htmlcov/
Expand Down
87 changes: 87 additions & 0 deletions benchmarks/optestim_bench.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# optestim_bench.py - benchmarks for optimal/moving horizon estimation
# RMM, 14 Mar 2023
#
# This benchmark tests the timing for the optimal estimation routines and
# is intended to be used for helping tune the performance of the functions
# used for optimization-based estimation.

import numpy as np
import math
import control as ct
import control.optimal as opt

minimizer_table = {
'default': (None, {}),
'trust': ('trust-constr', {}),
'trust_bigstep': ('trust-constr', {'finite_diff_rel_step': 0.01}),
'SLSQP': ('SLSQP', {}),
'SLSQP_bigstep': ('SLSQP', {'eps': 0.01}),
'COBYLA': ('COBYLA', {}),
}

# Table to turn on and off process disturbances and measurement noise
noise_table = {
'noisy': (1e-1, 1e-3),
'nodist': (0, 1e-3),
'nomeas': (1e-1, 0),
'clean': (0, 0)
}


# Assess performance as a function of optimization and integration methods
def time_oep_minimizer_methods(minimizer_name, noise_name, initial_guess):
# Use fixed system to avoid randome errors (was csys = ct.rss(4, 2, 5))
csys = ct.ss(
[[-0.5, 1, 0, 0], [0, -1, 1, 0], [0, 0, -2, 1], [0, 0, 0, -3]], # A
[[0, 0.1], [0, 0.1], [0, 0.1], [1, 0.1]], # B
[[1, 0, 0, 0], [0, 0, 1, 0]], # C
0, dt=0)
# dsys = ct.c2d(csys, dt)
# sys = csys if dt == 0 else dsys
sys = csys

# Decide on process disturbances and measurement noise
dist_mag, meas_mag = noise_table[noise_name]

# Create disturbances and noise (fixed, to avoid random errors)
Rv = 0.1 * np.eye(1) # scalar disturbance
Rw = 0.01 * np.eye(sys.noutputs)
timepts = np.arange(0, 10.1, 1)
V = np.array(
[0 if t % 2 == 1 else 1 if t % 4 == 0 else -1 for t in timepts]
).reshape(1, -1) * dist_mag
W = np.vstack([np.sin(2*timepts), np.cos(3*timepts)]) * meas_mag

# Generate system data
U = np.sin(timepts).reshape(1, -1)
res = ct.input_output_response(sys, timepts, [U, V])
Y = res.outputs + W

# Decide on the initial guess to use
if initial_guess == 'xhat':
initial_guess = (res.states, V*0)
elif initial_guess == 'both':
initial_guess = (res.states, V)
else:
initial_guess = None


# Set up optimal estimation function using Gaussian likelihoods for cost
traj_cost = opt.gaussian_likelihood_cost(sys, Rv, Rw)
init_cost = lambda xhat, x: (xhat - x) @ (xhat - x)
oep = opt.OptimalEstimationProblem(
sys, timepts, traj_cost, terminal_cost=init_cost)

# Noise and disturbances (the standard case)
est = oep.compute_estimate(Y, U, initial_guess=initial_guess)
assert est.success
np.testing.assert_allclose(
est.states[:, -1], res.states[:, -1], atol=1e-1, rtol=1e-2)


# Parameterize the test against different choices of integrator and minimizer
time_oep_minimizer_methods.param_names = ['minimizer', 'noise', 'initial']
time_oep_minimizer_methods.params = (
['default', 'trust', 'SLSQP', 'COBYLA'],
['noisy', 'nodist', 'nomeas', 'clean'],
['none', 'xhat', 'both'])
23 changes: 23 additions & 0 deletions control/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import collections
import warnings
from .exception import ControlArgument

__all__ = ['defaults', 'set_defaults', 'reset_defaults',
'use_matlab_defaults', 'use_fbs_defaults',
Expand Down Expand Up @@ -310,3 +311,25 @@ def use_legacy_defaults(version):
set_defaults('nyquist', mirror_style='-')

return (major, minor, patch)


#
# Utility function for processing legacy keywords
#
# Use this function to handle a legacy keyword that has been renamed. This
# function pops the old keyword off of the kwargs dictionary and issues a
# warning. if both the old and new keyword are present, a ControlArgument
# exception is raised.
#
def _process_legacy_keyword(kwargs, oldkey, newkey, newval):
if kwargs.get(oldkey) is not None:
warnings.warn(
f"keyworld '{oldkey}' is deprecated; use '{newkey}'",
DeprecationWarning)
if newval is not None:
raise ControlArgument(
f"duplicate keywords '{oldkey}' and '{newkey}'")
else:
return kwargs.pop(oldkey)
else:
return newval
2 changes: 1 addition & 1 deletion control/iosys.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class for a set of subclasses that are used to implement specific
System name (used for specifying signals). If unspecified, a generic
name <sys[id]> is generated with a unique integer id.
params : dict, optional
Parameter values for the systems. Passed to the evaluation functions
Parameter values for the system. Passed to the evaluation functions
for the system as default values, overriding internal defaults.

Attributes
Expand Down
104 changes: 102 additions & 2 deletions control/namedio.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
'namedio.sampled_system_name_prefix': '',
'namedio.sampled_system_name_suffix': '$sampled'
}


class NamedIOSystem(object):
def __init__(
self, name=None, inputs=None, outputs=None, states=None, **kwargs):
Expand Down Expand Up @@ -584,3 +584,103 @@ def _process_signal_list(signals, prefix='s'):

else:
raise TypeError("Can't parse signal list %s" % str(signals))


#
# Utility functions to process signal indices
#
# Signal indices can be specified in one of four ways:
#
# 1. As a positive integer 'm', in which case we return a list
# corresponding to the first 'm' elements of a range of a given length
#
# 2. As a negative integer '-m', in which case we return a list
# corresponding to the last 'm' elements of a range of a given length
#
# 3. As a slice, in which case we return the a list corresponding to the
# indices specified by the slice of a range of a given length
#
# 4. As a list of ints or strings specifying specific indices. Strings are
# compared to a list of labels to determine the index.
#
def _process_indices(arg, name, labels, length):
# Default is to return indices up to a certain length
arg = length if arg is None else arg

if isinstance(arg, int):
# Return the start or end of the list of possible indices
return list(range(arg)) if arg > 0 else list(range(length))[arg:]

elif isinstance(arg, slice):
# Return the indices referenced by the slice
return list(range(length))[arg]

elif isinstance(arg, list):
# Make sure the length is OK
if len(arg) > length:
raise ValueError(
f"{name}_indices list is too long; max length = {length}")

# Return the list, replacing strings with corresponding indices
arg=arg.copy()
for i, idx in enumerate(arg):
if isinstance(idx, str):
arg[i] = labels.index(arg[i])
return arg

raise ValueError(f"invalid argument for {name}_indices")

#
# Process control and disturbance indices
#
# For systems with inputs and disturbances, the control_indices and
# disturbance_indices keywords are used to specify which is which. If only
# one is given, the other is assumed to be the remaining indices in the
# system input. If neither is given, the disturbance inputs are assumed to
# be the same as the control inputs.
#
def _process_control_disturbance_indices(
sys, control_indices, disturbance_indices):

if control_indices is None and disturbance_indices is None:
# Disturbances enter in the same place as the controls
dist_idx = ctrl_idx = list(range(sys.ninputs))

elif control_indices is not None:
# Process the control indices
ctrl_idx = _process_indices(
control_indices, 'control', sys.input_labels, sys.ninputs)

# Disturbance indices are the complement of control indices
dist_idx = [i for i in range(sys.ninputs) if i not in ctrl_idx]

else: # disturbance_indices is not None
# If passed an integer, count from the end of the input vector
arg = -disturbance_indices if isinstance(disturbance_indices, int) \
else disturbance_indices

dist_idx = _process_indices(
arg, 'disturbance', sys.input_labels, sys.ninputs)

# Set control indices to complement disturbance indices
ctrl_idx = [i for i in range(sys.ninputs) if i not in dist_idx]

return ctrl_idx, dist_idx


# Process labels
def _process_labels(labels, name, default):
if isinstance(labels, str):
labels = [labels.format(i=i) for i in range(len(default))]

if labels is None:
labels = default
elif isinstance(labels, list):
if len(labels) != len(default):
raise ValueError(
f"incorrect length of {name}_labels: {len(labels)}"
f" instead of {len(default)}")
else:
raise ValueError(f"{name}_labels should be a string or a list")

return labels
Loading