Writing Data Processing Tasks¶
Creating a Task¶
In sparklanes, data processing tasks exist as decorated classes. Best practice suggests, that a task should depend as little as possible on other tasks, in order to allow for lane definitions with an arbitrary processor order (up to a certain extent, because of course there will always be some dependence, since e.g. a task extracting data most likely comes before one transforming it).
For example:
from sparklanes import Task
@Task('extract_data')
class ExtractIrisCSVData(object):
def __init__(self, iris_csv_path):
self.iris_csv_path = iris_csv_path
def extract_data(self):
# Read the csv
iris_df = conn.spark.read.csv(path=self.iris_csv_path,
sep=',',
header=True,
inferSchema=True)
The class ExtractIrisCSVData
above becomes a Task by being decorated with
sparklanes.Task()
. Tasks have an entry-method, which is the method that will be run during
lane execution, and is specified using the Task
decorator’s sole argument
(in this case, extract_data
).
The entry-method itself should not take arguments, however custom arguments can be passed to the class during instantiation.
Todo
The functionality to pass args/kwargs to both the constructor, as well as to the entry method, might be added in future versions.
Sharing Resources between Tasks¶
By being decorated, the class becomes a child of the internal
sparklanes._framework.task.LaneTask
class and inherits the
sparklanes._framework.task.LaneTask.cache()
,
sparklanes._framework.task.LaneTask.uncache()
and
sparklanes._framework.task.LaneTask.clear_cache()
methods, which can be used to add an
object to the TaskCache.
When object is cached from within a task (e.g.
using self.cache('some_df', df)
, it becomes an attribute to all tasks that follow and is
accessible from within each task object as self.some_df
(that is, until it is uncached).
Accessing the Pyspark API from within Tasks¶
To allow for easy access to the Pyspark API across tasks, sparklanes offers means to avoid having
to “getOrCreate” a SparkContext
/SparkSession
in each task requiring access to one. A
module containing tasks can simply import sparklanes.conn
(an alias of
sparklanes.SparkContextAndSessionContainer
) and have immediate access to a SparkContext
and SparkSession object:
from sparklanes import conn
conn.sc # SparkContext instance
conn.spark # SparkSession instance
The currently active Context/Session can be changed using its methods
sparklanes.SparkContextAndSessionContainer.set_sc()
and
sparklanes.SparkContextAndSessionContainer.set_spark()
If it is preferred to handle SparkContexts/SparkSessions manually, without making use of the shared
container, this can be done by setting an environment variable INIT_SPARK_ON_IMPORT
to
0
when submitting the application to spark.