# Copyright 2016-2022 Swiss National Supercomputing Centre (CSCS/ETH Zurich)
# ReFrame Project Developers. See the top-level LICENSE file for details.
#
# SPDX-License-Identifier: BSD-3-Clause
import builtins
import collections.abc
import contextlib
import glob as pyglob
import itertools
import os
import re
import sys
import reframe.utility as util
import reframe.core.warnings as warn
from reframe.core.deferrable import (deferrable, _DeferredExpression,
_DeferredPerformanceExpression)
from reframe.core.exceptions import SanityError
def _format(s, *args, **kwargs):
'''Safely format string ``s``.
Returns ``s.format(*args, **kwargs)`` if no exception is thrown, otherwise
``s``.
'''
try:
return s.format(*args, **kwargs)
except (IndexError, KeyError):
return s
@contextlib.contextmanager
def _open(filename, *args, **kwargs):
try:
with open(filename, *args, **kwargs) as fp:
yield fp
except OSError as e:
# Re-raise it as sanity error
raise SanityError(f'{filename}: {e.strerror}')
# Create an alias decorator
def sanity_function(func):
warn.user_deprecation_warning(
'using the @sn.sanity_function decorator from the sn module is '
'deprecated; please use the built-in decorator @deferrable instead.',
from_version='3.8.0'
)
return deferrable(func)
# Deferrable versions of selected builtins
[docs]@deferrable
def abs(x):
'''Replacement for the built-in :func:`abs() <python:abs>` function.'''
return builtins.abs(x)
[docs]@deferrable
def all(iterable):
'''Replacement for the built-in :func:`all() <python:all>` function.'''
return builtins.all(iterable)
[docs]@deferrable
def any(iterable):
'''Replacement for the built-in :func:`any() <python:any>` function.'''
return builtins.any(iterable)
[docs]@deferrable
def chain(*iterables):
'''Replacement for the :func:`itertools.chain() <python:itertools.chain>`
function.'''
return itertools.chain(*iterables)
[docs]@deferrable
def enumerate(iterable, start=0):
'''Replacement for the built-in
:func:`enumerate() <python:enumerate>` function.'''
return builtins.enumerate(iterable, start)
[docs]@deferrable
def filter(function, iterable):
'''Replacement for the built-in
:func:`filter() <python:filter>` function.'''
return builtins.filter(function, iterable)
[docs]@deferrable
def getattr(obj, attr, *args):
'''Replacement for the built-in
:func:`getattr() <python:getattr>` function.'''
return builtins.getattr(obj, attr, *args)
[docs]@deferrable
def hasattr(obj, name):
'''Replacement for the built-in
:func:`hasattr() <python:hasattr>` function.'''
return builtins.hasattr(obj, name)
[docs]@deferrable
def len(s):
'''Replacement for the built-in :func:`len() <python:len>` function.'''
return builtins.len(s)
[docs]@deferrable
def map(function, *iterables):
'''Replacement for the built-in :func:`map() <python:map>` function.'''
return builtins.map(function, *iterables)
[docs]@deferrable
def max(*args):
'''Replacement for the built-in :func:`max() <python:max>` function.'''
return builtins.max(*args)
[docs]@deferrable
def min(*args):
'''Replacement for the built-in :func:`min() <python:min>` function.'''
return builtins.min(*args)
[docs]@deferrable
def print(obj, *, sep=' ', end='\n', file=None, flush=False):
'''Replacement for the built-in :func:`print() <python:print>` function.
The only difference is that this function takes a *single* object argument
and it returns that, so that you can use it transparently inside a complex
sanity expression. For example, you could write the following to print the
matches returned from the :func:`extractall()` function:
.. code:: python
@sanity_function
def my_sanity_fn(self):
return sn.assert_eq(
sn.count(sn.print(sn.extract_all(...))), 10
)
If ``file`` is None, :func:`print` will print its arguments to the
standard output. Unlike the builtin :func:`print() <python:print>`
function, we don't bind the ``file`` argument to :attr:`sys.stdout` by
default. This would capture :attr:`sys.stdout` at the time this function
is defined and would prevent it from seeing changes to :attr:`sys.stdout`,
such as redirects, in the future.
.. versionchanged:: 3.4
This function accepts now a single object argument in contrast to the
built-in :func:`print() <python:print>` function, which accepts
multiple.
'''
if file is None:
file = sys.stdout
builtins.print(obj, sep=sep, end=end, file=file, flush=flush)
return obj
[docs]@deferrable
def reversed(seq):
'''Replacement for the built-in
:func:`reversed() <python:reversed>` function.'''
return builtins.reversed(seq)
[docs]@deferrable
def round(number, *args):
'''Replacement for the built-in
:func:`round() <python:round>` function.'''
return builtins.round(number, *args)
[docs]@deferrable
def setattr(obj, name, value):
'''Replacement for the built-in
:func:`setattr() <python:setattr>` function.'''
builtins.setattr(obj, name, value)
[docs]@deferrable
def sorted(iterable, *args):
'''Replacement for the built-in
:func:`sorted() <python:sorted>` function.'''
return builtins.sorted(iterable, *args)
[docs]@deferrable
def sum(iterable, *args):
'''Replacement for the built-in :func:`sum() <python:sum>` function.'''
return builtins.sum(iterable, *args)
[docs]@deferrable
def zip(*iterables):
'''Replacement for the built-in :func:`zip() <python:zip>` function.'''
return builtins.zip(*iterables)
# Alternatives for non-overridable operators
[docs]@deferrable
def and_(a, b):
'''Deferrable version of the :keyword:`and` operator.
:returns: ``a and b``.'''
return builtins.all([a, b])
[docs]@deferrable
def or_(a, b):
'''Deferrable version of the :keyword:`or` operator.
:returns: ``a or b``.'''
return builtins.any([a, b])
[docs]@deferrable
def not_(a):
'''Deferrable version of the :keyword:`not` operator.
:returns: ``not a``.'''
return not a
[docs]@deferrable
def contains(seq, key):
'''Deferrable version of the :keyword:`in` operator.
:returns: ``key in seq``.'''
return key in seq
# Deferrable assert functions
[docs]@deferrable
def assert_true(x, msg=None):
'''Assert that ``x`` is evaluated to ``True``.
:arg msg: The error message to use if the assertion fails. You may use
``{0}`` ... ``{N}`` as placeholders for the function arguments.
:returns: ``True`` on success.
:raises reframe.core.exceptions.SanityError: if assertion fails.
'''
if builtins.bool(x) is not True:
error_msg = msg or '{0} is not True'
raise SanityError(_format(error_msg, x))
return True
[docs]@deferrable
def assert_false(x, msg=None):
'''Assert that ``x`` is evaluated to ``False``.
:arg msg: The error message to use if the assertion fails. You may use
``{0}`` ... ``{N}`` as placeholders for the function arguments.
:returns: ``True`` on success.
:raises reframe.core.exceptions.SanityError: if assertion fails.
'''
if builtins.bool(x) is not False:
error_msg = msg or '{0} is not False'
raise SanityError(_format(error_msg, x))
return True
[docs]@deferrable
def assert_eq(a, b, msg=None):
'''Assert that ``a == b``.
:arg msg: The error message to use if the assertion fails. You may use
``{0}`` ... ``{N}`` as placeholders for the function arguments.
:returns: ``True`` on success.
:raises reframe.core.exceptions.SanityError: if assertion fails.
'''
if a != b:
error_msg = msg or '{0} != {1}'
raise SanityError(_format(error_msg, a, b))
return True
[docs]@deferrable
def assert_ne(a, b, msg=None):
'''Assert that ``a != b``.
:arg msg: The error message to use if the assertion fails. You may use
``{0}`` ... ``{N}`` as placeholders for the function arguments.
:returns: ``True`` on success.
:raises reframe.core.exceptions.SanityError: if assertion fails.
'''
if a == b:
error_msg = msg or '{0} == {1}'
raise SanityError(_format(error_msg, a, b))
return True
[docs]@deferrable
def assert_in(item, container, msg=None):
'''Assert that ``item`` is in ``container``.
:arg msg: The error message to use if the assertion fails. You may use
``{0}`` ... ``{N}`` as placeholders for the function arguments.
:returns: ``True`` on success.
:raises reframe.core.exceptions.SanityError: if assertion fails.
'''
if item not in container:
error_msg = msg or '{0} is not in {1}'
raise SanityError(_format(error_msg, item, container))
return True
[docs]@deferrable
def assert_not_in(item, container, msg=None):
'''Assert that ``item`` is not in ``container``.
:arg msg: The error message to use if the assertion fails. You may use
``{0}`` ... ``{N}`` as placeholders for the function arguments.
:returns: ``True`` on success.
:raises reframe.core.exceptions.SanityError: if assertion fails.
'''
if item in container:
error_msg = msg or '{0} is in {1}'
raise SanityError(_format(error_msg, item, container))
return True
[docs]@deferrable
def assert_gt(a, b, msg=None):
'''Assert that ``a > b``.
:arg msg: The error message to use if the assertion fails. You may use
``{0}`` ... ``{N}`` as placeholders for the function arguments.
:returns: ``True`` on success.
:raises reframe.core.exceptions.SanityError: if assertion fails.
'''
if a <= b:
error_msg = msg or '{0} <= {1}'
raise SanityError(_format(error_msg, a, b))
return True
[docs]@deferrable
def assert_ge(a, b, msg=None):
'''Assert that ``a >= b``.
:arg msg: The error message to use if the assertion fails. You may use
``{0}`` ... ``{N}`` as placeholders for the function arguments.
:returns: ``True`` on success.
:raises reframe.core.exceptions.SanityError: if assertion fails.
'''
if a < b:
error_msg = msg or '{0} < {1}'
raise SanityError(_format(error_msg, a, b))
return True
[docs]@deferrable
def assert_lt(a, b, msg=None):
'''Assert that ``a < b``.
:arg msg: The error message to use if the assertion fails. You may use
``{0}`` ... ``{N}`` as placeholders for the function arguments.
:returns: ``True`` on success.
:raises reframe.core.exceptions.SanityError: if assertion fails.
'''
if a >= b:
error_msg = msg or '{0} >= {1}'
raise SanityError(_format(error_msg, a, b))
return True
[docs]@deferrable
def assert_le(a, b, msg=None):
'''Assert that ``a <= b``.
:arg msg: The error message to use if the assertion fails. You may use
``{0}`` ... ``{N}`` as placeholders for the function arguments.
:returns: ``True`` on success.
:raises reframe.core.exceptions.SanityError: if assertion fails.
'''
if a > b:
error_msg = msg or '{0} > {1}'
raise SanityError(_format(error_msg, a, b))
return True
[docs]@deferrable
def assert_found(patt, filename, msg=None, encoding='utf-8'):
'''Assert that regex pattern ``patt`` is found in the file ``filename``.
:arg patt: The regex pattern to search.
Any standard Python `regular expression
<https://docs.python.org/3/library/re.html#regular-expression-syntax>`_
is accepted.
The `re.MULTILINE
<https://docs.python.org/3/library/re.html#re.MULTILINE>`_ flag
is set for the pattern search.
:arg filename: The name of the file to examine or a file descriptor as in
:py:func:`open`. Any :class:`OSError` raised while processing the file
will be propagated as a :class:`reframe.core.exceptions.SanityError`.
:arg msg: The error message to use if the assertion fails. You may use
``{0}`` ... ``{N}`` as placeholders for the function arguments.
:arg encoding: The name of the encoding used to decode the file.
:returns: ``True`` on success.
:raises reframe.core.exceptions.SanityError: if assertion fails.
'''
with _open(filename, 'rt', encoding=encoding) as fp:
return assert_found_s(
patt, fp.read(),
msg or f'pattern {patt!r} not found in {filename!r}'
)
[docs]@deferrable
def assert_found_s(patt, string, msg=None):
'''Assert that regex pattern ``patt`` is found in the string ``string``.
:arg patt: as in :func:`assert_found`.
:arg string: The string to examine.
:arg msg: as in :func:`assert_found`. You may use
``{0}`` ... ``{N}`` as placeholders for the function arguments.
:returns: ``True`` on success.
:raises reframe.core.exceptions.SanityError: if assertion fails.
.. versionadded:: 3.4.1
'''
num_matches = count(finditer_s(patt, string))
try:
evaluate(assert_true(num_matches))
except SanityError:
error_msg = msg or "pattern `{0}' not found in given string"
raise SanityError(_format(error_msg, patt, string))
else:
return True
[docs]@deferrable
def assert_not_found(patt, filename, msg=None, encoding='utf-8'):
'''Assert that regex pattern ``patt`` is not found in the file
``filename``.
This is the inverse of :func:`assert_found()`.
:returns: ``True`` on success.
:raises reframe.core.exceptions.SanityError: if assertion fails.
'''
with _open(filename, 'rt', encoding=encoding) as fp:
return assert_not_found_s(
patt, fp.read(), msg or f'pattern {patt!r} found in {filename!r}'
)
[docs]@deferrable
def assert_not_found_s(patt, string, msg=None):
'''Assert that regex pattern ``patt`` is not found in ``string``.
This is the inverse of :func:`assert_found_s()`.
:returns: ``True`` on success.
:raises reframe.core.exceptions.SanityError: if assertion fails.
.. versionadded:: 3.4.1
'''
try:
evaluate(assert_found_s(patt, string, msg))
except SanityError:
return True
else:
error_msg = msg or "pattern `{0}' found in the given string"
raise SanityError(_format(error_msg, patt))
[docs]@deferrable
def assert_bounded(val, lower=None, upper=None, msg=None):
'''Assert that ``lower <= val <= upper``.
:arg val: The value to check.
:arg lower: The lower bound. If ``None``, it defaults to ``-inf``.
:arg upper: The upper bound. If ``None``, it defaults to ``inf``.
:arg msg: The error message to use if the assertion fails. You may use
``{0}`` ... ``{N}`` as placeholders for the function arguments.
:returns: ``True`` on success.
:raises reframe.core.exceptions.SanityError: if assertion fails.
'''
if lower is None:
lower = builtins.float('-inf')
if upper is None:
upper = builtins.float('inf')
if val >= lower and val <= upper:
return True
error_msg = msg or 'value {0} not within bounds {1}..{2}'
raise SanityError(_format(error_msg, val, lower, upper))
[docs]@deferrable
def assert_reference(val, ref, lower_thres=None, upper_thres=None, msg=None):
'''Assert that value ``val`` respects the reference value ``ref``.
:arg val: The value to check.
:arg ref: The reference value.
:arg lower_thres: The lower threshold value expressed as a negative decimal
fraction of the reference value. Must be in [-1, 0] for ref >= 0.0 and
in [-inf, 0] for ref < 0.0.
If ``None``, no lower thresholds is applied.
:arg upper_thres: The upper threshold value expressed as a decimal fraction
of the reference value. Must be in [0, inf] for ref >= 0.0 and
in [0, 1] for ref < 0.0.
If ``None``, no upper thresholds is applied.
:arg msg: The error message to use if the assertion fails. You may use
``{0}`` ... ``{N}`` as placeholders for the function arguments.
:returns: ``True`` on success.
:raises reframe.core.exceptions.SanityError: if assertion fails or if the
lower and upper thresholds do not have appropriate values.
'''
if lower_thres is not None:
lower_thres_limit = -1 if ref >= 0 else None
try:
evaluate(assert_bounded(lower_thres, lower_thres_limit, 0))
except SanityError:
raise SanityError(
f'invalid low threshold value: {lower_thres}'
) from None
if upper_thres is not None:
upper_thres_limit = None if ref >= 0 else 1
try:
evaluate(assert_bounded(upper_thres, 0, upper_thres_limit))
except SanityError:
raise SanityError(
f'invalid high threshold value: {upper_thres}'
) from None
def calc_bound(thres):
if thres is None:
return None
# Inverse threshold if ref < 0
if ref < 0:
thres = -thres
return ref*(1 + thres)
lower = calc_bound(lower_thres) or float('-inf')
upper = calc_bound(upper_thres) or float('inf')
try:
evaluate(assert_bounded(val, lower, upper))
except SanityError:
error_msg = msg or '{0} is beyond reference value {1} (l={2}, u={3})'
raise SanityError(_format(error_msg, val, ref, lower, upper)) from None
else:
return True
# Pattern matching functions
[docs]@deferrable
def finditer(patt, filename, encoding='utf-8'):
'''Get an iterator over the matches of the regex ``patt`` in ``filename``.
This function is equivalent to :func:`findall()` except that it returns
a generator object instead of a list, which you can use to iterate over
the raw matches.
'''
with _open(filename, 'rt', encoding=encoding) as fp:
yield from re.finditer(patt, fp.read(), re.MULTILINE)
[docs]@deferrable
def finditer_s(patt, string):
'''Get an iterator over the matches of the regex ``patt`` in ``string``.
This function is equivalent to :func:`findall_s()` except that it returns
a generator object instead of a list, which you can use to iterate over
the raw matches.
.. versionadded:: 3.4.1
'''
yield from re.finditer(patt, string, re.MULTILINE)
[docs]@deferrable
def findall(patt, filename, encoding='utf-8'):
'''Get all matches of regex ``patt`` in ``filename``.
:arg patt: The regex pattern to search.
Any standard Python `regular expression
<https://docs.python.org/3/library/re.html#regular-expression-syntax>`_
is accepted.
The `re.MULTILINE
<https://docs.python.org/3/library/re.html#re.MULTILINE>`_ flag
is set for the pattern search.
:arg filename: The name of the file to examine.
:arg encoding: The name of the encoding used to decode the file.
:returns: A list of raw `regex match objects
<https://docs.python.org/3/library/re.html#match-objects>`_.
:raises reframe.core.exceptions.SanityError: In case an :class:`OSError` is
raised while processing ``filename``.
'''
return list(evaluate(x) for x in finditer(patt, filename, encoding))
[docs]@deferrable
def findall_s(patt, string):
'''Get all matches of regex ``patt`` in ``string``.
:arg patt: as in :func:`findall`
:arg string: The string to examine.
:returns: same as :func:`finall`.
.. versionadded:: 3.4.1
'''
return list(evaluate(x) for x in finditer_s(patt, string))
def _callable_name(fn):
fn_name = '<unknown>'
try:
# Assume fn is standard function
fn_name = fn.__name__
except AttributeError:
try:
# Assume fn is callable object
fn_name = fn.__class__.__name__
except AttributeError:
pass
return fn_name
def _extractiter_singletag(patt, string, tag, conv):
if isinstance(conv, collections.abc.Iterable):
raise SanityError(f'multiple conversion functions given for the '
f'single capturing group {tag!r}')
for m in finditer_s(patt, string):
try:
val = m.group(tag)
except (IndexError, KeyError):
raise SanityError(f'no such group in pattern {patt!r}: {tag}')
try:
yield conv(val) if callable(conv) else val
except ValueError:
fn_name = _callable_name(conv)
raise SanityError(
f'could not convert value {val!r} using {fn_name}()'
)
def _extractiter_multitag(patt, string, tags, conv):
for m in finditer_s(patt, string):
val = []
for t in tags:
try:
val.append(m.group(t))
except (IndexError, KeyError):
raise SanityError(f'no such group in pattern {patt!r}: {t}')
converted_vals = []
if not isinstance(conv, collections.abc.Iterable):
conv = [conv] * builtins.len(val)
elif builtins.len(conv) > builtins.len(val):
conv = conv[:builtins.len(val)]
# Use the last function in case we have less conversion functions than
# tags
for v, c in itertools.zip_longest(val, conv, fillvalue=conv[-1]):
try:
converted_vals.append(c(v) if callable(c) else v)
except ValueError:
fn_name = _callable_name(conv)
raise SanityError(
f'could not convert value {v!r} using {fn_name}()'
)
yield tuple(converted_vals)
# Numeric functions
[docs]@deferrable
def avg(iterable):
'''Return the average of all the elements of ``iterable``.'''
# We walk over the iterable manually in case this is a generator
total = 0
num_vals = None
for num_vals, val in builtins.enumerate(iterable, start=1):
total += val
if num_vals is None:
raise SanityError('attempt to get average on an empty container')
return total / num_vals
# Other utility functions
[docs]@deferrable
def allx(iterable):
'''Same as the built-in :func:`all() <python:all>` function, except that it
returns :class:`False` if ``iterable`` is empty.
.. versionadded:: 2.13
'''
return util.allx(iterable)
[docs]@deferrable
def defer(x):
'''Defer the evaluation of variable ``x``.
.. versionadded:: 2.21
'''
return x
[docs]def evaluate(expr, cache=False):
'''Evaluate a deferred expression.
If ``expr`` is not a deferred expression, it will be returned as is.
If ``expr`` is a deferred expression and ``cache`` is ``True``, the
results of the deferred expression will be cached and subsequent calls
to :func:`evaluate` on this deferred expression (when ``cache=False``)
will simply return the previously cached result.
:param expr: The expression to be evaluated.
:param cache: Cache the result of this evaluation.
.. note::
When the ``cache`` argument is passed as ``True``, a deferred
expression will always be evaluated and its results will be re-cached.
This may replace any other results that may have been cached in
previous evaluations.
.. versionadded:: 2.21
.. versionchanged:: 3.8.0
The ``cache`` argument is added.
'''
if isinstance(expr, _DeferredExpression):
return expr.evaluate(cache=cache)
else:
return expr
[docs]@deferrable
def getitem(container, item):
'''Get ``item`` from ``container``.
``container`` may refer to any container that can be indexed.
:raises reframe.core.exceptions.SanityError: In case ``item`` cannot be
retrieved from ``container``.
'''
try:
return container[item]
except KeyError:
raise SanityError(f'key not found: {item}')
except IndexError:
raise SanityError(f'index out of bounds: {item}')
[docs]@deferrable
def count(iterable):
'''Return the element count of ``iterable``.
This is similar to the built-in :func:`len() <python:len>`, except that it
can also handle any argument that supports iteration, including
generators.
'''
try:
return builtins.len(iterable)
except TypeError:
# Try to determine length by iterating over the iterable
ret = 0
for ret, _ in builtins.enumerate(iterable, start=1):
pass
return ret
[docs]@deferrable
def count_uniq(iterable):
'''Return the unique element count of ``iterable``.'''
return builtins.len(builtins.set(iterable))
[docs]@deferrable
def glob(pathname, *, recursive=False):
'''Replacement for the :func:`glob.glob() <python:glob.glob>` function.'''
return pyglob.glob(pathname, recursive=recursive)
[docs]@deferrable
def iglob(pathname, recursive=False):
'''Replacement for the :func:`glob.iglob() <python:glob.iglob>`
function.'''
return pyglob.iglob(pathname, recursive=recursive)
[docs]@deferrable
def path_exists(path):
'''Replacement for the :func:`os.path.exists` function.
.. versionadded:: 3.4
'''
return os.path.exists(path)
[docs]@deferrable
def path_isdir(path):
'''Replacement for the :func:`os.path.isdir` function.
.. versionadded:: 3.4
'''
return os.path.isdir(path)
[docs]@deferrable
def path_isfile(path):
'''Replacement for the :func:`os.path.isfile` function.
.. versionadded:: 3.4
'''
return os.path.isfile(path)
[docs]@deferrable
def path_islink(path):
'''Replacement for the :func:`os.path.islink` function.
.. versionadded:: 3.4
'''
return os.path.islink(path)