"""Module that allows submitting lanes to spark using YAML definitions"""
import argparse
import logging
import os
import re
import shutil
import sys
import tempfile
from subprocess import call, STDOUT

SPARK_SUBMIT_FLAGS = ['verbose', 'supervised']
MY_ENV = os.environ.copy()

[docs]def submit_to_spark(): """Console-script entry point""" _package_and_submit(sys.argv[1:])
def _package_and_submit(args): """ Packages and submits a job, which is defined in a YAML file, to Spark. Parameters ---------- args (List): Command-line arguments """ args = _parse_and_validate_args(args) logging.debug(args) dist = __make_tmp_dir() try: __package_dependencies(dist_dir=dist, additional_reqs=args['requirements'], silent=args['silent']) __package_app(tasks_pkg=args['package'], dist_dir=dist, custom_main=args['main'], extra_data=args['extra_data']) __run_spark_submit(lane_yaml=args['yaml'], dist_dir=dist, spark_home=args['spark_home'], spark_args=args['spark_args'], silent=args['silent']) except Exception as exc: __clean_up(dist) raise exc __clean_up(dist) def _parse_and_validate_args(args): """ Parse and validate arguments. During validation, it is checked whether the given files/directories exist, while also converting relative paths to absolute ones. Parameters ---------- args (List): Command-line arguments """ class ExtendAction(argparse.Action): def __call__(self, parser, namespace, values, option_string=None): if getattr(namespace, self.dest, None) is None: setattr(namespace, self.dest, []) getattr(namespace, self.dest).extend(values) parser = argparse.ArgumentParser(description='Submitting a lane to spark.') parser.add_argument('-y', '--yaml', type=str, required=True, help='Path to the yaml definition file.') parser.add_argument('-p', '--package', type=str, required=True, help='Path to the python package containing your tasks.') parser.add_argument('-r', '--requirements', type=str, required=False, help='Path to a `requirements.txt` specifying any additional dependencies ' 'of your tasks.') parser.add_argument('-e', '--extra-data', nargs='*', required=False, action=ExtendAction, help='Path to any additional files or directories that should be packaged ' 'and sent to Spark.') parser.add_argument('-m', '--main', type=str, required=False, help='Path to a custom main python file') parser.add_argument('-d', '--spark-home', type=str, required=False, help='Custom path to the directory containing your Spark installation. If ' 'none is given, sparklanes will try to use the `spark-submit` command ' 'from your PATH') parser.add_argument('-s', '--spark-args', nargs='*', required=False, help='Any additional arguments that should be sent to Spark via ' 'spark-submit. ' '(e.g. `--spark-args executor-memory=20G total-executor-cores=100`)') parser.add_argument('--silent', help='If set, no output will be sent to console', action='store_true') args = parser.parse_args(args).__dict__ # Check/fix files/dirs for param in ('package', 'spark_home'): args[param] = __validate_and_fix_path(args[param], check_dir=True) for param in ('yaml', 'requirements', 'main'): args[param] = __validate_and_fix_path(args[param], check_file=True) if args['extra_data']: for i in range(len(args['extra_data'])): args['extra_data'][i] = __validate_and_fix_path(args['extra_data'][i], check_file=True, check_dir=True) # Check if python package if not os.path.isfile(os.path.join(args['package'], '')): raise SystemExit('Could not confirm `%s` is a python package. Make sure it contains an ' '``.') # Check/fix spark args if args['spark_args']: args['spark_args'] = __validate_and_fix_spark_args(args['spark_args']) return args def __validate_and_fix_path(path, check_file=False, check_dir=False): """Check if a file/directory exists and converts relative paths to absolute ones""" # pylint: disable=superfluous-parens if path is None: return path else: if not (os.path.isfile(path) if check_file else False) \ and not (os.path.isdir(path) if check_dir else False): raise SystemExit('Path `%s` does not exist' % path) if not os.path.isabs(path): path = os.path.abspath(os.path.join(os.path.abspath(os.curdir), path)) return path def __validate_and_fix_spark_args(spark_args): """ Prepares spark arguments. In the command-line script, they are passed as for example `-s master=local[4] deploy-mode=client verbose`, which would be passed to spark-submit as `--master local[4] --deploy-mode client --verbose` Parameters ---------- spark_args (List): List of spark arguments Returns ------- fixed_args (List): List of fixed and validated spark arguments """ pattern = re.compile(r'[\w\-_]+=.+') fixed_args = [] for arg in spark_args: if arg not in SPARK_SUBMIT_FLAGS: if not pattern.match(arg): raise SystemExit('Spark argument `%s` does not seem to be in the correct format ' '`ARG_NAME=ARG_VAL`, and is also not recognized to be one of the' 'valid spark-submit flags (%s).' % (arg, str(SPARK_SUBMIT_FLAGS))) eq_pos = arg.find('=') fixed_args.append('--' + arg[:eq_pos]) fixed_args.append(arg[eq_pos + 1:]) else: fixed_args.append('--' + arg) return fixed_args def __make_tmp_dir(): """ Create a temporary directory where the packaged files will be located Returns ------- tmp_dir (str): Absolute path to temporary directory """ tmp_dir = tempfile.mkdtemp() logging.debug('Created temporary dir: `%s`', tmp_dir) return tmp_dir def __package_dependencies(dist_dir, additional_reqs, silent): """ Installs the app's dependencies from pip and packages them (as zip), to be submitted to spark. Parameters ---------- dist_dir (str): Path to directory where the packaged libs shall be located additional_reqs (str): Path to a requirements.txt, containing any of the app's additional requirements silent (bool): Flag indicating whether pip output should be printed to console """'Packaging dependencies') libs_dir = os.path.join(dist_dir, 'libs') if not os.path.isdir(libs_dir): os.mkdir(libs_dir) # Get requirements req_txt = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'requirements-submit.txt') with open(req_txt, 'r') as req: requirements = if additional_reqs: with open(additional_reqs, 'r') as req: for row in req: requirements.append(row) # Remove duplicates requirements = list(set(requirements)) # Install devnull = open(os.devnull, 'w') outp = {'stderr': STDOUT, 'stdout': devnull} if silent else {} for pkg in requirements: cmd = ['pip', 'install', pkg, '-t', libs_dir] logging.debug('Calling `%s`', str(cmd)) call(cmd, **outp) devnull.close() # Package shutil.make_archive(libs_dir, 'zip', libs_dir, './') def __package_app(tasks_pkg, dist_dir, custom_main=None, extra_data=None): """ Packages the `tasks_pkg` (as zip) to `dist_dir`. Also copies the 'main' python file to `dist_dir`, to be submitted to spark. Same for `extra_data`. Parameters ---------- tasks_pkg (str): Path to the python package containing tasks dist_dir (str): Path to the directory where the packaged code should be stored custom_main (str): Path to a custom 'main' python file. extra_data (List[str]): List containing paths to files/directories that should also be packaged and submitted to spark """'Packaging application') # Package tasks tasks_dir_splits = os.path.split(os.path.realpath(tasks_pkg)) shutil.make_archive(os.path.join(dist_dir, 'tasks'), 'zip', tasks_dir_splits[0], tasks_dir_splits[1]) # Package if custom_main is None: from . import _main main_path = _main.__file__ if main_path[-3:] == 'pyc': main_path = main_path[:-1] shutil.copy(os.path.realpath(main_path), os.path.join(dist_dir, '')) else: shutil.copy(os.path.realpath(custom_main), os.path.join(dist_dir, '')) # Package _framework shutil.make_archive(os.path.join(dist_dir, '_framework'), 'zip', os.path.join(os.path.dirname(os.path.realpath(__file__)), '..', '..'), './sparklanes/') # Package extra data if extra_data: for dat in extra_data: real_path = os.path.realpath(dat) target = os.path.join(dist_dir, os.path.split(real_path)[1]) if os.path.isfile(real_path): shutil.copy(real_path, target) elif os.path.isdir(real_path): shutil.copytree(real_path, target) else: raise IOError('File `%s` not found at `%s`.' % (dat, real_path)) def __run_spark_submit(lane_yaml, dist_dir, spark_home, spark_args, silent): """ Submits the packaged application to spark using a `spark-submit` subprocess Parameters ---------- lane_yaml (str): Path to the YAML lane definition file dist_dir (str): Path to the directory where the packaged code is located spark_args (str): String of any additional spark config args to be passed when submitting silent (bool): Flag indicating whether job output should be printed to console """ # spark-submit binary cmd = ['spark-submit' if spark_home is None else os.path.join(spark_home, 'bin/spark-submit')] # Supplied spark arguments if spark_args: cmd += spark_args # Packaged App & lane cmd += ['--py-files', ',,', ''] cmd += ['--lane', lane_yaml]'Submitting to Spark') logging.debug(str(cmd)) # Submit devnull = open(os.devnull, 'w') outp = {'stderr': STDOUT, 'stdout': devnull} if silent else {} call(cmd, cwd=dist_dir, env=MY_ENV, **outp) devnull.close() def __clean_up(dist_dir): """Delete packaged app""" shutil.rmtree(dist_dir)