Source code for pyro.distributions.lkj

import math

import torch
from torch.distributions import biject_to, constraints, transform_to
from torch.distributions.constraints import Constraint
from torch.distributions.transforms import Transform

from pyro.distributions import Beta, TorchDistribution


########################################
# Define constraint
########################################


class _CorrCholesky(Constraint):
    """
    Constrains to lower-triangular square matrices with positive diagonals and
    Euclidean norm of each row is 1, such that `torch.mm(omega, omega.t())` will
    have unit diagonal.
    """

    def check(self, value):
        unit_norm_row = (value.norm(dim=-1).sub(1) < 1e-4).min(-1)[0]
        return constraints.lower_cholesky.check(value) & unit_norm_row


corr_cholesky_constraint = _CorrCholesky()


########################################
# Define transforms
########################################

def _vector_to_l_cholesky(z):
    D = (1.0 + math.sqrt(1.0 + 8.0 * z.shape[-1])) / 2.0
    if D % 1 != 0:
        raise ValueError("Correlation matrix transformation requires d choose 2 inputs")
    D = int(D)
    x = torch.zeros(z.shape[:-1] + (D, D), dtype=z.dtype, device=z.device)

    x[..., 0, 0] = 1
    x[..., 1:, 0] = z[..., :(D - 1)]
    i = D - 1
    last_squared_x = torch.zeros(z.shape[:-1] + (D,), dtype=z.dtype, device=z.device)
    for j in range(1, D):
        distance_to_copy = D - 1 - j
        last_squared_x = last_squared_x[..., 1:] + x[..., j:, (j - 1)].clone()**2
        x[..., j, j] = (1 - last_squared_x[..., 0]).sqrt()
        x[..., (j + 1):, j] = z[..., i:(i + distance_to_copy)] * (1 - last_squared_x[..., 1:]).sqrt()
        i += distance_to_copy
    return x


class CorrLCholeskyTransform(Transform):
    """
    Transforms a vector into the cholesky factor of a correlation matrix.

    The input should have shape `[batch_shape] + [d * (d-1)/2]`. The output will have
    shape `[batch_shape] + [d, d]`.

    Reference:

    [1] `Cholesky Factors of Correlation Matrices`, Stan Reference Manual v2.18, Section 10.12
    """
    domain = constraints.real
    codomain = corr_cholesky_constraint
    bijective = True
    sign = +1
    event_dim = 1

    def __eq__(self, other):
        return isinstance(other, CorrLCholeskyTransform)

    def _call(self, x):
        z = x.tanh()
        return _vector_to_l_cholesky(z)

    def _inverse(self, y):
        if (y.shape[-2] != y.shape[-1]):
            raise ValueError("A matrix that isn't square can't be a Cholesky factor of a correlation matrix")
        D = y.shape[-1]

        z_tri = torch.zeros(y.shape[:-2] + (D - 2, D - 2), dtype=y.dtype, device=y.device)
        z_stack = [
            y[..., 1:, 0]
        ]

        for i in range(2, D):
            z_tri[..., i - 2, 0:(i - 1)] = y[..., i, 1:i] / (1 - y[..., i, 0:(i - 1)].pow(2).cumsum(-1)).sqrt()
        for j in range(D - 2):
            z_stack.append(z_tri[..., j:, j])

        z = torch.cat(z_stack, -1)
        return torch.log1p((2 * z) / (1 - z)) / 2

    def log_abs_det_jacobian(self, x, y):
        # Note dependence on pytorch 1.0.1 for batched tril
        tanpart = x.cosh().log().sum(-1).mul(-2)
        matpart = (1 - y.pow(2).cumsum(-1).tril(diagonal=-2)).log().div(2).sum(-1).sum(-1)
        return tanpart + matpart

# register transform to global store


@biject_to.register(corr_cholesky_constraint)
@transform_to.register(corr_cholesky_constraint)
def _transform_to_corr_cholesky(constraint):
    return CorrLCholeskyTransform()


########################################
# Define distribution
########################################

# TODO: Modify class to support more than one eta value at a time?
[docs]class LKJCorrCholesky(TorchDistribution): """ Generates cholesky factors of correlation matrices using an LKJ prior. The expected use is to combine it with a vector of variances and pass it to the scale_tril parameter of a multivariate distribution such as MultivariateNormal. E.g., if theta is a (positive) vector of covariances with the same dimensionality as this distribution, and Omega is sampled from this distribution, scale_tril=torch.mm(torch.diag(sqrt(theta)), Omega) Note that the `event_shape` of this distribution is `[d, d]` .. note:: When using this distribution with HMC/NUTS, it is important to use a `step_size` such as 1e-4. If not, you are likely to experience LAPACK errors regarding positive-definiteness. For example usage, refer to `pyro/examples/lkj.py <https://github.com/pyro-ppl/pyro/blob/dev/examples/lkj.py>`_. :param int d: Dimensionality of the matrix :param torch.Tensor eta: A single positive number parameterizing the distribution. """ arg_constraints = {"eta": constraints.positive} support = corr_cholesky_constraint has_rsample = False def __init__(self, d, eta, validate_args=None): if eta.numel() != 1: raise ValueError("eta must be a single number; for a larger batch size, call expand") if d <= 1: raise ValueError("d must be > 1 in any correlation matrix") eta = eta.squeeze() vector_size = (d * (d - 1)) // 2 alpha = eta.add(0.5 * (d - 1.0)) concentrations = torch.empty(vector_size, dtype=eta.dtype, device=eta.device) i = 0 for k in range(d - 1): alpha -= .5 concentrations[..., i:(i + d - k - 1)] = alpha i += d - k - 1 self._gen = Beta(concentrations, concentrations) self.eta = eta self._d = d self._lkj_constant = None super(LKJCorrCholesky, self).__init__(torch.Size(), torch.Size((d, d)), validate_args=validate_args)
[docs] def sample(self, sample_shape=torch.Size()): y = self._gen.sample(sample_shape=self.batch_shape + sample_shape).detach() z = y.mul(2).add(-1.0) return _vector_to_l_cholesky(z)
[docs] def expand(self, batch_shape, _instance=None): new = self._get_checked_instance(LKJCorrCholesky, _instance) batch_shape = torch.Size(batch_shape) new._gen = self._gen new.eta = self.eta new._d = self._d new._lkj_constant = self._lkj_constant super(LKJCorrCholesky, new).__init__(batch_shape, self.event_shape, validate_args=False) new._validate_args = self._validate_args return new
[docs] def lkj_constant(self, eta, K): if self._lkj_constant is not None: return self._lkj_constant Km1 = K - 1 constant = torch.lgamma(eta.add(0.5 * Km1)).mul(Km1) k = torch.linspace(start=1, end=Km1, steps=Km1, dtype=eta.dtype, device=eta.device) constant -= (k.mul(math.log(math.pi) * 0.5) + torch.lgamma(eta.add(0.5 * (Km1 - k)))).sum() self._lkj_constant = constant return constant
[docs] def log_prob(self, x): if self._validate_args: self._validate_sample(x) eta = self.eta lp = self.lkj_constant(eta, self._d) Km1 = self._d - 1 log_diagonals = x.diagonal(offset=0, dim1=-1, dim2=-2)[..., 1:].log() # TODO: Figure out why the `device` kwarg to torch.linspace seems to not work in certain situations, # and a seemingly redundant .to(x.device) is needed below. values = log_diagonals * torch.linspace(start=Km1 - 1, end=0, steps=Km1, dtype=x.dtype, device=x.device).expand_as(log_diagonals).to(x.device) values += log_diagonals.mul(eta.mul(2).add(-2.0)) values = values.sum(-1) + lp values, _ = torch.broadcast_tensors(values, torch.empty(self.batch_shape)) return values