Source code for reframe.core.schedulers

#
# Scheduler implementations
#

import abc

import reframe.core.debug as debug
import reframe.utility.os as os_ext
import reframe.core.fields as fields

from reframe.core.exceptions import ConfigError, JobNotStartedError
from reframe.core.launchers import JobLauncher
from reframe.core.logging import getlogger
from reframe.core.shell import BashScriptBuilder


class JobState:
    def __init__(self, state):
        self._state = state

    def __repr__(self):
        return debug.repr(self)

    def __eq__(self, other):
        if not isinstance(other, type(self)):
            return NotImplemented

        return self._state == other._state

    def __str__(self):
        return self._state


[docs]class Job(abc.ABC): """A job descriptor. .. note:: This is an abstract class. Users may not create jobs directly. """ #: Options to be passed to the backend job scheduler. #: #: :type: :class:`list` of :class:`str` #: :default: ``[]`` options = fields.TypedListField('options', str) #: List of shell commands to execute before launching this job. #: #: :type: :class:`list` of :class:`str` #: :default: ``[]`` #: #: .. note:: #: .. deprecated:: 2.10 #: Please use the :attr:`reframe.core.pipeline.RegressionTest.pre_run` #: field instead. pre_run = fields.DeprecatedField( fields.TypedListField('_pre_run', str), 'Use of the pre_run field of Job is deprecated. ' 'Please use the pre_run field of RegressionTest instead.') #: List of shell commands to execute after launching this job. #: #: :type: :class:`list` of :class:`str` #: :default: ``[]`` #: #: .. note:: #: .. deprecated:: 2.10 #: Please use the :attr:`reframe.core.pipeline.RegressionTest.post_run` #: field instead. post_run = fields.DeprecatedField( fields.TypedListField('_post_run', str), 'Use of the post_run field of Job is deprecated. ' 'Please use the post_run field of RegressionTest instead.') #: The parallel program launcher that will be used to launch the parallel #: executable of this job. #: #: :type: :class:`reframe.core.launchers.JobLauncher` launcher = fields.TypedField('launcher', JobLauncher) _jobid = fields.IntegerField('_jobid', allow_none=True) _exitcode = fields.IntegerField('_exitcode', allow_none=True) _state = fields.TypedField('_state', JobState, allow_none=True) # The sched_* arguments are exposed also to the frontend def __init__(self, name, command, launcher, environs=[], workdir='.', num_tasks=1, num_tasks_per_node=None, num_tasks_per_core=None, num_tasks_per_socket=None, num_cpus_per_task=None, use_smt=None, time_limit=(0, 10, 0), script_filename=None, stdout=None, stderr=None, pre_run=[], post_run=[], sched_account=None, sched_partition=None, sched_reservation=None, sched_nodelist=None, sched_exclude_nodelist=None, sched_exclusive_access=None, sched_options=[]): # Mutable fields self.options = list(sched_options) # Commands to be run before and after the job is launched self._pre_run = list(pre_run) self._post_run = list(post_run) self.launcher = launcher self._name = name self._command = command self._environs = list(environs) self._workdir = workdir self._num_tasks = num_tasks self._num_tasks_per_node = num_tasks_per_node self._num_tasks_per_core = num_tasks_per_core self._num_tasks_per_socket = num_tasks_per_socket self._num_cpus_per_task = num_cpus_per_task self._use_smt = use_smt self._script_filename = script_filename or '%s.sh' % self._name self._stdout = stdout or '%s.out' % self._name self._stderr = stderr or '%s.err' % self._name self._time_limit = time_limit # Backend scheduler related information self._sched_nodelist = sched_nodelist self._sched_exclude_nodelist = sched_exclude_nodelist self._sched_partition = sched_partition self._sched_reservation = sched_reservation self._sched_account = sched_account self._sched_exclusive_access = sched_exclusive_access # Live job information; to be filled during job's lifetime by the # scheduler self._jobid = None self._exitcode = None self._state = None def __repr__(self): return debug.repr(self) # Read-only properties @property def exitcode(self): return self._exitcode @property def jobid(self): return self._jobid @property def state(self): return self._state @property def name(self): return self._name @property def command(self): return self._command @property def workdir(self): return self._workdir @property def environs(self): return self._environs @property def num_tasks(self): return self._num_tasks @property def script_filename(self): return self._script_filename @property def stdout(self): return self._stdout @property def stderr(self): return self._stderr @property def time_limit(self): return self._time_limit @property def num_cpus_per_task(self): return self._num_cpus_per_task @property def num_tasks_per_core(self): return self._num_tasks_per_core @property def num_tasks_per_node(self): return self._num_tasks_per_node @property def num_tasks_per_socket(self): return self._num_tasks_per_socket @property def use_smt(self): return self._use_smt @property def sched_nodelist(self): return self._sched_nodelist @property def sched_exclude_nodelist(self): return self._sched_exclude_nodelist @property def sched_partition(self): return self._sched_partition @property def sched_reservation(self): return self._sched_reservation @property def sched_account(self): return self._sched_account @property def sched_exclusive_access(self): return self._sched_exclusive_access def emit_preamble(self, builder): for e in self._environs: e.emit_load_instructions(builder) for c in self._pre_run: builder.verbatim(c) def emit_postamble(self, script_builder): for c in self._post_run: script_builder.verbatim(c) def prepare(self, script_builder): self.emit_preamble(script_builder) script_builder.verbatim('cd %s' % self._workdir) self.launcher.emit_run_command(self, script_builder) self.emit_postamble(script_builder) with open(self.script_filename, 'w') as fp: fp.write(script_builder.finalise()) @abc.abstractmethod def submit(self): pass @abc.abstractmethod def wait(self): if self._jobid is None: raise JobNotStartedError('cannot wait an unstarted job') @abc.abstractmethod def cancel(self): if self._jobid is None: raise JobNotStartedError('cannot cancel an unstarted job') @abc.abstractmethod def finished(self): if self._jobid is None: raise JobNotStartedError('cannot poll an unstarted job')