Skip to content

continuous time system support for create_estimator_iosystem #829

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Dec 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 51 additions & 14 deletions control/stochsys.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import scipy as sp
from math import sqrt

from .iosys import InputOutputSystem, NonlinearIOSystem
from .iosys import InputOutputSystem, LinearIOSystem, NonlinearIOSystem
from .lti import LTI
from .namedio import isctime, isdtime
from .mateqn import care, dare, _check_shape
Expand Down Expand Up @@ -314,7 +314,13 @@ def create_estimator_iosystem(
"""Create an I/O system implementing a linqear quadratic estimator

This function creates an input/output system that implements a
state estimator of the form
continuous time state estimator of the form

\dot xhat = A x + B u - L (C xhat - y)
\dot P = A P + P A^T + F QN F^T - P C^T RN^{-1} C P
L = P C^T RN^{-1}

or a discrete time state estimator of the form

xhat[k + 1] = A x[k] + B u[k] - L (C xhat[k] - y[k])
P[k + 1] = A P A^T + F QN F^T - A P C^T Reps^{-1} C P A
Expand Down Expand Up @@ -359,8 +365,9 @@ def create_estimator_iosystem(
Returns
-------
estim : InputOutputSystem
Input/output system representing the estimator. This system takes the
system input and output and generates the estimated state.
Input/output system representing the estimator. This system takes
the system output y and input u and generates the estimated state
xhat.

Notes
-----
Expand All @@ -384,8 +391,8 @@ def create_estimator_iosystem(
"""

# Make sure that we were passed an I/O system as an input
if not isinstance(sys, InputOutputSystem):
raise ControlArgument("Input system must be I/O system")
if not isinstance(sys, LinearIOSystem):
raise ControlArgument("Input system must be a linear I/O system")

# Extract the matrices that we need for easy reference
A, B = sys.A, sys.B
Expand All @@ -409,7 +416,7 @@ def create_estimator_iosystem(
# Initialize the covariance matrix
if P0 is None:
# Initalize P0 to the steady state value
_, P0, _ = lqe(A, G, C, QN, RN)
L0, P0, _ = lqe(A, G, C, QN, RN)

# Figure out the labels to use
if isinstance(state_labels, str):
Expand All @@ -432,24 +439,54 @@ def create_estimator_iosystem(
sensor_labels = [sensor_labels.format(i=i) for i in range(C.shape[0])]

if isctime(sys):
raise NotImplementedError("continuous time not yet implemented")

else:
# Create an I/O system for the state feedback gains
# Note: reshape vectors into column vectors for legacy np.matrix

R_inv = np.linalg.inv(RN)
Reps_inv = C.T @ R_inv @ C

def _estim_update(t, x, u, params):
# See if we are estimating or predicting
correct = params.get('correct', True)

# Get the state of the estimator
xhat = x[0:sys.nstates].reshape(-1, 1)
P = x[sys.nstates:].reshape(sys.nstates, sys.nstates)

# Extract the inputs to the estimator
y = u[0:C.shape[0]].reshape(-1, 1)
u = u[C.shape[0]:].reshape(-1, 1)

# Compute the optimal gain
L = P @ C.T @ R_inv

# Update the state estimate
dxhat = A @ xhat + B @ u # prediction
if correct:
dxhat -= L @ (C @ xhat - y) # correction

# Update the covariance
dP = A @ P + P @ A.T + G @ QN @ G.T
if correct:
dP -= P @ Reps_inv @ P

# Return the update
return np.hstack([dxhat.reshape(-1), dP.reshape(-1)])

else:
def _estim_update(t, x, u, params):
# See if we are estimating or predicting
correct = params.get('correct', True)

# Get the state of the estimator
# Get the state of the estimator
xhat = x[0:sys.nstates].reshape(-1, 1)
P = x[sys.nstates:].reshape(sys.nstates, sys.nstates)

# Extract the inputs to the estimator
y = u[0:C.shape[0]].reshape(-1, 1)
u = u[C.shape[0]:].reshape(-1, 1)

# Compute the optimal again
# Compute the optimal gain
Reps_inv = np.linalg.inv(RN + C @ P @ C.T)
L = A @ P @ C.T @ Reps_inv

Expand All @@ -466,8 +503,8 @@ def _estim_update(t, x, u, params):
# Return the update
return np.hstack([dxhat.reshape(-1), dP.reshape(-1)])

def _estim_output(t, x, u, params):
return x[0:sys.nstates]
def _estim_output(t, x, u, params):
return x[0:sys.nstates]

# Define the estimator system
return NonlinearIOSystem(
Expand Down
10 changes: 6 additions & 4 deletions control/tests/flatsys_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,15 +212,17 @@ def test_kinematic_car_ocp(
elif re.match("Iteration limit.*", traj_ocp.message) and \
re.match(
"conda ubuntu-3.* Generic", os.getenv('JOBNAME', '')) and \
np.__version__ == '1.24.0':
re.match("1.24.[01]", np.__version__):
pytest.xfail("gh820: iteration limit exceeded")

else:
# Dump out information to allow creation of an exception
print("Platform: ", platform.platform())
print("Python: ", platform.python_version())
print("Message:", traj_ocp.message)
print("Platform:", platform.platform())
print("Python:", platform.python_version())
print("NumPy version:", np.__version__)
np.show_config()
print("JOBNAME: ", os.getenv('JOBNAME'))
print("JOBNAME:", os.getenv('JOBNAME'))

pytest.fail(
"unknown failure; view output to identify configuration")
Expand Down
89 changes: 73 additions & 16 deletions control/tests/stochsys_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import control as ct
from control import lqe, dlqe, rss, drss, tf, ss, ControlArgument, slycot_check
from math import log, pi

# Utility function to check LQE answer
def check_LQE(L, P, poles, G, QN, RN):
Expand Down Expand Up @@ -48,7 +49,7 @@ def test_lqe_call_format(cdlqe):

# Standard calling format
Lref, Pref, Eref = cdlqe(sys.A, sys.B, sys.C, Q, R)

# Call with system instead of matricees
L, P, E = cdlqe(sys, Q, R)
np.testing.assert_almost_equal(Lref, L)
Expand All @@ -58,15 +59,15 @@ def test_lqe_call_format(cdlqe):
# Make sure we get an error if we specify N
with pytest.raises(ct.ControlNotImplemented):
L, P, E = cdlqe(sys, Q, R, N)

# Inconsistent system dimensions
with pytest.raises(ct.ControlDimension, match="Incompatible"):
L, P, E = cdlqe(sys.A, sys.C, sys.B, Q, R)

# Incorrect covariance matrix dimensions
with pytest.raises(ct.ControlDimension, match="Incompatible"):
L, P, E = cdlqe(sys.A, sys.B, sys.C, R, Q)

# Too few input arguments
with pytest.raises(ct.ControlArgument, match="not enough input"):
L, P, E = cdlqe(sys.A, sys.C)
Expand Down Expand Up @@ -99,26 +100,26 @@ def test_lqe_discrete():
np.testing.assert_almost_equal(K_csys, K_expl)
np.testing.assert_almost_equal(S_csys, S_expl)
np.testing.assert_almost_equal(E_csys, E_expl)

# Calling lqe() with a discrete time system should call dlqe()
K_lqe, S_lqe, E_lqe = ct.lqe(dsys, Q, R)
K_dlqe, S_dlqe, E_dlqe = ct.dlqe(dsys, Q, R)
np.testing.assert_almost_equal(K_lqe, K_dlqe)
np.testing.assert_almost_equal(S_lqe, S_dlqe)
np.testing.assert_almost_equal(E_lqe, E_dlqe)

# Calling lqe() with no timebase should call lqe()
asys = ct.ss(csys.A, csys.B, csys.C, csys.D, dt=None)
K_asys, S_asys, E_asys = ct.lqe(asys, Q, R)
K_expl, S_expl, E_expl = ct.lqe(csys.A, csys.B, csys.C, Q, R)
np.testing.assert_almost_equal(K_asys, K_expl)
np.testing.assert_almost_equal(S_asys, S_expl)
np.testing.assert_almost_equal(E_asys, E_expl)

# Calling dlqe() with a continuous time system should raise an error
with pytest.raises(ControlArgument, match="called with a continuous"):
K, S, E = ct.dlqe(csys, Q, R)

def test_estimator_iosys():
sys = ct.drss(4, 2, 2, strictly_proper=True)

Expand All @@ -129,7 +130,7 @@ def test_estimator_iosys():
QN = np.eye(sys.ninputs)
RN = np.eye(sys.noutputs)
estim = ct.create_estimator_iosystem(sys, QN, RN, P0)

ctrl, clsys = ct.create_statefbk_iosystem(sys, K, estimator=estim)

# Extract the elements of the estimator
Expand Down Expand Up @@ -162,20 +163,76 @@ def test_estimator_iosys():
np.testing.assert_almost_equal(cls.D, D_clchk)


@pytest.mark.parametrize("sys_args", [
([[-1]], [[1]], [[1]], 0), # scalar system
([[-1, 0.1], [0, -2]], [[0], [1]], [[1, 0]], 0), # SISO, 2 state
([[-1, 0.1], [0, -2]], [[1, 0], [0, 1]], [[1, 0]], 0), # 2i, 1o, 2s
([[-1, 0.1, 0.1], [0, -2, 0], [0.1, 0, -3]], # 2i, 2o, 3s
[[1, 0], [0, 0.1], [0, 1]],
[[1, 0, 0.1], [0, 1, 0.1]], 0),
])
def test_estimator_iosys_ctime(sys_args):
# Define the system we want to test
sys = ct.ss(*sys_args)
T = 10 * log(1e-2) / np.max(sys.poles().real)
assert T > 0

# Create nonlinear version of the system to match integration methods
nl_sys = ct.NonlinearIOSystem(
lambda t, x, u, params : sys.A @ x + sys.B @ u,
lambda t, x, u, params : sys.C @ x + sys.D @ u,
inputs=sys.ninputs, outputs=sys.noutputs, states=sys.nstates)

# Define an initial condition, inputs (small, to avoid integration errors)
timepts = np.linspace(0, T, 500)
U = 2e-2 * np.array([np.sin(timepts + i*pi/3) for i in range(sys.ninputs)])
X0 = np.ones(sys.nstates)

# Set up the parameters for the filter
P0 = np.eye(sys.nstates)
QN = np.eye(sys.ninputs)
RN = np.eye(sys.noutputs)

# Construct the estimator
estim = ct.create_estimator_iosystem(sys, QN, RN)

# Compute the system response and the optimal covariance
sys_resp = ct.input_output_response(nl_sys, timepts, U, X0)
_, Pf, _ = ct.lqe(sys, QN, RN)
Pf = np.array(Pf) # convert from matrix, if needed

# Make sure that we converge to the optimal estimate
estim_resp = ct.input_output_response(
estim, timepts, [sys_resp.outputs, U], [0*X0, P0])
np.testing.assert_allclose(
estim_resp.states[0:sys.nstates, -1], sys_resp.states[:, -1],
atol=1e-6, rtol=1e-3)
np.testing.assert_allclose(
estim_resp.states[sys.nstates:, -1], Pf.reshape(-1),
atol=1e-6, rtol=1e-3)

# Make sure that optimal estimate is an eq pt
ss_resp = ct.input_output_response(
estim, timepts, [sys_resp.outputs, U], [X0, Pf])
np.testing.assert_allclose(
ss_resp.states[sys.nstates:],
np.outer(Pf.reshape(-1), np.ones_like(timepts)),
atol=1e-4, rtol=1e-2)
np.testing.assert_allclose(
ss_resp.states[0:sys.nstates], sys_resp.states,
atol=1e-4, rtol=1e-2)


def test_estimator_errors():
sys = ct.drss(4, 2, 2, strictly_proper=True)
P0 = np.eye(sys.nstates)
QN = np.eye(sys.ninputs)
RN = np.eye(sys.noutputs)

with pytest.raises(ct.ControlArgument, match="Input system must be I/O"):
with pytest.raises(ct.ControlArgument, match=".* system must be a linear"):
sys_tf = ct.tf([1], [1, 1], dt=True)
estim = ct.create_estimator_iosystem(sys_tf, QN, RN)

with pytest.raises(NotImplementedError, match="continuous time not"):
sys_ct = ct.rss(4, 2, 2, strictly_proper=True)
estim = ct.create_estimator_iosystem(sys_ct, QN, RN)


with pytest.raises(ValueError, match="output must be full state"):
C = np.eye(2, 4)
estim = ct.create_estimator_iosystem(sys, QN, RN, C=C)
Expand Down Expand Up @@ -246,7 +303,7 @@ def test_correlation():
# Try passing a second argument
tau, Rneg = ct.correlation(T, V, -V)
np.testing.assert_equal(Rtau, -Rneg)

# Test error conditions
with pytest.raises(ValueError, match="Time vector T must be 1D"):
tau, Rtau = ct.correlation(V, V)
Expand Down