"""Module that allows submitting lanes to spark using YAML definitions"""
import argparse
import logging
import os
import re
import shutil
import sys
import tempfile
from subprocess import call, STDOUT
SPARK_SUBMIT_FLAGS = ['verbose', 'supervised']
MY_ENV = os.environ.copy()
[docs]def submit_to_spark():
"""Console-script entry point"""
_package_and_submit(sys.argv[1:])
def _package_and_submit(args):
"""
Packages and submits a job, which is defined in a YAML file, to Spark.
Parameters
----------
args (List): Command-line arguments
"""
args = _parse_and_validate_args(args)
logging.debug(args)
dist = __make_tmp_dir()
try:
__package_dependencies(dist_dir=dist, additional_reqs=args['requirements'],
silent=args['silent'])
__package_app(tasks_pkg=args['package'],
dist_dir=dist,
custom_main=args['main'],
extra_data=args['extra_data'])
__run_spark_submit(lane_yaml=args['yaml'],
dist_dir=dist,
spark_home=args['spark_home'],
spark_args=args['spark_args'],
silent=args['silent'])
except Exception as exc:
__clean_up(dist)
raise exc
__clean_up(dist)
def _parse_and_validate_args(args):
"""
Parse and validate arguments. During validation, it is checked whether the given
files/directories exist, while also converting relative paths to absolute ones.
Parameters
----------
args (List): Command-line arguments
"""
class ExtendAction(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
if getattr(namespace, self.dest, None) is None:
setattr(namespace, self.dest, [])
getattr(namespace, self.dest).extend(values)
parser = argparse.ArgumentParser(description='Submitting a lane to spark.')
parser.add_argument('-y', '--yaml', type=str, required=True,
help='Path to the yaml definition file.')
parser.add_argument('-p', '--package', type=str, required=True,
help='Path to the python package containing your tasks.')
parser.add_argument('-r', '--requirements', type=str, required=False,
help='Path to a `requirements.txt` specifying any additional dependencies '
'of your tasks.')
parser.add_argument('-e', '--extra-data', nargs='*', required=False, action=ExtendAction,
help='Path to any additional files or directories that should be packaged '
'and sent to Spark.')
parser.add_argument('-m', '--main', type=str, required=False,
help='Path to a custom main python file')
parser.add_argument('-d', '--spark-home', type=str, required=False,
help='Custom path to the directory containing your Spark installation. If '
'none is given, sparklanes will try to use the `spark-submit` command '
'from your PATH')
parser.add_argument('-s', '--spark-args', nargs='*', required=False,
help='Any additional arguments that should be sent to Spark via '
'spark-submit. '
'(e.g. `--spark-args executor-memory=20G total-executor-cores=100`)')
parser.add_argument('--silent', help='If set, no output will be sent to console',
action='store_true')
args = parser.parse_args(args).__dict__
# Check/fix files/dirs
for param in ('package', 'spark_home'):
args[param] = __validate_and_fix_path(args[param], check_dir=True)
for param in ('yaml', 'requirements', 'main'):
args[param] = __validate_and_fix_path(args[param], check_file=True)
if args['extra_data']:
for i in range(len(args['extra_data'])):
args['extra_data'][i] = __validate_and_fix_path(args['extra_data'][i],
check_file=True, check_dir=True)
# Check if python package
if not os.path.isfile(os.path.join(args['package'], '__init__.py')):
raise SystemExit('Could not confirm `%s` is a python package. Make sure it contains an '
'`__init__.py`.')
# Check/fix spark args
if args['spark_args']:
args['spark_args'] = __validate_and_fix_spark_args(args['spark_args'])
return args
def __validate_and_fix_path(path, check_file=False, check_dir=False):
"""Check if a file/directory exists and converts relative paths to absolute ones"""
# pylint: disable=superfluous-parens
if path is None:
return path
else:
if not (os.path.isfile(path) if check_file else False) \
and not (os.path.isdir(path) if check_dir else False):
raise SystemExit('Path `%s` does not exist' % path)
if not os.path.isabs(path):
path = os.path.abspath(os.path.join(os.path.abspath(os.curdir), path))
return path
def __validate_and_fix_spark_args(spark_args):
"""
Prepares spark arguments. In the command-line script, they are passed as for example
`-s master=local[4] deploy-mode=client verbose`, which would be passed to spark-submit as
`--master local[4] --deploy-mode client --verbose`
Parameters
----------
spark_args (List): List of spark arguments
Returns
-------
fixed_args (List): List of fixed and validated spark arguments
"""
pattern = re.compile(r'[\w\-_]+=.+')
fixed_args = []
for arg in spark_args:
if arg not in SPARK_SUBMIT_FLAGS:
if not pattern.match(arg):
raise SystemExit('Spark argument `%s` does not seem to be in the correct format '
'`ARG_NAME=ARG_VAL`, and is also not recognized to be one of the'
'valid spark-submit flags (%s).' % (arg, str(SPARK_SUBMIT_FLAGS)))
eq_pos = arg.find('=')
fixed_args.append('--' + arg[:eq_pos])
fixed_args.append(arg[eq_pos + 1:])
else:
fixed_args.append('--' + arg)
return fixed_args
def __make_tmp_dir():
"""
Create a temporary directory where the packaged files will be located
Returns
-------
tmp_dir (str): Absolute path to temporary directory
"""
tmp_dir = tempfile.mkdtemp()
logging.debug('Created temporary dir: `%s`', tmp_dir)
return tmp_dir
def __package_dependencies(dist_dir, additional_reqs, silent):
"""
Installs the app's dependencies from pip and packages them (as zip), to be submitted to spark.
Parameters
----------
dist_dir (str): Path to directory where the packaged libs shall be located
additional_reqs (str): Path to a requirements.txt, containing any of the app's additional
requirements
silent (bool): Flag indicating whether pip output should be printed to console
"""
logging.info('Packaging dependencies')
libs_dir = os.path.join(dist_dir, 'libs')
if not os.path.isdir(libs_dir):
os.mkdir(libs_dir)
# Get requirements
req_txt = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'requirements-submit.txt')
with open(req_txt, 'r') as req:
requirements = req.read().splitlines()
if additional_reqs:
with open(additional_reqs, 'r') as req:
for row in req:
requirements.append(row)
# Remove duplicates
requirements = list(set(requirements))
# Install
devnull = open(os.devnull, 'w')
outp = {'stderr': STDOUT, 'stdout': devnull} if silent else {}
for pkg in requirements:
cmd = ['pip', 'install', pkg, '-t', libs_dir]
logging.debug('Calling `%s`', str(cmd))
call(cmd, **outp)
devnull.close()
# Package
shutil.make_archive(libs_dir, 'zip', libs_dir, './')
def __package_app(tasks_pkg, dist_dir, custom_main=None, extra_data=None):
"""
Packages the `tasks_pkg` (as zip) to `dist_dir`. Also copies the 'main' python file to
`dist_dir`, to be submitted to spark. Same for `extra_data`.
Parameters
----------
tasks_pkg (str): Path to the python package containing tasks
dist_dir (str): Path to the directory where the packaged code should be stored
custom_main (str): Path to a custom 'main' python file.
extra_data (List[str]): List containing paths to files/directories that should also be packaged
and submitted to spark
"""
logging.info('Packaging application')
# Package tasks
tasks_dir_splits = os.path.split(os.path.realpath(tasks_pkg))
shutil.make_archive(os.path.join(dist_dir, 'tasks'),
'zip',
tasks_dir_splits[0],
tasks_dir_splits[1])
# Package main.py
if custom_main is None:
from . import _main
main_path = _main.__file__
if main_path[-3:] == 'pyc':
main_path = main_path[:-1]
shutil.copy(os.path.realpath(main_path),
os.path.join(dist_dir, 'main.py'))
else:
shutil.copy(os.path.realpath(custom_main),
os.path.join(dist_dir, 'main.py'))
# Package _framework
shutil.make_archive(os.path.join(dist_dir, '_framework'),
'zip',
os.path.join(os.path.dirname(os.path.realpath(__file__)), '..', '..'),
'./sparklanes/')
# Package extra data
if extra_data:
for dat in extra_data:
real_path = os.path.realpath(dat)
target = os.path.join(dist_dir, os.path.split(real_path)[1])
if os.path.isfile(real_path):
shutil.copy(real_path, target)
elif os.path.isdir(real_path):
shutil.copytree(real_path, target)
else:
raise IOError('File `%s` not found at `%s`.' % (dat, real_path))
def __run_spark_submit(lane_yaml, dist_dir, spark_home, spark_args, silent):
"""
Submits the packaged application to spark using a `spark-submit` subprocess
Parameters
----------
lane_yaml (str): Path to the YAML lane definition file
dist_dir (str): Path to the directory where the packaged code is located
spark_args (str): String of any additional spark config args to be passed when submitting
silent (bool): Flag indicating whether job output should be printed to console
"""
# spark-submit binary
cmd = ['spark-submit' if spark_home is None else os.path.join(spark_home, 'bin/spark-submit')]
# Supplied spark arguments
if spark_args:
cmd += spark_args
# Packaged App & lane
cmd += ['--py-files', 'libs.zip,_framework.zip,tasks.zip', 'main.py']
cmd += ['--lane', lane_yaml]
logging.info('Submitting to Spark')
logging.debug(str(cmd))
# Submit
devnull = open(os.devnull, 'w')
outp = {'stderr': STDOUT, 'stdout': devnull} if silent else {}
call(cmd, cwd=dist_dir, env=MY_ENV, **outp)
devnull.close()
def __clean_up(dist_dir):
"""Delete packaged app"""
shutil.rmtree(dist_dir)