import logging
import random
from multiprocessing import Process, Queue
import cloudpickle as pickle
import numpy as np
from jabbar import jabbar
from .multicorebase import MultiCoreSampler, get_if_worker_healthy
from .singlecore import SingleCoreSampler
logger = logging.getLogger("ABC.Sampler")
SENTINEL = None
def feed(feed_q, n_jobs, n_proc):
for _ in range(n_jobs):
feed_q.put(1)
for _ in range(n_proc):
feed_q.put(SENTINEL)
def work(feed_q, result_q, simulate_one, max_eval, single_core_sampler):
# unwrap arguments
if isinstance(simulate_one, bytes):
simulate_one = pickle.loads(simulate_one)
random.seed()
np.random.seed()
while True:
arg = feed_q.get()
if arg == SENTINEL:
break
res = single_core_sampler.sample_until_n_accepted(
1, simulate_one, max_eval
)
result_q.put((res, single_core_sampler.nr_evaluations_))
[docs]
class MulticoreParticleParallelSampler(MultiCoreSampler):
"""
Samples on multiple cores using the multiprocessing module.
This sampler is optimized for low latencies and is efficient, even
if the individual model evaluations are fast.
Requires no pickling of the ``sample_one``,
``simulate_one`` and ``accept_one`` function.
This is achieved using fork on linux (see :class:`Sampler`).
The simulation results are still pickled as they are transmitted
from the worker processes back to the parent process.
Depending on the kind of summary statistics this can be fast or slow.
If your summary statistics are only a dict with a couple of numbers,
the overhead should not be substantial.
However, if your summary statistics are large numpy arrays
or similar, this could cause overhead
Parameters
----------
n_procs: int, optional
If set to None, the Number of cores is determined according to
:func:`pyabc.sge.nr_cores_available`.
.. warning::
Windows support is *not* tested.
As there is no fork on Windows. This sampler might not work.
"""
def sample_until_n_accepted(
self,
n,
simulate_one,
t,
*,
max_eval=np.inf,
all_accepted=False,
ana_vars=None,
):
# starting more than n jobs
# does not help in this parallelization scheme
n_procs = min(n, self.n_procs)
logger.debug(
"Start sampling on {} cores ({} requested)".format(
n_procs, self.n_procs
)
)
feed_q = Queue()
result_q = Queue()
feed_process = Process(target=feed, args=(feed_q, n, n_procs))
single_core_sampler = SingleCoreSampler(
check_max_eval=self.check_max_eval
)
# the max_eval handling in this sampler is certainly not optimal
single_core_sampler.sample_factory = self.sample_factory
# wrap arguments
if self.pickle:
simulate_one = pickle.dumps(simulate_one)
args = (feed_q, result_q, simulate_one, max_eval, single_core_sampler)
worker_processes = [
Process(target=work, args=args) for _ in range(n_procs)
]
for proc in worker_processes:
proc.start()
feed_process.start()
collected_results = []
for _ in jabbar(range(n), enable=self.show_progress, keep=False):
res = get_if_worker_healthy(worker_processes, result_q)
collected_results.append(res)
feed_process.join()
for proc in worker_processes:
proc.join()
# Queues get closed automatically on garbage collection
# No explicit closing necessary.
results, evaluations = zip(*collected_results)
self.nr_evaluations_ = sum(evaluations)
# create 1 to-be-returned sample from results
sample = self._create_empty_sample()
for result in results:
sample += result
if sample.n_accepted < n:
sample.ok = False
return sample