Submitting lanes to Spark

Console script

sparklanes comes with a console script to package and submit a YAML lane to Spark:

usage: lane-submit [-h] -y YAML -p PACKAGE [-r REQUIREMENTS]
                   [-e [EXTRA_DATA [EXTRA_DATA ...]]] [-m MAIN]
                   [-d SPARK_HOME] [-s [SPARK_ARGS [SPARK_ARGS ...]]]
                   [--silent]

Submitting a lane to spark.

optional arguments:
  -h, --help            show this help message and exit
  -y YAML, --yaml YAML  Path to the yaml definition file.
  -p PACKAGE, --package PACKAGE
                        Path to the python package containing your tasks.
  -r REQUIREMENTS, --requirements REQUIREMENTS
                        Path to a `requirements.txt` specifying any additional
                        dependencies of your tasks.
  -e [EXTRA_DATA [EXTRA_DATA ...]], --extra-data [EXTRA_DATA [EXTRA_DATA ...]]
                        Path to any additional files or directories that
                        should be packaged and sent to Spark.
  -m MAIN, --main MAIN  Path to a custom main python file
  -d SPARK_HOME, --spark-home SPARK_HOME
                        Custom path to the directory containing your Spark
                        installation. If none is given, sparklanes will try to
                        use the `spark-submit` command from your PATH
  -s [SPARK_ARGS [SPARK_ARGS ...]], --spark-args [SPARK_ARGS [SPARK_ARGS ...]]
                        Any additional arguments that should be sent to Spark
                        via spark-submit. (e.g. `--spark-args executor-
                        memory=20G total-executor-cores=100`)
  --silent              If set, no output will be sent to console

Packaging

When submitting a YAML lane configuration file to spark, the python package containing the tasks (i.e. the data processors) has to be specified. While there is no strict requirement anymore for python packages to have a __init__.py for Python version 3.4+, it remains a requirement here.

For example, if a YAML file contains tasks like:

tasks:
- class: tasks.extract.LoadFromS3
- class: tasks.extract.LoadFromMySQL
- class: tasks.transform.NormalizeColumns
- class: tasks.transform.EngineerFeatures
- class: tasks.load.DumpToFTP

Then the directory structure of the python package specified using -p should look something like this:

tasks/
  __init__.py
  extract.py  # Contains LoadFromS3 and LoadFromMySQL classes
  transform.py  # Contains NormalizeColumns and EngineerFeatures classes
  load.py  # Contains DumpToFTP class

Extra Data

If any additional data needs to be accessible locally from within the spark cluster, they can be specified using -e/--extra-data. Both files and directories are supported, and they will be accessible relative to the application’s root directory.

For example, a single file, as in -e example.csv, will be made accessible from spark at ./example.csv, regardless from the original directory structure. If a directory is specified, e.g. -e extra/data, that folder will be accessible from spark at ./data.

Spark Configuration

Any flags and configuration arguments accepted by spark-submit can also be used using lane-submit.

For example, spark-submit configuration arguments could look like:

spark-submit --properties-file ./spark.conf --executor-memory 20G --supervise [...]

Then those same arguments could be passed using lane-submit like:

lane-submit -s properties-file=./spark.conf executor-memory=20G supervise [...]

Custom main

The default main python file is a simple script loading and executing the lane:

"""A simple 'main' file, that will be submitted to spark. It will execute the lane as defined in
the YAML lane definition file."""
# pylint: disable=missing-docstring
from argparse import ArgumentParser

from sparklanes import build_lane_from_yaml


def main():
    args = parse_args()
    build_lane_from_yaml(args['lane']).run()


def parse_args():
    parser = ArgumentParser()
    parser.add_argument('-l', '--lane',
                        help='Relative or absolute path to the lane definition YAML file',
                        type=str,
                        required=True)

    return parser.parse_args().__dict__


if __name__ == '__main__':
    main()

If this is not sufficient, the script can be extended and the new python script specified as a custom main file as in -m new_main.py.