sparklanes._framework package

sparklanes._framework.env module

Environment configuration variables that can be passed when executing a lane.

sparklanes._framework.env.INIT_SPARK_ON_IMPORT = False

Specifies if a default SparkContext/SparkSession should be instantiated upon import sparklanes

sparklanes._framework.env.INTERNAL_LOGGER_NAME = 'SPARKLANES'

The logger’s name under which internal events will be logged.

sparklanes._framework.env.SPARK_APP_NAME = 'sparklanes.app'

The app name using which the default SparkContext/SparkSession will be instantiated

sparklanes._framework.env.UNNAMED_BRANCH_NAME = 'UnnamedBranch'

Default name of a branch, if no custom branch is specified

sparklanes._framework.env.UNNAMED_LANE_NAME = 'UnnamedLane'

Default name of a lane, if no custom name is specified.

sparklanes._framework.env.VERBOSE_TESTING = False

Specifies if output should be printed to console when running tests

sparklanes._framework.errors module

Exceptions

exception sparklanes._framework.errors.CacheError[source]

Bases: AttributeError

Should be thrown whenever task-cache access fails.

exception sparklanes._framework.errors.LaneExecutionError[source]

Bases: Exception

Should be thrown whenever execution of a lane fails.

exception sparklanes._framework.errors.LaneImportError[source]

Bases: Exception

Should be thrown when a module or class in a YAML definition file cannot be imported.

exception sparklanes._framework.errors.LaneSchemaError(*args, **kwargs)[source]

Bases: schema.SchemaError

Should be thrown when a YAML definition does not match the required schema.

code

Show the specific error from the super class

exception sparklanes._framework.errors.TaskInitializationError[source]

Bases: Exception

Should be thrown whenever transformation of a class into a task fails (during decoration).

sparklanes._framework.lane module

Lane and Branch classes. TODO: Better logging

sparklanes._framework.log module

Module handling logging. TODO: improve logging. Allow configuration, etc.

sparklanes._framework.log.make_default_logger(name='SPARKLANES', level=20, fmt='%(asctime)s - %(name)s - %(levelname)s - %(message)s')[source]

Create a logger with the default configuration

sparklanes._framework.spark module

Used to allow sharing of SparkContext and SparkSession, to avoid having to “getOrCreate” them again and again for each task. This way, they can just be imported and used right away.

sparklanes._framework.task module

Includes the Task decorator, the parent class LaneTask from which all tasks will inherit, as well as the _TaskCache, which is used to share attributes between Task objects.

class sparklanes._framework.task.LaneTask[source]

Bases: object

The super class of each task, from which all tasks inherit when being decorated with sparklanes.Task

cache(name, val, overwrite=True)[source]

Assigns an attribute reference to all subsequent tasks. For example, if a task caches a DataFrame df using self.cache(‘some_df’, df), all tasks that follow can access the DataFrame using self.some_df. Note that manually assigned attributes that share the same name have precedence over cached attributes.

Parameters:
  • name (str) – Name of the attribute
  • val – Attribute value
  • overwrite (bool) – Indicates if the attribute shall be overwritten, or not (if False, and a cached attribute with the given name already exists, sparklanes.errors.CacheError will be thrown).
clear_cache()[source]

Clears the entire cache

uncache(name)[source]

Removes an attribute from the cache, i.e. it will be deleted and becomes unavailable for all subsequent tasks.

Parameters:name (str) – Name of the cached attribute, which shall be deleted
class sparklanes._framework.task.LaneTaskThread(task)[source]

Bases: threading.Thread

Used to spawn tasks as threads to be run in parallel.

join(timeout=None)[source]

Overwrites threading.Thread.join, to allow handling of exceptions thrown by threads from within the main app.

run()[source]

Overwrites threading.Thread.run, to allow handling of exceptions thrown by threads from within the main app.

class sparklanes._framework.task.TaskCache[source]

Bases: object

Serves as the attribute cache of tasks, which is accessed using the tasks’ __getattr__ method.

cached = {}
static get(name)[source]

Retrieves an object from the cache.

Parameters:name (str) – Name of the object to be retrieved
Returns:
Return type:object

sparklanes._framework.validation module

Contains helper functions, used for class and schema validation.

sparklanes._framework.validation.arg_spec(cls, mtd_name)[source]

Cross-version argument signature inspection

Parameters:
  • cls (class) –
  • mtd_name (str) – Name of the method to be inspected
Returns:

  • required_params (list of str) – List of required, positional parameters
  • optional_params (list of str) – List of optional parameters, i.e. parameters with a default value

sparklanes._framework.validation.validate_params(cls, mtd_name, *args, **kwargs)[source]

Validates if the given args/kwargs match the method signature. Checks if: - at least all required args/kwargs are given - no redundant args/kwargs are given

Parameters:
  • cls (Class) –
  • mtd_name (str) – Name of the method whose parameters shall be validated
  • args (list) – Positional arguments
  • kwargs (dict) – Dict of keyword arguments
sparklanes._framework.validation.validate_schema(yaml_def, branch=False)[source]

Validates the schema of a dict

Parameters:
  • yaml_def (dict) – dict whose schema shall be validated
  • branch (bool) – Indicates whether yaml_def is a dict of a top-level lane, or of a branch inside a lane (needed for recursion)
Returns:

True if validation was successful

Return type:

bool