Source code for pyabc.transition.local_transition

import logging
from typing import Union

import numpy as np
import numpy.linalg as la
import pandas as pd
from scipy.spatial import cKDTree

from ..parameters import Parameter
from .base import Transition
from .exceptions import NotEnoughParticles
from .util import smart_cov

logger = logging.getLogger("ABC.Transition")

[docs] class LocalTransition(Transition): """ Local KDE fit. Takes into account only the k nearest neighbors, similar to [Filippi]_. Parameters ---------- k: int Number of nearest neighbors for local covariance calculation. scaling: float Scaling factor for the local covariance matrices. k_fraction: float, optional Calculate number of nearest neighbors to use according to ``k = k_fraction * population_size`` (and rounds it). Attributes ---------- EPS: float Scaling of the identity matrix to be added to the covariance in case the covariances are not invertible. .. [Filippi] Filippi, Sarah, Chris P. Barnes, Julien Cornebise, and Michael P.H. Stumpf. β€œOn Optimality of Kernels for Approximate Bayesian Computation Using Sequential Monte Carlo.” Statistical Applications in Genetics and Molecular Biology 12, no. 1 (2013): 87–107. doi:10.1515/sagmb-2012-0069. """ EPS = 1e-3 MIN_K = 10
[docs] def __init__(self, k=None, k_fraction=1 / 4, scaling=1): if k_fraction is not None: self.k_fraction = k_fraction self._k = None else: self.k_fraction = None self._k = k self.scaling = scaling
@property def k(self): if self.k_fraction is not None: if self.w is None: k_ = 0 else: k_ = int(self.k_fraction * len(self.w)) else: k_ = self._k try: dim = self.X_arr.shape[1] except AttributeError: dim = 0 return max([k_, self.MIN_K, dim])
[docs] def fit(self, X, w): if len(X) == 0: raise NotEnoughParticles("Fitting not possible.") self.X_arr = X.values ctree = cKDTree(self.X_arr) _, indices = ctree.query(self.X_arr, k=min(self.k + 1, X.shape[0])) covs, inv_covs, dets = list( zip(*[self._cov_and_inv(n, indices) for n in range(X.shape[0])]) ) self.covs = np.array(covs) self.inv_covs = np.array(inv_covs) self.determinants = np.array(dets) self.normalization = np.sqrt( (2 * np.pi) ** self.X_arr.shape[1] * self.determinants ) if not np.isreal(self.normalization).all(): raise Exception("Normalization not real") self.normalization = np.real(self.normalization)
[docs] def pdf(self, x: Union[Parameter, pd.Series, pd.DataFrame]): # convert to numpy array in correct order if isinstance(x, (Parameter, pd.Series)): x = np.array([x[key] for key in self.X.columns]) else: x = x[self.X.columns].to_numpy() # compute density if len(x.shape) == 1: return self._pdf_single(x) else: return np.array([self._pdf_single(xi) for xi in x])
def _pdf_single(self, x: np.ndarray): distance = self.X_arr - x cov_distance = np.einsum( "ij,ijk,ik->i", distance, self.inv_covs, distance ) return float( np.average( np.exp(-0.5 * cov_distance) / self.normalization, weights=self.w, ) ) def _cov_and_inv(self, n, indices): """ Calculate covariance around local support vector and also the inverse """ cov = self._cov(indices, n) det = la.det(cov) while det <= 0: cov += np.identity(cov.shape[0]) * self.EPS det = la.det(cov) try: inv_cov = la.inv(cov) except np.linalg.LinAlgError: inv_cov = np.linalg.pinv(cov) # Use pseudo-inverse as a fallback return cov, inv_cov, det def _cov(self, indices, n): if len(indices) > 1: surrounding_indices = indices[n, 1:] nearest_vector_deltas = ( self.X_arr[surrounding_indices] - self.X_arr[n] ) local_weights = self.w[surrounding_indices] else: nearest_vector_deltas = np.absolute(self.X_arr) local_weights = np.array([1]) cov = smart_cov( nearest_vector_deltas, local_weights / local_weights.sum() ) if np.absolute(cov.sum()) == 0: for k in range(cov.shape[0]): cov[k, k] = np.absolute(self.X_arr[0, k]) return cov * self.scaling
[docs] def rvs_single(self) -> Parameter: support_index = np.random.choice(self.w.shape[0], p=self.w) sample = np.random.multivariate_normal( self.X_arr[support_index], self.covs[support_index] ) return Parameter(dict(zip(self.X.columns, sample)))