pyabc.sge
SGE
Parallel job execution on SGE like environments.
The functions and classes in the pyabc.sge package can be used for at least two purposes:
The SGE.map method can be used together with the MappingSampler to parallelize ABC-SMC in a SGE/UGE infrastructure.
SGE.map can be used in a standalone mode to execute jobs on a SGE/UGE cluster. This is completely independent of ABC-SMC inference.
- class pyabc.sge.DefaultContext(tmp_path, job_nr)[source]
Bases:
ExecutionContextMixin
Does nothing special.
- class pyabc.sge.NamedPrinter(tmp_path, job_nr)[source]
Bases:
ExecutionContextMixin
Context with appends the job name and number to anything printed by that job.
- class pyabc.sge.ProfilingContext(tmp_path, job_nr)[source]
Bases:
ExecutionContextMixin
Profiles the running jobs and stores the profiles in the temporary job folder in the subdirectory “profiling”.
Useful for debugging. Do not use in production.
- class pyabc.sge.SGE(tmp_directory: str = None, memory: str = '3G', time_h: int = 100, python_executable_path: str = None, sge_error_file: str = None, sge_output_file: str = None, parallel_environment=None, name='map', queue=None, priority=None, num_threads: int = 1, execution_context=<class 'pyabc.sge.execution_contexts.DefaultContext'>, chunk_size=1)[source]
Bases:
object
Map a function to be executed on an SGE cluster environment.
Reads a config file (if it exists) in you home directory which should look as the default in sge.config.
The mapper reads commonly used parameters from a configuration file stored in
~/.parallel
An example configuration file could look as follows:#~/.parallel [DIRECTORIES] TMP=/tmp [BROKER] # The value of TYPE can be SQLITE or REDIS TYPE=REDIS [SGE] QUEUE=p.openmp PARALLEL_ENVIRONMENT=openmp PRIORITY=-500 [REDIS] HOST=127.0.0.1
- Parameters:
tmp_directory (str or None) – Directory where temporary job pickle files are stored If set to None a tmp directory is read from the ‘’~/.parallel’’ configuration file. It this file does not exist a tmp directory within the user home directory is created.
memory (str, optional (default = '3G')) – Ram requested by each job, e.g. ‘10G’.
time_h (int (default = 100)) – Job run time in hours.
python_executable_path (str or None) – The python interpreter which executes the jobs. If set to None, the currently executing interpreter is used as returned by
sys.executable
.sge_error_file (str or None) – File to which stderr messages from workers are stored. If set to None, a file within the tmp_directory is used.
sge_output_file (str or None) – File to which stdout messages from workers are stored If set to None, a file within the tmp_directory is used.
parallel_environment (str, optional (default = 'map')) – The SGE environment. (This is what is passed to the -pe option in the qsub script).
name (str) – A name for the job.
queue (str) – The SGE queue.
priority (int, optional.) – SGE job priority. A value between -1000 and 0. Note that a priority of 0 automatically enables the reservation flag.
num_threads (int, optional (default = 1)) – Number of threads for each worker. This also sets the environment variable MKL_NUM_THREADS, OMP_NUM_THREADS to the specified number to handle jobs which use OpenMP etc. correctly.
execution_context (
DefaultContext
,ProfilingContext
,NamedPrinter
) – Any context manager can be passed here. The__enter__
method is called before evaluating the function on the cluster. The__exit__
method directly after the function run finished.chunk_size (int, optional (default = 1)) –
nr of tasks executed within one job.
Warning
If
chunk_size
is larger than 1, this can have side effects as all the jobs within one chunk are executed within the python process.
- Returns:
sge – The configured sge mapper.
- Return type:
- __init__(tmp_directory: str = None, memory: str = '3G', time_h: int = 100, python_executable_path: str = None, sge_error_file: str = None, sge_output_file: str = None, parallel_environment=None, name='map', queue=None, priority=None, num_threads: int = 1, execution_context=<class 'pyabc.sge.execution_contexts.DefaultContext'>, chunk_size=1)[source]
- map(function, array)[source]
Do what map(function, array) would do, but do it via a array job on the SGE by pickling objects, storing them in a temporary folder, submitting them to SGE and then reading and returning the results.
- Parameters:
function (Callable)
array (iterable)
- Returns:
result_list – List of results of function application. This list can also contain
Exception
objects.- Return type:
- pyabc.sge.nr_cores_available() int [source]
Determine the number of available cores in a manner which is safer for cluster environments than counting the number of CPUs the machine has. The CPU count might not be adequate as a job on a cluster might not have access to all the cores present on the cluster node on which it executes due to resource restrictions, such as for example done by SGE, SLURM etc.
The followin heuristic scheme is used to get the available number of cores:
Tries to determin cores form the SGE environment variable
NSLOTS
From the environment variable
OMP_NUM_THREADS
From the environment variable
MKL_NUM_THREADS
from Python’s
os.cpu_count
- Returns:
nr_cores – The number of cores available.
- Return type: