Source code for pyabc.sumstat.subset

"""Identification of sample subsets for model training."""

import logging
from abc import ABC, abstractmethod
from typing import Tuple

import numpy as np

from ..weighted_statistics import weighted_mean

try:
    import sklearn.mixture as skl_mx
except ImportError:
    skl_mx = None


logger = logging.getLogger("ABC.Sumstat")


[docs] class Subsetter(ABC): """Select a localized sample subset for model training. E.g. in the :class:`pyabc.PredictorSumstat` class, we employ predictor models `y -> p` from data to parameters. These models should be local, e.g. trained on samples from a high-density region. This is because the inverse mapping of `p -> y`, `y -> p`, does in general not exist globally, e.g. due to parameter non-identifiability, multiple modes, and model stochasticity. Therefore, it is important to train the models on a sample set in which a functional form is roughly given. This class allows to subset a given sample to generate a localized sample. """
[docs] @abstractmethod def select( self, x: np.ndarray, y: np.ndarray, w: np.ndarray, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """Select samples for model training. This is the main method. Parameters ---------- x: Samples, shape (n_sample, n_feature). y: Targets, shape (n_sample, n_out). w: Weights, shape (n_sample,). Returns ------- A tuple x_, y_, w_ of the subsetted samples, targets and weights with n_sample -> n_sample_used <= n_sample. """
[docs] class IdSubsetter(Subsetter): """Identity subset mapping."""
[docs] def select( self, x: np.ndarray, y: np.ndarray, w: np.ndarray, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """Just return x, y, w unchanged.""" return x, y, w
[docs] class GMMSubsetter(Subsetter): """Using a Gaussian mixed model for subset identification. Performs model selection over Gaussian mixed models with up to `n_components_max` components and returns all samples belonging to the same cluster as the posterior mean. Optionally, this set is augmented by the nearest neighbors to reach a fraction `min_fraction` of the original sample size. Parameters ---------- n_components_min: Minimum candidate number of clusters. n_components_max: Maximum candidate number of clusters. min_fraction: Minimum fraction of samples in the result. If the obtained cluster has less samples, it is augmented by nearby samples. normalize_labels: Whether to z-score normalize labels internally prior to cluster analysis. gmm_args: Keyword arguments that are passed on to the sklearn `GaussianMixture`, see https://scikit-learn.org/stable/modules/generated/sklearn.mixture.GaussianMixture.html. # noqa Properties ---------- gmm: The best fitted Gaussian mixture model. n_components: The corresponding number of components. bics: All BIC values used in model selection. """
[docs] def __init__( self, n_components_min: int = 1, n_components_max: int = 5, min_fraction: float = 0.3, normalize_labels: bool = True, gmm_args: dict = None, ): if skl_mx is None: raise ImportError( "This class requires an installation of scikit-learn. " "Install e.g. via `pip install pyabc[scikit-learn]`" ) self.n_components_min: int = n_components_min self.n_components_max: int = n_components_max self.min_fraction: float = min_fraction self.normalize_labels: bool = normalize_labels if gmm_args is None: gmm_args = {} default_gmm_args = {'max_iter': 500, 'n_init': 5} default_gmm_args.update(gmm_args) self.gmm_args: dict = default_gmm_args # status variables, filled in `select()` self.gmm = None self.n_components = None self.bics = None
[docs] def select( self, x: np.ndarray, y: np.ndarray, w: np.ndarray, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """Select based on GMM clusters.""" # normalize if self.normalize_labels: y_norm = (y - np.mean(y, axis=0)) / np.std(y, axis=0) else: y_norm = y # find most suitable number of clusters gmm, n_components, bics = self._find_optimal_model(y=y_norm) self.gmm = gmm self.n_components = n_components self.bics = bics # identify cluster of posterior mean mean: np.ndarray = weighted_mean(y_norm, w / w.sum()).reshape(1, -1) cluster_id: np.ndarray = gmm.predict(mean) # find cluster associations of data points cluster_ids: np.ndarray = gmm.predict(y_norm) in_cluster: np.ndarray = cluster_ids == cluster_id # augment subset selected = get_augmented_subset( y=y_norm, ref=mean, in_cluster=in_cluster, min_fraction=self.min_fraction, ) # reduce sample set x_new, y_new, w_new = x[selected], y[selected], w[selected] logger.info( f"Subsetting: #clusters: {n_components}, " f"target cluster points: {sum(in_cluster)}, using {len(y_new)} " f"(BICs: {bics})", ) return x_new, y_new, w_new
def _find_optimal_model(self, y: np.ndarray): """Find optimal GMM model. Parameters ---------- y: The data to fit the model on (usually parameters). Returns ------- best_gmm, best_n_components, bics: The best fitted GMM, the number of components, and all BIC values. """ # iterate over numbers of clusters bics = {} best_gmm = None best_n_components = None for n_components in range( self.n_components_min, self.n_components_max + 1 ): # fit a Gaussian mixture model gmm = skl_mx.GaussianMixture( n_components=n_components, **self.gmm_args ) gmm.fit(y) # evaluate BIC as model selection criterion bic = gmm.bic(y) if len(bics) == 0 or bic < min(bics.values()): best_gmm = gmm best_n_components = n_components bics[n_components] = bic return best_gmm, best_n_components, bics
def get_augmented_subset( y: np.ndarray, ref: np.ndarray, in_cluster: np.ndarray, min_fraction: float, ) -> np.ndarray: """Select indices from clustering and potential augment to match minimum. Parameters ---------- y: The original y. ref: Reference value (e.g. posterior mean). in_cluster: Boolean indicators whether an entry is in the cluster. min_fraction: Minimum required fraction of particles, in [0, 1]. Returns ------- in_cluster: Boolean indicators whether an entry is in the augmented cluster. """ # copy as we modify in_cluster = in_cluster.copy() # calculate how many are required desired: int = int(min_fraction * len(y)) ixs_in_cluster: np.ndarray = np.flatnonzero(in_cluster) required: int = desired - len(ixs_in_cluster) if required <= 0: return in_cluster # sort remaining values by distance to reference point y_left = y[~in_cluster] distances: np.ndarray = np.linalg.norm(y_left - ref, ord=2, axis=1) # indices of the required closest parameters ixs_nearest: np.ndarray = np.argpartition(distances, required)[:required] ixs_not_in_cluster: np.ndarray = np.flatnonzero(~in_cluster) in_cluster[ixs_not_in_cluster[ixs_nearest]] = True if sum(in_cluster) != desired: raise AssertionError("Unexpected number of entries.") return in_cluster