Source code for hpctestlib.data_analytics.spark.spark_checks

# Copyright 2016-2022 Swiss National Supercomputing Centre (CSCS/ETH Zurich)
# ReFrame Project Developers. See the top-level LICENSE file for details.
# SPDX-License-Identifier: BSD-3-Clause

import math

import reframe as rfm
import reframe.utility.sanity as sn

from reframe.core.backends import getlauncher

[docs]@rfm.simple_test class compute_pi_check(rfm.RunOnlyRegressionTest, pin_prefix=True): '''Test Apache Spark by computing PI. Apache Spark is a unified analytics engine for large-scale data processing. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Structured Streaming for incremental computation and stream processing (see This test checks that Spark is functioning correctly. To do this, it is necessary to define the tolerance of acceptable deviation. The tolerance is used to check that the computations are executed correctly, by comparing the value of pi calculated to the one obtained from the math library. The default assumption is that Spark is already installed on the system under test. ''' #: Parameter encoding the variant of the test. #: #: :type: :class:`str` #: :values: ``['spark', 'pyspark']`` variant = parameter(['spark', 'pyspark']) #: The absolute tolerance of the computed value of PI #: #: :type: :class:`float` #: :required: No #: :default: `0.01` tolerance = variable(float, value=0.01) #: The Spark installation prefix path #: #: :type: :class:`str` #: :required: Yes spark_prefix = variable(str) #: The local directories used by Spark #: #: :type: :class:`str` #: :required: No #: :default: `'/tmp'` spark_local_dirs = variable(str, value='/tmp') #: Amount of memory to use per executor process, following the JVM memory #: strings convention, i.e a number with a size unit suffix #: ("k", "m", "g" or "t") (e.g. 512m, 2g) #: #: :type: :class:`str` #: :required: Yes executor_memory = variable(str) #: The number of Spark workers per node #: #: :type: :class:`int` #: :required: No #: :default: `1` num_workers = variable(int, value=1) #: The number of cores per each Spark executor #: #: :type: :class:`int` #: :required: No #: :default: `1` exec_cores = variable(int, value=1) num_tasks = 3 num_tasks_per_node = 1 prerun_cmds = [''] postrun_cmds = [''] executable = 'spark-submit' executable_opts = required tags = {'data-science', 'big-data'} @run_after('init') def set_description(self): self.mydescr = f'Simple calculation of pi with {self.variant}' @run_before('run') def set_job_launcher(self): # The job launcher has to be changed since the `spark-submit` # script is not used with srun. self.job.launcher = getlauncher('local')() @run_before('run') def prepare_run(self): self.variables = { 'SPARK_WORKER_CORES': str(self.num_workers), 'SPARK_LOCAL_DIRS': self.spark_local_dirs, } self.executable_opts = [ f'--conf spark.default.parallelism={self.num_workers}', f'--conf spark.executor.cores={self.exec_cores}', f'--conf spark.executor.memory={self.executor_memory}', f'--master $SPARKURL' ] if self.variant == 'spark': self.executable_opts += [ f'--class org.apache.spark.examples.SparkPi', f'{self.spark_prefix}/examples/jars/spark-examples*.jar 10000' ] elif self.variant == 'pyspark': self.executable_opts += ['']
[docs] @sanity_function def assert_pi_readout(self): '''Assert that the obtained pi value meets the specified tolerances.''' pi_value = sn.extractsingle(r'Pi is roughly\s+(?P<pi>\S+)', self.stdout, 'pi', float) return sn.assert_lt(sn.abs(pi_value - math.pi), self.tolerance)