Source code for pyabc.distance.kernel

"""Stochastic kernels."""
from typing import Callable, List, Sequence, Union

import numpy as np
from scipy import stats

from ..population import Sample
from .base import Distance

SCALE_LIN = "SCALE_LIN"
SCALE_LOG = "SCALE_LOG"
SCALES = [SCALE_LIN, SCALE_LOG]


[docs] class StochasticKernel(Distance): """ A stochastic kernel assesses the similarity between observed and simulated summary statistics or data via a probability measure. .. note:: The returned value cannot be interpreted as a distance function, but rather as an inverse distance, as it increases as the similarity between observed and simulated summary statistics increases. Thus, a StochasticKernel should only be used together with a StochasticAcceptor. Parameters ----------- ret_scale: str, optional (default = SCALE_LIN) The scale of the value returned in __call__: Given a proability density p(x,x_0), the returned value can be either of p(x,x_0), or log(p(x,x_0)). keys: List[str], optional The keys of the summary statistics, specifying the order to be used. pdf_max: float, optional The maximum possible probability density function value. Defaults to None and is then computed as the density at (x_0, x_0), where x_0 denotes the observed summary statistics. Must be overridden if pdf_max is to be used in the analysis by the acceptor and the default is not applicable. This value should be in the scale specified by ret_scale already. """
[docs] def __init__( self, ret_scale: str = SCALE_LIN, keys: List[str] = None, pdf_max: float = None, ): StochasticKernel.check_ret_scale(ret_scale) self.ret_scale = ret_scale self.keys = keys self.pdf_max = pdf_max
[docs] def initialize( self, t: int, get_sample: Callable[[], Sample], x_0: dict, total_sims: int, ): """ Remember the summary statistic keys in sorted order, if not set in __init__ already. """ # initialize keys if self.keys is None: self.initialize_keys(x_0)
@staticmethod def check_ret_scale(ret_scale): if ret_scale not in SCALES: raise ValueError( f"The ret_scale {ret_scale} must be one of {SCALES}." ) def initialize_keys(self, x): self.keys = sorted(x)
[docs] class FunctionKernel(StochasticKernel): """ This is a wrapper around a simple function which calculates the probability density. Parameters ---------- fun: Callable A Callable accepting `__call__`'s parameters. The function should be a pdf or pmf. ret_scale, keys, pdf_max: as in StochasticKernel """
[docs] def __init__( self, fun: Callable, ret_scale: str = SCALE_LIN, keys: List[str] = None, pdf_max: float = None, ): super().__init__(ret_scale=ret_scale, keys=keys, pdf_max=pdf_max) self.fun = fun
[docs] def __call__( self, x: dict, x_0: dict, t: int = None, par: dict = None, ) -> float: return self.fun(x=x, x_0=x_0, t=t, par=par)
[docs] class NormalKernel(StochasticKernel): """ A kernel with a normal, i.e. Gaussian, probability density. This is just a wrapper around sp.multivariate_normal. Parameters ---------- cov: array_like, optional (default = identiy matrix) Covariance matrix of the distribution. ret_scale, keys, pdf_max: As in StochasticKernel. .. note:: The order of the entries in the mean and cov vectors is assumed to be the same as the one in keys. If keys is None, it is assumed to be the same as the one obtained via sorted(x.keys()) for summary statistics x. """
[docs] def __init__( self, cov: np.ndarray = None, ret_scale: str = SCALE_LOG, keys: List[str] = None, pdf_max: float = None, ): super().__init__(ret_scale=ret_scale, keys=keys, pdf_max=pdf_max) self.cov = cov self.ret_scale = ret_scale
[docs] def initialize( self, t: int, get_sample: Callable[[], Sample], x_0: dict, total_sims: int, ): # in particular set keys super().initialize( t=t, get_sample=get_sample, x_0=x_0, total_sims=total_sims, ) # initialize distribution self._init_distr(x_0) # cache pdf_max if self.pdf_max is None: # take value at observed summary statistics self.pdf_max = self(x_0, x_0)
def _init_distr(self, x_0): """ Initialize cov (covariance) and rv (distribution). """ if self.cov is None: dim = sum(np.size(x_0[key]) for key in self.keys) self.cov = np.eye(dim) self.cov = np.asarray(self.cov) # create frozen multivariate normal distribution dim = self.cov.shape[0] mean = np.zeros(dim) self.rv = stats.multivariate_normal(mean=mean, cov=self.cov)
[docs] def __call__( self, x: dict, x_0: dict, t: int = None, par: dict = None, ) -> float: """ Return the value of the normal distribution at x - x_0, or its logarithm. """ # safety check if self.keys is None: self.initialize_keys(x_0) # difference to array diff = _diff_arr(x, x_0, self.keys) # compute pdf if self.ret_scale == SCALE_LIN: ret = self.rv.pdf(diff) else: # self.ret_scale == SCALE_LOG ret = self.rv.logpdf(diff) return ret
[docs] class IndependentNormalKernel(StochasticKernel): """ This kernel can be used for efficient computations of large-scale independent normal distributions, circumventing the covariance matrix, and performing computations directly on a log-scale to avoid numeric issues. Parameters ---------- var: Union[array_like, float, Callable], optional (default = ones vector) Variances of the distribution (assuming zeros in the off-diagonal of the covariance matrix). Can also be a Callable taking as arguments the parameters. In that case, pdf_max should also be given if it is supposed to be used. Usually, it will then be given as the density at the observed statistics assuming the minimum allowed variance. keys, pdf_max: As in StochasticKernel. """
[docs] def __init__( self, var: Union[Callable, Sequence[float], float] = None, keys: List[str] = None, pdf_max: float = None, ): super().__init__(ret_scale=SCALE_LOG, keys=keys, pdf_max=pdf_max) self.var = var
[docs] def initialize( self, t: int, get_sample: Callable[[], Sample], x_0: dict, total_sims: int, ): # in particular set keys super().initialize( t=t, get_sample=get_sample, x_0=x_0, total_sims=total_sims, ) # dimension dim = sum(np.size(x_0[key]) for key in self.keys) # initialize var if self.var is None: self.var = np.ones(dim) if not callable(self.var): self.var = np.asarray(self.var) * np.ones(dim) # cache pdf_max if self.pdf_max is None and not callable(self.var): # take value at observed summary statistics self.pdf_max = self(x_0, x_0)
[docs] def __call__( self, x: dict, x_0: dict, t: int = None, par: dict = None, ): # safety check if self.keys is None: self.initialize_keys(x_0) # compute variance var = self.var(par) if callable(self.var) else self.var var = np.asarray(var) # difference to array diff = _diff_arr(x, x_0, self.keys) if var.size == 1: var = var * np.ones(diff.size) # compute pdf log_2_pi = np.sum(np.log(2) + np.log(np.pi) + np.log(var)) squares = np.sum((diff**2) / var) log_pdf = -0.5 * (log_2_pi + squares) return log_pdf
[docs] class IndependentLaplaceKernel(StochasticKernel): """ This kernel can be used for efficient computations of large-scale independent Laplace distributions, performing computations directly on a log-scale to avoid numeric issues. In each coordinate, a 1-dim Laplace distribution .. math:: p(x) = \\frac{1}{2b}\\exp (\\frac{1}{b}|x-a|) is assumed. Parameters ---------- scale: Union[array_like, float, Callable], optional (default = ones vector) Scale terms b of the distribution. Can also be a Callable taking as arguments the parameters. In that case, pdf_max should also be given if it is supposed to be used. Usually, it will then be given as the density at the observed statistics assuming the minimum allowed variance. keys, pdf_max: As in StochasticKernel. """
[docs] def __init__( self, scale: Union[Callable, Sequence[float], float] = None, keys: List[str] = None, pdf_max: float = None, ): super().__init__(ret_scale=SCALE_LOG, keys=keys, pdf_max=pdf_max) self.scale = scale self.dim = None
[docs] def initialize( self, t: int, get_sample: Callable[[], Sample], x_0: dict, total_sims: int, ): # in particular set keys super().initialize( t=t, get_sample=get_sample, x_0=x_0, total_sims=total_sims, ) # dimension dim = sum(np.size(x_0[key]) for key in self.keys) # initialize scale correctly if self.scale is None: self.scale = np.ones(dim) if not callable(self.scale): self.scale = np.asarray(self.scale) * np.ones(dim) # cache pdf_max if self.pdf_max is None and not callable(self.scale): # take value at observed summary statistics self.pdf_max = self(x_0, x_0)
[docs] def __call__( self, x: dict, x_0: dict, t: int = None, par: dict = None, ): # safety check if self.keys is None: self.initialize_keys(x_0) # compute variance scale = self.scale(par) if callable(self.scale) else self.scale scale = np.asarray(scale) # difference to array diff = _diff_arr(x, x_0, self.keys) if scale.size == 1: scale = scale * np.ones(diff.size) # compute pdf log_2_b = np.sum(np.log(2) + np.log(scale)) abs_diff = np.sum(np.abs(diff) / scale) log_pdf = -(log_2_b + abs_diff) return log_pdf
[docs] class BinomialKernel(StochasticKernel): """ A kernel with a binomial probability mass function. Parameters ---------- p: Union[float, Callable] The success probability. ret_scale, keys, pdf_max: See StochasticKernel. """
[docs] def __init__( self, p: Union[float, Callable], ret_scale: str = SCALE_LOG, keys: List[str] = None, pdf_max: float = None, ): super().__init__(ret_scale=ret_scale, keys=keys, pdf_max=pdf_max) if not callable(p) and (p > 1 or p < 0): raise ValueError( f"The success probability p={p} must be in the interval" f"[0, 1]." ) self.p = p
[docs] def initialize( self, t: int, get_sample: Callable[[], Sample], x_0: dict, total_sims: int, ): # in particular set keys super().initialize( t=t, get_sample=get_sample, x_0=x_0, total_sims=total_sims, ) # cache pdf_max if self.pdf_max is None and not callable(self.p): # take value at observed summary statistics self.pdf_max = binomial_pdf_max( x_0, self.keys, self.p, self.ret_scale )
[docs] def __call__( self, x: dict, x_0: dict, t: int = None, par: dict = None, ) -> float: x = np.asarray(_arr(x, self.keys), dtype=int) x_0 = np.asarray(_arr(x_0, self.keys), dtype=int) # compute p p = self.p if not callable(self.p) else self.p(par) if self.ret_scale == SCALE_LIN: ret = np.prod(stats.binom.pmf(k=x_0, n=x, p=p)) else: # self.ret_scale == SCALE_LOG ret = np.sum(stats.binom.logpmf(k=x_0, n=x, p=p)) return float(ret)
[docs] class PoissonKernel(StochasticKernel): """ A kernel with a Poisson probability mass function. Parameters ---------- ret_scale, keys, pdf_max: See StochasticKernel. """
[docs] def __init__( self, ret_scale: str = SCALE_LOG, keys: List[str] = None, pdf_max: float = None, ): super().__init__(ret_scale=ret_scale, keys=keys, pdf_max=pdf_max)
[docs] def initialize( self, t: int, get_sample: Callable[[], Sample], x_0: dict, total_sims: int, ): # in particular set keys super().initialize( t=t, get_sample=get_sample, x_0=x_0, total_sims=total_sims, ) # cache pdf_max if self.pdf_max is None: # take value at observed summary statistics # this is the optimal value self.pdf_max = self(x_0, x_0)
[docs] def __call__( self, x: dict, x_0: dict, t: int = None, par: dict = None, ) -> float: x = np.asarray(_arr(x, self.keys), dtype=int) x_0 = np.asarray(_arr(x_0, self.keys), dtype=int) if self.ret_scale == SCALE_LIN: ret = np.prod(stats.poisson.pmf(k=x_0, mu=x)) else: # self.ret_scale == SCALE_LOG ret = np.sum(stats.poisson.logpmf(k=x_0, mu=x)) return float(ret)
[docs] class NegativeBinomialKernel(StochasticKernel): """ A kernel with a negative binomial probability mass function. Parameters ---------- p: Union[float, Callable] The success probability. ret_scale, keys, pdf_max: See StochasticKernel. """
[docs] def __init__( self, p: float, ret_scale: str = SCALE_LOG, keys: List[str] = None, pdf_max: float = None, ): super().__init__(ret_scale=ret_scale, keys=keys, pdf_max=pdf_max) if not callable(p) and (p > 1 or p < 0): raise ValueError( f"The success probability p={p} must be in the interval" f"[0, 1]." ) self.p = p
[docs] def initialize( self, t: int, get_sample: Callable[[], Sample], x_0: dict, total_sims: int, ): # in particular set keys super().initialize( t=t, get_sample=get_sample, x_0=x_0, total_sims=total_sims, )
# pdf_max is not computed
[docs] def __call__( self, x: dict, x_0: dict, t: int = None, par: dict = None, ) -> float: x = np.asarray(_arr(x, self.keys), dtype=int) x_0 = np.asarray(_arr(x_0, self.keys), dtype=int) # compute p p = self.p if not callable(self.p) else self.p(par) if self.ret_scale == SCALE_LIN: ret = np.prod(stats.nbinom.pmf(k=x_0, n=x, p=p)) else: # self.ret_scale == SCALE_LOG ret = np.sum(stats.nbinom.logpmf(k=x_0, n=x, p=p)) return float(ret)
def binomial_pdf_max(x_0, keys, p, ret_scale): """ Compute the model value of the binomial distribution. Note that since we interpret x_0 as the noisy k value, we search the model value over arbitrary n. The optimal value was calculated by checking p(n+1,k) / p(n,k). """ ks = np.asarray(_arr(x_0, keys), dtype=int) ns = np.maximum(np.ceil((ks - p) / p), 0) pms = stats.binom.logpmf(k=ks, n=ns, p=p) # sum over all log values log_pdf_max = np.sum(pms) if ret_scale == SCALE_LIN: return np.exp(log_pdf_max) return log_pdf_max def _diff_arr(x, x_0, keys): """ Get difference array. """ diff = [] for key in keys: d = x[key] - x_0[key] try: diff.extend(d) except Exception: diff.append(d) diff = np.asarray(diff) return diff def _arr(x, keys): """ Get as flat array. """ arr = [] for key in keys: val = x[key] try: arr.extend(val) except Exception: arr.append(val) arr = np.asarray(arr) return arr