#
# Utilities for manipulating the modules subsystem
#
import abc
import os
import re
from collections import OrderedDict
import reframe.core.fields as fields
import reframe.utility.os_ext as os_ext
from reframe.core.exceptions import (ConfigError, EnvironError,
SpawnedProcessError)
class Module:
"""Module wrapper.
This class represents internally a module. Concrete module system
implementation should deal only with that.
"""
def __init__(self, name):
if not isinstance(name, str):
raise TypeError('module name not a string')
name = name.strip()
if not name:
raise ValueError('module name cannot be empty')
try:
self._name, self._version = name.split('/', maxsplit=1)
except ValueError:
self._name, self._version = name, None
@property
def name(self):
return self._name
@property
def version(self):
return self._version
@property
def fullname(self):
if self.version is not None:
return '/'.join((self.name, self.version))
else:
return self.name
def __hash__(self):
# Here we hash only over the name of the module, because foo/1.2 and
# simply foo compare equal. In case of hash conflicts (e.g., foo/1.2
# and foo/1.3), the equality operator will resolve it.
return hash(self.name)
def __eq__(self, other):
if not isinstance(other, type(self)):
return NotImplemented
if not self.version or not other.version:
return self.name == other.name
else:
return self.name == other.name and self.version == other.version
def __repr__(self):
return '%s(%s)' % (type(self).__name__, self.fullname)
def __str__(self):
return self.fullname
[docs]class ModulesSystem:
"""A modules system abstraction inside ReFrame.
This class interfaces between the framework internals and the actual
modules systems implementation.
"""
module_map = fields.AggregateTypeField('module_map',
(dict, (str, (list, str))))
@classmethod
def create(cls, modules_kind=None):
if modules_kind is None:
return ModulesSystem(NoModImpl())
elif modules_kind == 'tmod':
return ModulesSystem(TModImpl())
elif modules_kind == 'tmod4':
return ModulesSystem(TMod4Impl())
elif modules_kind == 'lmod':
return ModulesSystem(LModImpl())
else:
raise ConfigError('unknown module system: %s' % modules_kind)
def __init__(self, backend):
self._backend = backend
self.module_map = {}
[docs] def resolve_module(self, name):
"""Resolve module ``name`` in the registered module map.
:returns: the list of real modules names pointed to by ``name``.
:raises: :class:`reframe.core.exceptions.ConfigError` if the mapping
contains a cycle.
"""
ret = []
visited = set()
unvisited = [(name, None)]
path = []
while unvisited:
node, parent = unvisited.pop()
# Adjust the path
while path and path[-1] != parent:
path.pop()
try:
# We insert the adjacent nodes in reverse order, so as to
# preserve the DFS access order
adjacent = reversed(self.module_map[node])
except KeyError:
# We have reached a terminal node
ret.append(node)
else:
path.append(node)
for m in adjacent:
if m in path:
raise EnvironError('module cyclic dependency: ' +
'->'.join(path + [m]))
if m not in visited:
unvisited.append((m, node))
visited.add(node)
return ret
@property
def backend(self):
return(self._backend)
[docs] def loaded_modules(self):
"""Return a list of loaded modules.
This method returns a list of strings.
"""
return [str(m) for m in self._backend.loaded_modules()]
[docs] def conflicted_modules(self, name):
"""Return the list of the modules conflicting with module ``name``.
If module ``name`` resolves to multiple real modules, then the returned
list will be the concatenation of the conflict lists of all the real
modules.
This method returns a list of strings.
"""
ret = []
for m in self.resolve_module(name):
ret += self._conflicted_modules(m)
return ret
def _conflicted_modules(self, name):
return [str(m) for m in self._backend.conflicted_modules(Module(name))]
[docs] def load_module(self, name, force=False):
"""Load the module ``name``.
If ``force`` is set, forces the loading, unloading first any
conflicting modules currently loaded. If module ``name`` refers to
multiple real modules, all of the target modules will be loaded.
Returns the list of unloaded modules as strings.
"""
ret = []
for m in self.resolve_module(name):
ret += self._load_module(m, force)
return ret
def _load_module(self, name, force=False):
module = Module(name)
loaded_modules = self._backend.loaded_modules()
if module in loaded_modules:
# Do not try to load the module if it is already present
return []
# Get the list of the modules that need to be unloaded
unload_list = set()
if force:
conflict_list = self._backend.conflicted_modules(module)
unload_list = set(loaded_modules) & set(conflict_list)
for m in unload_list:
self._backend.unload_module(m)
self._backend.load_module(module)
return [str(m) for m in unload_list]
[docs] def unload_module(self, name):
"""Unload module ``name``.
If module ``name`` refers to multiple real modules, all the referred to
modules will be unloaded in reverse order.
"""
for m in reversed(self.resolve_module(name)):
self._unload_module(m)
def _unload_module(self, name):
self._backend.unload_module(Module(name))
[docs] def is_module_loaded(self, name):
"""Check if module ``name`` is loaded.
If module ``name`` refers to multiple real modules, this method will
return :class:`True` only if all the referees are loaded.
"""
return all(self._is_module_loaded(m) for m in self.resolve_module(name))
def _is_module_loaded(self, name):
return self._backend.is_module_loaded(Module(name))
[docs] def load_mapping(self, mapping):
"""Update the internal module mappings using a single mapping.
:arg mapping: a string specifying the module mapping.
Example syntax: ``'m0: m1 m2'``.
"""
key, *rest = mapping.split(':')
if len(rest) != 1:
raise ConfigError('invalid mapping syntax: %s' % mapping)
key = key.strip()
values = rest[0].split()
if not key:
raise ConfigError('no key found in mapping: %s' % mapping)
if not values:
raise ConfigError('no mapping defined for module: %s' % key)
self.module_map[key] = list(OrderedDict.fromkeys(values))
[docs] def load_mapping_from_file(self, filename):
"""Update the internal module mappings from mappings read from file."""
with open(filename) as fp:
for lineno, line in enumerate(fp, start=1):
line = line.strip().split('#')[0]
if not line:
continue
try:
self.load_mapping(line)
except ConfigError as e:
raise ConfigError('%s:%s' % (filename, lineno)) from e
@property
def name(self):
"""Return the name of this module system."""
return self._backend.name()
@property
def version(self):
"""Return the version of this module system."""
return self._backend.version()
[docs] def unload_all(self):
"""Unload all loaded modules."""
return self._backend.unload_all()
@property
def searchpath(self):
"""The module system search path as a list of directories."""
return self._backend.searchpath()
[docs] def searchpath_add(self, *dirs):
"""Add ``dirs`` to the module system search path."""
return self._backend.searchpath_add(*dirs)
[docs] def searchpath_remove(self, *dirs):
"""Remove ``dirs`` from the module system search path."""
return self._backend.searchpath_remove(*dirs)
[docs] def emit_load_commands(self, name):
"""Return the appropriate shell command for loading module ``name``."""
return [self._backend.emit_load_instr(Module(name))
for name in self.resolve_module(name)]
[docs] def emit_unload_commands(self, name):
"""Return the appropriate shell command for unloading module
``name``."""
return [self._backend.emit_unload_instr(Module(name))
for name in reversed(self.resolve_module(name))]
def __str__(self):
return str(self._backend)
class ModulesSystemImpl(abc.ABC):
"""Abstract base class for module systems."""
@abc.abstractmethod
def loaded_modules(self):
"""Return a list of loaded modules.
This method returns a list of Module instances.
"""
@abc.abstractmethod
def conflicted_modules(self, module):
"""Return the list of conflicted modules.
This method returns a list of Module instances.
"""
@abc.abstractmethod
def load_module(self, module):
"""Load the module ``name``.
If ``force`` is set, forces the loading,
unloading first any conflicting modules currently loaded.
Returns the unloaded modules as a list of module instances."""
@abc.abstractmethod
def unload_module(self, module):
"""Unload module ``module``."""
@abc.abstractmethod
def is_module_loaded(self, module):
"""Check presence of module ``module``."""
@abc.abstractmethod
def name(self):
"""Return the name of this module system."""
@abc.abstractmethod
def version(self):
"""Return the version of this module system."""
@abc.abstractmethod
def unload_all(self):
"""Unload all loaded modules."""
@abc.abstractmethod
def searchpath(self):
"""The module system search path as a list of directories."""
@abc.abstractmethod
def searchpath_add(self, *dirs):
"""Add ``dirs`` to the module system search path."""
@abc.abstractmethod
def searchpath_remove(self, *dirs):
"""Remove ``dirs`` from the module system search path."""
@abc.abstractmethod
def emit_load_instr(self, module):
"""Emit the instruction that loads module."""
@abc.abstractmethod
def emit_unload_instr(self, module):
"""Emit the instruction that unloads module."""
def __repr__(self):
return type(self).__name__ + '()'
def __str__(self):
return self.name() + ' ' + self.version()
class TModImpl(ModulesSystemImpl):
"""Module system for TMod (Tcl)."""
def __init__(self):
# Try to figure out if we are indeed using the TCL version
try:
completed = os_ext.run_command('modulecmd -V')
except OSError as e:
raise ConfigError(
'could not find a sane Tmod installation: %s' % e) from e
version_match = re.search(r'^VERSION=(\S+)', completed.stdout,
re.MULTILINE)
tcl_version_match = re.search(r'^TCL_VERSION=(\S+)', completed.stdout,
re.MULTILINE)
if version_match is None or tcl_version_match is None:
raise ConfigError('could not find a sane Tmod installation')
self._version = version_match.group(1)
self._command = 'modulecmd python'
try:
# Try the Python bindings now
completed = os_ext.run_command(self._command)
except OSError as e:
raise ConfigError(
'could not get the Python bindings for Tmod: ' % e) from e
if re.search(r'Unknown shell type', completed.stderr):
raise ConfigError(
'Python is not supported by this Tmod installation')
def name(self):
return 'tmod'
def version(self):
return self._version
def _run_module_command(self, *args):
command = [self._command, *args]
return os_ext.run_command(' '.join(command))
def _module_command_failed(self, completed):
return re.search(r'ERROR', completed.stderr) is not None
def _exec_module_command(self, *args, msg=None):
completed = self._run_module_command(*args)
if self._module_command_failed(completed):
if msg is None:
msg = 'modules system command failed: '
if isinstance(completed.args, str):
msg += completed.args
else:
msg += ' '.join(completed.args)
raise EnvironError(msg)
exec(completed.stdout)
def loaded_modules(self):
try:
# LOADEDMODULES may be defined but empty
return [Module(m)
for m in os.environ['LOADEDMODULES'].split(':') if m]
except KeyError:
return []
def conflicted_modules(self, module):
conflict_list = []
completed = self._run_module_command('show', str(module))
return [Module(m.group(1))
for m in re.finditer(r'^conflict\s+(\S+)',
completed.stderr, re.MULTILINE)]
def is_module_loaded(self, module):
return module in self.loaded_modules()
def load_module(self, module):
self._exec_module_command('load', str(module),
msg='could not load module %s' % module)
def unload_module(self, module):
self._exec_module_command('unload', str(module),
msg='could not unload module %s' % module)
def unload_all(self):
self._exec_module_command('purge')
def searchpath(self):
return os.environ['MODULEPATH'].split(':')
def searchpath_add(self, *dirs):
self._exec_module_command('use', *dirs)
def searchpath_remove(self, *dirs):
self._exec_module_command('unuse', *dirs)
def emit_load_instr(self, module):
return 'module load %s' % module
def emit_unload_instr(self, module):
return 'module unload %s' % module
class TMod4Impl(TModImpl):
"""Module system for TMod 4."""
def __init__(self):
self._command = 'modulecmd python'
try:
completed = os_ext.run_command(self._command + ' -V', check=True)
except OSError as e:
raise ConfigError(
'could not find a sane Tmod4 installation') from e
except SpawnedProcessError as e:
raise ConfigError(
'could not get the Python bindings for Tmod4') from e
version_match = re.match('^Modules Release (\S+)\s+', completed.stderr)
if not version_match:
raise ConfigError('could not retrieve the TMod4 version')
self._version = version_match.group(1)
def name(self):
return 'tmod4'
def _exec_module_command(self, *args, msg=None):
command = ' '.join([self._command, *args])
completed = os_ext.run_command(command, check=True)
namespace = {}
exec(completed.stdout, {}, namespace)
if not namespace['_mlstatus']:
# _mlstatus is set by the TMod4 Python bindings
if msg is None:
msg = 'modules system command failed: '
if isinstance(completed.args, str):
msg += completed.args
else:
msg += ' '.join(completed.args)
raise EnvironError(msg)
class LModImpl(TModImpl):
"""Module system for Lmod (Tcl/Lua)."""
def __init__(self):
# Try to figure out if we are indeed using LMOD
lmod_cmd = os.getenv('LMOD_CMD')
if lmod_cmd is None:
raise ConfigError('could not find a sane Lmod installation: '
'environment variable LMOD_CMD is not defined')
try:
completed = os_ext.run_command('%s --version' % lmod_cmd)
except OSError as e:
raise ConfigError(
'could not find a sane Lmod installation: %s' % e)
version_match = re.search(r'.*Version\s*(\S+)', completed.stderr,
re.MULTILINE)
if version_match is None:
raise ConfigError('could not retrieve Lmod version')
self._version = version_match.group(1)
self._command = '%s python ' % lmod_cmd
try:
# Try the Python bindings now
completed = os_ext.run_command(self._command)
except OSError as e:
raise ConfigError(
'could not get the Python bindings for Lmod: ' % e)
if re.search(r'Unknown shell type', completed.stderr):
raise ConfigError('Python is not supported by '
'this Lmod installation')
def name(self):
return 'lmod'
def _module_command_failed(self, completed):
return completed.stdout.strip() == 'false'
def conflicted_modules(self, module):
conflict_list = []
completed = self._run_module_command('show', str(module))
# Lmod accepts both Lua and and Tcl syntax
# The following test allows incorrect syntax, e.g., `conflict
# ('package"(`, but we expect this to be caught by the Lmod framework
# in earlier stages.
ret = []
for m in re.finditer(r'conflict\s*(\S+)', completed.stderr):
conflict_arg = m.group(1)
if conflict_arg.startswith('('):
# Lua syntax
ret.append(Module(conflict_arg.strip('\'"()')))
else:
# Tmod syntax
ret.append(Module(conflict_arg))
return ret
def unload_all(self):
# Currently, we don't take any provision for sticky modules in Lmod, so
# we forcefully unload everything.
self._exec_module_command('--force', 'purge')
class NoModImpl(ModulesSystemImpl):
"""A convenience class that implements a no-op a modules system."""
def loaded_modules(self):
return []
def conflicted_modules(self, module):
return []
def load_module(self, module):
pass
def unload_module(self, module):
pass
def is_module_loaded(self, module):
#
# Always return `True`, since this pseudo modules system effectively
# assumes that everything needed is loaded.
#
return True
def name(self):
return 'nomod'
def version(self):
return '1.0'
def unload_all(self):
pass
def searchpath(self):
return []
def searchpath_add(self, *dirs):
pass
def searchpath_remove(self, *dirs):
pass
def emit_load_instr(self, module):
return ''
def emit_unload_instr(self, module):
return ''