Usage

Labrea exposes a declarative way to define the data that your application uses, the relationships between different datasets, and the user inputs necessary for those datasets.

Motivation

Imagine that you have a csv file that contains a list of stores, their regions, and their total sales across multiple days, where each day is a different row. You read in the csv as a Pandas dataframe, and then in different parts of your program you need the set of distinct store codes, the distinct regions, and the date range captured by the file. You write functions like so to read in the data and derive these values.

from typing import Set, Tuple
import datetime
import pandas as pd

def read_input(path: str) -> pd.DataFrame:
    return pd.read_csv(path)

def get_distinct_stores(data: pd.DataFrame) -> Set[str]:
    return set(data['store_id'])

def get_distinct_regions(data: pd.DataFrame) -> Set[str]:
    return set(data['region_id'])

def get_min_date(data: pd.DataFrame) -> datetime.date:
    return min(data['date'])

def get_max_date(data: pd.DataFrame) -> datetime.date:
    return max(data['date'])

def get_date_range(data: pd.DataFrame) -> Tuple[datetime.date, datetime.date]:
    return get_min_date(data), get_max_date(data)

This is ok, but there is a subtle problem with the way this is written. Each of our get_* functions are all designed to use the output of read_input, but there is no explicit dependency being declared anywhere. The only way to know that get_distinct_stores should take the output of read_input is through comments. As an application grows, these kinds of implicit dependencies can lead to a lack of maintainability and bugs that are hard to identify the cause of.

One option would be to move the read_input call inside each of our get_* functions, like so:

from typing import Set
import pandas as pd

def read_input(path: str) -> pd.DataFrame:
    return pd.read_csv(path)

def get_distinct_stores(path: str) -> Set[str]:
    return set(read_input(path)['store_id'])

def get_distinct_regions(path: str) -> Set[str]:
    return set(read_input(path)['region_id'])

...

This helps make our dependencies more clear, but at the cost of greatly increased coupling. Imagine we change read_input to take a fmt parameter, which specifies if the file is a csv or an excel file. We would have to add that fmt parameter to the signature of every one of our get_* functions, which greatly hurts maintainability. Additionally, the performance of our code took a hit because the inputs are being read in every time read_input is called, even though it’s the same data being used in each get_* call.

Datasets

The way Labrea handles this is with Datasets. A Dataset is a defined as a function that takes some input parameters (called Options), and has explicit dependencies on other datasets. When the dataset is created (by passing a config dictionary of Options), all the dependencies are resolved automatically for you.

This is how we would handle our problem from before using Labrea.

from typing import Set, Tuple
import datetime

import pandas as pd

from labrea import dataset, Option


@dataset
def input_data(path: str = Option('INPUT.PATH')) -> pd.DataFrame:
    return pd.read_csv(path)

@dataset
def distinct_stores(data: pd.DataFrame = input_data) -> Set[str]:
    return set(data['store_id'])

@dataset
def distinct_regions(data: pd.DataFrame = input_data) -> Set[str]:
    return set(data['region_id'])

@dataset
def min_date(data: pd.DataFrame = input_data) -> datetime.date:
    return min(data['date'])

@dataset
def max_date(data: pd.DataFrame = input_data) -> datetime.date:
    return max(data['date'])

@dataset
def date_range(
        min_date: datetime.date = min_date, 
        max_date: datetime.date = max_date
) -> Tuple[datetime.date, datetime.date]:
    return min_date, max_date


options = {
    'INPUT': {
        'PATH': '/path/to/input.csv'
    }
}

distinct_regions(options) == {'014', '620', '706'} 

All of our functions have been converted into Datasets using the @dataset decorator. The inputs to our functions all have defaults which are either Options, or other Datasets. These Datasets take a dictionary of options as their input, extract the necessary options using the Options, and recursively calculate any dependent datasets before calling the body of the function.

This allows us to decouple the implementations of each of our functions, while also explicitly declaring any dependencies from one piece of data to the next. It also helps with our issue of upstream dependencies taking new arguments. If we wanted to take that fmt argument in input_data, we can add it like so, with no impact to the rest of our datasets.

from typing import Set

import pandas as pd

from labrea import dataset, Option


@dataset
def input_data(
        path: str = Option('INPUT.PATH'),
        fmt: str = Option('INPUT.FMT', 'csv')
) -> pd.DataFrame:
    if fmt == 'csv':
        return pd.read_csv(path)
    elif fmt == 'excel':
        return pd.read_excel(path)
    else:
        raise ValueError('Only csv and excel files are accepted.')

@dataset
def distinct_stores(data: pd.DataFrame = input_data) -> Set[str]:
    return set(data['store_id'])

@dataset
def distinct_regions(data: pd.DataFrame = input_data) -> Set[str]:
    return set(data['region_id'])


options = {
    'INPUT': {
        'PATH': '/path/to/input.xlsx',
        'FMT': 'excel'
    }
}

distinct_regions(options) == {'014', '620', '706'} 

Additional Features

Caching Results

By default, when a dataset is evaluated with some inputs the result is cached in memory, so if the same inputs are provided it does not need to be recalculated. This might be undesirable (for example if your dataset should return random data on each call); you can disable this using the @dataset.nocache decorator.

import random

from labrea import dataset, Option


@dataset
def same_random_number_every_time(
        minimum: float = Option('MIN'),
        maximum: float = Option('MAX')
):
  return random.random() * (maximum - minimum) + minimum


@dataset.nocache
def new_random_number_every_time(
        minimum: float = Option('MIN'),
        maximum: float = Option('MAX')
):
  return random.random() * (maximum - minimum) + minimum


config = {
    'MIN': 1,
    'MAX': 2
}

same_random_number_every_time(config)  ## 1.8286543357828648
same_random_number_every_time(config)  ## 1.8286543357828648

new_random_number_every_time(config)  ## 1.226523659380299
new_random_number_every_time(config)  ## 1.907915105351007

Default Values to Options

Options can take a default value. If the value is a string, you can use confectioner-style templating syntax to impute a default based on other config entries.

from labrea import Option

config = {
    'A': 'a',
    'V': 'b'
}

Option('X', 1)(config)          ## 1
Option('Y', '{A}/{V}')(config)  ## 'a/b'

Switches

Sometimes your dataset might have different dependencies depending on some input parameter or other condition. We can express this simply using switches.

In this example, we have different logic for cloud vs on-prem, and can express that this way. switch takes a string representing the option we want to switch over, a dictionary mapping config values to corresponding datasets, and (optionally) a default value if the config value is missing or does not appear in our mapping.

from labrea import dataset, switch


@dataset
def cloud_inputs():
    ...


@dataset
def onprem_inputs():
    ...


@dataset
def final_data(
        inputs = switch(
            'ENVIRONMENT',
            {
                'CLOUD': cloud_inputs,
                'ONPREM': onprem_inputs
            }
        )
):
    ...


final_data({'ENVIRONMENT': 'CLOUD'})  ## uses cloud_inputs as inputs arg
final_data({'ENVIRONMENT': 'ONPREM'})  ## uses onprem_inputs as inputs arg

The first argument to switch can also be another dataset. In this example, we could automatically determine the environment in another dataset rather than pass it explicitly in the config.

from labrea import dataset, switch


@dataset
def inferred_environment():
    ...


@dataset
def final_data(
        inputs = switch(
            inferred_environment,
            {
                'CLOUD': cloud_inputs,
                'ONPREM': onprem_inputs
            }
        )
):
    ...

Coalesce

Coalesce allows you to provide a sequence of Datasets (or Options, Switches, etc.), and use the first one that can evaluate.

from labrea import Coalesce, Option

x = Coalesce(Option('A'), Option('V'), Option('C'))

x({'A': 1}) == 1
x({'V': 2}) == 2
x({'C': 3}) == 3
x({'A': 1, 'V': 2}) == 1
x({'V': 2, 'C': 3}) == 2
x()  ## EvaluationError


y = Coalesce(Option('A'), Option('V'), None)
y({'A': 1}) == 1
y({'V': 2}) == 2
y({'A': 1, 'V': 2}) == 1
y() is None

Overloads

You can write multiple implementations to the same dataset using the .overload method of the dataset. For example, if you want to write a unit test where you mock up reading in some external data, you could write an overload that provides mock data.

from typing import List

from labrea import dataset, Option


@dataset(dispatch='INPUT.SOURCE')
def input_data(
        path: str = Option('INPUT.PATH')
) -> List[str]:
    with open(path) as file:
        return file.readlines()


@input_data.overload(alias='MOCK')
def mock_input_data() -> List[str]:
    return ['a', 'b', 'c']

Now, we can control which implementation is used by setting the INPUT.SOURCE option in our config. By default, if nothing is provided, we use the default implementation in the body of input_data.

input_data({'INPUT': {'PATH': '/input/data/path'}})  ## Use default implementation
input_data({'INPUT': {'SOURCE': 'MOCK'}}) == ['a', 'b', 'c'] 
input_data({'INPUT': {'SOURCE': 'UNKNOWN_SOURCE'}})  ## Error

Abstract Datasets

We can also have datasets that have no default implementation, called abstract datasets.

from typing import List

from labrea import abstractdataset, Option

@abstractdataset(dispatch='INPUT.SOURCE')
def input_data() -> List[str]:
    ...


@input_data.overload(alias='FLAT_FILE')
def flat_file_input_data(
        path: str = Option('INPUT.PATH')
) -> List[str]:
    with open(path) as file:
        return file.readlines()


@input_data.overload(alias='MOCK')
def mock_input_data() -> List[str]:
    return ['a', 'b', 'c']

Interfaces

We can write collections of multiple datasets whose implementations are connected using interfaces. For example, you may want your application to pull from a SQL database in production, but from a CSV file in development. You can define an interface that specifies the datasets that need to be implemented, and then provide different implementations for each environment.

Interface Definition

import pandas as pd
from labrea import abstractdataset, dataset, interface, Option

@interface(dispatch='ENVIRONMENT')
class DataSource:
    @staticmethod  # Adding staticmethod appeases linters/IDEs that don't understand interfaces
    @abstractdataset
    def store() -> pd.DataFrame:
        """Returns a dataframe of store data."""

    @staticmethod
    @abstractdataset
    def region() -> pd.DataFrame:
        """Returns a dataframe of region data."""

    @staticmethod
    @dataset
    def store_ids(
            store_: pd.DataFrame = store.__func__  # Use .__func__ to refer to the abstract dataset itself
    ) -> set[str]:
        """Derives the set of store ids from the store dataframe. This implementation is shared across all environments
        by default, but can be overridden if necessary."""
        return set(store_['store_id'])

    @staticmethod
    @dataset
    def region_ids(
            region_: pd.DataFrame = region.__func__
    ) -> set[str]:
        return set(region_['region_id'])

Development Implementation

@DataSource.implementation(alias='DEVELOPMENT')
class DevDataSource:
    @staticmethod
    @dataset
    def store(
            path: str = Option('DEV.STORE.PATH')
    ) -> pd.DataFrame:
        return pd.read_csv(path)

    @staticmethod
    @dataset
    def region(
            path: str = Option('DEV.REGION.PATH')
    ) -> pd.DataFrame:
        return pd.read_csv(path)

Production Implementation

def open_connection(connection_string: str):
    ...


@DataSource.implementation(alias='PRODUCTION')
class ProdDataSource:
    @staticmethod
    @dataset
    def store(
            connection_string: str = Option('PROD.CONNECTION_STRING')
    ) -> pd.DataFrame:
        with open_connection(connection_string) as conn:
            return pd.read_sql('SELECT * FROM stores', conn)

    @staticmethod
    @dataset
    def region(
            connection_string: str = Option('PROD.CONNECTION_STRING')
    ) -> pd.DataFrame:
        with open_connection(connection_string) as conn:
            return pd.read_sql('SELECT * FROM regions', conn)

Now, in your code, you can use DataSource.store and DataSource.region like normal datasets, and the implementation will be chosen based on the ENVIRONMENT option in your config.

@dataset
def num_stores(
        store_ids: set[str] = DataSource.store_ids
) -> int:
    return len(store_ids)

Collections

Labrea provides a few helper functions for creating collections of datasets. For example, you might have a list of datasets that you want to provide as a single input to another dataset. You can use the evaluatable_list function to accomplist this.

from labrea import evaluatable_list, dataset

@dataset
def x() -> int:
    return 1

@dataset
def y() -> int:
    return 2

@dataset
def z(
        x_and_y: list[int] = evaluatable_list(x, y)
) -> list[int]:
    return x_and_y


z() == [1, 2]

The evaluatable_tuple and evaluatable_set functions work similarly. There is also a evaluatable_dict function that takes a dictionary mapping (static) keys to labrea objects.

from labrea import evaluatable_dict

@dataset
def z(
        xy_dict: dict[str, int] = evaluatable_dict({'x': x, 'y': y})
) -> dict[str, int]:
    return xy_dict

z() == {'x': 1, 'y': 2}

Map

Sometimes you want to use a dataset multiple times with different options. This can be accomplished using the Map type. Map takes a dataset and a dictionary mapping option keys to labrea objects that return lists (or other iterables) of values. When the Map object is evaluated, it will call the dataset with each of the values in the dictionary, and return an iterable of tuples, where the first element is the options set on that iteration, and the second element is the value. Like the build-in map, this iterable is lazy and is not a list.

from labrea import dataset, Option, Map


@dataset
def x_plus_y(
        x: int = Option('X'),
        y: int = Option('Y')
) -> int:
    return x + y


mapped = Map(x_plus_y, {'X': Option('X_LIST')})

for keys, value in mapped({'X_LIST': [1, 2, 3], 'Y': 10}):
    print(keys, value)

## {'X': 1} 11
## {'X': 2} 12
## {'X': 3} 13

Map objects have a .values property that can be used to only get the values.

for value in mapped.values({'X_LIST': [1, 2, 3], 'Y': 10}):
    print(value)
    
## 11
## 12
## 13

In-Line Transformations

Sometimes you want to perform a transformation on a dataset (or other object) that is not worth creating a new dataset for. A common example is an Option that you want to parse as a date. You can use the >> operator (or the equivalent .apply() method) to perform this transformation in-line. The >> operator is shared by all Labrea objects.

from labrea import Option
import datetime as dt

start_date = Option('START_DATE') >> dt.datetime.fromisoformat

start_date({'START_DATE': '2022-01-01'}) == dt.datetime(2022, 1, 1)

Pipelines

Labrea datasets are really useful when you know the dependency tree in advance. However, sometimes you want to write code that performs some transformation on an arbitrary input, and perhaps you want to perform a series of these transformations in an arbitrary order. To accomplish this, Labrea exposes a Pipeline class, where each step is created using the @pipeline_step decorator.

Pipelines look similar to datasets, but their first argument is always the input to the pipeline, and should not have a default value. To combine pipeline steps into a pipeline, use the + operator. Pipelines can be evaluated like a dataset, and it will return a function of 1 variable. If you want to run the pipeline on a value, use the .transform(input, options) method.

For example, if you were building a feature engineering pipeline, you could write a series of functions that take a dataframe and add new columns, and then chain different subsets of these functions together to create different feature sets.

import pandas as pd
from labrea import pipeline_step, Option, dataset

@dataset
def store_sales(path: str = Option('PATH.STORE_SALES')) -> pd.DataFrame:
    pd.read_csv(path)

@dataset
def store_square_footage(path: str = Option('PATH.STORE_SQFT')) -> pd.DataFrame:
    pd.read_csv(path)


@pipeline_step
def add_sales(
        df: pd.DataFrame,
        sales: pd.DataFrame = store_sales
) -> pd.DataFrame:
    return pd.merge(df, sales, on='store_id', how='left')


@pipeline_step
def add_square_footage(
        df: pd.DataFrame,
        sqft: pd.DataFrame = store_square_footage
) -> pd.DataFrame:
    return pd.merge(df, sqft, on='store_id', how='left')


@pipeline_step
def add_sales_per_sqft(
        df: pd.DataFrame,
        sales: pd.DataFrame = store_sales,
        sqft: pd.DataFrame = store_square_footage
) -> pd.DataFrame:
    df = pd.merge(df, sales, on='store_id', how='left')
    df = pd.merge(df, sqft, on='store_id', how='left')
    df['sales_per_sqft'] = df['sales'] / df['sqft']
    return df.drop(columns=['sales', 'sqft'])


basic_features = add_sales + add_square_footage
derived_features = add_sales_per_sqft
all_features = basic_features + derived_features


stores = pd.read_csv('/path/to/stores.csv')
options = {
    'PATH.STORE_SALES': '/path/to/store_sales.csv',
    'PATH.STORE_SQFT': '/path/to/store_sqft.csv'
}

basic_features.transform(stores, options)
## Returns a dataframe with columns from stores and new columns sales and sqft

derived_features.transform(stores, options)
## Returns a dataframe with columns from stores and a new column sales_per_sqft

all_features.transform(stores, options)
## Returns a dataframe with columns from stores and new columns sales, sqft, and sales_per_sqft

Pipelines can also be used as inline transformations on other Labrea objects.

from labrea import dataset, Option, pipeline_step

@dataset
def letters() -> list[str]:
    return ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j']


@pipeline_step
def take_first_n(
        lst: list[str],
        n: int = Option('N')
) -> list[str]:
    return lst[:n]


first_n_letters = letters >> take_first_n

first_n_letters({'N': 3}) == ['a', 'b', 'c']

Helper Pipelines

Labrea provides a few helper functions for creating common pipeline steps. These are map, filter, and reduce, all under the labrea.functions module.

from labrea import dataset
import labrea.functions as lf

@dataset
def numbers() -> list[int]:
    return [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

sum_squared_evens = (
        numbers >> 
        lf.filter(lambda x: x % 2 == 0) >> 
        lf.map(lambda x: x**2) >> 
        lf.reduce(lambda x, y: x + y)
)

sum_squared_evens() == 220

Templates

Similar to the built-in f-strings, Labrea provides a Template type for string interpolation. This can be useful for creating strings that depend on config values, or for creating strings that depend on the results of other datasets.

from labrea import dataset, Option, Template

@dataset
def b_dataset(
        b: str = Option('B')
) -> str:
    return b

template = Template(
    '{A} {:b:}',
    b=b_dataset
)

template({'A': 'Hello', 'B': 'World!'})  ## 'Hello World!'

Dataset Classes

You may want to write classes with more complex behavior that use datasets and options as their inputs. Similar to the built-in dataclasses, we can use the @datasetclass decorator to create a class whose __init__ method takes an options dictionary and automatically evaluates dependencies like a dataset.

import pandas as pd
from labrea import dataset, datasetclass, Option

@dataset
def input_data(
        path: str = Option('INPUT.PATH'),
        fmt: str = Option('INPUT.FMT', 'csv')
) -> pd.DataFrame:
    if fmt == 'csv':
        return pd.read_csv(path)
    elif fmt == 'excel':
        return pd.read_excel(path)
    else:
        raise ValueError('Only csv and excel files are accepted.')

@dataset
def distinct_stores(data: pd.DataFrame = input_data) -> set[str]:
    return set(data['store_id'])

@dataset
def distinct_regions(data: pd.DataFrame = input_data) -> set[str]:
    return set(data['region_id'])


@datasetclass
class MyClass:
    data: pd.DataFrame = input_data
    stores: set[str] = distinct_stores
    region: set[str] = distinct_regions
    
    def lookup_store(self, store_id: str):
        if store_id not in self.stores:
            return None
        
        return self.data[self.data['store_id'] == store_id]]

options = {
    'INPUT.PATH': '/path/to/input.xlsx',
    'INPUT.FMT': 'excel'
}

my_data = MyClass(options)

my_data.regions == {'region_1', 'region_2', 'region_3'} 
my_data.lookup_store('<my_store>') == pd.DataFrame(...)

Typing

By default, labrea code is not going to pass a type checker (like MyPy) since the default arguments to datasets do not match the type annotations. For example:

from labrea import dataset, Option


@dataset
def add(
        x: int = Option('X'),
        y: int = Option('Y')
) -> int:
    return x + y

This will fail a type check because Option('X') is not an int, it’s an Option object. However, when defining datasets, we can appease the type checker by adding .result to the end of all of our Options like so:

from labrea import dataset, Option


@dataset
def add(
        x: int = Option('X').result,
        y: int = Option('Y').result
) -> int:
    return x + y

This will signal to the type checker that Option('X').result should be treated as the resulting value of the Option, rather than the option value itself.

The .result property is shared among all Labrea types (Option, Dataset, etc.). Whenever you are defining a dataset or DatasetClass, use the .result suffix on all your dependencies to pass type checking.

Subscripting

For Options, you can explicitly tell the type checker what the resulting type should be using Option[<type>](...).result syntax. This can be useful if you want to share options across datasets and ensure that the types all match.

from labrea import dataset, Option

X = Option[int]('X')


## PASSES
@dataset
def double(
        x: int = X.result
) -> int:
    return 2*x

## PASSES
@dataset
def halve(
        x: int = X.result
) -> float:
    return x/2.0

## FAILS
@dataset
def first_char(
        x: str = X.result
) -> str:
    return x[0]

MyPy Plugin

MyPy has trouble understanding the way the decorators for interfaces and dataset classes work. A MyPy plugin is provided to help with this. To use it, add the following to your mypy.ini file:

[mypy]
plugins = labrea.mypy.plugin