Defining Processing Lanes¶
Lanes (i.e. data processing pipelines) can be defined by either writing YAML definition files, or by using sparklane’s API.
YAML definition files¶
Lane definition files must adhere to the following schema:
lane:
name: SimpleLane # str (optional): Name under which the lane will be referred to during logging
run_parallel: false # bool (optional): Indicates whether tasks should be run in parallel
tasks: # list (required): List of processor classes
- class: pkg.mdl.Task1 # str (required): Full python module path to the processor class
args: [arg1, arg2] # list (optional): List of arguments passed when instantiating the class
kwargs: # dict (optional): Dict of kwargs passed when instantiating the class
kwarg1: val1
kwarg2: val2
- class: pkg.mdl.Task2
args: [arg1, arg2]
...
Attention should be placed on using the correct class path. Let’s say we have the following directory structure:
tasks/
extract/
__init__.py
extractors.py # Contains class 'SomeExtractorClass'
load/
...
...
The exact path to be used then depends on which folder will be packaged and submitted to spark. To
reference SomeExtractorClass
, the correct class path would be
tasks.extract.extractors.SomeExtractorClass
if the entire tasks
folder would be packaged and
submitted to Spark, whereas just packaging the extract
folder would result in the correct class
path extract.extractors.SomeExtractorClass
(see Submitting lanes to Spark).
Using the API¶
Lanes can also be defined and executed using sparklane’s API, for example:
from sparklanes import Lane, Task
@Task('main_mtd')
class Task1(object):
def main_mtd(self, a, b, c):
pass
@Task('main_mtd')
class Task2(object):
def main_mtd(self, a, b):
pass
# Building the lane
lane = (Lane(name='ExampleLane', run_parallel=False)
.add(Task1, 1, 2, c=3)
.add(Task2, a=1, b=2))
# Execute it
lane.run()
Refer to the API documentation for sparklanes.Lane
.
Branching & Running Tasks in Parallel¶
Lanes can be branched infinitely deep, which is especially useful if part of the lane should be executed in parallel. As stated in the Spark documentation:
Inside a given Spark application (SparkContext instance), multiple parallel jobs can run
simultaneously if they were submitted from separate threads.
If parameter run_parallel
is true when instantiating a Lane
or Branch
, a separate thread will
be spawned for each of the tasks it contains, ensuring that Spark will execute them in
parallel.
For example, a lane containing branches could look like this:
from sparklanes import Lane, Branch
from pkg.mdl import Task1, Task2, Task3, SubTaskA, SubTaskB1, SubTaskB2, SubTaskC
lane = (Lane(name='BranchedLane', run_parallel=False)
.add(Task1)
.add(Task2)
.add(Branch(name='ExampleBranch', run_parallel=True)
.add(SubTaskA)
.add(Branch(name='SubBranch', run_parallel=False)
.add(SubTaskB1)
.add(SubTaskB2))
.add(SubTaskC))
.add(Task3))
Or the same lane defined as YAML:
lane:
name: BranchedLane
run_parallel: false
tasks:
- class: pkg.mdl.Task1
- class: pkg.mdl.Task2
- branch:
name: ExampleBranch
run_parallel: true
tasks:
- class: pkg.mdl.SubTaskA
- branch:
name: ExampleSubBranch
run_parallel: false
tasks:
- class: pkg.mdl.SubTaskB1
- class: pkg.mdl.SubTaskB2
- class: pkg.mdl.SubTaskC
In this lane, SubTaskA
, Branch SubBranch
and SubTaskC
would be executed in parallel, whereas
the tasks within SubBranch
wouldn’t be. This way, complex processing pipelines can be built.
Refer to sparklanes.Branch
.