Source code for pyabc.sampler.multicorebase

import logging
import os
import platform
from multiprocessing import Process, ProcessError, Queue
from queue import Empty
from typing import List

from .base import Sampler

logger = logging.getLogger("ABC.Sampler")


class MultiCoreSampler(Sampler):
    """
    Multi-core sampler base class. This sampler is not functional but provides
    the number of cores selection functionality used by all the multiprocessing
    samplers.

    Parameters
    ----------
    n_procs: int
        Number of processes.
    daemon: bool
        Whether to spawn workers in daemon mode.
    pickle:
        Whether to manually pickle (we employ cloudpickle) certain objects
        that are passed to the parallel processes. This may be necessary on
        some systems, while pickling is usually not necessary at all on Linux.
    check_max_eval: bool
        Whether to check the maximum number of evaluations on the fly.
    """

    def __init__(
        self,
        n_procs: int = None,
        daemon: bool = True,
        pickle: bool = None,
        check_max_eval: bool = False,
    ):
        super().__init__()
        self._n_procs = n_procs
        self.daemon = daemon
        if pickle is None:
            pickle = False
            if platform.system() == "Darwin":  # macos
                pickle = True
        self.pickle = pickle
        self.check_max_eval = check_max_eval

        # inform user about number of cores used
        logger.info(f"Parallelize sampling on {self.n_procs} processes.")

    @property
    def n_procs(self):
        if self._n_procs is not None:
            return self._n_procs
        return nr_cores_available()


[docs] def nr_cores_available(): """ Determine the number of cores available: If set, the environment variable `PYABC_NUM_PROCS` is used, otherwise `os.cpu_count()` is used. Returns ------- nr_cores: int The number of cores available. """ try: return int(os.environ['PYABC_NUM_PROCS']) except KeyError: pass return os.cpu_count()
def healthy(worker): return all(worker.exitcode in [0, None] for worker in worker) def get_if_worker_healthy(workers: List[Process], queue: Queue): """ Parameters ---------- workers: List of worker processes which should be in a healthy state, i.e. either terminated with exit code 0 (success) or are still running (exitcode is None in this case) queue: A multiprocessing queue which is fed by the workers Returns ------- item: An item from the queue """ while True: try: item = queue.get(True, 5) return item except Empty: if not healthy(workers): raise ProcessError("At least one worker is dead.")