Functional Pypelines API

Pipeline Class

class functional_pypelines.Pipeline(transform: ~typing.Callable[[~functional_pypelines.core.A], ~typing.Any] = <function Pipeline.<lambda>>, *, rest: ~functional_pypelines.core.Pipeline[~typing.Any, ~functional_pypelines.core.B] | ~functional_pypelines.core.Identity = <functional_pypelines.core.Identity object>)

Composable function of a single variable.

This class is used to build pipelines of functions that take a single argument and return a single value. It can either be used as a decorator, or in-line to build a pipeline.

Example Usage

>>> from functional_pypelines import Pipeline
>>>
>>> # As a decorator
>>> @Pipeline.step
... def add_one(x: float) -> float:
...     return x + 1
...
>>> @Pipeline.step
... def double(x: float) -> float:
...     return x * 2
...
>>> @Pipeline.step
... def square(x: float) -> float:
...     return x ** 2
...
>>> my_pipeline = add_one >> double >> square
>>> my_pipeline  # Pipeline[float -> float]
>>> my_pipeline(2)  # 36
>>>
>>> # In-line
>>> def add_one(x: float) -> float:
...     return x + 1
...
>>> def double(x: float) -> float:
...     return x * 2
...
>>> def square(x: float) -> float:
...     return x ** 2
...
>>> my_pipeline = Pipeline() >> add_one >> double >> square
>>> my_pipeline  # Pipeline[float -> float]
>>> my_pipeline(2)  # 36
classmethod step(f: Callable[[A], B]) Pipeline[A, B]

Decorator to turn a function into a Pipeline step.

For the base Pipeline class, this is does the same thing as the constructor, but it can be overridden to provide additional functionality.

For example, if you wanted to pass around a dictionary of keyword arguments to maintain state, you could do something like this:

>>> from typing import Callable, Dict, Any
>>> from functional_pypelines import Pipeline
>>>
>>> KwargType = Dict[str, Any]
>>>
>>> class KwargPipeline(Pipeline[KwargType, KwargType]):
...     @classmethod
...     def step(cls, f: Callable[..., KwargType]) -> 'KwargPipeline':
...         def wrapped(kwargs: KwargType) -> KwargType:
...             return {**kwargs, **f(**kwargs)}
...         return cls(wrapped)
>>>
>>> @KwargPipeline.step
... def add(x: int, y: int, **kwargs) -> KwargType:
...     return {'z': x + y}
...
>>> add({'x': 1, 'y': 2})  # {'x': 1, 'y': 2, 'z': 3}
Parameters:

f (Callable[[A], B]) – The function to turn into a Pipeline step.

Returns:

The Pipeline step.

Return type:

Pipeline[A, B]

classmethod default_data() A

Returns the default data for this Pipeline.

This method is called when the Pipeline is called without any arguments. By default, it raises a warning and returns NotImplemented. This method should be overridden in subclasses if you want to be able to call the Pipeline without any arguments.

Returns:

The default data.

Return type:

A

classmethod wrap(*args, **kwargs) A

Wraps the arguments passed to the Pipeline.

This method is called when the Pipeline is called with arguments. By default, it returns the first argument passed to the Pipeline. This method should be overridden in subclasses if you want to be able to call the Pipeline with multiple arguments, or keyword arguments.

Parameters:
  • *args – The arguments passed to the Pipeline.

  • **kwargs – The keyword arguments passed to the Pipeline.

Returns:

The wrapped data.

Return type:

A

classmethod from_json(data: int | float | bool | str | None | Dict[str, int | float | bool | str | None | Dict[str, JSONType] | List[JSONType]] | List[int | float | bool | str | None | Dict[str, JSONType] | List[JSONType]]) A

Converts JSON data into the Pipeline’s input type.

This method is called when the Pipeline is called with JSON data. By default, it just calls wrap() with the JSON data. This method should be overridden in subclasses if you want to be able to call the Pipeline with JSON data.

Parameters:

data (JSONType) – The JSON data to convert.

Returns:

The converted data.

Return type:

A

See also

CLI

property base_validator: Pipeline[ValidatorData, ValidatorData]

Returns the base validator for this Pipeline.

Validators are Pipelines that perform static analysis of a Pipeline and input data to ensure that the Pipeline can be run with the given data. By default, this method returns a Pipeline that just returns whatever data is passed to it. This method should be overridden in subclasses if you want to be able to validate the Pipeline.

When a Pipeline is invoked via the CLI, the CLI will automatically run the Pipeline through its base validator before running it. If the base validator fails, the CLI will print an error message and exit.

Returns:

The base validator.

Return type:

Pipeline[ValidatorData, ValidatorData]

See also

functional_pypelines.validator.Validator, CLI

classmethod create(transform: ~typing.Callable[[~functional_pypelines.core.A], ~functional_pypelines.core.B], *, rest: ~functional_pypelines.core.Pipeline[~functional_pypelines.core.B, ~functional_pypelines.core.C] | ~functional_pypelines.core.Identity = <functional_pypelines.core.Identity object>) Pipeline[A, C]

Creates a new Pipeline. Alias for the constructor.

run(data: A | None = None, *, report: bool = False) B

Runs the Pipeline.

Parameters:
  • data (Optional[A], optional) – The data to pass to the Pipeline, by default None. If None, the Pipeline will be called with the result of default_data().

  • report (bool, optional) – Whether or not to print the name of each step as it is entered, by default False.

Returns:

The result of running the Pipeline.

Return type:

B

bind(nxt: Callable[[B], C]) Pipeline[A, C]

Composes two Pipelines together.

The >> operator is an alias for this method.

Parameters:

nxt (Pipeline[B, C]) – The Pipeline to compose with.

Returns:

The composed Pipeline.

Return type:

Pipeline[A, C]

debug() PipelineDebugger

Returns a PipelineDebugger for this Pipeline.

Returns:

The PipelineDebugger.

Return type:

PipelineDebugger

See also

PipelineDebugger

class functional_pypelines.core.PipelineDebugger(pipeline: Pipeline)

Helper class used for debugging Pipelines.

This class is used to step through the steps of a Pipeline for debugging purposes. It is an iterator that yields the steps of the Pipeline, and can be used to step through the Pipeline one step at a time.

Example Usage

>>> from functional_pypelines import Pipeline
>>>
>>> @Pipeline.step
... def add_one(x: float) -> float:
...     return x + 1
...
>>> @Pipeline.step
... def double(x: float) -> float:
...     return x * 2
...
>>> @Pipeline.step
... def square(x: float) -> float:
...     return x ** 2
...
>>> my_pipeline = add_one >> double >> square
>>> debugger = my_pipeline.debug()
>>> debugger.step(2)  # 3
>>> debugger.step(3)  # 6
>>> debugger.step(6)  # 36
>>> debugger.step(36)  # IndexError

Validators

class functional_pypelines.validator.ValidatorPipeline(transform: ~typing.Callable[[~functional_pypelines.core.A], ~typing.Any] = <function Pipeline.<lambda>>, *, rest: ~functional_pypelines.core.Pipeline[~typing.Any, ~functional_pypelines.core.B] | ~functional_pypelines.core.Identity = <functional_pypelines.core.Identity object>)

A pipeline for validating another pipeline.

ValidatorPipelines are used to validate a pipeline before it is run. They are useful for validating user input before running a pipeline on it. For example, a validator pipeline could be used to validate that a user has provided a valid filepath before running a pipeline that processes the file.

Validator Pipeline steps should take two arguments: a Pipeline and its input data. The step should return a ValidationResult, using either the SUCCESS singleton or FAILURE() function.

classmethod step(f: Callable[[Pipeline[A, Any], A], ValidationResult]) ValidatorPipeline

Decorator for creating a validator step.

Parameters:

f (Callable[[Pipeline[A, Any], A], ValidationResult]) – The function to decorate. Must take a Pipeline and its input data as arguments, and return a ValidationResult.

Returns:

The decorated validator pipeline.

Return type:

ValidatorPipeline

Example Usage

>>> from typing import Any
>>> import os
>>> from functional_pypelines import Pipeline
>>> from functional_pypelines.validator import (
...     ValidatorPipeline, SUCCESS, FAILURE, ValidationResult
... )
>>>
>>> @Pipeline.step
... def read_file(filepath: str) -> str:
...     with open(filepath, 'r') as f:
...         return f.read()
>>>
>>> @ValidatorPipeline.step
... def validate_filepath(
...     pipeline: Pipeline,
...     data: Any
... ) -> ValidationResult:
...    if not isinstance(data, str):
...        return FAILURE('Filepath must be a string.')
...    if not os.path.exists(data):
...        return FAILURE('File does not exist.')
...    if not os.path.isfile(data):
...        return FAILURE('Filepath must point to a file.')
...    return SUCCESS
>>>
validate(pipeline: Pipeline, *args, **kwargs) Tuple[Pipeline, A]

Validate a pipeline and its input data.

Parameters:
  • pipeline (Pipeline) – The pipeline to validate.

  • args (tuple) – The input data to validate.

  • kwargs (dict) – The input data to validate.

Returns:

The validated pipeline and input data.

Return type:

Tuple[Pipeline, A]

Raises:

ValidationError – If the pipeline/data is invalid.

validate_and_run(pipeline: Pipeline[A, B], data: A, *, report: bool = False) B

Validate a pipeline and its input data, then run the pipeline.

Parameters:
  • pipeline (Pipeline) – The pipeline to validate.

  • data (Any) – The input data to validate.

  • report (bool, optional) – Whether to report the pipeline’s progress, by default False

Returns:

The result of running the pipeline.

Return type:

B

Raises:

ValidationError – If the pipeline/data is invalid.

functional_pypelines.validator.SUCCESS

alias of ValidationResult(valid=True, reason=None)

functional_pypelines.validator.FAILURE(reason: str)

Indicates a failed validation.

Parameters:

reason (str) – The reason for the failure.

Returns:

result – The failed validation result.

Return type:

ValidationResult

JSON API

functional_pypelines.run(*args, report: bool = False, **kwargs) B

Run the pipeline defined in the config dictionary.

The config must be a dictionary with the following structure:

{
    "PIPELINE": [
        ...
    ],
    "DATA": ...,
    "VALIDATORS": [
        ...
    ]
}

The “PIPELINE” key must be a list of strings, where each string is a fully-qualified name of a pipeline class. The pipeline classes must be importable from the current working directory.

The “DATA” key (OPTIONAL) must be a JSON object. This object will be passed to the pipeline as the data argument, and will be parsed using the Pipeline class’s functional_pypelines.Pipeline.from_json() method. If the “DATA” key is not present, the pipeline will be passed the result of calling the Pipeline class’s functional_pypelines.Pipeline.default_data() method.

The “VALIDATORS” key (OPTIONAL) must be a list of strings, where each string is a fully-qualified name of a pipeline class. The pipeline classes must be importable from the current working directory. The validator pipelines should be subclasses of functional_pypelines.validator.ValidatorPipeline, and will be run before the main pipeline. If the validator pipeline fails, the main pipeline will not be run. If the main pipeline’s class defines a functional_pypelines.Pipeline.base_validator() attribute, it will be run before the validator pipelines defined here.

Parameters:

config (dict) – Dictionary defining the pipeline to run.

Returns:

The result of running the pipeline.

Return type:

Any

Raises:

ValidationError – If the pipeline fails to validate.

functional_pypelines.api.core.dry_run(*args, report: bool = False, **kwargs) B

Validate that the defined pipeline will run without errors.

CLI