import logging
from typing import Union
import numpy as np
import numpy.linalg as la
import pandas as pd
from scipy.spatial import cKDTree
from ..parameters import Parameter
from .base import Transition
from .exceptions import NotEnoughParticles
from .util import smart_cov
logger = logging.getLogger("ABC.Transition")
[docs]
class LocalTransition(Transition):
"""
Local KDE fit. Takes into account only the k
nearest neighbors, similar to [Filippi]_.
Parameters
----------
k: int
Number of nearest neighbors for local covariance
calculation.
scaling: float
Scaling factor for the local covariance matrices.
k_fraction: float, optional
Calculate number of nearest neighbors to use according to
``k = k_fraction * population_size`` (and rounds it).
Attributes
----------
EPS: float
Scaling of the identity matrix to be added to the covariance
in case the covariances are not invertible.
.. [Filippi] Filippi, Sarah, Chris P. Barnes, Julien Cornebise,
and Michael P.H. Stumpf. βOn Optimality of Kernels
for Approximate Bayesian Computation Using Sequential
Monte Carlo.β Statistical Applications in Genetics and
Molecular Biology 12, no. 1 (2013):
87β107. doi:10.1515/sagmb-2012-0069.
"""
EPS = 1e-3
MIN_K = 10
[docs]
def __init__(self, k=None, k_fraction=1 / 4, scaling=1):
if k_fraction is not None:
self.k_fraction = k_fraction
self._k = None
else:
self.k_fraction = None
self._k = k
self.scaling = scaling
@property
def k(self):
if self.k_fraction is not None:
if self.w is None:
k_ = 0
else:
k_ = int(self.k_fraction * len(self.w))
else:
k_ = self._k
try:
dim = self.X_arr.shape[1]
except AttributeError:
dim = 0
return max([k_, self.MIN_K, dim])
[docs]
def fit(self, X, w):
if len(X) == 0:
raise NotEnoughParticles("Fitting not possible.")
self.X_arr = X.values
ctree = cKDTree(self.X_arr)
_, indices = ctree.query(self.X_arr, k=min(self.k + 1, X.shape[0]))
covs, inv_covs, dets = list(
zip(*[self._cov_and_inv(n, indices) for n in range(X.shape[0])])
)
self.covs = np.array(covs)
self.inv_covs = np.array(inv_covs)
self.determinants = np.array(dets)
self.normalization = np.sqrt(
(2 * np.pi) ** self.X_arr.shape[1] * self.determinants
)
if not np.isreal(self.normalization).all():
raise Exception("Normalization not real")
self.normalization = np.real(self.normalization)
[docs]
def pdf(self, x: Union[Parameter, pd.Series, pd.DataFrame]):
# convert to numpy array in correct order
if isinstance(x, (Parameter, pd.Series)):
x = np.array([x[key] for key in self.X.columns])
else:
x = x[self.X.columns].to_numpy()
# compute density
if len(x.shape) == 1:
return self._pdf_single(x)
else:
return np.array([self._pdf_single(xi) for xi in x])
def _pdf_single(self, x: np.ndarray):
distance = self.X_arr - x
cov_distance = np.einsum(
"ij,ijk,ik->i", distance, self.inv_covs, distance
)
return float(
np.average(
np.exp(-0.5 * cov_distance) / self.normalization,
weights=self.w,
)
)
def _cov_and_inv(self, n, indices):
"""
Calculate covariance around local support vector
and also the inverse
"""
cov = self._cov(indices, n)
det = la.det(cov)
while det <= 0:
cov += np.identity(cov.shape[0]) * self.EPS
det = la.det(cov)
try:
inv_cov = la.inv(cov)
except np.linalg.LinAlgError:
inv_cov = np.linalg.pinv(cov) # Use pseudo-inverse as a fallback
return cov, inv_cov, det
def _cov(self, indices, n):
if len(indices) > 1:
surrounding_indices = indices[n, 1:]
nearest_vector_deltas = (
self.X_arr[surrounding_indices] - self.X_arr[n]
)
local_weights = self.w[surrounding_indices]
else:
nearest_vector_deltas = np.absolute(self.X_arr)
local_weights = np.array([1])
cov = smart_cov(
nearest_vector_deltas, local_weights / local_weights.sum()
)
if np.absolute(cov.sum()) == 0:
for k in range(cov.shape[0]):
cov[k, k] = np.absolute(self.X_arr[0, k])
return cov * self.scaling
[docs]
def rvs_single(self) -> Parameter:
support_index = np.random.choice(self.w.shape[0], p=self.w)
sample = np.random.multivariate_normal(
self.X_arr[support_index], self.covs[support_index]
)
return Parameter(dict(zip(self.X.columns, sample)))