Source code for pyabc.transition.multivariatenormal

from typing import Callable, Union

import numpy as np
import pandas as pd
import scipy.stats as st

from ..parameters import Parameter
from .base import Transition
from .exceptions import NotEnoughParticles
from .util import smart_cov

BandwidthSelector = Callable[[int, int], float]


[docs] def scott_rule_of_thumb(n_samples, dimension): """ Scott's rule of thumb. .. math:: \\left ( \\frac{1}{n} \\right ) ^{\\frac{1}{d+4}} (see also scipy.stats.kde.gaussian_kde.scotts_factor) """ return n_samples ** (-1.0 / (dimension + 4))
[docs] def silverman_rule_of_thumb(n_samples, dimension): """ Silverman's rule of thumb. .. math:: \\left ( \\frac{4}{n (d+2)} \\right ) ^ {\\frac{1}{d + 4}} (see also scipy.stats.kde.gaussian_kde.silverman_factor) """ return (4 / n_samples / (dimension + 2)) ** (1 / (dimension + 4))
[docs] class MultivariateNormalTransition(Transition): """ Transition via a multivariate Gaussian KDE estimate. Parameters ---------- scaling: float Scaling is a factor which additionally multiplies the covariance with. Since Silverman and Scott usually have too large bandwidths, it should make most sense to have 0 < scaling <= 1 bandwidth_selector: optional Defaults to `silverman_rule_of_thumb`. The bandwidth selector is a function of the form f(n_samples: float, dimension: int), where n_samples denotes the (effective) samples size (and is therefore) a float and dimension is the parameter dimension. """
[docs] def __init__( self, scaling: float = 1, bandwidth_selector: BandwidthSelector = silverman_rule_of_thumb, ): self.scaling: float = scaling self.bandwidth_selector: BandwidthSelector = bandwidth_selector # base population as an array self._X_arr: Union[np.ndarray, None] = None # perturbation covariance matrix self.cov: Union[np.ndarray, None] = None # normal perturbation distribution self.normal = None # cache a range array self._range = None
[docs] def fit(self, X: pd.DataFrame, w: np.ndarray) -> None: if len(X) == 0: raise NotEnoughParticles("Fitting not possible.") self._X_arr = X.values sample_cov = smart_cov(self._X_arr, w) dim = sample_cov.shape[0] eff_sample_size = 1 / (w**2).sum() bw_factor = self.bandwidth_selector(eff_sample_size, dim) self.cov = sample_cov * bw_factor**2 * self.scaling self.normal = st.multivariate_normal(cov=self.cov, allow_singular=True) # cache range array self._range = np.arange(len(self.X))
[docs] def rvs(self, size: int = None) -> Union[Parameter, pd.DataFrame]: if size is None: return self.rvs_single() sample_ind = np.random.choice( self._range, size=size, p=self.w, replace=True ) sample = self.X.iloc[sample_ind] perturbed = sample + np.random.multivariate_normal( np.zeros(self.cov.shape[0]), self.cov, size=size ) return perturbed
[docs] def rvs_single(self) -> Parameter: sample_ind = np.random.choice(self._range, p=self.w, replace=True) sample = self.X.iloc[sample_ind] perturbed = sample + np.random.multivariate_normal( np.zeros(self.cov.shape[0]), self.cov ) return Parameter(perturbed)
[docs] def pdf( self, x: Union[Parameter, pd.Series, pd.DataFrame], ) -> Union[float, np.ndarray]: # convert to numpy array in correct order if isinstance(x, (Parameter, pd.Series)): x = np.array([x[key] for key in self.X.columns]) else: x = x[self.X.columns].to_numpy() # compute density if len(x.shape) == 1: return self._pdf_single(x) else: return np.array([self._pdf_single(xi) for xi in x])
# alternative (higher memory consumption but broadcast) # x = np.atleast_3d(x) # n_sample x n_par x 1 # dens = self.normal.pdf(np.swapaxes(x - self._X_arr.T, 1, 2)) * self.w # dens = np.atleast_2d(dens).sum(axis=1).squeeze() def _pdf_single(self, x: np.ndarray): return float((self.normal.pdf(x - self._X_arr) * self.w).sum())