Source code for reframe.core.modules

#
# Utilities for manipulating the modules subsystem
#

import abc
import os
import re
from collections import OrderedDict

import reframe.core.fields as fields
import reframe.utility.os_ext as os_ext
from reframe.core.exceptions import (ConfigError, EnvironError,
                                     SpawnedProcessError)


class Module:
    """Module wrapper.

    This class represents internally a module. Concrete module system
    implementation should deal only with that.
    """

    def __init__(self, name):
        if not isinstance(name, str):
            raise TypeError('module name not a string')

        name = name.strip()
        if not name:
            raise ValueError('module name cannot be empty')

        try:
            self._name, self._version = name.split('/', maxsplit=1)
        except ValueError:
            self._name, self._version = name, None

    @property
    def name(self):
        return self._name

    @property
    def version(self):
        return self._version

    @property
    def fullname(self):
        if self.version is not None:
            return '/'.join((self.name, self.version))
        else:
            return self.name

    def __hash__(self):
        # Here we hash only over the name of the module, because foo/1.2 and
        # simply foo compare equal. In case of hash conflicts (e.g., foo/1.2
        # and foo/1.3), the equality operator will resolve it.
        return hash(self.name)

    def __eq__(self, other):
        if not isinstance(other, type(self)):
            return NotImplemented

        if not self.version or not other.version:
            return self.name == other.name
        else:
            return self.name == other.name and self.version == other.version

    def __repr__(self):
        return '%s(%s)' % (type(self).__name__, self.fullname)

    def __str__(self):
        return self.fullname


[docs]class ModulesSystem: """A modules system abstraction inside ReFrame. This class interfaces between the framework internals and the actual modules systems implementation. """ module_map = fields.AggregateTypeField('module_map', (dict, (str, (list, str)))) @classmethod def create(cls, modules_kind=None): if modules_kind is None: return ModulesSystem(NoModImpl()) elif modules_kind == 'tmod': return ModulesSystem(TModImpl()) elif modules_kind == 'tmod4': return ModulesSystem(TMod4Impl()) elif modules_kind == 'lmod': return ModulesSystem(LModImpl()) else: raise ConfigError('unknown module system: %s' % modules_kind) def __init__(self, backend): self._backend = backend self.module_map = {}
[docs] def resolve_module(self, name): """Resolve module ``name`` in the registered module map. :returns: the list of real modules names pointed to by ``name``. :raises: :class:`reframe.core.exceptions.ConfigError` if the mapping contains a cycle. """ ret = [] visited = set() unvisited = [(name, None)] path = [] while unvisited: node, parent = unvisited.pop() # Adjust the path while path and path[-1] != parent: path.pop() try: # We insert the adjacent nodes in reverse order, so as to # preserve the DFS access order adjacent = reversed(self.module_map[node]) except KeyError: # We have reached a terminal node ret.append(node) else: path.append(node) for m in adjacent: if m in path: raise EnvironError('module cyclic dependency: ' + '->'.join(path + [m])) if m not in visited: unvisited.append((m, node)) visited.add(node) return ret
@property def backend(self): return(self._backend)
[docs] def loaded_modules(self): """Return a list of loaded modules. This method returns a list of strings. """ return [str(m) for m in self._backend.loaded_modules()]
[docs] def conflicted_modules(self, name): """Return the list of the modules conflicting with module ``name``. If module ``name`` resolves to multiple real modules, then the returned list will be the concatenation of the conflict lists of all the real modules. This method returns a list of strings. """ ret = [] for m in self.resolve_module(name): ret += self._conflicted_modules(m) return ret
def _conflicted_modules(self, name): return [str(m) for m in self._backend.conflicted_modules(Module(name))]
[docs] def load_module(self, name, force=False): """Load the module ``name``. If ``force`` is set, forces the loading, unloading first any conflicting modules currently loaded. If module ``name`` refers to multiple real modules, all of the target modules will be loaded. Returns the list of unloaded modules as strings. """ ret = [] for m in self.resolve_module(name): ret += self._load_module(m, force) return ret
def _load_module(self, name, force=False): module = Module(name) loaded_modules = self._backend.loaded_modules() if module in loaded_modules: # Do not try to load the module if it is already present return [] # Get the list of the modules that need to be unloaded unload_list = set() if force: conflict_list = self._backend.conflicted_modules(module) unload_list = set(loaded_modules) & set(conflict_list) for m in unload_list: self._backend.unload_module(m) self._backend.load_module(module) return [str(m) for m in unload_list]
[docs] def unload_module(self, name): """Unload module ``name``. If module ``name`` refers to multiple real modules, all the referred to modules will be unloaded in reverse order. """ for m in reversed(self.resolve_module(name)): self._unload_module(m)
def _unload_module(self, name): self._backend.unload_module(Module(name))
[docs] def is_module_loaded(self, name): """Check if module ``name`` is loaded. If module ``name`` refers to multiple real modules, this method will return :class:`True` only if all the referees are loaded. """ return all(self._is_module_loaded(m) for m in self.resolve_module(name))
def _is_module_loaded(self, name): return self._backend.is_module_loaded(Module(name))
[docs] def load_mapping(self, mapping): """Update the internal module mappings using a single mapping. :arg mapping: a string specifying the module mapping. Example syntax: ``'m0: m1 m2'``. """ key, *rest = mapping.split(':') if len(rest) != 1: raise ConfigError('invalid mapping syntax: %s' % mapping) key = key.strip() values = rest[0].split() if not key: raise ConfigError('no key found in mapping: %s' % mapping) if not values: raise ConfigError('no mapping defined for module: %s' % key) self.module_map[key] = list(OrderedDict.fromkeys(values))
[docs] def load_mapping_from_file(self, filename): """Update the internal module mappings from mappings read from file.""" with open(filename) as fp: for lineno, line in enumerate(fp, start=1): line = line.strip().split('#')[0] if not line: continue try: self.load_mapping(line) except ConfigError as e: raise ConfigError('%s:%s' % (filename, lineno)) from e
@property def name(self): """Return the name of this module system.""" return self._backend.name() @property def version(self): """Return the version of this module system.""" return self._backend.version()
[docs] def unload_all(self): """Unload all loaded modules.""" return self._backend.unload_all()
@property def searchpath(self): """The module system search path as a list of directories.""" return self._backend.searchpath()
[docs] def searchpath_add(self, *dirs): """Add ``dirs`` to the module system search path.""" return self._backend.searchpath_add(*dirs)
[docs] def searchpath_remove(self, *dirs): """Remove ``dirs`` from the module system search path.""" return self._backend.searchpath_remove(*dirs)
[docs] def emit_load_commands(self, name): """Return the appropriate shell command for loading module ``name``.""" return [self._backend.emit_load_instr(Module(name)) for name in self.resolve_module(name)]
[docs] def emit_unload_commands(self, name): """Return the appropriate shell command for unloading module ``name``.""" return [self._backend.emit_unload_instr(Module(name)) for name in reversed(self.resolve_module(name))]
def __str__(self): return str(self._backend)
class ModulesSystemImpl(abc.ABC): """Abstract base class for module systems.""" @abc.abstractmethod def loaded_modules(self): """Return a list of loaded modules. This method returns a list of Module instances. """ @abc.abstractmethod def conflicted_modules(self, module): """Return the list of conflicted modules. This method returns a list of Module instances. """ @abc.abstractmethod def load_module(self, module): """Load the module ``name``. If ``force`` is set, forces the loading, unloading first any conflicting modules currently loaded. Returns the unloaded modules as a list of module instances.""" @abc.abstractmethod def unload_module(self, module): """Unload module ``module``.""" @abc.abstractmethod def is_module_loaded(self, module): """Check presence of module ``module``.""" @abc.abstractmethod def name(self): """Return the name of this module system.""" @abc.abstractmethod def version(self): """Return the version of this module system.""" @abc.abstractmethod def unload_all(self): """Unload all loaded modules.""" @abc.abstractmethod def searchpath(self): """The module system search path as a list of directories.""" @abc.abstractmethod def searchpath_add(self, *dirs): """Add ``dirs`` to the module system search path.""" @abc.abstractmethod def searchpath_remove(self, *dirs): """Remove ``dirs`` from the module system search path.""" @abc.abstractmethod def emit_load_instr(self, module): """Emit the instruction that loads module.""" @abc.abstractmethod def emit_unload_instr(self, module): """Emit the instruction that unloads module.""" def __repr__(self): return type(self).__name__ + '()' def __str__(self): return self.name() + ' ' + self.version() class TModImpl(ModulesSystemImpl): """Module system for TMod (Tcl).""" def __init__(self): # Try to figure out if we are indeed using the TCL version try: completed = os_ext.run_command('modulecmd -V') except OSError as e: raise ConfigError( 'could not find a sane Tmod installation: %s' % e) from e version_match = re.search(r'^VERSION=(\S+)', completed.stdout, re.MULTILINE) tcl_version_match = re.search(r'^TCL_VERSION=(\S+)', completed.stdout, re.MULTILINE) if version_match is None or tcl_version_match is None: raise ConfigError('could not find a sane Tmod installation') self._version = version_match.group(1) self._command = 'modulecmd python' try: # Try the Python bindings now completed = os_ext.run_command(self._command) except OSError as e: raise ConfigError( 'could not get the Python bindings for Tmod: ' % e) from e if re.search(r'Unknown shell type', completed.stderr): raise ConfigError( 'Python is not supported by this Tmod installation') def name(self): return 'tmod' def version(self): return self._version def _run_module_command(self, *args): command = [self._command, *args] return os_ext.run_command(' '.join(command)) def _module_command_failed(self, completed): return re.search(r'ERROR', completed.stderr) is not None def _exec_module_command(self, *args, msg=None): completed = self._run_module_command(*args) if self._module_command_failed(completed): if msg is None: msg = 'modules system command failed: ' if isinstance(completed.args, str): msg += completed.args else: msg += ' '.join(completed.args) raise EnvironError(msg) exec(completed.stdout) def loaded_modules(self): try: # LOADEDMODULES may be defined but empty return [Module(m) for m in os.environ['LOADEDMODULES'].split(':') if m] except KeyError: return [] def conflicted_modules(self, module): conflict_list = [] completed = self._run_module_command('show', str(module)) return [Module(m.group(1)) for m in re.finditer(r'^conflict\s+(\S+)', completed.stderr, re.MULTILINE)] def is_module_loaded(self, module): return module in self.loaded_modules() def load_module(self, module): self._exec_module_command('load', str(module), msg='could not load module %s' % module) def unload_module(self, module): self._exec_module_command('unload', str(module), msg='could not unload module %s' % module) def unload_all(self): self._exec_module_command('purge') def searchpath(self): return os.environ['MODULEPATH'].split(':') def searchpath_add(self, *dirs): self._exec_module_command('use', *dirs) def searchpath_remove(self, *dirs): self._exec_module_command('unuse', *dirs) def emit_load_instr(self, module): return 'module load %s' % module def emit_unload_instr(self, module): return 'module unload %s' % module class TMod4Impl(TModImpl): """Module system for TMod 4.""" def __init__(self): self._command = 'modulecmd python' try: completed = os_ext.run_command(self._command + ' -V', check=True) except OSError as e: raise ConfigError( 'could not find a sane Tmod4 installation') from e except SpawnedProcessError as e: raise ConfigError( 'could not get the Python bindings for Tmod4') from e version_match = re.match('^Modules Release (\S+)\s+', completed.stderr) if not version_match: raise ConfigError('could not retrieve the TMod4 version') self._version = version_match.group(1) def name(self): return 'tmod4' def _exec_module_command(self, *args, msg=None): command = ' '.join([self._command, *args]) completed = os_ext.run_command(command, check=True) namespace = {} exec(completed.stdout, {}, namespace) if not namespace['_mlstatus']: # _mlstatus is set by the TMod4 Python bindings if msg is None: msg = 'modules system command failed: ' if isinstance(completed.args, str): msg += completed.args else: msg += ' '.join(completed.args) raise EnvironError(msg) class LModImpl(TModImpl): """Module system for Lmod (Tcl/Lua).""" def __init__(self): # Try to figure out if we are indeed using LMOD lmod_cmd = os.getenv('LMOD_CMD') if lmod_cmd is None: raise ConfigError('could not find a sane Lmod installation: ' 'environment variable LMOD_CMD is not defined') try: completed = os_ext.run_command('%s --version' % lmod_cmd) except OSError as e: raise ConfigError( 'could not find a sane Lmod installation: %s' % e) version_match = re.search(r'.*Version\s*(\S+)', completed.stderr, re.MULTILINE) if version_match is None: raise ConfigError('could not retrieve Lmod version') self._version = version_match.group(1) self._command = '%s python ' % lmod_cmd try: # Try the Python bindings now completed = os_ext.run_command(self._command) except OSError as e: raise ConfigError( 'could not get the Python bindings for Lmod: ' % e) if re.search(r'Unknown shell type', completed.stderr): raise ConfigError('Python is not supported by ' 'this Lmod installation') def name(self): return 'lmod' def _module_command_failed(self, completed): return completed.stdout.strip() == 'false' def conflicted_modules(self, module): conflict_list = [] completed = self._run_module_command('show', str(module)) # Lmod accepts both Lua and and Tcl syntax # The following test allows incorrect syntax, e.g., `conflict # ('package"(`, but we expect this to be caught by the Lmod framework # in earlier stages. ret = [] for m in re.finditer(r'conflict\s*(\S+)', completed.stderr): conflict_arg = m.group(1) if conflict_arg.startswith('('): # Lua syntax ret.append(Module(conflict_arg.strip('\'"()'))) else: # Tmod syntax ret.append(Module(conflict_arg)) return ret def unload_all(self): # Currently, we don't take any provision for sticky modules in Lmod, so # we forcefully unload everything. self._exec_module_command('--force', 'purge') class NoModImpl(ModulesSystemImpl): """A convenience class that implements a no-op a modules system.""" def loaded_modules(self): return [] def conflicted_modules(self, module): return [] def load_module(self, module): pass def unload_module(self, module): pass def is_module_loaded(self, module): # # Always return `True`, since this pseudo modules system effectively # assumes that everything needed is loaded. # return True def name(self): return 'nomod' def version(self): return '1.0' def unload_all(self): pass def searchpath(self): return [] def searchpath_add(self, *dirs): pass def searchpath_remove(self, *dirs): pass def emit_load_instr(self, module): return '' def emit_unload_instr(self, module): return ''