Source code for pyabc.sampler.redis_eps.server_starter

import tempfile
import time
from multiprocessing import Process
from subprocess import Popen  # noqa: S404
from time import sleep

import psutil

from .cli import _manage, work
from .sampler import RedisEvalParallelSampler
from .sampler_static import RedisStaticSampler


class RedisServerStarter:
    def __init__(
        self,
        password: str = None,
        workers: int = 2,
        processes_per_worker: int = 1,
        daemon: bool = True,
        catch: bool = True,
    ):
        # start server
        conn = psutil.net_connections()
        ports = [c.laddr[1] for c in conn]
        port = max(ports) + 1
        self.port = port
        self.password = password

        # create config file
        maybe_redis_conf = []
        if password is not None:
            fname = tempfile.mkstemp()[1]
            with open(fname, 'w') as f:
                f.write(f"requirepass {password}\n")
            maybe_redis_conf = [fname]

        self.redis_server = Popen(  # noqa: S607,S603
            ["redis-server", *maybe_redis_conf, "--port", str(port)]
        )

        # give redis-server time to start
        # TODO this can be improved (also below) by checking whether the
        #  respective processes are in their expected states
        sleep(1)

        # initiate worker processes
        maybe_password = [] if password is None else ["--password", password]
        maybe_daemon = [] if daemon is None else ["--daemon", str(daemon)]
        self.workers = [
            Process(
                target=work,
                args=(
                    [
                        "--host",
                        "localhost",
                        "--port",
                        str(port),
                        *maybe_password,
                        "--processes",
                        str(processes_per_worker),
                        *maybe_daemon,
                        "--catch",
                        str(catch),
                    ],
                ),
                daemon=False,
            )
            for _ in range(workers)
        ]

        # start workers
        for p in self.workers:
            p.start()

        # sleep a short amount of time to make sure everything is set up
        time.sleep(0.5)

    def shutdown(self):
        """Cleanup workers and server."""
        if not self.redis_server:
            # no server: stop() was likely called already, skip
            return

        # send stop signal to workers
        _manage("stop", port=self.port, password=self.password)
        for p in self.workers:
            # wait for workers to join
            p.join()

        # terminate server
        self.redis_server.terminate()
        # make sure it's gone
        self.redis_server.kill()
        # delete python reference
        del self.redis_server
        self.redis_server = None


[docs] class RedisEvalParallelSamplerServerStarter(RedisEvalParallelSampler): """ Simple routine to start a dynamic redis-server. For the arguments see the base class. """
[docs] def __init__( self, password: str = None, batch_size: int = 1, wait_for_all_samples: bool = False, look_ahead: bool = False, look_ahead_delay_evaluation: bool = True, max_n_eval_look_ahead_factor: float = 10.0, adapt_look_ahead_proposal: bool = False, workers: int = 2, processes_per_worker: int = 1, daemon: bool = True, catch: bool = True, log_file: str = None, ): self.server_starter = RedisServerStarter( password=password, workers=workers, processes_per_worker=processes_per_worker, daemon=daemon, catch=catch, ) super().__init__( host="localhost", port=self.server_starter.port, password=self.server_starter.password, batch_size=batch_size, wait_for_all_samples=wait_for_all_samples, look_ahead=look_ahead, look_ahead_delay_evaluation=look_ahead_delay_evaluation, max_n_eval_look_ahead_factor=max_n_eval_look_ahead_factor, adapt_look_ahead_proposal=adapt_look_ahead_proposal, log_file=log_file, )
def shutdown(self): self.server_starter.shutdown()
[docs] class RedisStaticSamplerServerStarter(RedisStaticSampler): """ Simple routine to start a static redis-server. For the arguments see the base class. """
[docs] def __init__( self, password: str = None, workers: int = 2, processes_per_worker: int = 1, daemon: bool = True, catch: bool = True, log_file: str = None, ): self.server_starter = RedisServerStarter( password=password, workers=workers, processes_per_worker=processes_per_worker, daemon=daemon, catch=catch, ) super().__init__( host="localhost", port=self.server_starter.port, password=self.server_starter.password, log_file=log_file, )
def shutdown(self): self.server_starter.shutdown()