Example: Simple ETL lane¶
Note: it is recommended to took a look at the `official documentation <https://sparklanes.readthedocs.io>`__ first.
In this example, we’ll build a simple ETL pipeline using sparklanes and the popular iris dataset. First, we’ll load the dataset from CSV, before applying some transformations on it, to then finally dump it as JSON to disk.
Let’s start by importing all the libs we need:
[6]:
from pyspark.sql.functions import monotonically_increasing_id
from sparklanes import Task, Lane, conn
And write our data processors, or Tasks
next:
[6]:
@Task('extract_data')
class ExtractIrisCSVData(object):
"""Load the iris data set from a CSV file"""
def __init__(self, iris_csv_path):
self.iris_csv_path = iris_csv_path
def extract_data(self):
# Read the csv
iris_df = conn.spark.read.csv(path=self.iris_csv_path,
sep=',',
header=True,
inferSchema=True)
# Make it available to tasks that follow
self.cache('iris_df', iris_df)
@Task('add_index')
class AddRowIndex(object):
"""Add a index to each row in the data set"""
def add_index(self):
# Add id column
self.iris_df = self.iris_df.withColumn('id', monotonically_increasing_id())
# Update cache
self.cache('iris_df', self.iris_df)
@Task('normalize')
class NormalizeColumns(object):
"""Normalize all numerical columns"""
def normalize(self):
# Add normalized columns
columns = self.iris_df.columns
columns.remove('species')
for col in columns:
col_min = float(self.iris_df.agg({col: "min"}).collect()[0]['min(%s)' % col])
col_max = float(self.iris_df.agg({col: "max"}).collect()[0]['max(%s)' % col])
self.iris_df = self.iris_df.withColumn(
col + '_norm', (self.iris_df[col] - col_min) / (col_max - col_min)
)
# Update Cache
self.cache('iris_df', self.iris_df)
@Task('write_to_json')
class SaveAsJSON(object):
"""Dump the data set as JSON to disk"""
def __init__(self, output_folder):
self.output_folder = output_folder
def write_to_json(self):
self.iris_df.write.format('json').save(self.output_folder)
# Clear cache
self.uncache('iris_df')
Note how in the extractor class, we cache our DataFrame
using self.cache
, and as a result make it an attribute to all tasks that follow.
With our Tasks being defined, we can now build the lane:
[11]:
lane = (Lane(name='IrisExampleLane', run_parallel=False)
.add(ExtractIrisCSVData, iris_csv_path='data/iris.csv')
.add(AddRowIndex)
.add(NormalizeColumns)
.add(SaveAsJSON, 'out'))
And run it:
[12]:
lane.run()
2018-06-07 11:52:01,466 - SPARKLANES - INFO -
--------------------------------------------------------------------------------
Executing `IrisExampleLane`
--------------------------------------------------------------------------------
2018-06-07 11:52:01,468 - SPARKLANES - INFO -
================================================================================
IrisExampleLane
>Task_ExtractIrisCSVData
>Task_AddRowIndex
>Task_NormalizeColumns
>Task_SaveAsJSON
================================================================================
2018-06-07 11:52:01,469 - SPARKLANES - INFO -
--------------------------------------------------------------------------------
Executing task `ExtractIrisCSVData.extract_data`
--------------------------------------------------------------------------------
2018-06-07 11:52:01,668 - SPARKLANES - INFO -
--------------------------------------------------------------------------------
Finished executing task `ExtractIrisCSVData.extract_data`. Execution time: 0:00:00.198397
--------------------------------------------------------------------------------
2018-06-07 11:52:01,669 - SPARKLANES - INFO -
--------------------------------------------------------------------------------
Executing task `AddRowIndex.add_index`
--------------------------------------------------------------------------------
2018-06-07 11:52:01,675 - SPARKLANES - INFO -
--------------------------------------------------------------------------------
Finished executing task `AddRowIndex.add_index`. Execution time: 0:00:00.004853
--------------------------------------------------------------------------------
2018-06-07 11:52:01,676 - SPARKLANES - INFO -
--------------------------------------------------------------------------------
Executing task `NormalizeColumns.normalize`
--------------------------------------------------------------------------------
2018-06-07 11:52:02,848 - SPARKLANES - INFO -
--------------------------------------------------------------------------------
Finished executing task `NormalizeColumns.normalize`. Execution time: 0:00:01.170293
--------------------------------------------------------------------------------
2018-06-07 11:52:02,849 - SPARKLANES - INFO -
--------------------------------------------------------------------------------
Executing task `SaveAsJSON.write_to_json`
--------------------------------------------------------------------------------
2018-06-07 11:52:03,065 - SPARKLANES - INFO -
--------------------------------------------------------------------------------
Finished executing task `SaveAsJSON.write_to_json`. Execution time: 0:00:00.215517
--------------------------------------------------------------------------------
2018-06-07 11:52:03,066 - SPARKLANES - INFO -
--------------------------------------------------------------------------------
Finished executing `IrisExampleLane`
--------------------------------------------------------------------------------
[12]:
<sparklanes._framework.lane.Lane at 0x1034bf5f8>
That works, but what makes sparklanes
more useful, is the capability of defining processing lanes using YAML configuration files, to then submit these lanes to a spark cluster.
We can define the same lane as above like:
lane:
name: IrisExampleLane
run_parallel: false
tasks:
- class: tasks.iris.ExtractIrisCSVData
kwargs:
iris_csv_path: data/iris.csv
- class: tasks.iris.AddRowIndex
- class: tasks.iris.NormalizeColumns
- class: tasks.iris.SaveAsJSON
args:
- out
With the file being saved as iris.yml
, our directory structure looks like this:
data/
iris.csv
tasks/
__init__.py # Required to be recognized as a python package
iris.py # Contains our processor classes (Tasks)
iris.yml
So to run our pipeline, we can submit it to spark using the lane-submit
command line script:
[10]:
%%bash
lane-submit -y iris.yml -p tasks -e data -s master=local[2]
2018-06-07 11:50:09,219 - SPARKLANES - INFO -
--------------------------------------------------------------------------------
Executing `IrisExampleLane`
--------------------------------------------------------------------------------
2018-06-07 11:50:09,219 - SPARKLANES - INFO -
================================================================================
IrisExampleLane
>Task_ExtractIrisCSVData
>Task_AddRowIndex
>Task_NormalizeColumns
>Task_SaveAsJSON
================================================================================
2018-06-07 11:50:09,219 - SPARKLANES - INFO -
--------------------------------------------------------------------------------
Executing task `ExtractIrisCSVData.extract_data`
--------------------------------------------------------------------------------
2018-06-07 11:50:13,784 - SPARKLANES - INFO -
--------------------------------------------------------------------------------
Finished executing task `ExtractIrisCSVData.extract_data`. Execution time: 0:00:04.564362
--------------------------------------------------------------------------------
2018-06-07 11:50:13,784 - SPARKLANES - INFO -
--------------------------------------------------------------------------------
Executing task `AddRowIndex.add_index`
--------------------------------------------------------------------------------
2018-06-07 11:50:13,818 - SPARKLANES - INFO -
--------------------------------------------------------------------------------
Finished executing task `AddRowIndex.add_index`. Execution time: 0:00:00.033710
--------------------------------------------------------------------------------
2018-06-07 11:50:13,818 - SPARKLANES - INFO -
--------------------------------------------------------------------------------
Executing task `NormalizeColumns.normalize`
--------------------------------------------------------------------------------
2018-06-07 11:50:16,216 - SPARKLANES - INFO -
--------------------------------------------------------------------------------
Finished executing task `NormalizeColumns.normalize`. Execution time: 0:00:02.398010
--------------------------------------------------------------------------------
2018-06-07 11:50:16,216 - SPARKLANES - INFO -
--------------------------------------------------------------------------------
Executing task `SaveAsJSON.write_to_json`
--------------------------------------------------------------------------------
2018-06-07 11:50:16,833 - SPARKLANES - INFO -
--------------------------------------------------------------------------------
Finished executing task `SaveAsJSON.write_to_json`. Execution time: 0:00:00.616633
--------------------------------------------------------------------------------
2018-06-07 11:50:16,833 - SPARKLANES - INFO -
--------------------------------------------------------------------------------
Finished executing `IrisExampleLane`
--------------------------------------------------------------------------------
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
That’s it. Please check out the documentation for explanations on how more complex processing lanes can be built.