Example: Simple ETL lane

Note: it is recommended to took a look at the `official documentation <https://sparklanes.readthedocs.io>`__ first.

In this example, we’ll build a simple ETL pipeline using sparklanes and the popular iris dataset. First, we’ll load the dataset from CSV, before applying some transformations on it, to then finally dump it as JSON to disk.

Let’s start by importing all the libs we need:

[6]:
from pyspark.sql.functions import monotonically_increasing_id

from sparklanes import Task, Lane, conn

And write our data processors, or Tasks next:

[6]:
@Task('extract_data')
class ExtractIrisCSVData(object):
    """Load the iris data set from a CSV file"""
    def __init__(self, iris_csv_path):
        self.iris_csv_path = iris_csv_path

    def extract_data(self):
        # Read the csv
        iris_df = conn.spark.read.csv(path=self.iris_csv_path,
                                      sep=',',
                                      header=True,
                                      inferSchema=True)

        # Make it available to tasks that follow
        self.cache('iris_df', iris_df)


@Task('add_index')
class AddRowIndex(object):
    """Add a index to each row in the data set"""
    def add_index(self):
        # Add id column
        self.iris_df = self.iris_df.withColumn('id', monotonically_increasing_id())

        # Update cache
        self.cache('iris_df', self.iris_df)


@Task('normalize')
class NormalizeColumns(object):
    """Normalize all numerical columns"""
    def normalize(self):
        # Add normalized columns
        columns = self.iris_df.columns
        columns.remove('species')
        for col in columns:
            col_min = float(self.iris_df.agg({col: "min"}).collect()[0]['min(%s)' % col])
            col_max = float(self.iris_df.agg({col: "max"}).collect()[0]['max(%s)' % col])
            self.iris_df = self.iris_df.withColumn(
                col + '_norm', (self.iris_df[col] - col_min) / (col_max - col_min)
            )

        # Update Cache
        self.cache('iris_df', self.iris_df)


@Task('write_to_json')
class SaveAsJSON(object):
    """Dump the data set as JSON to disk"""
    def __init__(self, output_folder):
        self.output_folder = output_folder

    def write_to_json(self):
        self.iris_df.write.format('json').save(self.output_folder)

        # Clear cache
        self.uncache('iris_df')

Note how in the extractor class, we cache our DataFrame using self.cache, and as a result make it an attribute to all tasks that follow.

With our Tasks being defined, we can now build the lane:

[11]:
lane = (Lane(name='IrisExampleLane', run_parallel=False)
        .add(ExtractIrisCSVData, iris_csv_path='data/iris.csv')
        .add(AddRowIndex)
        .add(NormalizeColumns)
        .add(SaveAsJSON, 'out'))

And run it:

[12]:
lane.run()
2018-06-07 11:52:01,466 - SPARKLANES - INFO -
--------------------------------------------------------------------------------
Executing `IrisExampleLane`
--------------------------------------------------------------------------------

2018-06-07 11:52:01,468 - SPARKLANES - INFO -
================================================================================
        IrisExampleLane
         >Task_ExtractIrisCSVData
         >Task_AddRowIndex
         >Task_NormalizeColumns
         >Task_SaveAsJSON
================================================================================
2018-06-07 11:52:01,469 - SPARKLANES - INFO -
--------------------------------------------------------------------------------
Executing task `ExtractIrisCSVData.extract_data`
--------------------------------------------------------------------------------
2018-06-07 11:52:01,668 - SPARKLANES - INFO -
--------------------------------------------------------------------------------
Finished executing task `ExtractIrisCSVData.extract_data`. Execution time: 0:00:00.198397
--------------------------------------------------------------------------------
2018-06-07 11:52:01,669 - SPARKLANES - INFO -
--------------------------------------------------------------------------------
Executing task `AddRowIndex.add_index`
--------------------------------------------------------------------------------
2018-06-07 11:52:01,675 - SPARKLANES - INFO -
--------------------------------------------------------------------------------
Finished executing task `AddRowIndex.add_index`. Execution time: 0:00:00.004853
--------------------------------------------------------------------------------
2018-06-07 11:52:01,676 - SPARKLANES - INFO -
--------------------------------------------------------------------------------
Executing task `NormalizeColumns.normalize`
--------------------------------------------------------------------------------
2018-06-07 11:52:02,848 - SPARKLANES - INFO -
--------------------------------------------------------------------------------
Finished executing task `NormalizeColumns.normalize`. Execution time: 0:00:01.170293
--------------------------------------------------------------------------------
2018-06-07 11:52:02,849 - SPARKLANES - INFO -
--------------------------------------------------------------------------------
Executing task `SaveAsJSON.write_to_json`
--------------------------------------------------------------------------------
2018-06-07 11:52:03,065 - SPARKLANES - INFO -
--------------------------------------------------------------------------------
Finished executing task `SaveAsJSON.write_to_json`. Execution time: 0:00:00.215517
--------------------------------------------------------------------------------
2018-06-07 11:52:03,066 - SPARKLANES - INFO -
--------------------------------------------------------------------------------
Finished executing `IrisExampleLane`
--------------------------------------------------------------------------------
[12]:
<sparklanes._framework.lane.Lane at 0x1034bf5f8>

That works, but what makes sparklanes more useful, is the capability of defining processing lanes using YAML configuration files, to then submit these lanes to a spark cluster.

We can define the same lane as above like:

lane:
  name: IrisExampleLane
  run_parallel: false
  tasks:
    - class: tasks.iris.ExtractIrisCSVData
      kwargs:
        iris_csv_path: data/iris.csv
    - class: tasks.iris.AddRowIndex
    - class: tasks.iris.NormalizeColumns
    - class: tasks.iris.SaveAsJSON
      args:
        - out

With the file being saved as iris.yml, our directory structure looks like this:

data/
  iris.csv
tasks/
  __init__.py  # Required to be recognized as a python package
  iris.py  # Contains our processor classes (Tasks)
iris.yml

So to run our pipeline, we can submit it to spark using the lane-submit command line script:

[10]:
%%bash
lane-submit -y iris.yml -p tasks -e data -s master=local[2]
2018-06-07 11:50:09,219 - SPARKLANES - INFO -
--------------------------------------------------------------------------------
Executing `IrisExampleLane`
--------------------------------------------------------------------------------

2018-06-07 11:50:09,219 - SPARKLANES - INFO -
================================================================================
        IrisExampleLane
         >Task_ExtractIrisCSVData
         >Task_AddRowIndex
         >Task_NormalizeColumns
         >Task_SaveAsJSON
================================================================================
2018-06-07 11:50:09,219 - SPARKLANES - INFO -
--------------------------------------------------------------------------------
Executing task `ExtractIrisCSVData.extract_data`
--------------------------------------------------------------------------------
2018-06-07 11:50:13,784 - SPARKLANES - INFO -
--------------------------------------------------------------------------------
Finished executing task `ExtractIrisCSVData.extract_data`. Execution time: 0:00:04.564362
--------------------------------------------------------------------------------
2018-06-07 11:50:13,784 - SPARKLANES - INFO -
--------------------------------------------------------------------------------
Executing task `AddRowIndex.add_index`
--------------------------------------------------------------------------------
2018-06-07 11:50:13,818 - SPARKLANES - INFO -
--------------------------------------------------------------------------------
Finished executing task `AddRowIndex.add_index`. Execution time: 0:00:00.033710
--------------------------------------------------------------------------------
2018-06-07 11:50:13,818 - SPARKLANES - INFO -
--------------------------------------------------------------------------------
Executing task `NormalizeColumns.normalize`
--------------------------------------------------------------------------------
2018-06-07 11:50:16,216 - SPARKLANES - INFO -
--------------------------------------------------------------------------------
Finished executing task `NormalizeColumns.normalize`. Execution time: 0:00:02.398010
--------------------------------------------------------------------------------
2018-06-07 11:50:16,216 - SPARKLANES - INFO -
--------------------------------------------------------------------------------
Executing task `SaveAsJSON.write_to_json`
--------------------------------------------------------------------------------
2018-06-07 11:50:16,833 - SPARKLANES - INFO -
--------------------------------------------------------------------------------
Finished executing task `SaveAsJSON.write_to_json`. Execution time: 0:00:00.616633
--------------------------------------------------------------------------------
2018-06-07 11:50:16,833 - SPARKLANES - INFO -
--------------------------------------------------------------------------------
Finished executing `IrisExampleLane`
--------------------------------------------------------------------------------
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

That’s it. Please check out the documentation for explanations on how more complex processing lanes can be built.