pyabc.sge

SGE

Parallel job execution on SGE like environments.

The functions and classes in the pyabc.sge package can be used for at least two purposes:

  1. The SGE.map method can be used together with the MappingSampler to parallelize ABC-SMC in a SGE/UGE infrastructure.

  2. SGE.map can be used in a standalone mode to execute jobs on a SGE/UGE cluster. This is completely independent of ABC-SMC inference.

class pyabc.sge.DefaultContext(tmp_path, job_nr)[source]

Bases: ExecutionContextMixin

Does nothing special.

class pyabc.sge.NamedPrinter(tmp_path, job_nr)[source]

Bases: ExecutionContextMixin

Context with appends the job name and number to anything printed by that job.

__init__(tmp_path, job_nr)[source]
class pyabc.sge.ProfilingContext(tmp_path, job_nr)[source]

Bases: ExecutionContextMixin

Profiles the running jobs and stores the profiles in the temporary job folder in the subdirectory “profiling”.

Useful for debugging. Do not use in production.

__init__(tmp_path, job_nr)[source]
class pyabc.sge.SGE(tmp_directory: str = None, memory: str = '3G', time_h: int = 100, python_executable_path: str = None, sge_error_file: str = None, sge_output_file: str = None, parallel_environment=None, name='map', queue=None, priority=None, num_threads: int = 1, execution_context=<class 'pyabc.sge.execution_contexts.DefaultContext'>, chunk_size=1)[source]

Bases: object

Map a function to be executed on an SGE cluster environment.

Reads a config file (if it exists) in you home directory which should look as the default in sge.config.

The mapper reads commonly used parameters from a configuration file stored in ~/.parallel An example configuration file could look as follows:

#~/.parallel
[DIRECTORIES]
TMP=/tmp

[BROKER]
# The value of TYPE can be SQLITE or REDIS
TYPE=REDIS

[SGE]
QUEUE=p.openmp
PARALLEL_ENVIRONMENT=openmp
PRIORITY=-500

[REDIS]
HOST=127.0.0.1
Parameters:
  • tmp_directory (str or None) – Directory where temporary job pickle files are stored If set to None a tmp directory is read from the ‘’~/.parallel’’ configuration file. It this file does not exist a tmp directory within the user home directory is created.

  • memory (str, optional (default = '3G')) – Ram requested by each job, e.g. ‘10G’.

  • time_h (int (default = 100)) – Job run time in hours.

  • python_executable_path (str or None) – The python interpreter which executes the jobs. If set to None, the currently executing interpreter is used as returned by sys.executable.

  • sge_error_file (str or None) – File to which stderr messages from workers are stored. If set to None, a file within the tmp_directory is used.

  • sge_output_file (str or None) – File to which stdout messages from workers are stored If set to None, a file within the tmp_directory is used.

  • parallel_environment (str, optional (default = 'map')) – The SGE environment. (This is what is passed to the -pe option in the qsub script).

  • name (str) – A name for the job.

  • queue (str) – The SGE queue.

  • priority (int, optional.) – SGE job priority. A value between -1000 and 0. Note that a priority of 0 automatically enables the reservation flag.

  • num_threads (int, optional (default = 1)) – Number of threads for each worker. This also sets the environment variable MKL_NUM_THREADS, OMP_NUM_THREADS to the specified number to handle jobs which use OpenMP etc. correctly.

  • execution_context (DefaultContext, ProfilingContext, NamedPrinter) – Any context manager can be passed here. The __enter__ method is called before evaluating the function on the cluster. The __exit__ method directly after the function run finished.

  • chunk_size (int, optional (default = 1)) –

    nr of tasks executed within one job.

    Warning

    If chunk_size is larger than 1, this can have side effects as all the jobs within one chunk are executed within the python process.

Returns:

sge – The configured sge mapper.

Return type:

SGE

__init__(tmp_directory: str = None, memory: str = '3G', time_h: int = 100, python_executable_path: str = None, sge_error_file: str = None, sge_output_file: str = None, parallel_environment=None, name='map', queue=None, priority=None, num_threads: int = 1, execution_context=<class 'pyabc.sge.execution_contexts.DefaultContext'>, chunk_size=1)[source]
map(function, array)[source]

Do what map(function, array) would do, but do it via a array job on the SGE by pickling objects, storing them in a temporary folder, submitting them to SGE and then reading and returning the results.

Parameters:
  • function (Callable) –

  • array (iterable) –

Returns:

result_list – List of results of function application. This list can also contain Exception objects.

Return type:

list

pyabc.sge.nr_cores_available() int[source]

Determine the number of available cores in a manner which is safer for cluster environments than counting the number of CPUs the machine has. The CPU count might not be adequate as a job on a cluster might not have access to all the cores present on the cluster node on which it executes due to resource restrictions, such as for example done by SGE, SLURM etc.

The followin heuristic scheme is used to get the available number of cores:

  1. Tries to determin cores form the SGE environment variable NSLOTS

  2. From the environment variable OMP_NUM_THREADS

  3. From the environment variable MKL_NUM_THREADS

  4. from Python’s os.cpu_count

Returns:

nr_cores – The number of cores available.

Return type:

int

pyabc.sge.sge_available()[source]

Makes a simple heuristic test to check if the SGE is available on the machine. It tries to execute the qstat command. In case it is found, it is assumed that the SGE is available.

Returns:

available – Whether SGE is available or not.

Return type:

bool