# Copyright (c) 2017-2019 Uber Technologies, Inc.
# SPDX-License-Identifier: Apache-2.0
from abc import ABCMeta, abstractmethod
import torch
from torch import nn
from torch.nn import Parameter
import pyro.distributions as dist
from pyro.distributions.util import eye_like
[docs]class DynamicModel(nn.Module, metaclass=ABCMeta):
"""
Dynamic model interface.
:param dimension: native state dimension.
:param dimension_pv: PV state dimension.
:param num_process_noise_parameters: process noise parameter space dimension.
This for UKF applications. Can be left as ``None`` for EKF and most
other filters.
"""
def __init__(self, dimension, dimension_pv, num_process_noise_parameters=None):
self._dimension = dimension
self._dimension_pv = dimension_pv
self._num_process_noise_parameters = num_process_noise_parameters
super().__init__()
@property
def dimension(self):
"""
Native state dimension access.
"""
return self._dimension
@property
def dimension_pv(self):
"""
PV state dimension access.
"""
return self._dimension_pv
@property
def num_process_noise_parameters(self):
"""
Process noise parameters space dimension access.
"""
return self._num_process_noise_parameters
[docs] @abstractmethod
def forward(self, x, dt, do_normalization=True):
"""
Integrate native state ``x`` over time interval ``dt``.
:param x: current native state. If the DynamicModel is non-differentiable,
be sure to handle the case of ``x`` being augmented with process
noise parameters.
:param dt: time interval to integrate over.
:param do_normalization: whether to perform normalization on output, e.g.,
mod'ing angles into an interval.
:return: Native state x integrated dt into the future.
"""
raise NotImplementedError
[docs] def geodesic_difference(self, x1, x0):
"""
Compute and return the geodesic difference between 2 native states.
This is a generalization of the Euclidean operation ``x1 - x0``.
:param x1: native state.
:param x0: native state.
:return: Geodesic difference between native states ``x1`` and ``x2``.
"""
return x1 - x0 # Default to Euclidean behavior.
[docs] @abstractmethod
def mean2pv(self, x):
"""
Compute and return PV state from native state. Useful for combining
state estimates of different types in IMM (Interacting Multiple Model)
filtering.
:param x: native state estimate mean.
:return: PV state estimate mean.
"""
raise NotImplementedError
[docs] @abstractmethod
def cov2pv(self, P):
"""
Compute and return PV covariance from native covariance. Useful for
combining state estimates of different types in IMM (Interacting
Multiple Model) filtering.
:param P: native state estimate covariance.
:return: PV state estimate covariance.
"""
raise NotImplementedError
[docs] @abstractmethod
def process_noise_cov(self, dt=0.0):
"""
Compute and return process noise covariance (Q).
:param dt: time interval to integrate over.
:return: Read-only covariance (Q). For a DifferentiableDynamicModel, this is
the covariance of the native state ``x`` resulting from stochastic
integration (for use with EKF). Otherwise, it is the covariance
directly of the process noise parameters (for use with UKF).
"""
raise NotImplementedError
[docs] def process_noise_dist(self, dt=0.0):
"""
Return a distribution object of state displacement from the process noise
distribution over a time interval.
:param dt: time interval that process noise accumulates over.
:return: :class:`~pyro.distributions.torch.MultivariateNormal`.
"""
Q = self.process_noise_cov(dt)
return dist.MultivariateNormal(
torch.zeros(Q.shape[-1], dtype=Q.dtype, device=Q.device), Q
)
[docs]class DifferentiableDynamicModel(DynamicModel):
"""
DynamicModel for which state transition Jacobians can be efficiently
calculated, usu. analytically or by automatic differentiation.
"""
[docs] @abstractmethod
def jacobian(self, dt):
"""
Compute and return native state transition Jacobian (F) over time
interval ``dt``.
:param dt: time interval to integrate over.
:return: Read-only Jacobian (F) of integration map (f).
"""
raise NotImplementedError
[docs]class Ncp(DifferentiableDynamicModel):
"""
NCP (Nearly-Constant Position) dynamic model. May be subclassed, e.g., with
CWNV (Continuous White Noise Velocity) or DWNV (Discrete White Noise
Velocity).
:param dimension: native state dimension.
:param sv2: variance of velocity. Usually chosen so that the standard
deviation is roughly half of the max velocity one would ever expect
to observe.
"""
def __init__(self, dimension, sv2):
dimension_pv = 2 * dimension
super().__init__(dimension, dimension_pv, num_process_noise_parameters=1)
if not isinstance(sv2, torch.Tensor):
sv2 = torch.tensor(sv2)
self.sv2 = Parameter(sv2)
self._F_cache = eye_like(sv2, dimension) # State transition matrix cache
self._Q_cache = {} # Process noise cov cache
[docs] def forward(self, x, dt, do_normalization=True):
"""
Integrate native state ``x`` over time interval ``dt``.
:param x: current native state. If the DynamicModel is non-differentiable,
be sure to handle the case of ``x`` being augmented with process
noise parameters.
:param dt: time interval to integrate over.
do_normalization: whether to perform normalization on output, e.g.,
mod'ing angles into an interval. Has no effect for this subclass.
:return: Native state x integrated dt into the future.
"""
return x
[docs] def mean2pv(self, x):
"""
Compute and return PV state from native state. Useful for combining
state estimates of different types in IMM (Interacting Multiple Model)
filtering.
:param x: native state estimate mean.
:return: PV state estimate mean.
"""
with torch.no_grad():
x_pv = torch.zeros(2 * self._dimension, dtype=x.dtype, device=x.device)
x_pv[: self._dimension] = x
return x_pv
[docs] def cov2pv(self, P):
"""
Compute and return PV covariance from native covariance. Useful for
combining state estimates of different types in IMM (Interacting
Multiple Model) filtering.
:param P: native state estimate covariance.
:return: PV state estimate covariance.
"""
d = 2 * self._dimension
with torch.no_grad():
P_pv = torch.zeros(d, d, dtype=P.dtype, device=P.device)
P_pv[: self._dimension, : self._dimension] = P
return P_pv
[docs] def jacobian(self, dt):
"""
Compute and return cached native state transition Jacobian (F) over
time interval ``dt``.
:param dt: time interval to integrate over.
:return: Read-only Jacobian (F) of integration map (f).
"""
return self._F_cache
[docs] @abstractmethod
def process_noise_cov(self, dt=0.0):
"""
Compute and return cached process noise covariance (Q).
:param dt: time interval to integrate over.
:return: Read-only covariance (Q) of the native state ``x`` resulting from
stochastic integration (for use with EKF).
"""
raise NotImplementedError
[docs]class Ncv(DifferentiableDynamicModel):
"""
NCV (Nearly-Constant Velocity) dynamic model. May be subclassed, e.g., with
CWNA (Continuous White Noise Acceleration) or DWNA (Discrete White Noise
Acceleration).
:param dimension: native state dimension.
:param sa2: variance of acceleration. Usually chosen so that the standard
deviation is roughly half of the max acceleration one would ever
expect to observe.
"""
def __init__(self, dimension, sa2):
dimension_pv = dimension
super().__init__(dimension, dimension_pv, num_process_noise_parameters=1)
if not isinstance(sa2, torch.Tensor):
sa2 = torch.tensor(sa2)
self.sa2 = Parameter(sa2)
self._F_cache = {} # State transition matrix cache
self._Q_cache = {} # Process noise cov cache
[docs] def forward(self, x, dt, do_normalization=True):
"""
Integrate native state ``x`` over time interval ``dt``.
:param x: current native state. If the DynamicModel is non-differentiable,
be sure to handle the case of ``x`` being augmented with process
noise parameters.
:param dt: time interval to integrate over.
:param do_normalization: whether to perform normalization on output, e.g.,
mod'ing angles into an interval. Has no effect for this subclass.
:return: Native state x integrated dt into the future.
"""
F = self.jacobian(dt)
return F.mm(x.unsqueeze(1)).squeeze(1)
[docs] def mean2pv(self, x):
"""
Compute and return PV state from native state. Useful for combining
state estimates of different types in IMM (Interacting Multiple Model)
filtering.
:param x: native state estimate mean.
:return: PV state estimate mean.
"""
return x
[docs] def cov2pv(self, P):
"""
Compute and return PV covariance from native covariance. Useful for
combining state estimates of different types in IMM (Interacting
Multiple Model) filtering.
:param P: native state estimate covariance.
:return: PV state estimate covariance.
"""
return P
[docs] def jacobian(self, dt):
"""
Compute and return cached native state transition Jacobian (F) over
time interval ``dt``.
:param dt: time interval to integrate over.
:return: Read-only Jacobian (F) of integration map (f).
"""
if dt not in self._F_cache:
d = self._dimension
with torch.no_grad():
F = eye_like(self.sa2, d)
F[: d // 2, d // 2 :] = dt * eye_like(self.sa2, d // 2)
self._F_cache[dt] = F
return self._F_cache[dt]
[docs] @abstractmethod
def process_noise_cov(self, dt=0.0):
"""
Compute and return cached process noise covariance (Q).
:param dt: time interval to integrate over.
:return: Read-only covariance (Q) of the native state ``x`` resulting from
stochastic integration (for use with EKF).
"""
raise NotImplementedError
[docs]class NcpContinuous(Ncp):
"""
NCP (Nearly-Constant Position) dynamic model with CWNV (Continuous White
Noise Velocity).
References:
"Estimation with Applications to Tracking and Navigation" by Y. Bar-
Shalom et al, 2001, p.269.
:param dimension: native state dimension.
:param sv2: variance of velocity. Usually chosen so that the standard
deviation is roughly half of the max velocity one would ever expect
to observe.
"""
[docs] def process_noise_cov(self, dt=0.0):
"""
Compute and return cached process noise covariance (Q).
:param dt: time interval to integrate over.
:return: Read-only covariance (Q) of the native state ``x`` resulting from
stochastic integration (for use with EKF).
"""
if dt not in self._Q_cache:
# q: continuous-time process noise intensity with units
# length^2/time (m^2/s). Choose ``q`` so that changes in position,
# over a sampling period ``dt``, are roughly ``sqrt(q*dt)``.
q = self.sv2 * dt
Q = q * dt * eye_like(self.sv2, self._dimension)
self._Q_cache[dt] = Q
return self._Q_cache[dt]
[docs]class NcvContinuous(Ncv):
"""
NCV (Nearly-Constant Velocity) dynamic model with CWNA (Continuous White
Noise Acceleration).
References:
"Estimation with Applications to Tracking and Navigation" by Y. Bar-
Shalom et al, 2001, p.269.
:param dimension: native state dimension.
:param sa2: variance of acceleration. Usually chosen so that the standard
deviation is roughly half of the max acceleration one would ever
expect to observe.
"""
[docs] def process_noise_cov(self, dt=0.0):
"""
Compute and return cached process noise covariance (Q).
:param dt: time interval to integrate over.
:return: Read-only covariance (Q) of the native state ``x`` resulting from
stochastic integration (for use with EKF).
"""
if dt not in self._Q_cache:
with torch.no_grad():
d = self._dimension
dt2 = dt * dt
dt3 = dt2 * dt
Q = torch.zeros(d, d, dtype=self.sa2.dtype, device=self.sa2.device)
eye = eye_like(self.sa2, d // 2)
Q[: d // 2, : d // 2] = dt3 * eye / 3.0
Q[: d // 2, d // 2 :] = dt2 * eye / 2.0
Q[d // 2 :, : d // 2] = dt2 * eye / 2.0
Q[d // 2 :, d // 2 :] = dt * eye
# sa2 * dt is an intensity factor that changes in velocity
# over a sampling period ``dt``, ideally should be ~``sqrt(q*dt)``.
Q = Q * (self.sa2 * dt)
self._Q_cache[dt] = Q
return self._Q_cache[dt]
[docs]class NcpDiscrete(Ncp):
"""
NCP (Nearly-Constant Position) dynamic model with DWNV (Discrete White
Noise Velocity).
:param dimension: native state dimension.
:param sv2: variance of velocity. Usually chosen so that the standard
deviation is roughly half of the max velocity one would ever expect
to observe.
References:
"Estimation with Applications to Tracking and Navigation" by Y. Bar-
Shalom et al, 2001, p.273.
"""
[docs] def process_noise_cov(self, dt=0.0):
"""
Compute and return cached process noise covariance (Q).
:param dt: time interval to integrate over.
:return: Read-only covariance (Q) of the native state `x` resulting from
stochastic integration (for use with EKF).
"""
if dt not in self._Q_cache:
Q = self.sv2 * dt * dt * eye_like(self.sv2, self._dimension)
self._Q_cache[dt] = Q
return self._Q_cache[dt]
[docs]class NcvDiscrete(Ncv):
"""
NCV (Nearly-Constant Velocity) dynamic model with DWNA (Discrete White
Noise Acceleration).
:param dimension: native state dimension.
:param sa2: variance of acceleration. Usually chosen so that the standard
deviation is roughly half of the max acceleration one would ever
expect to observe.
References:
"Estimation with Applications to Tracking and Navigation" by Y. Bar-
Shalom et al, 2001, p.273.
"""
[docs] def process_noise_cov(self, dt=0.0):
"""
Compute and return cached process noise covariance (Q).
:param dt: time interval to integrate over.
:return: Read-only covariance (Q) of the native state `x` resulting from
stochastic integration (for use with EKF). (Note that this Q, modulo
numerical error, has rank `dimension/2`. So, it is only positive
semi-definite.)
"""
if dt not in self._Q_cache:
with torch.no_grad():
d = self._dimension
dt2 = dt * dt
dt3 = dt2 * dt
dt4 = dt2 * dt2
Q = torch.zeros(d, d, dtype=self.sa2.dtype, device=self.sa2.device)
Q[: d // 2, : d // 2] = 0.25 * dt4 * eye_like(self.sa2, d // 2)
Q[: d // 2, d // 2 :] = 0.5 * dt3 * eye_like(self.sa2, d // 2)
Q[d // 2 :, : d // 2] = 0.5 * dt3 * eye_like(self.sa2, d // 2)
Q[d // 2 :, d // 2 :] = dt2 * eye_like(self.sa2, d // 2)
Q = Q * self.sa2
self._Q_cache[dt] = Q
return self._Q_cache[dt]