"""Active Disturbance Rejection Control for Python
It is highly recommended to check the documentation first before attempting
to use the package. Although it is simple in nature, ADRC requires some
basic knowledge about PID control, observers and state-feedback.
"""
import numpy as np
import time
from collections import deque
[docs]def saturation(_limits: tuple, _val: float) -> float:
"""Saturation function
Parameters
----------
_limits : tuple
saturation limits (low, high)
_val : float
sat(_val)
Returns
-------
float
saturated signal
"""
lo, hi = _limits
if _val is None:
return None
elif hi is not None and _val > hi:
return hi
elif lo is not None and _val < lo:
return lo
return _val
[docs]class StateSpace():
"""Discrete linear time-invariant state space implementation\
of ADRC
Parameters
----------
order : int
first- or second-order ADRC
delta : float
sampling time in seconds
b0 : float
gain parameter b0
t_settle : float
settling time in seconds, determines closed-loop bandwidth
k_eso : float
observer bandwidth
eso_init : tuple, optional
initial state for the extended state observer, by default False
r_lim : tuple, optional
rate limits for the control signal, by default (None, None)
m_lim : tuple, optional
magnitude limits for the control signal, by default (None, None)
half_gain : tuple, optional
half gain tuning for controller/observer gains,\
by default (False, False)
"""
def __init__(self,
order: int,
delta: float,
b0: float,
t_settle: float,
k_eso: float,
eso_init: tuple = False,
r_lim: tuple = (None, None),
m_lim: tuple = (None, None),
half_gain: tuple = (False, False)):
assert (order == 1) or (order == 2),\
'Only first- and second-order ADRC is implemented'
self.b0 = b0
nx = order + 1
self.delta = delta
if order == 1:
self.Ad = np.vstack(([1, delta], [0, 1]))
self.Bd = np.vstack((b0 * delta, 0))
self.Cd = np.hstack((1, 0)).reshape(1, -1)
self.Dd = 0
# Controller parameters for closed-loop dynamics
sCL = -4 / t_settle
self.Kp = -2 * sCL
# Observer dynamics
sESO = k_eso * sCL
zESO = np.exp(sESO * delta)
# Observer gains resulting in common-location observer poles
self.L = np.array([1 - (zESO)**2,
(1 / delta) * (1 - zESO)**2]).reshape(-1, 1)
# Controller gains
self.w = np.array([self.Kp / self.b0,
1 / self.b0]).reshape(-1, 1)
elif order == 2:
self.Ad = np.vstack((
[1, delta, (delta**2) / 2],
[0, 1, delta],
[0, 0, 1]))
self.Bd = np.vstack((b0 * (delta**2) / 2, b0 * delta, 0))
self.Cd = np.hstack((1, 0, 0)).reshape(1, -1)
self.Dd = 0
# Controller parameters for closed-loop dynamics
sCL = -6 / t_settle
self.Kp = sCL**2
self.Kd = -2 * sCL
# Observer dynamics
sESO = k_eso * sCL
zESO = np.exp(sESO * delta)
# Observer gains resulting in common-location observer poles
self.L = np.array([1 - (zESO)**3,
(3 / (2 * delta)) * (1 - zESO)**2 * (1 + zESO),
(1 / delta**2) * (1 - zESO)**3]
).reshape(-1, 1)
# Controller gains
self.w = np.array([self.Kp / self.b0,
self.Kd / self.b0,
1 / self.b0]).reshape(-1, 1)
self.xhat = np.zeros((nx, 1), dtype=np.float64)
self.ukm1 = np.zeros((1, 1), dtype=np.float64)
self.m_lim = m_lim
self.r_lim = r_lim
if half_gain[0] is True:
self.w = self.w / 2
if half_gain[1] is True:
self.L = self.L / 2
if eso_init is not False:
assert len(eso_init) == nx,\
'Length of initial state vector of LESO not compatible\
with order'
self.xhat = np.fromiter(eso_init, np.float64).reshape(-1, 1)
self._last_time = None
self._last_output = None
self._last_input = None
self._linear_extended_state_observer()
def _linear_extended_state_observer(self):
"""Internal function implementing the one-step update
equation for the linear extended state observer
"""
self.oA = self.Ad - self.L @ self.Cd @ self.Ad
self.oB = self.Bd - self.L @ self.Cd @ self.Bd
self.oC = self.Cd
# if self.inc_form is True:
# self.oA = self.oA - np.eye(self.oA.shape[0])
def _update_eso(self, y: float, ukm1: float):
"""Update the linear extended state observer
Parameters
----------
y : float
Current measurement y[k]
ukm1 : float
Previous control signal u[k-1]
"""
self.xhat = self.oA.dot(self.xhat) + self.oB.dot(
ukm1).reshape(-1, 1) + self.L.dot(y)
def _limiter(self, u_control: float) -> float:
"""Implements rate and magnitude limiter
Parameters
----------
u_control : float
control signal to be limited
Returns
-------
float
float: rate and magnitude limited control signal
"""
# Limiting the rate of u (delta_u)
# delta_u = SaturatedInteger(self.r_lim[0], self.r_lim[1],
# u_control - self.ukm1)
delta_u = saturation((self.r_lim[0], self.r_lim[1]),
u_control - self.ukm1)
# Limiting the magnitude of u
self.ukm1 = saturation((self.m_lim[0],
self.m_lim[1]),
delta_u + self.ukm1)
return self.ukm1
@property
def eso_states(self) -> tuple:
"""Returns the states of the linear extended state observer
Returns
-------
tuple
States of the linear extended state observer
"""
return self.xhat
@property
def magnitude_limiter(self) -> tuple:
"""Returns the magnitude limits of the controller
Returns
-------
tuple
magnitude limits of the controller
"""
return self.m_lim
@magnitude_limiter.setter
def magnitude_limiter(self, lim: tuple):
"""Magnitude limitter setter
Parameters
----------
lim : tuple
New magnitude limits
"""
assert len(lim) == 2
# assert lim[0] < lim[1]
self.m_lim = lim
@property
def rate_limiter(self) -> tuple:
"""Returns the rate limits of the controller
Returns
-------
tuple
Rate limits of the controller
"""
return self.r_lim
@rate_limiter.setter
def rate_limiter(self, lim: tuple):
"""Rate limiter setter
Parameters
----------
lim : tuple
New rate limits
"""
assert len(lim) == 2
assert lim[0] < lim[1]
self.r_lim = lim
[docs] def __call__(self, y: float, u: float, r: float, zoh: bool = False):
"""Returns value of the control signal depending on current measurements,
previous control action, reference signal.
Parameters
----------
y : float
Current measurement y[k] of the process
u : float
Previous control signal u[k-1]
r : float
Current reference signal r[k]
zoh : bool, optional
Only update every delta seconds, by default False
Returns
-------
float
Current control signal u[k]
"""
now = time.monotonic()
if zoh is True:
try:
dt = now - self._last_time if now - self._last_time else 1e-16
except TypeError:
dt = 1e-16
if zoh is True and dt < self.delta and self._last_output is not None:
# Return last output of the controller if dt < delta
return self._last_output
u = (self.Kp / self.b0) * r - self.w.T @ self.xhat
u = self._limiter(u)
self._update_eso(y, u)
self._last_output = float(u)
self._last_time = now
return float(u)
[docs] def reset(self, x0=None):
"""Resets the extended state observer
Parameters
----------
x0 : np.array, optional
new initial state vector for the extended state observer,
by default None, i.e. resets to zero
"""
nx = len(self.xhat)
assert x0 is None or len(np.fromiter(x0, float)) == nx,\
"Invalid state vector"
if x0 is None:
self.xhat = np.zeros((nx, 1), dtype=np.float64)
else:
self.xhat = np.fromiter(x0, np.float64).reshape(-1, 1)
[docs]class TransferFunction(object):
"""Discrete time linear active disturbance rejection control\
in transfer function representation
Parameters
----------
order : int
first- or second-order ADRC TF
delta : float
sampling time in seconds
b0 : float
modelling parameter b0
w_cl : float
desired closed-loop bandwidth
k_eso : float
observer bandwidth is parametrized as k_eso-multiple faster than w_cl
eso_init : np.array, optional
initial state for the extended state observer, by default None
r_lim : tuple, optional
rate limits for the control output, by default (None, None)
m_lim : tuple, optional
magnitude limits for the control output, by default (None, None)
half_gain : tuple, optional
half gain tuning for controller/observer gains,\
by default (False, False)
method : str, optional, 'general_terms' or 'bandwidth'
method with which the transfer function parameters are calculated,\
functionally identical, but general_terms is used to implement half
gain tuning
"""
def __init__(self, order: int, delta: float, b0: float,
w_cl: float, k_eso: float, eso_init: np.array = None,
r_lim: tuple = (None, None),
m_lim: tuple = (None, None),
half_gain: tuple = (False, False), method='general_terms'):
assert order == 1 or order == 2, 'First- and second-order ADRC TF\
implemented'
# Define attributes
self.order = order
self.acc = 0.
zESO = np.exp(-k_eso * w_cl * delta)
self.params = self._calculate_parameters(order, delta, b0, w_cl, zESO,
method=method)
self._calculate_coeffs()
self._create_states()
@property
def accumulator(self):
"""Get accumulator
Returns
-------
float
accumulator output
"""
return self.acc
@accumulator.setter
def accumulator(self, value):
"""Set accumulator output to value
Parameters
----------
value : float
value to set the accumulator
"""
self.acc = float(value)
@property
def parameters(self):
"""Get parameters
Returns
-------
dict
calculated paramaters
"""
return self.params
def _create_states(self):
"""Create state vectors to store past values for transfer functions
"""
if self.order == 1:
# Reference prefilter
self.r_k = deque([0, 0], maxlen=2)
self.r_yk = deque([0], maxlen=1)
# Feedback controller
self.e_k = deque([0], maxlen=1)
self.u_k = deque([0], maxlen=1)
else:
# Reference prefilter
self.r_k = deque([0, 0, 0], maxlen=3)
self.r_yk = deque([0, 0], maxlen=2)
# Feedback controller
self.e_k = deque([0, 0], maxlen=2)
self.u_k = deque([0, 0], maxlen=2)
def _control_loop(self, y, r_k):
if self.order == 1:
# reference prefilter
ref_yk = (r_k * self.pf['num'][0]
+ self.r_k[0] * self.pf['num'][1]
+ self.r_k[1] * self.pf['num'][2]
- self.r_yk[0] * self.pf['den'][1])
# advance time
self.r_k.appendleft(r_k)
self.r_yk.appendleft(ref_yk)
# calculate error
e_k = ref_yk - y
# feedback controller
u_k = (e_k * self.fb['num'][0]
+ self.e_k[0] * self.fb['num'][1]
- self.u_k[0] * self.fb['den'][1])
# advance time
self.e_k.appendleft(e_k)
self.u_k.appendleft(u_k)
else: # self.order == 2
# reference prefilter
ref_yk = (r_k * self.pf['num'][0]
+ self.r_k[0] * self.pf['num'][1]
+ self.r_k[1] * self.pf['num'][2]
+ self.r_k[2] * self.pf['num'][3]
- self.r_yk[0] * self.pf['den'][1]
- self.r_yk[1] * self.pf['den'][2])
# advance time
self.r_k.appendleft(r_k)
self.r_yk.appendleft(ref_yk)
# calculate error
e_k = ref_yk - y
# feedback controller
u_k = (e_k * self.fb['num'][0]
+ self.e_k[0] * self.fb['num'][1]
+ self.e_k[1] * self.fb['num'][2]
- self.u_k[0] * self.fb['den'][1]
- self.u_k[1] * self.fb['den'][2])
# advance time
self.e_k.appendleft(e_k)
self.u_k.appendleft(u_k)
# Accumulate feedback controller output
self.accumulator = self.accumulator + u_k
return self.accumulator
def _calculate_coeffs(self):
"""Internal function to calculate the coefficients for the reference
prefilter and feedback controller (denominator and numerator)
Uses the z^-1 notation, coefficients in descending order, i.e
z^0, z^-1, ..., z^-n
"""
if self.order == 1:
fb = {'num': [self.params['be0'], self.params['be1']],
'den': [1, self.params['a1']]}
pf = {'num': [self.params['g0'],
self.params['g1'], self.params['g2']],
'den': [1, self.params['be1'] / self.params['be0']]}
else:
fb = {'num': [self.params['be0'], self.params['be1'],
self.params['be2']],
'den': [1, self.params['a1'], self.params['a2']]}
pf = {'num': [self.params['g0'], self.params['g1'],
self.params['g2'], self.params['g3']],
'den': [1, self.params['be1'] / self.params['be0'],
self.params['be2'] / self.params['be0']]}
self.fb = fb
self.pf = pf
def _calculate_parameters(self, order, delta, b0,
w_cl, zESO, method='general_terms'):
"""Calculate the coefficients of num and den of TransferFunction ADRC,
using either general terms or bandwidth parameterization
Parameters
----------
order : int
internal variable
delta : float
internal variable
b0 : float
internal variable
w_cl : float
internal variable
zESO : float
internal variable
method : str, optional
Calculation via general terms (i.e. controller and observer gains)
or bandwidth parametrization, functionally identical,
implementation uses general terms to leverage half-gain
tuning method, by default 'general_terms'
Returns
-------
list
Ordered transfer function parameters
Notes
-----
* For first-order ADRC: [a1, be0, be1, g0, g1, g2]
* For second-order ADRC: [a1, a2, be0, be1, be2, g0, g1, g2, g3]
"""
if method == 'general_terms':
if order == 1:
# Controller gains
k1 = w_cl
# Observer gains
l1 = 1 - zESO**2
l2 = (1 / delta) * (1 - zESO)**2
_C_TERM = (k1 * l1 + l2)
a1 = (delta * k1 - 1) * (1 - l1)
be0 = (1 / b0) * _C_TERM
be1 = (1 / b0) * (delta * k1 * l2 - k1 * l1 - l2)
g0 = k1 / _C_TERM
g1 = k1 * (delta * l2 + l1 - 2) / _C_TERM
g2 = k1 * (1 - l1) / _C_TERM
elif order == 2:
k1 = w_cl**2
k2 = 2 * w_cl
l1 = 1 - zESO**3
l2 = (3 / (2 * delta)) * (1 - zESO)**2 * (1 + zESO)
l3 = (1 / delta**2) * (1 - zESO)**3
_C_TERM = (k1 * l1 + k2 * l2 + l3)
a1 = ((delta ** 2 / 2) * (k1 - k1 * l1 - k2 * l2)
+ delta * k2 + delta * l2 + l1 - 2)
a2 = ((((delta**2 * k1) / 2) - delta * k2 + 1) * (1 - l1))
be0 = (1 / b0) * _C_TERM
be1 = ((1 / b0) * (((delta**2 * k1 * l3) / 2)
+ delta * k1 * l2 + delta
* k2 * l3 - 2
* _C_TERM))
be2 = ((1 / b0) * (((delta**2 * k1 * l3) / 2)
- delta * k1 * l2 - delta
* k2 * l3 + _C_TERM))
g0 = k1 / _C_TERM
g1 = ((k1 * (delta**2 * l3 + 2 * delta * l2 + 2 * l1 - 6))
/ _C_TERM)
g2 = ((k1 * (delta**2 * l3 - 2 * delta * l2 - 4 * l1 + 6))
/ _C_TERM)
g3 = (k1 * (l1 - 1)) / _C_TERM
elif method == 'bandwidth':
if order == 1:
_C_TERM = ((delta * w_cl) * (1 - zESO**2) + (1 - zESO)**2)
a1 = (delta * w_cl - 1) * zESO**2
be0 = ((1/(b0 * delta)) * _C_TERM)
be1 = ((1/(b0 * delta)) * ((-2 * delta * w_cl * zESO)
* (1 - zESO) - (1 - zESO)**2))
g0 = (delta * w_cl) * _C_TERM
g1 = (-2 * delta * w_cl * zESO) * _C_TERM
g2 = (delta * w_cl * zESO**2) * _C_TERM
elif order == 2:
_C_TERM = (delta**2 * w_cl**2 * (1 - zESO**3)
+ 3 * delta * w_cl * (1 - zESO - zESO**2 + zESO**3)
+ (1 - zESO)**3)
a1 = ((1/2) * (delta**2 * w_cl**2 * zESO**3
+ delta * w_cl
* (1 + 3 * zESO + 3 * zESO**2 - 3 * zESO**3)
+ (1 - 3 * zESO - 3 * zESO**2 + zESO**3)))
a2 = ((zESO**3 / 2)
* (delta**2 * w_cl**2 - 4 * delta * w_cl + 2))
be0 = ((1 / (b0 * delta**2)) * _C_TERM)
be1 = ((1 / (b0 * delta**2))
* (-3 * delta**2 * w_cl**2 * zESO * (1 - zESO**2)
- 4 * delta * w_cl * (1 - 3 * zESO**2 + 2 * zESO**3)
- 2 * (1 - zESO)**3))
be2 = ((1 / (b0 * delta**2))
* (-3 * delta**2 * w_cl**2 * zESO**2 * (1 - zESO)
+ delta * w_cl
* (1 + 3 * zESO - 9 * zESO**2 + 5 * zESO**3)
+ (1 - zESO)**3))
g0 = (delta**2 * w_cl**2) / _C_TERM
g1 = (-3 * delta**2 * w_cl**2 * zESO) / _C_TERM
g2 = (3 * delta**2 * w_cl**2 * zESO**2) / _C_TERM
g3 = (-1 * delta**2 * w_cl**2 * zESO**3) / _C_TERM
if order == 1:
params = {'a1': a1, 'be0': be0,
'be1': be1, 'g0': g0,
'g1': g1, 'g2': g2}
else:
params = {'a1': a1, 'a2': a2, 'be0': be0,
'be1': be1, 'be2': be2, 'g0': g0,
'g1': g1, 'g2': g2, 'g3': g3}
return params
[docs] def __call__(self, y, r):
"""Control loop call
Parameters
----------
y : float
current output signal
r : float
current reference signal
Returns
-------
float
control signal to the plant (accumulator output)
"""
return self._control_loop(y, r)