Writing Data Processing Tasks¶
Creating a Task¶
In sparklanes, data processing tasks exist as decorated classes. Best practice suggests, that a task should depend as little as possible on other tasks, in order to allow for lane definitions with an arbitrary processor order (up to a certain extent, because of course there will always be some dependence, since e.g. a task extracting data most likely comes before one transforming it).
from sparklanes import Task @Task('extract_data') class ExtractIrisCSVData(object): def __init__(self, iris_csv_path): self.iris_csv_path = iris_csv_path def extract_data(self): # Read the csv iris_df = conn.spark.read.csv(path=self.iris_csv_path, sep=',', header=True, inferSchema=True)
ExtractIrisCSVData above becomes a Task by being decorated with
sparklanes.Task(). Tasks have an entry-method, which is the method that will be run during
lane execution, and is specified using the
Task decorator’s sole argument
(in this case,
The entry-method itself should not take arguments, however custom arguments can be passed to the class during instantiation.
The functionality to pass args/kwargs to both the constructor, as well as to the entry method, might be added in future versions.
Accessing the Pyspark API from within Tasks¶
To allow for easy access to the Pyspark API across tasks, sparklanes offers means to avoid having
to “getOrCreate” a
SparkSession in each task requiring access to one. A
module containing tasks can simply import
sparklanes.conn (an alias of
sparklanes.SparkContextAndSessionContainer) and have immediate access to a SparkContext
and SparkSession object:
from sparklanes import conn conn.sc # SparkContext instance conn.spark # SparkSession instance
The currently active Context/Session can be changed using its methods
If it is preferred to handle SparkContexts/SparkSessions manually, without making use of the shared
container, this can be done by setting an environment variable
0 when submitting the application to spark.