# Copyright 2016-2020 Swiss National Supercomputing Centre (CSCS/ETH Zurich)
# ReFrame Project Developers. See the top-level LICENSE file for details.
#
# SPDX-License-Identifier: BSD-3-Clause
#
# Handling of the current host context
#
import os
import functools
from datetime import datetime
import reframe.core.config as config
import reframe.core.fields as fields
import reframe.utility.osext as osext
from reframe.core.environments import (Environment, snapshot)
from reframe.core.exceptions import ReframeFatalError
from reframe.core.logging import getlogger
from reframe.core.systems import System
[docs]class RuntimeContext:
'''The runtime context of the framework.
There is a single instance of this class globally in the framework.
.. versionadded:: 2.13
'''
def __init__(self, site_config):
self._site_config = site_config
self._system = System.create(site_config)
self._current_run = 0
self._timestamp = datetime.now()
def _makedir(self, *dirs, wipeout=False):
ret = os.path.join(*dirs)
if wipeout:
osext.rmtree(ret, ignore_errors=True)
os.makedirs(ret, exist_ok=True)
return ret
def _format_dirs(self, *dirs):
if not self.get_option('general/0/clean_stagedir'):
# If stagedir is to be reused, no new stage directories will be
# used for retries
return dirs
try:
last = dirs[-1]
except IndexError:
return dirs
current_run = runtime().current_run
if current_run == 0:
return dirs
last += '_retry%s' % current_run
return (*dirs[:-1], last)
def next_run(self):
self._current_run += 1
@property
def current_run(self):
return self._current_run
@property
def site_config(self):
return self._site_config
@property
def system(self):
'''The current host system.
:type: :class:`reframe.core.systems.System`
'''
return self._system
@property
def prefix(self):
return osext.expandvars(
self.site_config.get('systems/0/prefix')
)
@property
def stagedir(self):
return osext.expandvars(
self.site_config.get('systems/0/stagedir')
)
@property
def outputdir(self):
return osext.expandvars(
self.site_config.get('systems/0/outputdir')
)
@property
def perflogdir(self):
# Find the first filelog handler
handlers = self.site_config.get('logging/0/handlers_perflog')
for i, h in enumerate(handlers):
if h['type'] == 'filelog':
break
return osext.expandvars(
self.site_config.get(f'logging/0/handlers_perflog/{i}/basedir')
)
@property
def timestamp(self):
timefmt = self.site_config.get('general/0/timestamp_dirs')
return self._timestamp.strftime(timefmt)
@property
def output_prefix(self):
'''The output directory prefix.
:type: :class:`str`
'''
if self.outputdir:
ret = os.path.join(self.outputdir, self.timestamp)
else:
ret = os.path.join(self.prefix, 'output', self.timestamp)
return os.path.abspath(ret)
@property
def stage_prefix(self):
'''The stage directory prefix.
:type: :class:`str`
'''
if self.stagedir:
ret = os.path.join(self.stagedir, self.timestamp)
else:
ret = os.path.join(self.prefix, 'stage', self.timestamp)
return os.path.abspath(ret)
def make_stagedir(self, *dirs):
wipeout = self.get_option('general/0/clean_stagedir')
ret = self._makedir(self.stage_prefix,
*self._format_dirs(*dirs), wipeout=wipeout)
getlogger().debug(
f'Created stage directory {ret!r} [clean_stagedir: {wipeout}]'
)
return ret
def make_outputdir(self, *dirs):
ret = self._makedir(self.output_prefix,
*self._format_dirs(*dirs), wipeout=True)
getlogger().debug(f'Created output directory {ret!r}')
return ret
@property
def modules_system(self):
'''The environment modules system used in the current host.
:type: :class:`reframe.core.modules.ModulesSystem`.
'''
return self._system.modules_system
[docs] def get_option(self, option):
'''Get a configuration option.
:arg option: The option to be retrieved.
:returns: The value of the option.
'''
return self._site_config.get(option)
# Global resources for the current host
_runtime_context = None
def init_runtime(site_config):
global _runtime_context
if _runtime_context is None:
_runtime_context = RuntimeContext(site_config)
[docs]def runtime():
'''Get the runtime context of the framework.
.. versionadded:: 2.13
:returns: A :class:`reframe.core.runtime.RuntimeContext` object.
'''
if _runtime_context is None:
raise ReframeFatalError('no runtime context is configured')
return _runtime_context
[docs]def loadenv(*environs):
'''Load environments in the current Python context.
:arg environs: A list of environments to load.
:type environs: List[Environment]
:returns: A tuple containing snapshot of the current environment upon
entry to this function and a list of shell commands required to load
the environments.
:rtype: Tuple[_EnvironmentSnapshot, List[str]]
'''
modules_system = runtime().modules_system
env_snapshot = snapshot()
commands = []
for env in environs:
for mod in env.modules_detailed:
load_seq = modules_system.load_module(**mod, force=True)
for m, conflicted in load_seq:
for c in conflicted:
commands += modules_system.emit_unload_commands(c)
commands += modules_system.emit_load_commands(
m, mod['collection']
)
for k, v in env.variables.items():
os.environ[k] = osext.expandvars(v)
commands.append(f'export {k}={v}')
return env_snapshot, commands
def emit_loadenv_commands(*environs):
env_snapshot = snapshot()
try:
_, commands = loadenv(*environs)
finally:
env_snapshot.restore()
return commands
[docs]def is_env_loaded(environ):
'''Check if environment is loaded.
:arg environ: Environment to check for.
:type environ: Environment
:returns: :class:`True` if this environment is loaded, :class:`False`
otherwise.
'''
is_module_loaded = runtime().modules_system.is_module_loaded
return (all(map(is_module_loaded, environ.modules)) and
all(os.environ.get(k, None) == osext.expandvars(v)
for k, v in environ.variables.items()))
[docs]class temp_environment:
'''Context manager to temporarily change the environment.'''
def __init__(self, modules=[], variables=[]):
self._modules = modules
self._variables = variables
def __enter__(self):
new_env = Environment('_rfm_temp_env', self._modules, self._variables)
self._environ_save, _ = loadenv(new_env)
return new_env
def __exit__(self, exc_type, exc_value, traceback):
self._environ_save.restore()
# The following utilities are useful only for the unit tests
class temp_runtime:
'''Context manager to temporarily switch to another runtime.
:meta private:
'''
def __init__(self, config_file, sysname=None, options=None):
global _runtime_context
options = options or {}
self._runtime_save = _runtime_context
if config_file is None:
_runtime_context = None
else:
site_config = config.load_config(config_file)
site_config.select_subconfig(sysname, ignore_resolve_errors=True)
for opt, value in options.items():
site_config.add_sticky_option(opt, value)
_runtime_context = RuntimeContext(site_config)
def __enter__(self):
return _runtime_context
def __exit__(self, exc_type, exc_value, traceback):
global _runtime_context
_runtime_context = self._runtime_save
def switch_runtime(config_file, sysname=None, options=None):
'''Function decorator for temporarily changing the runtime for a
function.
:meta private:
'''
def _runtime_deco(fn):
@functools.wraps(fn)
def _fn(*args, **kwargs):
with temp_runtime(config_file, sysname, options):
ret = fn(*args, **kwargs)
return ret
return _fn
return _runtime_deco
[docs]class module_use:
'''Context manager for temporarily modifying the module path.'''
def __init__(self, *paths):
self._paths = paths
def __enter__(self):
runtime().modules_system.searchpath_add(*self._paths)
return self
def __exit__(self, exc_type, exc_value, traceback):
runtime().modules_system.searchpath_remove(*self._paths)