Source code for pyabc.distance.aggregate

"""Aggregated distances."""

import logging
from typing import Callable, List, Union

import numpy as np

from ..population import Sample
from import save_dict_to_json
from .base import Distance, FunctionDistance
from .scale import span

logger = logging.getLogger("ABC.Distance")

[docs] class AggregatedDistance(Distance): """ Aggregates a list of distance functions, all of which may work on subparts of the summary statistics. Then computes and returns the weighted sum of the distance values generated by the various distance functions. All class functions are propagated to the children and the obtained results aggregated appropriately. """
[docs] def __init__( self, distances: List[Union[Distance, Callable]], weights: Union[List, dict] = None, factors: Union[List, dict] = None, ): """ Parameters ---------- distances: List The distance functions to apply. weights: Union[List, dict], optional (default = [1,...]) The weights to apply to the distances when taking the sum. Can be a list with entries in the same order as the distances, or a dictionary of lists, with the keys being the single time points (if the weights should be iteration-specific). factors: Union[List, dict], optional (dfault = [1,...]) Scaling factors that the weights are multiplied with. The same structure applies as to weights. If None is passed, a factor of 1 is considered for every summary statistic. Note that in this class, factors are superfluous as everything can be achieved with weights alone, however in subclasses the factors can remain static while weights adapt over time, allowing for greater flexibility. """ super().__init__() if isinstance(distances, (Distance, Callable)): distances = [distances] self.distances: List[Distance] = [ FunctionDistance.to_distance(distance) for distance in distances ] self.weights = weights self.factors = factors
[docs] def requires_calibration(self) -> bool: return any(d.requires_calibration() for d in self.distances)
[docs] def is_adaptive(self) -> bool: return any(d.is_adaptive() for d in self.distances)
[docs] def initialize( self, t: int, get_sample: Callable[[], Sample], x_0: dict, total_sims: int, ): super().initialize( t=t, get_sample=get_sample, x_0=x_0, total_sims=total_sims, ) for distance in self.distances: distance.initialize( t=t, get_sample=get_sample, x_0=x_0, total_sims=total_sims, ) self.format_weights_and_factors(t)
[docs] def configure_sampler( self, sampler, ): """ Note: `configure_sampler` is applied by all distances sequentially, so care must be taken that they perform no contradictory operations on the sampler. """ for distance in self.distances: distance.configure_sampler(sampler)
[docs] def update( self, t: int, get_sample: Callable[[], Sample], total_sims: int, ) -> bool: """ The `sum_stats` are passed on to all distance functions, each of which may then update using these. If any update occurred, a value of True is returned indicating that e.g. the distance may need to be recalculated since the underlying distances changed. """ return any( distance.update( t=t, get_sample=get_sample, total_sims=total_sims, ) for distance in self.distances )
[docs] def __call__( self, x: dict, x_0: dict, t: int = None, par: dict = None, ) -> float: """ Applies all distance functions and computes the weighted sum of all obtained values. """ values = np.array( [distance(x, x_0, t, par) for distance in self.distances] ) self.format_weights_and_factors(t) weights = AggregatedDistance.get_for_t_or_latest(self.weights, t) factors = AggregatedDistance.get_for_t_or_latest(self.factors, t) return float( * factors, values))
[docs] def get_config(self) -> dict: """ Return configuration of the distance. Returns ------- config: dict Dictionary describing the distance. """ config = {} for j, distance in enumerate(self.distances): config[f'Distance_{j}'] = distance.get_config() return config
def format_weights_and_factors(self, t): self.weights = AggregatedDistance.format_dict( self.weights, t, len(self.distances) ) self.factors = AggregatedDistance.format_dict( self.factors, t, len(self.distances) )
[docs] @staticmethod def format_dict(w, t, n_distances, default_val=1.0): """ Normalize weight or factor dictionary to the employed format. """ if w is None: # use default w = {t: default_val * np.ones(n_distances)} elif not isinstance(w, dict): # f is not time-dependent # so just create one for time t w = {t: np.array(w)} return w
[docs] @staticmethod def get_for_t_or_latest(w, t): """ Extract values from dict for given time point. """ # take last time point for which values exist if t not in w: t = max(w) # extract values for time point return w[t]
[docs] class AdaptiveAggregatedDistance(AggregatedDistance): """ Adapt the weights of `AggregatedDistances` automatically over time. Parameters ---------- distances: As in AggregatedDistance. initial_weights: Weights to be used in the initial iteration. List with a weight for each distance function. factors: As in AggregatedDistance. adaptive: True: Adapt weights after each iteration. False: Adapt weights only once at the beginning in initialize(). This corresponds to a pre-calibration. scale_function: Function that takes a np.ndarray of shape (n_sample,), namely the values obtained by applying one of the distances on a set of samples, and returns a single float, namely the weight to apply to this distance function. Default: span. log_file: A log file to store weights for each time point in. Weights are currently not stored in the database. The data are saved in json format and can be retrieved via ``. """
[docs] def __init__( self, distances: List[Distance], initial_weights: List = None, factors: Union[List, dict] = None, adaptive: bool = True, scale_function: Callable = None, log_file: str = None, ): super().__init__(distances=distances) self.initial_weights: List = initial_weights self.factors: Union[List, dict] = factors self.adaptive: bool = adaptive self.x_0: Union[dict, None] = None if scale_function is None: scale_function = span self.scale_function: Callable = scale_function self.log_file: str = log_file
[docs] def requires_calibration(self) -> bool: return self.initial_weights is None or any( d.requires_calibration() for d in self.distances )
[docs] def is_adaptive(self) -> bool: return self.adaptive or any(d.is_adaptive() for d in self.distances)
[docs] def initialize( self, t: int, get_sample: Callable[[], Sample], x_0: dict, total_sims: int, ): """ Initialize weights. """ super().initialize( t=t, get_sample=get_sample, x_0=x_0, total_sims=total_sims, ) self.x_0 = x_0 if self.initial_weights is not None: self.weights[t] = self.initial_weights return # execute function sample = get_sample() # update weights from samples self._update(t, sample)
[docs] def update( self, t: int, get_sample: Callable[[], Sample], total_sims: int, ): """ Update weights based on all simulations. """ super().update(t=t, get_sample=get_sample, total_sims=total_sims) if not self.adaptive: return False # execute function sample = get_sample() self._update(t, sample) return True
def _update( self, t: int, sample: Sample, ): """ Here the real update of weights happens. """ # to-be-filled-and-appended weights dictionary w = [] sum_stats = sample.all_sum_stats for distance in self.distances: # apply distance to all samples current_list = np.array( [distance(sum_stat, self.x_0) for sum_stat in sum_stats] ) # compute scaling scale = self.scale_function(samples=current_list) # compute weight (inverted scale) if np.isclose(scale, 0): # This means that either the summary statistic is not in the # samples, or that all simulations were identical. In either # case, it should be safe to ignore this summary statistic. w.append(0) else: w.append(1 / scale) w = np.array(w) if w.size != len(self.distances): raise AssertionError( f"weights.size={w.size} != " f"len(distances)={len(self.distances)}" ) # add to w attribute, at time t self.weights[t] = np.array(w) # logging self.log(t)
[docs] def configure_sampler(self, sampler) -> None: """ Make the sampler return also rejected particles, because these are needed to get a better estimate of the summary statistic variability, avoiding a bias to accepted ones only. Parameters ---------- sampler: Sampler The sampler employed. """ super().configure_sampler(sampler=sampler) if self.adaptive: sampler.sample_factory.record_rejected()
def log(self, t: int) -> None: logger.debug(f"Weights[{t}] = {self.weights[t]}") if self.log_file: save_dict_to_json(self.weights, self.log_file)