Source code for pyabc.sampler.mapping

import functools
import random

import cloudpickle as pickle
import numpy as np

from .base import Sampler


[docs] class MappingSampler(Sampler): """ Parallelize via a map operation. This sampler can be applied in a multi-core or in a distributed setting. Parameters ---------- map: map like function A function which works like the built in `map`. The map can be really any generic map operations. Possible candidates include: * multiprocessing.pool.map (see https://docs.python.org/3/library/\ multiprocessing.html#multiprocessing.pool.Pool) * :class:`pyabc.sge.SGE`'s map method. This mapper is useful in SGE-like environments where you don't want to start workers which run forever. * Dask's distributed `distributed.Client`'s map (see https://distributed.readthedocs.io/en/latest/api.html#client) * IPython parallel' map (see http://ipyparallel.readthedocs.io/en/\ latest/task.html#quick-and-easy-parallelism) and many other implementations. Each of the mapped function calls samples until it gets one accepted particle. This could have a performance impact if one of the sample tasks runs very long and all the other tasks are already finished. The sampler then has to wait until the last sample task is finished. mapper_pickles: bool, optional Whether the mapper handles the pickling itself or the MappingSampler class should handle serialization. The default is `False`. While this setting is compatible with a larger range of map functions, its performance can be suboptimal. As possibly too much serialization and deserialization is done, which could limit overall performace if the model evaluations are comparatively fast. The passed map function might implement more efficient serialization. For example, for the :class:`pyabc.sge.SGE` mapper, this option should be set to `True` for better performance. """
[docs] def __init__(self, map_=map, mapper_pickles: bool = False): super().__init__() self.map_ = map_ self.pickle, self.unpickle = ( (identity, identity) if mapper_pickles else (pickle.dumps, pickle.loads) )
def __getstate__(self): return ( self.pickle, self.unpickle, self.nr_evaluations_, self.sample_factory, ) def __setstate__(self, state): ( self.pickle, self.unpickle, self.nr_evaluations_, self.sample_factory, ) = state def map_function(self, simulate_one, _): simulate_one = self.unpickle(simulate_one) np.random.seed() random.seed() nr_simulations = 0 sample = self._create_empty_sample() while True: new_sim = simulate_one() nr_simulations += 1 sample.append(new_sim) if new_sim.accepted: break return sample, nr_simulations def sample_until_n_accepted( self, n, simulate_one, t, *, max_eval=np.inf, all_accepted=False, ana_vars=None, ): # pickle them as a tuple instead of individual pickling # this should save time and should make better use of # shared references. # Correct usage of shared references might even be necessary # to ensure correct working, depending on the details of the # model implementations. sample_simulate_accept = self.pickle(simulate_one) map_function = functools.partial( self.map_function, sample_simulate_accept ) counted_results = list(self.map_(map_function, [None] * n)) counted_results = filter( lambda x: not isinstance(x, Exception), counted_results ) results, evals = zip(*counted_results) # count all evaluations self.nr_evaluations_ = sum(evals) # aggregate all results to 1 to-be-returned sample sample = self._create_empty_sample() for result in results: sample += result return sample
def identity(x): return x