"""ABC-SMC"""
import copy
import logging
from datetime import datetime, timedelta
from typing import Callable, List, Tuple, TypeVar, Union
import numpy as np
from ..acceptor import (
Acceptor,
FunctionAcceptor,
StochasticAcceptor,
UniformAcceptor,
)
from ..distance import (
Distance,
FunctionDistance,
PNormDistance,
StochasticKernel,
)
from ..epsilon import Epsilon, MedianEpsilon, TemperatureBase
from ..inference_util import (
AnalysisVars,
create_analysis_id,
create_prior_pdf,
create_simulate_from_prior_function,
create_simulate_function,
create_transition_pdf,
eps_from_hist,
termination_criteria_fulfilled,
)
from ..model import FunctionModel, Model
from ..platform_factory import DefaultSampler
from ..population import Population, Sample
from ..populationstrategy import ConstantPopulationSize, PopulationStrategy
from ..random_variables import RV, Distribution
from ..sampler import Sampler
from ..storage import History
from ..transition import (
ModelPerturbationKernel,
MultivariateNormalTransition,
Transition,
)
from ..weighted_statistics import effective_sample_size
logger = logging.getLogger("ABC")
model_output = TypeVar("model_output")
def identity(x):
return x
def run_cleanup(run):
"""Wrapper: Run and in any case clean up afterwards."""
def wrapped_run(self: "ABCSMC", *args, **kwargs):
try:
# the actual run
ret = run(self, *args, **kwargs)
finally:
# close session and store end time
self.history.done(end_time=datetime.now())
# tell samplers to stop any ongoing processes
self.sampler.stop()
return ret
return wrapped_run
[docs]
class ABCSMC:
"""
Approximate Bayesian Computation - Sequential Monte Carlo (ABCSMC).
This is an implementation of an ABCSMC algorithm similar to
[#tonistumpf]_.
The ABCSMC class is the most central class of the pyABC package.
Most of the other classes serve to configure it (i.e. the other classes
implement a strategy pattern).
Parameters
----------
models:
Can be a list of models, a single model, a list of functions, or a
single function.
* If models is a function, then the function should have a single
parameter, which is of dictionary type, and should return a single
dictionary, which contains the simulated data.
* If models is a list of functions, then the first point applies to
each function.
* Models can also be a list of Model instances or a single
Model instance.
This model's output is passed to the summary statistics calculation.
Per default, the model is assumed to already return the calculated
summary statistics. Accordingly, the default summary_statistics
function is just the identity. Note that the sampling and evaluation of
particles happens in the model's methods, so overriding these offers a
great deal of flexibility, in particular the freedom to use or ignore
the distance_function, summary_statistics, and eps parameters here.
parameter_priors:
A list of prior distributions for the models' parameters.
Each list entry is the prior distribution for the corresponding model.
distance_function:
Measures the distance of the tentatively sampled particle to the
measured data.
population_size:
Specify the size of the population.
If ``population_specification`` is an ``int``, then the size is
constant. Adaptive population sizes are also possible by passing a
:class:`pyabc.populationstrategy.PopulationStrategy` object.
The default is 100 particles per population.
summary_statistics:
A function which takes the raw model output as returned by
any ot the models and calculates the corresponding summary
statistics. Note that the default value is just the identity
function. I.e. the model is assumed to already calculate
the summary statistics. However, in a model selection setting
it can make sense to have the model produce some kind or raw output
and then take the same summary statistics function for all the models.
model_prior:
A random variable giving the prior weights of the model classes.
The default is a uniform prior over the model classes,
``RV("randint", 0, len(models))``.
model_perturbation_kernel:
Kernel which governs with which probability to switch from one
model to another model for a given sample while generating proposals
for the subsequent population from the current population.
transitions:
A list of :class:`pyabc.transition.Transition` objects
or a single :class:`pyabc.transition.Transition` in case
of a single model. Defaults to multivariate normal transitions for
every model.
eps:
Accepts any :class:`pyabc.epsilon.Epsilon` subclass.
The default is the :class:`pyabc.epsilon.MedianEpsilon` which adapts
automatically. The object passed here determines how the acceptance
threshold scheduling is performed.
sampler:
In some cases, a mapper implementation will require initialization
to run properly, e.g. database connection, grid setup, etc..
The sampler is an object that encapsulates this information.
The default sampler :class:`pyabc.sampler.MulticoreEvalParallelSampler`
will parallelize across the cores of a single
machine only.
acceptor:
Takes a distance function, summary statistics and an epsilon threshold
to decide about acceptance of a particle. Argument accepts any subclass
of :class:`pyabc.acceptor.Acceptor`, or a function convertible to an
acceptor. Defaults to a :class:`pyabc.acceptor.UniformAcceptor`.
stop_if_only_single_model_alive:
Defaults to False. Set this to true if you want to stop ABCSMC
automatically as soon as only a single model has survived.
max_nr_recorded_particles:
Defaults to inf. Set this to the maximum number of accepted and
rejected particles that methods like the AdaptivePNormDistance
function use to update themselves each iteration.
.. [#tonistumpf] Toni, Tina, and Michael P. H. Stumpf.
“Simulation-Based Model Selection for Dynamical Systems in Systems and
Population Biology”.
Bioinformatics 26, no. 1, 104–10, 2010.
https://doi.org/10.1093/bioinformatics/btp619
"""
[docs]
def __init__(
self,
models: Union[List[Model], Model, Callable],
parameter_priors: Union[List[Distribution], Distribution, Callable],
distance_function: Union[Distance, Callable] = None,
population_size: Union[PopulationStrategy, int] = 100,
summary_statistics: Callable[[model_output], dict] = identity,
model_prior: RV = None,
model_perturbation_kernel: ModelPerturbationKernel = None,
transitions: Union[List[Transition], Transition] = None,
eps: Epsilon = None,
sampler: Sampler = None,
acceptor: Acceptor = None,
stop_if_only_single_model_alive: bool = False,
max_nr_recorded_particles: int = np.inf,
):
if not isinstance(models, list):
models = [models]
models = list(map(FunctionModel.to_model, models))
self.models = models
if not isinstance(parameter_priors, list):
parameter_priors = [parameter_priors]
self.parameter_priors = parameter_priors
# sanity checks
if len(self.models) != len(self.parameter_priors):
raise AssertionError(
"Number models and number parameter priors have to agree."
)
if distance_function is None:
distance_function = PNormDistance()
self.distance_function = FunctionDistance.to_distance(
distance_function,
)
self.summary_statistics = summary_statistics
if model_prior is None:
model_prior = RV("randint", 0, len(self.models))
self.model_prior = model_prior
if model_perturbation_kernel is None:
model_perturbation_kernel = ModelPerturbationKernel(
len(self.models), probability_to_stay=0.7
)
self.model_perturbation_kernel = model_perturbation_kernel
if transitions is None:
transitions = [MultivariateNormalTransition() for _ in self.models]
if not isinstance(transitions, list):
transitions = [transitions]
self.transitions: List[Transition] = transitions
if eps is None:
eps = MedianEpsilon(median_multiplier=1)
self.eps = eps
if isinstance(population_size, int):
population_size = ConstantPopulationSize(population_size)
self.population_size = population_size
if sampler is None:
sampler = DefaultSampler()
self.sampler = sampler
if acceptor is None:
acceptor = UniformAcceptor()
self.acceptor = FunctionAcceptor.to_acceptor(acceptor)
self.stop_if_only_single_model_alive = stop_if_only_single_model_alive
self.max_nr_recorded_particles = max_nr_recorded_particles
# will be set later
self.x_0 = None
self.history = None
self._initial_population = None
self.minimum_epsilon = None
self.max_nr_populations = None
self.min_acceptance_rate = None
self.max_t = None
self.max_total_nr_simulations = None
self.max_walltime = None
self.min_eps_diff = None
self.init_walltime = None
self.analysis_id = None
self._sanity_check()
def _sanity_check(self):
# check stochastic setting
stochastics = [
isinstance(self.acceptor, StochasticAcceptor),
isinstance(self.eps, TemperatureBase),
isinstance(self.distance_function, StochasticKernel),
]
# check if usage is consistent
if not all(stochastics) and any(stochastics):
raise ValueError(
"Please only use acceptor.StochasticAcceptor, "
"epsilon.TemperatureBase and distance.StochasticKernel "
"together."
)
# check sampler
self.sampler.check_analysis_variables(
distance_function=self.distance_function,
eps=self.eps,
acceptor=self.acceptor,
)
def __getstate__(self):
state_red_dict = self.__dict__.copy()
del state_red_dict['sampler']
return state_red_dict
[docs]
def new(
self,
db: str,
observed_sum_stat: dict = None,
*,
gt_model: int = None,
gt_par: dict = None,
meta_info=None,
) -> History:
"""
Make a new ABCSMC run.
Parameters
----------
db: str
Has to be a valid SQLAlchemy database identifier.
This indicates the database to be used (and created if necessary
and possible) for the ABC-SMC run.
To use an in-memory database pass "sqlite://".
Note that in-memory databases are only available on the master
mode. If workers are started on different nodes they won't be
able to access the database. This should not be a problem
in most scenarios. The in-memory option is mainly useful for
benchmarking (and maybe) for testing.
observed_sum_stat : dict, optional
This is the really important parameter here. It is of the
form ``{'statistic_1': val_1, 'statistic_2': val_2, ... }``.
The dictionary provided here represents the measured data.
Particle during ABCSMC sampling are compared against the
summary statistics provided here.
The summary statistics' values can be integers, floats,
strings and everything which is a numpy array or can be
converted to one (e.g. lists).
In addition, pandas.DataFrames can also be used as summary
statistics.
**Note that storage of pandas DataFrames in pyABC's database
is still considered experimental.**
This parameter is optional, as the distance function might
implement comparison to the observed data on its own.
Not giving this parameter is equivalent to passing an empty
dictionary ``{}``.
gt_model: int, optional
This is only meta data stored to the database, but not actually
used for the ABCSMC algorithm. If you want to predict your ABCSMC
procedure against synthetic samples, you can use
this parameter to indicate the ground truth model number.
This helps with further analysis. If you use actually measured data
(and don't know the ground truth) you don't have to set this.
gt_par: dict, optional
Similar to ``ground_truth_model``, this is only for recording
purposes in the database, but not used in the ABCSMC algorithm.
This stores the parameters of the ground truth model
if it was synthetically obtained.
Don't give this parameter if you don't know the ground truth.
meta_info: dict, optional
Can contain an arbitrary number of keys, only for recording
purposes. Store arbitrary
meta information in this dictionary. Can be used for really
anything.
This dictionary is stored in the database.
Returns
-------
history: History
The history, with set history.id, which is the id under which the
generated ABCSMC run entry in the database can be identified.
"""
# record observed summary statistics
if observed_sum_stat is None:
observed_sum_stat = {}
self.x_0 = observed_sum_stat
# initialize history object
self.history = History(db)
if gt_par is None:
gt_par = {}
# save configuration data to database
model_names = [model.name for model in self.models]
self.history.store_initial_data(
ground_truth_model=gt_model,
options=meta_info,
observed_summary_statistics=observed_sum_stat,
ground_truth_parameter=gt_par,
model_names=model_names,
distance_function_json_str=self.distance_function.to_json(),
eps_function_json_str=self.eps.to_json(),
population_strategy_json_str=self.population_size.to_json(),
start_time=datetime.now(),
)
# return history
# contains id generated in store_initial_data
return self.history
[docs]
def load(
self,
db: str,
abc_id: int = 1,
observed_sum_stat: dict = None,
) -> History:
"""
Load an ABC-SMC run for continuation.
Parameters
----------
db: str
A SQLAlchemy database identifier pointing to the database from
which to continue a run.
abc_id: int, optional
The id of the ABC-SMC run in the database which is to be continued.
The default is 1. If more than one ABC-SMC run is stored, use
the ``abc_id`` parameter to indicate which one to continue.
observed_sum_stat: dict, optional
The observed summary statistics. This field should be used only if
the summary statistics cannot be reproduced exactly from the
database (in particular when they are no numpy or pandas objects,
e.g. when they were generated in R). If None, then the summary
statistics are read from the history.
"""
self.history = History(db)
self.history.id = abc_id
# extract observed sum stats from input or history
if observed_sum_stat is None:
observed_sum_stat = self.history.observed_sum_stat()
self.x_0 = observed_sum_stat
# just return the history
return self.history
def _initialize_dist_eps_acc(self, t: int):
"""
Called once at the start of run(). This function either, if available,
takes the last population from the history, or generates a
sample population from the prior. Then, it calls the initialize()
functions of the distance, epsilon, and acceptor.
Note that a calibration sample is only taken if required by any of
the tools.
Parameters
----------
t: int
Time point for which to initialize (i.e. the time point at which
to do the first population). Usually 0 or history.max_t + 1.
"""
def get_initial_sample():
"""Create sample from population, with only accepted particles."""
population = self._get_initial_population(t - 1)
return Sample.from_population(population)
def _get_initial_population_with_distances():
population = self._get_initial_population(t - 1)
def distance_to_ground_truth(x, par):
return self.distance_function(x, self.x_0, t, par)
population.update_distances(distance_to_ground_truth)
return population
def get_initial_weighted_distances():
population = _get_initial_population_with_distances()
weighted_distances = population.get_weighted_distances()
return weighted_distances
# initialize dist, eps, acc (order important)
self.distance_function.initialize(
t=t,
get_sample=get_initial_sample,
x_0=self.x_0,
total_sims=self.history.total_nr_simulations,
)
self.acceptor.initialize(
t=t,
get_weighted_distances=get_initial_weighted_distances,
distance_function=self.distance_function,
x_0=self.x_0,
)
def get_initial_records():
population = _get_initial_population_with_distances()
records = []
for particle in population.particles:
# we use dummy densities here, since only the quotient
# is of interest
records.append(
{
'distance': particle.distance,
'transition_pd_prev': 1.0,
'transition_pd': 1.0,
'accepted': True,
}
)
return records
self.eps.initialize(
t=t,
get_weighted_distances=get_initial_weighted_distances,
get_all_records=get_initial_records,
max_nr_populations=self.max_nr_populations,
acceptor_config=self.acceptor.get_epsilon_config(t),
)
def _get_initial_population(
self, t: int
) -> Tuple[List[float], List[dict]]:
"""
Get initial samples, either from the last population stored in history,
or via sampling sum stats from the prior. This can be used to calibrate
the distance function or the epsilon.
The history must have been initialized already. This function fills the
private property _initial_population.
.. warning::
The sample is cached. Thus, the function can be called repeatedly
without further computational overhead.
Parameters
----------
t: The time for which to draw samples, i.e. one before the target time.
"""
if self._initial_population is None:
if self.history.n_populations > 0:
# extract latest population from database
population = self.history.get_population(t=t)
else:
# sample
population = self._sample_from_prior(t)
self._initial_population = population
return self._initial_population
def _create_simulate_from_prior_function(self):
"""
Similar to _create_simulate_function, apart here we sample from the
prior and accept all.
"""
return create_simulate_from_prior_function(
model_prior=self.model_prior,
parameter_priors=self.parameter_priors,
models=self.models,
summary_statistics=self.summary_statistics,
)
def _sample_from_prior(self, t: int) -> Population:
"""
Only sample from prior and return results without changing
the history of the distance function or the epsilon.
"""
# create simulate function
simulate_one = self._create_simulate_from_prior_function()
logger.info(f"Calibration sample t = {t}.")
# call sampler
sample = self.sampler.sample_until_n_accepted(
n=self.population_size(-1),
simulate_one=simulate_one,
t=t,
max_eval=np.inf,
all_accepted=True,
ana_vars=self._vars(t=t),
)
# normalize accepted population weight to 1
sample.normalize_weights()
# extract accepted population
population = sample.get_accepted_population()
# update information saved in history about calibration
self.history.update_after_calibration(
nr_samples=self.sampler.nr_evaluations_, end_time=datetime.now()
)
return population
def _create_simulate_function(self, t: int):
"""
Create a simulation function which performs the sampling of parameters,
simulation of data and acceptance checking, and which is then passed
to the sampler.
Parameters
----------
t: int
Time index
Returns
-------
simulate_one: callable
Function that samples parameters, simulates data, and checks
acceptance.
.. note::
For some of the samplers, the sampling function needs to be
serialized in order to be transported to where the sampling
happens. Therefore, the returned function should be light, and
in particular not contain references to the ABCSMC class.
"""
return create_simulate_function(
t=t,
model_probabilities=self.history.get_model_probabilities(t - 1),
model_perturbation_kernel=self.model_perturbation_kernel,
transitions=self.transitions,
model_prior=self.model_prior,
parameter_priors=self.parameter_priors,
models=self.models,
summary_statistics=self.summary_statistics,
x_0=self.x_0,
distance_function=self.distance_function,
eps=self.eps,
acceptor=self.acceptor,
)
def _create_transition_pdf(self, t: int, transitions):
"""Create transition probability density function for time `t>=0`."""
if t == 0:
return create_prior_pdf(
model_prior=self.model_prior,
parameter_priors=self.parameter_priors,
)
return create_transition_pdf(
transitions=transitions,
model_probabilities=self.history.get_model_probabilities(t - 1),
model_perturbation_kernel=self.model_perturbation_kernel,
)
@run_cleanup
def run(
self,
minimum_epsilon: float = None,
max_nr_populations: int = np.inf,
min_acceptance_rate: float = 0.0,
max_total_nr_simulations: int = np.inf,
max_walltime: timedelta = None,
min_eps_diff: float = 0.0,
) -> History:
"""
Run the ABCSMC model selection until either of the stopping
criteria is met.
Parameters
----------
minimum_epsilon:
Stop if epsilon is smaller than minimum epsilon specified here.
Defaults in general to 0.0, and to 1.0 for a Temperature epsilon.
max_nr_populations:
Maximum allowed number of populations.
Stop if this number is reached.
min_acceptance_rate:
Minimal allowed acceptance rate. Sampling stops if a population
has a lower rate.
max_total_nr_simulations:
Maximum allowed total number of evaluations.
max_walltime:
Maximum allowed walltime since start of the run() method,
e.g.
>>> from datetime import timedelta
>>> max_walltime = timedelta(hours=2, minutes=30)
min_eps_diff:
Minimum epsilon difference in two sequential generations.
Population after population is sampled and particles which are close
enough to the observed data are accepted and added to the next
population.
If an adaptive Epsilon is specified (this is the default), then
the acceptance threshold decreases from population to population
automatically in a data dependent way.
Sampling of further populations is stopped, when either of the three
stopping criteria is met:
* the maximum number of populations ``max_nr_populations``
is reached,
* the acceptance threshold for the last sampled population was
smaller than ``minimum_epsilon``,
* or the acceptance rate dropped below ``acceptance_rate``.
The value of ``minimum_epsilon`` determines the quality of the ABCSMC
approximation. The smaller the better. But sampling time also increases
with decreasing ``minimum_epsilon``.
This method can be called repeatedly to sample further populations
after sampling was stopped once.
"""
t0: int = self.initialize_components_before_run(
minimum_epsilon=minimum_epsilon,
max_nr_populations=max_nr_populations,
min_acceptance_rate=min_acceptance_rate,
max_total_nr_simulations=max_total_nr_simulations,
max_walltime=max_walltime,
min_eps_diff=min_eps_diff,
)
# run loop over time points
t = t0
while True:
# perform one generation
ret = self.run_generation(t=t)
# check whether to discontinue
if not ret["successful"] or self.check_terminate(
t=t,
acceptance_rate=ret["acceptance_rate"],
):
break
# increment t
t += 1
# return used history object
return self.history
[docs]
def initialize_components_before_run(
self,
minimum_epsilon: float,
max_nr_populations: int,
min_acceptance_rate: float,
max_total_nr_simulations: int,
max_walltime: timedelta,
min_eps_diff: float,
) -> int:
"""Initialize everything before starting a run.
In particular sets variables corresponding to arguments, and
performs sampling based initialization for e.g. distance and epsilon,
if these are adaptive.
All parameters are as for `run()`.
Returns
-------
t0: The initial time point from which to start the next generation.
"""
# handle arguments
if minimum_epsilon is None:
if isinstance(self.eps, TemperatureBase):
minimum_epsilon = 1.0
else:
minimum_epsilon = 0.0
self.minimum_epsilon = minimum_epsilon
self.max_nr_populations = max_nr_populations
self.min_acceptance_rate = min_acceptance_rate
self.max_total_nr_simulations = max_total_nr_simulations
self.max_walltime = max_walltime
self.min_eps_diff = min_eps_diff
# for recording the overall time
self.init_walltime = datetime.now()
# initial time index
t0 = self.history.max_t + 1
# last time point index
self.max_t = t0 + max_nr_populations - 1
# assign an analysis id
self.analysis_id = create_analysis_id()
# let the sampler know
self.sampler.set_analysis_id(self.analysis_id)
# initialize transitions
self._fit_transitions(t0)
# initialize population size
self._adapt_population_size(t0)
# sample from prior to calibrate distance, epsilon, and acceptor
self._initialize_dist_eps_acc(t0)
# configure recording of rejected particles
self.distance_function.configure_sampler(self.sampler)
# TODO update configuration in each iteration
self.eps.configure_sampler(self.sampler)
return t0
[docs]
def run_generation(
self,
t: int,
) -> dict:
"""Run a single generation.
Parameters
----------
t: Generation time index to run for.
Returns
-------
ret:
Dictionary with entries "successful" indicating whether the
generation terminated successfully,
and potentially "acceptance_rate".
"""
# get epsilon for generation t
current_eps = self.eps(t)
if current_eps is None or np.isnan(current_eps):
raise ValueError(
f"The epsilon threshold {current_eps} is invalid."
)
logger.info(f"t: {t}, eps: {current_eps:.8e}.")
# create simulate function
simulate_one = self._create_simulate_function(t)
# population size and maximum number of evaluations
pop_size = self.population_size(t)
max_eval = (
np.inf
if self.min_acceptance_rate == 0.0
else pop_size / self.min_acceptance_rate
)
# perform the sampling
logger.debug(f"Submitting population {t}.")
sample = self.sampler.sample_until_n_accepted(
n=pop_size,
simulate_one=simulate_one,
t=t,
max_eval=max_eval,
ana_vars=self._vars(t=t),
)
# check sample health
if not sample.ok:
logger.info("Stopping: sample not ok.")
return {
"successful": False,
}
# normalize accepted population weight to 1
sample.normalize_weights()
# retrieve accepted population
population = sample.get_accepted_population()
logger.debug(f"Population {t} done.")
# save to database
n_sim = self.sampler.nr_evaluations_
model_names = [model.name for model in self.models]
self.history.append_population(
t, current_eps, population, n_sim, model_names
)
logger.debug(
f"Total samples up to t = {t}: "
f"{self.history.total_nr_simulations}."
)
# acceptance rate and ess
pop_size = len(population)
acceptance_rate = pop_size / n_sim
ess = effective_sample_size(population.get_weighted_distances()['w'])
logger.info(
f"Accepted: {pop_size} / {n_sim} = "
f"{acceptance_rate:.4e}, ESS: {ess:.4e}."
)
# prepare next iteration
self._prepare_next_iteration(
t=t + 1,
sample=sample,
population=population,
acceptance_rate=acceptance_rate,
)
return {
"successful": True,
"acceptance_rate": acceptance_rate,
}
[docs]
def check_terminate(
self,
t: int,
acceptance_rate: float,
) -> bool:
"""Check whether any termination criterion is met.
Parameters
----------
t: Current time point.
acceptance_rate: Acceptance rate in current generation.
Returns
-------
terminate: Whether to terminate (True) or not (False).
"""
# check termination criteria
if termination_criteria_fulfilled(
current_eps=self.eps(t=t),
min_eps=self.minimum_epsilon,
prev_eps=eps_from_hist(history=self.history, t=t - 1),
min_eps_diff=self.min_eps_diff,
stop_if_single_model_alive=self.stop_if_only_single_model_alive,
# noqa: E251
nr_of_models_alive=self.history.nr_of_models_alive(),
acceptance_rate=acceptance_rate,
min_acceptance_rate=self.min_acceptance_rate,
total_nr_simulations=self.history.total_nr_simulations,
max_total_nr_simulations=self.max_total_nr_simulations,
walltime=datetime.now() - self.init_walltime,
max_walltime=self.max_walltime,
t=t,
max_t=self.max_t,
):
return True
return False
def _prepare_next_iteration(
self,
t: int,
sample: Sample,
population: Population,
acceptance_rate: float,
):
"""Update actors for the upcoming iteration.
Be aware: The current (finished) iteration is t-1, the next t.
Parameters
----------
t: int
The upcoming iteration time index to prepare for.
sample: pyabc.Sample
The current iteration's sample object.
population: pyabc.Population
The current iteration's population object.
acceptance_rate: float
The current iteration's acceptance rate.
"""
# make a copy
prev_transitions = copy.deepcopy(self.transitions)
# update transitions
self._fit_transitions(t)
# update population size
self._adapt_population_size(t)
def get_sample():
return sample
# update distance
df_updated = self.distance_function.update(
t=t,
get_sample=get_sample,
total_sims=self.history.total_nr_simulations,
)
# compute distances with the new distance measure
def get_weighted_distances():
if df_updated:
def distance_to_ground_truth(x, par):
return self.distance_function(x, self.x_0, t, par)
population.update_distances(distance_to_ground_truth)
return population.get_weighted_distances()
# update acceptor
self.acceptor.update(
t=t,
get_weighted_distances=get_weighted_distances,
prev_temp=self.eps(t - 1),
acceptance_rate=acceptance_rate,
)
def get_all_records():
recorded_particles = sample.all_particles
# create list of all records
records = []
# get transition functions
transition_pdf_prev = self._create_transition_pdf(
t - 1, prev_transitions
)
transition_pdf = self._create_transition_pdf(t, self.transitions)
# iterate over all particles
for particle in recorded_particles:
# evaluate previous and current transition density
transition_pd_prev = transition_pdf_prev(
particle.m, particle.parameter
)
transition_pd = transition_pdf(particle.m, particle.parameter)
records.append(
{
'distance': particle.distance,
'transition_pd_prev': transition_pd_prev,
'transition_pd': transition_pd,
'accepted': particle.accepted,
}
)
return records
# update epsilon
self.eps.update(
t=t,
get_weighted_distances=get_weighted_distances,
get_all_records=get_all_records,
acceptance_rate=acceptance_rate,
acceptor_config=self.acceptor.get_epsilon_config(t),
)
def _adapt_population_size(self, t):
"""
Adapt population size based on the employed population strategy.
Parameters
----------
t: int
Time for which to adapt the population size.
"""
if t == 0: # we need a particle population to do the fitting
return
# model probabilities
w = self.history.get_model_probabilities(self.history.max_t)[
"p"
].values
# make a copy in case the population strategy messes with
# the transitions
# WARNING: the deepcopy also copies the random states of scipy.stats
# distributions
copied_transitions = copy.deepcopy(self.transitions)
# update the population size
self.population_size.update(
transitions=copied_transitions, model_weights=w, t=t
)
def _fit_transitions(self, t):
"""
Fit the density estimator.
Parameters
----------
t: int
Time for which to update the kernel density estimator.
"""
if t == 0: # we need a particle population to do the fitting
return
for m in self.history.alive_models(t - 1):
particles, w = self.history.get_distribution(m, t - 1)
self.transitions[m].fit(particles, w)
def _vars(self, t: int) -> AnalysisVars:
"""Create a dictionary of analysis variables of interest.
These variables are passed to the sampler, as some need to create
simulation settings themselves.
"""
return AnalysisVars(
model_prior=self.model_prior,
parameter_priors=self.parameter_priors,
model_perturbation_kernel=self.model_perturbation_kernel,
transitions=self.transitions,
models=self.models,
summary_statistics=self.summary_statistics,
x_0=self.x_0,
distance_function=self.distance_function,
eps=self.eps,
acceptor=self.acceptor,
min_acceptance_rate=self.min_acceptance_rate,
min_eps=self.minimum_epsilon,
stop_if_single_model_alive=self.stop_if_only_single_model_alive,
max_t=self.max_t,
max_total_nr_simulations=self.max_total_nr_simulations,
prev_total_nr_simulations=self.history.total_nr_simulations,
max_walltime=self.max_walltime,
init_walltime=self.init_walltime,
min_eps_diff=self.min_eps_diff,
prev_eps=eps_from_hist(history=self.history, t=t - 1),
)