# Copyright (c) 2017-2019 Uber Technologies, Inc.
# SPDX-License-Identifier: Apache-2.0
import math
import torch
import torch.nn as nn
from torch.distributions import MultivariateNormal, constraints
import pyro.distributions as dist
from pyro.contrib.timeseries.base import TimeSeriesModel
from pyro.nn import PyroParam, pyro_method
from pyro.ops.ssm_gp import MaternKernel
from pyro.ops.tensor_utils import block_diag_embed
[docs]class IndependentMaternGP(TimeSeriesModel):
"""
A time series model in which each output dimension is modeled independently
with a univariate Gaussian Process with a Matern kernel. The targets are assumed
to be evenly spaced in time. Training and inference are logarithmic in the length
of the time series T.
:param float nu: The order of the Matern kernel; one of 0.5, 1.5 or 2.5.
:param float dt: The time spacing between neighboring observations of the time series.
:param int obs_dim: The dimension of the targets at each time step.
:param torch.Tensor length_scale_init: optional initial values for the kernel length scale
given as a ``obs_dim``-dimensional tensor
:param torch.Tensor kernel_scale_init: optional initial values for the kernel scale
given as a ``obs_dim``-dimensional tensor
:param torch.Tensor obs_noise_scale_init: optional initial values for the observation noise scale
given as a ``obs_dim``-dimensional tensor
"""
def __init__(
self,
nu=1.5,
dt=1.0,
obs_dim=1,
length_scale_init=None,
kernel_scale_init=None,
obs_noise_scale_init=None,
):
self.nu = nu
self.dt = dt
self.obs_dim = obs_dim
if obs_noise_scale_init is None:
obs_noise_scale_init = 0.2 * torch.ones(obs_dim)
assert obs_noise_scale_init.shape == (obs_dim,)
super().__init__()
self.kernel = MaternKernel(
nu=nu,
num_gps=obs_dim,
length_scale_init=length_scale_init,
kernel_scale_init=kernel_scale_init,
)
self.obs_noise_scale = PyroParam(
obs_noise_scale_init, constraint=constraints.positive
)
obs_matrix = [1.0] + [0.0] * (self.kernel.state_dim - 1)
self.register_buffer("obs_matrix", torch.tensor(obs_matrix).unsqueeze(-1))
def _get_init_dist(self):
return torch.distributions.MultivariateNormal(
self.obs_matrix.new_zeros(self.obs_dim, self.kernel.state_dim),
self.kernel.stationary_covariance().squeeze(-3),
)
def _get_obs_dist(self):
return dist.Normal(
self.obs_matrix.new_zeros(self.obs_dim, 1, 1),
self.obs_noise_scale.unsqueeze(-1).unsqueeze(-1),
).to_event(1)
[docs] def get_dist(self, duration=None):
"""
Get the :class:`~pyro.distributions.GaussianHMM` distribution that corresponds
to ``obs_dim``-many independent Matern GPs.
:param int duration: Optional size of the time axis ``event_shape[0]``.
This is required when sampling from homogeneous HMMs whose parameters
are not expanded along the time axis.
"""
trans_matrix, process_covar = self.kernel.transition_matrix_and_covariance(
dt=self.dt
)
trans_dist = MultivariateNormal(
self.obs_matrix.new_zeros(self.obs_dim, 1, self.kernel.state_dim),
process_covar.unsqueeze(-3),
)
trans_matrix = trans_matrix.unsqueeze(-3)
return dist.GaussianHMM(
self._get_init_dist(),
trans_matrix,
trans_dist,
self.obs_matrix,
self._get_obs_dist(),
duration=duration,
)
[docs] @pyro_method
def log_prob(self, targets):
"""
:param torch.Tensor targets: A 2-dimensional tensor of real-valued targets
of shape ``(T, obs_dim)``, where ``T`` is the length of the time series and ``obs_dim``
is the dimension of the real-valued ``targets`` at each time step
:returns torch.Tensor: A 1-dimensional tensor of log probabilities of shape ``(obs_dim,)``
"""
assert targets.dim() == 2 and targets.size(-1) == self.obs_dim
return self.get_dist().log_prob(targets.t().unsqueeze(-1))
@torch.no_grad()
def _filter(self, targets):
"""
Return the filtering state for the associated state space model.
"""
assert targets.dim() == 2 and targets.size(-1) == self.obs_dim
return self.get_dist().filter(targets.t().unsqueeze(-1))
@torch.no_grad()
def _forecast(self, dts, filtering_state, include_observation_noise=True):
"""
Internal helper for forecasting.
"""
assert dts.dim() == 1
dts = dts.unsqueeze(-1).unsqueeze(-1).unsqueeze(-1)
trans_matrix, process_covar = self.kernel.transition_matrix_and_covariance(
dt=dts
)
trans_matrix = trans_matrix[..., 0:1]
predicted_mean = torch.matmul(
filtering_state.loc.unsqueeze(-2), trans_matrix
).squeeze(-2)[..., 0]
predicted_function_covar = (
torch.matmul(
trans_matrix.transpose(-1, -2),
torch.matmul(filtering_state.covariance_matrix, trans_matrix),
)[..., 0, 0]
+ process_covar[..., 0, 0]
)
if include_observation_noise:
predicted_function_covar = (
predicted_function_covar + self.obs_noise_scale.pow(2.0)
)
return predicted_mean, predicted_function_covar
[docs] @pyro_method
def forecast(self, targets, dts):
"""
:param torch.Tensor targets: A 2-dimensional tensor of real-valued targets
of shape ``(T, obs_dim)``, where ``T`` is the length of the time series and ``obs_dim``
is the dimension of the real-valued targets at each time step. These
represent the training data that are conditioned on for the purpose of making
forecasts.
:param torch.Tensor dts: A 1-dimensional tensor of times to forecast into the future,
with zero corresponding to the time of the final target ``targets[-1]``.
:returns torch.distributions.Normal: Returns a predictive Normal distribution with batch shape ``(S,)`` and
event shape ``(obs_dim,)``, where ``S`` is the size of ``dts``.
"""
filtering_state = self._filter(targets)
predicted_mean, predicted_covar = self._forecast(dts, filtering_state)
return torch.distributions.Normal(predicted_mean, predicted_covar.sqrt())
[docs]class LinearlyCoupledMaternGP(TimeSeriesModel):
"""
A time series model in which each output dimension is modeled as a linear combination
of shared univariate Gaussian Processes with Matern kernels.
In more detail, the generative process is:
:math:`y_i(t) = \\sum_j A_{ij} f_j(t) + \\epsilon_i(t)`
The targets :math:`y_i` are assumed to be evenly spaced in time. Training and inference
are logarithmic in the length of the time series T.
:param float nu: The order of the Matern kernel; one of 0.5, 1.5 or 2.5.
:param float dt: The time spacing between neighboring observations of the time series.
:param int obs_dim: The dimension of the targets at each time step.
:param int num_gps: The number of independent GPs that are mixed to model the time series.
Typical values might be :math:`N_{\\rm gp} \\in [\\frac{D_{\\rm obs}}{2}, D_{\\rm obs}]`
:param torch.Tensor length_scale_init: optional initial values for the kernel length scale
given as a ``num_gps``-dimensional tensor
:param torch.Tensor kernel_scale_init: optional initial values for the kernel scale
given as a ``num_gps``-dimensional tensor
:param torch.Tensor obs_noise_scale_init: optional initial values for the observation noise scale
given as a ``obs_dim``-dimensional tensor
"""
def __init__(
self,
nu=1.5,
dt=1.0,
obs_dim=2,
num_gps=1,
length_scale_init=None,
kernel_scale_init=None,
obs_noise_scale_init=None,
):
self.nu = nu
self.dt = dt
assert obs_dim > 1, "If obs_dim==1 you should use IndependentMaternGP"
self.obs_dim = obs_dim
self.num_gps = num_gps
if obs_noise_scale_init is None:
obs_noise_scale_init = 0.2 * torch.ones(obs_dim)
assert obs_noise_scale_init.shape == (obs_dim,)
self.dt = dt
self.obs_dim = obs_dim
self.num_gps = num_gps
super().__init__()
self.kernel = MaternKernel(
nu=nu,
num_gps=num_gps,
length_scale_init=length_scale_init,
kernel_scale_init=kernel_scale_init,
)
self.full_state_dim = num_gps * self.kernel.state_dim
self.obs_noise_scale = PyroParam(
obs_noise_scale_init, constraint=constraints.positive
)
self.A = nn.Parameter(0.3 * torch.randn(self.num_gps, self.obs_dim))
def _get_obs_matrix(self):
# (num_gps, obs_dim) => (state_dim * num_gps, obs_dim)
return self.A.repeat_interleave(
self.kernel.state_dim, dim=0
) * self.A.new_tensor([1.0] + [0.0] * (self.kernel.state_dim - 1)).repeat(
self.num_gps
).unsqueeze(
-1
)
def _stationary_covariance(self):
return block_diag_embed(self.kernel.stationary_covariance())
def _get_init_dist(self):
loc = self.A.new_zeros(self.full_state_dim)
return MultivariateNormal(loc, self._stationary_covariance())
def _get_obs_dist(self):
loc = self.A.new_zeros(self.obs_dim)
return dist.Normal(loc, self.obs_noise_scale).to_event(1)
[docs] def get_dist(self, duration=None):
"""
Get the :class:`~pyro.distributions.GaussianHMM` distribution that corresponds
to a :class:`LinearlyCoupledMaternGP`.
:param int duration: Optional size of the time axis ``event_shape[0]``.
This is required when sampling from homogeneous HMMs whose parameters
are not expanded along the time axis.
"""
trans_matrix, process_covar = self.kernel.transition_matrix_and_covariance(
dt=self.dt
)
trans_matrix = block_diag_embed(trans_matrix)
process_covar = block_diag_embed(process_covar)
loc = self.A.new_zeros(self.full_state_dim)
trans_dist = MultivariateNormal(loc, process_covar)
return dist.GaussianHMM(
self._get_init_dist(),
trans_matrix,
trans_dist,
self._get_obs_matrix(),
self._get_obs_dist(),
duration=duration,
)
[docs] @pyro_method
def log_prob(self, targets):
"""
:param torch.Tensor targets: A 2-dimensional tensor of real-valued targets
of shape ``(T, obs_dim)``, where ``T`` is the length of the time series and ``obs_dim``
is the dimension of the real-valued ``targets`` at each time step
:returns torch.Tensor: a (scalar) log probability
"""
assert targets.dim() == 2 and targets.size(-1) == self.obs_dim
return self.get_dist().log_prob(targets)
@torch.no_grad()
def _filter(self, targets):
"""
Return the filtering state for the associated state space model.
"""
assert targets.dim() == 2 and targets.size(-1) == self.obs_dim
return self.get_dist().filter(targets)
@torch.no_grad()
def _forecast(
self, dts, filtering_state, include_observation_noise=True, full_covar=True
):
"""
Internal helper for forecasting.
"""
assert dts.dim() == 1
dts = dts.unsqueeze(-1).unsqueeze(-1).unsqueeze(-1)
trans_mat, process_covar = self.kernel.transition_matrix_and_covariance(dt=dts)
trans_mat = block_diag_embed(trans_mat) # S x full_state_dim x full_state_dim
process_covar = block_diag_embed(
process_covar
) # S x full_state_dim x full_state_dim
obs_matrix = self._get_obs_matrix() # full_state_dim x obs_dim
trans_obs = torch.matmul(trans_mat, obs_matrix) # S x full_state_dim x obs_dim
predicted_mean = torch.matmul(
filtering_state.loc.unsqueeze(-2), trans_obs
).squeeze(-2)
predicted_function_covar = torch.matmul(
trans_obs.transpose(-1, -2),
torch.matmul(filtering_state.covariance_matrix, trans_obs),
)
predicted_function_covar = predicted_function_covar + torch.matmul(
obs_matrix.transpose(-1, -2), torch.matmul(process_covar, obs_matrix)
)
if include_observation_noise:
obs_noise = self.obs_noise_scale.pow(2.0).diag_embed()
predicted_function_covar = predicted_function_covar + obs_noise
if not full_covar:
predicted_function_covar = predicted_function_covar.diagonal(
dim1=-1, dim2=-2
)
return predicted_mean, predicted_function_covar
[docs] @pyro_method
def forecast(self, targets, dts):
"""
:param torch.Tensor targets: A 2-dimensional tensor of real-valued targets
of shape ``(T, obs_dim)``, where ``T`` is the length of the time series and ``obs_dim``
is the dimension of the real-valued targets at each time step. These
represent the training data that are conditioned on for the purpose of making
forecasts.
:param torch.Tensor dts: A 1-dimensional tensor of times to forecast into the future,
with zero corresponding to the time of the final target ``targets[-1]``.
:returns torch.distributions.MultivariateNormal: Returns a predictive MultivariateNormal
distribution with batch shape ``(S,)`` and event shape ``(obs_dim,)``,
where ``S`` is the size of ``dts``.
"""
filtering_state = self._filter(targets)
predicted_mean, predicted_covar = self._forecast(dts, filtering_state)
return MultivariateNormal(predicted_mean, predicted_covar)
[docs]class DependentMaternGP(TimeSeriesModel):
"""
A time series model in which each output dimension is modeled as a univariate Gaussian Process
with a Matern kernel. The different output dimensions become correlated because the Gaussian
Processes are driven by a correlated Wiener process; see reference [1] for details.
If, in addition, `linearly_coupled` is True, additional correlation is achieved through
linear mixing as in :class:`LinearlyCoupledMaternGP`. The targets are assumed to be evenly
spaced in time. Training and inference are logarithmic in the length of the time series T.
:param float nu: The order of the Matern kernel; must be 1.5.
:param float dt: The time spacing between neighboring observations of the time series.
:param int obs_dim: The dimension of the targets at each time step.
:param bool linearly_coupled: Whether to linearly mix the various gaussian processes in the likelihood.
Defaults to False.
:param torch.Tensor length_scale_init: optional initial values for the kernel length scale
given as a ``obs_dim``-dimensional tensor
:param torch.Tensor obs_noise_scale_init: optional initial values for the observation noise scale
given as a ``obs_dim``-dimensional tensor
References
[1] "Dependent Matern Processes for Multivariate Time Series," Alexander Vandenberg-Rodes, Babak Shahbaba.
"""
def __init__(
self,
nu=1.5,
dt=1.0,
obs_dim=1,
linearly_coupled=False,
length_scale_init=None,
obs_noise_scale_init=None,
):
if nu != 1.5:
raise NotImplementedError("The only supported value of nu is 1.5")
self.dt = dt
self.obs_dim = obs_dim
if obs_noise_scale_init is None:
obs_noise_scale_init = 0.2 * torch.ones(obs_dim)
assert obs_noise_scale_init.shape == (obs_dim,)
super().__init__()
self.kernel = MaternKernel(
nu=nu, num_gps=obs_dim, length_scale_init=length_scale_init
)
self.full_state_dim = self.kernel.state_dim * obs_dim
# we demote self.kernel.kernel_scale from being a nn.Parameter
# since the relevant scales are now encoded in the wiener noise matrix
del self.kernel.kernel_scale
self.kernel.register_buffer("kernel_scale", torch.ones(obs_dim))
self.obs_noise_scale = PyroParam(
obs_noise_scale_init, constraint=constraints.positive
)
self.wiener_noise_tril = PyroParam(
torch.eye(obs_dim) + 0.03 * torch.randn(obs_dim, obs_dim).tril(-1),
constraint=constraints.lower_cholesky,
)
if linearly_coupled:
self.obs_matrix = nn.Parameter(
0.3 * torch.randn(self.obs_dim, self.obs_dim)
)
else:
obs_matrix = torch.zeros(self.full_state_dim, obs_dim)
for i in range(obs_dim):
obs_matrix[self.kernel.state_dim * i, i] = 1.0
self.register_buffer("obs_matrix", obs_matrix)
def _get_obs_matrix(self):
if self.obs_matrix.size(0) == self.obs_dim:
# (num_gps, obs_dim) => (state_dim * num_gps, obs_dim)
selector = [1.0] + [0.0] * (self.kernel.state_dim - 1)
return self.obs_matrix.repeat_interleave(
self.kernel.state_dim, dim=0
) * self.obs_matrix.new_tensor(selector).repeat(self.obs_dim).unsqueeze(-1)
else:
return self.obs_matrix
def _get_init_dist(self, stationary_covariance):
return torch.distributions.MultivariateNormal(
self.obs_matrix.new_zeros(self.full_state_dim), stationary_covariance
)
def _get_obs_dist(self):
return dist.Normal(
self.obs_matrix.new_zeros(self.obs_dim), self.obs_noise_scale
).to_event(1)
def _get_wiener_cov(self):
chol = self.wiener_noise_tril
wiener_cov = torch.mm(chol, chol.t()).reshape(self.obs_dim, 1, self.obs_dim, 1)
wiener_cov = wiener_cov * wiener_cov.new_ones(
self.kernel.state_dim, 1, self.kernel.state_dim
)
return wiener_cov.reshape(self.full_state_dim, self.full_state_dim)
def _stationary_covariance(self):
rho_j = math.sqrt(3.0) / self.kernel.length_scale.unsqueeze(-1).unsqueeze(-1)
rho_i = rho_j.unsqueeze(-1)
block = (
2.0 * self.kernel.mask00
+ (rho_i - rho_j) * (self.kernel.mask01 - self.kernel.mask10)
+ (2.0 * rho_i * rho_j) * self.kernel.mask11
)
block = block / (rho_i + rho_j).pow(3.0)
block = block.transpose(-2, -3).reshape(
self.full_state_dim, self.full_state_dim
)
return self._get_wiener_cov() * block
def _get_trans_dist(self, trans_matrix, stationary_covariance):
covar = stationary_covariance - torch.matmul(
trans_matrix.transpose(-1, -2),
torch.matmul(stationary_covariance, trans_matrix),
)
return MultivariateNormal(covar.new_zeros(self.full_state_dim), covar)
def _trans_matrix_distribution_stat_covar(self, dts):
stationary_covariance = self._stationary_covariance()
trans_matrix = self.kernel.transition_matrix(dt=dts)
trans_matrix = block_diag_embed(trans_matrix)
trans_dist = self._get_trans_dist(trans_matrix, stationary_covariance)
return trans_matrix, trans_dist, stationary_covariance
[docs] def get_dist(self, duration=None):
"""
Get the :class:`~pyro.distributions.GaussianHMM` distribution that corresponds to a :class:`DependentMaternGP`
:param int duration: Optional size of the time axis ``event_shape[0]``.
This is required when sampling from homogeneous HMMs whose parameters
are not expanded along the time axis.
"""
(
trans_matrix,
trans_dist,
stat_covar,
) = self._trans_matrix_distribution_stat_covar(self.dt)
return dist.GaussianHMM(
self._get_init_dist(stat_covar),
trans_matrix,
trans_dist,
self._get_obs_matrix(),
self._get_obs_dist(),
duration=duration,
)
[docs] @pyro_method
def log_prob(self, targets):
"""
:param torch.Tensor targets: A 2-dimensional tensor of real-valued targets
of shape ``(T, obs_dim)``, where ``T`` is the length of the time series and ``obs_dim``
is the dimension of the real-valued ``targets`` at each time step
:returns torch.Tensor: A (scalar) log probability
"""
assert targets.dim() == 2 and targets.size(-1) == self.obs_dim
return self.get_dist().log_prob(targets)
@torch.no_grad()
def _filter(self, targets):
"""
Return the filtering state for the associated state space model.
"""
assert targets.dim() == 2 and targets.size(-1) == self.obs_dim
return self.get_dist().filter(targets)
@torch.no_grad()
def _forecast(self, dts, filtering_state, include_observation_noise=True):
"""
Internal helper for forecasting.
"""
assert dts.dim() == 1
dts = dts.unsqueeze(-1).unsqueeze(-1).unsqueeze(-1)
trans_matrix, trans_dist, _ = self._trans_matrix_distribution_stat_covar(dts)
obs_matrix = self._get_obs_matrix()
trans_obs = torch.matmul(trans_matrix, obs_matrix)
predicted_mean = torch.matmul(
filtering_state.loc.unsqueeze(-2), trans_obs
).squeeze(-2)
predicted_function_covar = torch.matmul(
trans_obs.transpose(-1, -2),
torch.matmul(filtering_state.covariance_matrix, trans_obs),
) + torch.matmul(
obs_matrix.t(), torch.matmul(trans_dist.covariance_matrix, obs_matrix)
)
if include_observation_noise:
predicted_function_covar = (
predicted_function_covar + self.obs_noise_scale.pow(2.0)
)
return predicted_mean, predicted_function_covar
[docs] @pyro_method
def forecast(self, targets, dts):
"""
:param torch.Tensor targets: A 2-dimensional tensor of real-valued targets
of shape ``(T, obs_dim)``, where ``T`` is the length of the time series and ``obs_dim``
is the dimension of the real-valued targets at each time step. These
represent the training data that are conditioned on for the purpose of making
forecasts.
:param torch.Tensor dts: A 1-dimensional tensor of times to forecast into the future,
with zero corresponding to the time of the final target ``targets[-1]``.
:returns torch.distributions.MultivariateNormal: Returns a predictive MultivariateNormal
distribution with batch shape ``(S,)`` and event shape ``(obs_dim,)``, where ``S`` is the size of ``dts``.
"""
filtering_state = self._filter(targets)
predicted_mean, predicted_covar = self._forecast(dts, filtering_state)
return MultivariateNormal(predicted_mean, predicted_covar)