Functional Pypelines API
Pipeline Class
- class functional_pypelines.Pipeline(transform: ~typing.Callable[[~functional_pypelines.core.A], ~typing.Any] = <function Pipeline.<lambda>>, *, rest: ~functional_pypelines.core.Pipeline[~typing.Any, ~functional_pypelines.core.B] | ~functional_pypelines.core.Identity = <functional_pypelines.core.Identity object>)
Composable function of a single variable.
This class is used to build pipelines of functions that take a single argument and return a single value. It can either be used as a decorator, or in-line to build a pipeline.
Example Usage
>>> from functional_pypelines import Pipeline >>> >>> # As a decorator >>> @Pipeline.step ... def add_one(x: float) -> float: ... return x + 1 ... >>> @Pipeline.step ... def double(x: float) -> float: ... return x * 2 ... >>> @Pipeline.step ... def square(x: float) -> float: ... return x ** 2 ... >>> my_pipeline = add_one >> double >> square >>> my_pipeline # Pipeline[float -> float] >>> my_pipeline(2) # 36 >>> >>> # In-line >>> def add_one(x: float) -> float: ... return x + 1 ... >>> def double(x: float) -> float: ... return x * 2 ... >>> def square(x: float) -> float: ... return x ** 2 ... >>> my_pipeline = Pipeline() >> add_one >> double >> square >>> my_pipeline # Pipeline[float -> float] >>> my_pipeline(2) # 36
- classmethod step(f: Callable[[A], B]) Pipeline[A, B]
Decorator to turn a function into a Pipeline step.
For the base
Pipelineclass, this is does the same thing as the constructor, but it can be overridden to provide additional functionality.For example, if you wanted to pass around a dictionary of keyword arguments to maintain state, you could do something like this:
>>> from typing import Callable, Dict, Any >>> from functional_pypelines import Pipeline >>> >>> KwargType = Dict[str, Any] >>> >>> class KwargPipeline(Pipeline[KwargType, KwargType]): ... @classmethod ... def step(cls, f: Callable[..., KwargType]) -> 'KwargPipeline': ... def wrapped(kwargs: KwargType) -> KwargType: ... return {**kwargs, **f(**kwargs)} ... return cls(wrapped) >>> >>> @KwargPipeline.step ... def add(x: int, y: int, **kwargs) -> KwargType: ... return {'z': x + y} ... >>> add({'x': 1, 'y': 2}) # {'x': 1, 'y': 2, 'z': 3}
- Parameters:
f (Callable[[A], B]) – The function to turn into a Pipeline step.
- Returns:
The Pipeline step.
- Return type:
Pipeline[A, B]
- classmethod default_data() A
Returns the default data for this Pipeline.
This method is called when the Pipeline is called without any arguments. By default, it raises a warning and returns NotImplemented. This method should be overridden in subclasses if you want to be able to call the Pipeline without any arguments.
- Returns:
The default data.
- Return type:
A
- classmethod wrap(*args, **kwargs) A
Wraps the arguments passed to the Pipeline.
This method is called when the Pipeline is called with arguments. By default, it returns the first argument passed to the Pipeline. This method should be overridden in subclasses if you want to be able to call the Pipeline with multiple arguments, or keyword arguments.
- Parameters:
*args – The arguments passed to the Pipeline.
**kwargs – The keyword arguments passed to the Pipeline.
- Returns:
The wrapped data.
- Return type:
A
- classmethod from_json(data: int | float | bool | str | None | Dict[str, int | float | bool | str | None | Dict[str, JSONType] | List[JSONType]] | List[int | float | bool | str | None | Dict[str, JSONType] | List[JSONType]]) A
Converts JSON data into the Pipeline’s input type.
This method is called when the Pipeline is called with JSON data. By default, it just calls
wrap()with the JSON data. This method should be overridden in subclasses if you want to be able to call the Pipeline with JSON data.- Parameters:
data (JSONType) – The JSON data to convert.
- Returns:
The converted data.
- Return type:
A
See also
CLI
- property base_validator: Pipeline[ValidatorData, ValidatorData]
Returns the base validator for this Pipeline.
Validators are Pipelines that perform static analysis of a Pipeline and input data to ensure that the Pipeline can be run with the given data. By default, this method returns a Pipeline that just returns whatever data is passed to it. This method should be overridden in subclasses if you want to be able to validate the Pipeline.
When a Pipeline is invoked via the CLI, the CLI will automatically run the Pipeline through its base validator before running it. If the base validator fails, the CLI will print an error message and exit.
- Returns:
The base validator.
- Return type:
Pipeline[ValidatorData, ValidatorData]
See also
functional_pypelines.validator.Validator,CLI
- classmethod create(transform: ~typing.Callable[[~functional_pypelines.core.A], ~functional_pypelines.core.B], *, rest: ~functional_pypelines.core.Pipeline[~functional_pypelines.core.B, ~functional_pypelines.core.C] | ~functional_pypelines.core.Identity = <functional_pypelines.core.Identity object>) Pipeline[A, C]
Creates a new Pipeline. Alias for the constructor.
- run(data: A | None = None, *, report: bool = False) B
Runs the Pipeline.
- Parameters:
data (Optional[A], optional) – The data to pass to the Pipeline, by default
None. IfNone, the Pipeline will be called with the result ofdefault_data().report (bool, optional) – Whether or not to print the name of each step as it is entered, by default
False.
- Returns:
The result of running the Pipeline.
- Return type:
B
- bind(nxt: Callable[[B], C]) Pipeline[A, C]
Composes two Pipelines together.
The
>>operator is an alias for this method.
- debug() PipelineDebugger
Returns a PipelineDebugger for this Pipeline.
- Returns:
The PipelineDebugger.
- Return type:
See also
PipelineDebugger
- class functional_pypelines.core.PipelineDebugger(pipeline: Pipeline)
Helper class used for debugging Pipelines.
This class is used to step through the steps of a Pipeline for debugging purposes. It is an iterator that yields the steps of the Pipeline, and can be used to step through the Pipeline one step at a time.
Example Usage
>>> from functional_pypelines import Pipeline >>> >>> @Pipeline.step ... def add_one(x: float) -> float: ... return x + 1 ... >>> @Pipeline.step ... def double(x: float) -> float: ... return x * 2 ... >>> @Pipeline.step ... def square(x: float) -> float: ... return x ** 2 ... >>> my_pipeline = add_one >> double >> square >>> debugger = my_pipeline.debug() >>> debugger.step(2) # 3 >>> debugger.step(3) # 6 >>> debugger.step(6) # 36 >>> debugger.step(36) # IndexError
Validators
- class functional_pypelines.validator.ValidatorPipeline(transform: ~typing.Callable[[~functional_pypelines.core.A], ~typing.Any] = <function Pipeline.<lambda>>, *, rest: ~functional_pypelines.core.Pipeline[~typing.Any, ~functional_pypelines.core.B] | ~functional_pypelines.core.Identity = <functional_pypelines.core.Identity object>)
A pipeline for validating another pipeline.
ValidatorPipelines are used to validate a pipeline before it is run. They are useful for validating user input before running a pipeline on it. For example, a validator pipeline could be used to validate that a user has provided a valid filepath before running a pipeline that processes the file.
Validator Pipeline steps should take two arguments: a
Pipelineand its input data. The step should return aValidationResult, using either theSUCCESSsingleton orFAILURE()function.See also
- classmethod step(f: Callable[[Pipeline[A, Any], A], ValidationResult]) ValidatorPipeline
Decorator for creating a validator step.
- Parameters:
f (Callable[[Pipeline[A, Any], A], ValidationResult]) – The function to decorate. Must take a
Pipelineand its input data as arguments, and return aValidationResult.- Returns:
The decorated validator pipeline.
- Return type:
Example Usage
>>> from typing import Any >>> import os >>> from functional_pypelines import Pipeline >>> from functional_pypelines.validator import ( ... ValidatorPipeline, SUCCESS, FAILURE, ValidationResult ... ) >>> >>> @Pipeline.step ... def read_file(filepath: str) -> str: ... with open(filepath, 'r') as f: ... return f.read() >>> >>> @ValidatorPipeline.step ... def validate_filepath( ... pipeline: Pipeline, ... data: Any ... ) -> ValidationResult: ... if not isinstance(data, str): ... return FAILURE('Filepath must be a string.') ... if not os.path.exists(data): ... return FAILURE('File does not exist.') ... if not os.path.isfile(data): ... return FAILURE('Filepath must point to a file.') ... return SUCCESS >>>
- validate(pipeline: Pipeline, *args, **kwargs) Tuple[Pipeline, A]
Validate a pipeline and its input data.
- validate_and_run(pipeline: Pipeline[A, B], data: A, *, report: bool = False) B
Validate a pipeline and its input data, then run the pipeline.
- Parameters:
pipeline (Pipeline) – The pipeline to validate.
data (Any) – The input data to validate.
report (bool, optional) – Whether to report the pipeline’s progress, by default False
- Returns:
The result of running the pipeline.
- Return type:
B
- Raises:
ValidationError – If the pipeline/data is invalid.
- functional_pypelines.validator.SUCCESS
alias of ValidationResult(valid=True, reason=None)
- functional_pypelines.validator.FAILURE(reason: str)
Indicates a failed validation.
- Parameters:
reason (str) – The reason for the failure.
- Returns:
result – The failed validation result.
- Return type:
ValidationResult
JSON API
- functional_pypelines.run(*args, report: bool = False, **kwargs) B
Run the pipeline defined in the config dictionary.
The config must be a dictionary with the following structure:
{ "PIPELINE": [ ... ], "DATA": ..., "VALIDATORS": [ ... ] }
The “PIPELINE” key must be a list of strings, where each string is a fully-qualified name of a pipeline class. The pipeline classes must be importable from the current working directory.
The “DATA” key (OPTIONAL) must be a JSON object. This object will be passed to the pipeline as the data argument, and will be parsed using the Pipeline class’s
functional_pypelines.Pipeline.from_json()method. If the “DATA” key is not present, the pipeline will be passed the result of calling the Pipeline class’sfunctional_pypelines.Pipeline.default_data()method.The “VALIDATORS” key (OPTIONAL) must be a list of strings, where each string is a fully-qualified name of a pipeline class. The pipeline classes must be importable from the current working directory. The validator pipelines should be subclasses of
functional_pypelines.validator.ValidatorPipeline, and will be run before the main pipeline. If the validator pipeline fails, the main pipeline will not be run. If the main pipeline’s class defines afunctional_pypelines.Pipeline.base_validator()attribute, it will be run before the validator pipelines defined here.- Parameters:
config (dict) – Dictionary defining the pipeline to run.
- Returns:
The result of running the pipeline.
- Return type:
Any
- Raises:
ValidationError – If the pipeline fails to validate.
- functional_pypelines.api.core.dry_run(*args, report: bool = False, **kwargs) B
Validate that the defined pipeline will run without errors.