#
# Scheduler implementations
#
import abc
import reframe.core.debug as debug
import reframe.utility.os as os_ext
import reframe.core.fields as fields
from reframe.core.exceptions import ConfigurationError
from reframe.core.launchers import JobLauncher
from reframe.core.logging import getlogger
from reframe.core.shell import BashScriptBuilder
class JobState:
def __init__(self, state):
self._state = state
def __repr__(self):
return debug.repr(self)
def __eq__(self, other):
if not isinstance(other, type(self)):
return NotImplemented
return self._state == other._state
def __str__(self):
return self._state
[docs]class Job(abc.ABC):
"""A job descriptor.
.. note::
This is an abstract class.
Users may not create jobs directly.
"""
#: Options to be passed to the backend job scheduler.
#:
#: :type: :class:`list` of :class:`str`
#: :default: ``[]``
options = fields.TypedListField('options', str)
#: List of shell commands to execute before launching this job.
#:
#: These commands do not execute in the context of ReFrame.
#: Instead, they are emitted in the generated job script just before the
#: actual job launch command.
#:
#: :type: :class:`list` of :class:`str`
#: :default: ``[]``
pre_run = fields.TypedListField('pre_run', str)
#: List of shell commands to execute after launching this job.
#:
#: See :attr:`pre_run` for a more detailed description of the semantics.
#:
#: :type: :class:`list` of :class:`str`
#: :default: ``[]``
post_run = fields.TypedListField('post_run', str)
#: The parallel program launcher that will be used to launch the parallel
#: executable of this job.
#:
#: :type: :class:`reframe.core.launchers.JobLauncher`
launcher = fields.TypedField('launcher', JobLauncher)
_jobid = fields.IntegerField('_jobid', allow_none=True)
_exitcode = fields.IntegerField('_exitcode', allow_none=True)
_state = fields.TypedField('_state', JobState, allow_none=True)
# The sched_* arguments are exposed also to the frontend
def __init__(self,
name,
command,
launcher,
environs=[],
workdir='.',
num_tasks=1,
num_tasks_per_node=None,
num_tasks_per_core=None,
num_tasks_per_socket=None,
num_cpus_per_task=None,
use_smt=None,
time_limit=(0, 10, 0),
script_filename=None,
stdout=None,
stderr=None,
sched_account=None,
sched_partition=None,
sched_reservation=None,
sched_nodelist=None,
sched_exclude_nodelist=None,
sched_exclusive_access=None,
sched_options=[]):
# Mutable fields
self.options = list(sched_options)
# Commands to be run before and after the job is launched
self.pre_run = []
self.post_run = []
self.launcher = launcher
self._name = name
self._command = command
self._environs = list(environs)
self._workdir = workdir
self._num_tasks = num_tasks
self._num_tasks_per_node = num_tasks_per_node
self._num_tasks_per_core = num_tasks_per_core
self._num_tasks_per_socket = num_tasks_per_socket
self._num_cpus_per_task = num_cpus_per_task
self._use_smt = use_smt
self._script_filename = script_filename or '%s.sh' % self._name
self._stdout = stdout or '%s.out' % self._name
self._stderr = stderr or '%s.err' % self._name
self._time_limit = time_limit
# Backend scheduler related information
self._sched_nodelist = sched_nodelist
self._sched_exclude_nodelist = sched_exclude_nodelist
self._sched_partition = sched_partition
self._sched_reservation = sched_reservation
self._sched_account = sched_account
self._sched_exclusive_access = sched_exclusive_access
# Live job information; to be filled during job's lifetime by the
# scheduler
self._jobid = None
self._exitcode = None
self._state = None
def __repr__(self):
return debug.repr(self)
# Read-only properties
@property
def exitcode(self):
return self._exitcode
@property
def jobid(self):
return self._jobid
@property
def state(self):
return self._state
@property
def name(self):
return self._name
@property
def command(self):
return self._command
@property
def workdir(self):
return self._workdir
@property
def environs(self):
return self._environs
@property
def num_tasks(self):
return self._num_tasks
@property
def script_filename(self):
return self._script_filename
@property
def stdout(self):
return self._stdout
@property
def stderr(self):
return self._stderr
@property
def time_limit(self):
return self._time_limit
@property
def num_cpus_per_task(self):
return self._num_cpus_per_task
@property
def num_tasks_per_core(self):
return self._num_tasks_per_core
@property
def num_tasks_per_node(self):
return self._num_tasks_per_node
@property
def num_tasks_per_socket(self):
return self._num_tasks_per_socket
@property
def use_smt(self):
return self._use_smt
@property
def sched_nodelist(self):
return self._sched_nodelist
@property
def sched_exclude_nodelist(self):
return self._sched_exclude_nodelist
@property
def sched_partition(self):
return self._sched_partition
@property
def sched_reservation(self):
return self._sched_reservation
@property
def sched_account(self):
return self._sched_account
@property
def sched_exclusive_access(self):
return self._sched_exclusive_access
def emit_preamble(self, builder):
for e in self._environs:
e.emit_load_instructions(builder)
for c in self.pre_run:
builder.verbatim(c)
def emit_postamble(self, script_builder):
for c in self.post_run:
script_builder.verbatim(c)
def prepare(self, script_builder):
self.emit_preamble(script_builder)
script_builder.verbatim('cd %s' % self._workdir)
self.launcher.emit_run_command(self, script_builder)
self.emit_postamble(script_builder)
with open(self.script_filename, 'w') as fp:
fp.write(script_builder.finalise())
@abc.abstractmethod
def submit(self):
pass
@abc.abstractmethod
def wait(self):
pass
@abc.abstractmethod
def cancel(self):
pass
@abc.abstractmethod
def finished(self):
pass