import random
from ctypes import c_longlong
from multiprocessing import Process, Queue, Value
import cloudpickle as pickle
import numpy as np
from jabbar import jabbar
from .multicorebase import MultiCoreSampler, get_if_worker_healthy
DONE = "Done"
def work(
simulate_one,
queue,
n_eval: Value,
n_acc: Value,
n: int,
check_max_eval: bool,
max_eval: int,
all_accepted: bool,
sample_factory,
):
# unwrap arguments
if isinstance(simulate_one, bytes):
simulate_one = pickle.loads(simulate_one)
random.seed()
np.random.seed()
sample = sample_factory()
while (
n_acc.value < n
and (not all_accepted or n_eval.value < n)
and (not check_max_eval or n_eval.value < max_eval)
):
with n_eval.get_lock():
particle_id = n_eval.value
n_eval.value += 1
new_sim = simulate_one()
sample.append(new_sim)
if new_sim.accepted:
# increase number of accepted particles
with n_acc.get_lock():
n_acc.value += 1
# put into queue
queue.put((particle_id, sample))
# create empty sample and record until next accepted
sample = sample_factory()
# indicate worker finished
queue.put(DONE)
[docs]
class MulticoreEvalParallelSampler(MultiCoreSampler):
"""
Multicore Evaluation parallel sampler.
Implements the same strategy as
:class:`pyabc.sampler.RedisEvalParallelSampler`
or
:class:`pyabc.sampler.DaskDistributedSampler`.
However, parallelization is restricted to a single machine with multiple
processes.
This sampler has very low communication overhead and is thus suitable
for short running model evaluations.
Requires no pickling of the ``sample_one``,
``simulate_one`` and ``accept_one`` function.
This is achieved using fork on linux (see :class:`Sampler`).
The simulation results are still pickled as they are transmitted
from the worker processes back to the parent process.
Depending on the kind of summary statistics this can be fast or slow.
If your summary statistics are only a dict with a couple of numbers,
the overhead should not be substantial.
However, if your summary statistics are large numpy arrays
or similar, this could cause overhead
Parameters
----------
n_procs: int, optional
If set to None, the Number of cores is determined according to
:func:`pyabc.sge.nr_cores_available`.
"""
def sample_until_n_accepted(
self,
n,
simulate_one,
t,
*,
max_eval=np.inf,
all_accepted=False,
ana_vars=None,
):
n_eval = Value(c_longlong)
n_eval.value = 0
n_acc = Value(c_longlong)
n_acc.value = 0
queue = Queue()
# wrap arguments
if self.pickle:
simulate_one = pickle.dumps(simulate_one)
args = (
simulate_one,
queue,
n_eval,
n_acc,
n,
self.check_max_eval,
max_eval,
all_accepted,
self._create_empty_sample,
)
processes = [
Process(target=work, args=args, daemon=self.daemon)
for _ in range(self.n_procs)
]
for proc in processes:
proc.start()
id_results = []
# make sure all results are collected
# and the queue is emptied to prevent deadlocks
n_done = 0
with jabbar(total=n, enable=self.show_progress, keep=False) as bar:
while n_done < len(processes):
val = get_if_worker_healthy(processes, queue)
if val == DONE:
n_done += 1
else:
id_results.append(val)
bar.inc()
for proc in processes:
proc.join()
# avoid bias toward short running evaluations
id_results.sort(key=lambda x: x[0])
id_results = id_results[: min(len(id_results), n)]
self.nr_evaluations_ = n_eval.value
results = [res[1] for res in id_results]
# create 1 to-be-returned sample from results
sample = self._create_empty_sample()
for result in results:
sample += result
if sample.n_accepted < n:
sample.ok = False
return sample