Scikit-Learn - Pipeline - Workarounds using transform when model is not the last step method 2.

Posted on Jun 12, 2022

In the previous post, we can use an EstimatorWrapper:

class EstimatorWrapper(base.BaseEstimator, base.TransformerMixin):
    def __init__(self, model: linear_model.LogisticRegression):
        self.model: linear_model.LogisticRegression = model

    def fit(self, X, y=None) -> EstimatorWrapper:
        return self

    def predict(self, X: Union[pd.DataFrame, Dict[str, Union[float, int]]]) -> np.ndarray:
        return self.transform(X)

    def transform(self, X: Union[pd.DataFrame, Dict[str, Union[float, int]]]) -> np.ndarray:
        X = copy.copy(X)

        if isinstance(X, dict):
            return self.model.predict(np.array(list(X.values())).reshape(1, -1))
        elif isinstance(X, pd.DataFrame):
            return self.model.predict(X)

To workaround pipeline’s limitation with predictions that must be in the last stage. However we also required to override the transform function with:

model_with_pipe = pipeline.make_pipeline(
    pipe,
    wrapped_estimator,
    SetOutput(
        name="output",
    )
)
model_with_pipe.predict = model_with_pipe.transform

If one was to require both predict and prediction then this method would fail. The other approach is we can override the pipeline function to detect the EstimatorWrapper, and use the corresponding function, such as:

class Pipeline(pipeline.Pipeline):
    def predict_proba(self, X, **predict_proba_params):
        Xt = X
        for _, name, transform in self._iter():
            if isinstance(transform, EstimatorWrapper):
                Xt = transform.predict_proba(Xt)
            else:
                Xt = transform.transform(Xt)
        return Xt

    def predict(self, X, **predict_params):
        Xt = X
        for _, name, transform in self._iter():
            if isinstance(transform, EstimatorWrapper):
                Xt = transform.predict(Xt)
            else:
                Xt = transform.transform(Xt)
        return Xt

def make_pipeline(*steps, memory=None, verbose=False):
    return Pipeline(pipeline._name_estimators(steps), memory=memory, verbose=verbose)

If we test if the transform is a EstimatorWrapper then use either the predict or predict_proba.

The EstimatorWrapper can be re-written to have both predict and predict_proba such as:

class EstimatorWrapper(base.BaseEstimator, base.TransformerMixin):
    def __init__(self, model: linear_model.LogisticRegression):
        self.model: linear_model.LogisticRegression = model

    def fit(self, X, y=None) -> EstimatorWrapper:
        return self

    def predict(self, X: Union[pd.DataFrame, Dict[str, Union[float, int]]]) -> np.ndarray:
        if isinstance(X, dict):
            return self.model.predict(np.array(list(X.values())).reshape(1, -1))
        elif isinstance(X, pd.DataFrame):
            return self.model.predict(X)

    def predict_proba(self, X: Union[pd.DataFrame, Dict[str, Union[float, int]]]) -> np.ndarray:
        if isinstance(X, dict):
            return self.model.predict_proba(np.array(list(X.values())).reshape(1, -1))
        elif isinstance(X, pd.DataFrame):
            return self.model.predict_proba(X)

This would use the corresponding predict or predict_proba function.

A full example:

from __future__ import annotations

import pandas as pd
import numpy as np
import copy
from typing import Union, List, Dict
from sklearn import datasets, linear_model, model_selection, preprocessing, base, pipeline, utils

iris: utils.bunch.Bunch = datasets.load_iris(as_frame=True)

df: pd.DataFrame = iris["data"]
target: pd.Series = iris["target"]

X_train: pd.DataFrame
X_test: pd.DataFrame
y_train: pd.Series
y_test: pd.Series
X_train, X_test, y_train, y_test = model_selection.train_test_split(
    df,
    target,
    test_size=0.3,
    random_state=0
)

class Pipeline(pipeline.Pipeline):
    def predict_proba(self, X, **predict_proba_params):
        Xt = X
        for _, name, transform in self._iter():
            if isinstance(transform, EstimatorWrapper):
                Xt = transform.predict_proba(Xt)
            else:
                Xt = transform.transform(Xt)
        return Xt

    def predict(self, X, **predict_params):
        Xt = X
        for _, name, transform in self._iter():
            if isinstance(transform, EstimatorWrapper):
                Xt = transform.predict(Xt)
            else:
                Xt = transform.transform(Xt)
        return Xt

def make_pipeline(*steps, memory=None, verbose=False):
    return Pipeline(pipeline._name_estimators(steps), memory=memory, verbose=verbose)

class Scaler(base.BaseEstimator, base.TransformerMixin):
    def __init__(self, columns: List[str]):
        self.columns: List[str] = columns
        self.scalers: Dict[str, preprocessing.MinMaxScaler] = {column: preprocessing.MinMaxScaler() for column in self.columns}

    def fit(self, X: pd.DataFrame, y=None) -> Scaler:
        column: str
        for column in self.columns:
            self.scalers[column].fit(X[column].to_numpy().reshape(-1, 1))

        return self

    def transform(self, X: Union[pd.DataFrame, Dict[str, Union[float, int]]]) -> Union[pd.DataFrame, Dict[str, Union[float, int]]]:
        X = copy.copy(X)

        if isinstance(X, dict):
            for column in self.columns:
                X[column] = self.scalers[column].transform(np.array(X[column]).reshape(1, -1))
        elif isinstance(X, pd.DataFrame):
            for column in self.columns:
                X[column] = self.scalers[column].transform(X[column].to_numpy().reshape(-1, 1))

        return X

class EstimatorWrapper(base.BaseEstimator, base.TransformerMixin):
    def __init__(self, model: linear_model.LogisticRegression):
        self.model: linear_model.LogisticRegression = model

    def fit(self, X, y=None) -> EstimatorWrapper:
        return self

    def predict(self, X: Union[pd.DataFrame, Dict[str, Union[float, int]]]) -> np.ndarray:
        if isinstance(X, dict):
            return self.model.predict(np.array(list(X.values())).reshape(1, -1))
        elif isinstance(X, pd.DataFrame):
            return self.model.predict(X)

    def predict_proba(self, X: Union[pd.DataFrame, Dict[str, Union[float, int]]]) -> np.ndarray:
        if isinstance(X, dict):
            return self.model.predict_proba(np.array(list(X.values())).reshape(1, -1))
        elif isinstance(X, pd.DataFrame):
            return self.model.predict_proba(X)

class SetOutput(base.BaseEstimator, base.TransformerMixin):
    def __init__(self, name: str):
        self.name: str = name

    def fit(self, X: pd.DataFrame, y=None) -> SetOutput:
        return self

    def transform(self, X: np.ndarray) -> Union[pd.DataFrame, Dict[str, Union[float, int]]]:
        if X.shape[0] > 1:
            if len(X.shape) == 1:
                return pd.DataFrame({self.name: X})
            else:
                return pd.DataFrame(X, columns=[f"{self.name}_{count}" for count, _ in enumerate(X[0])])

        elif X.shape[0] == 1:
            if len(X.shape) == 1:
                return {self.name: float(X[0])}
            else:
                return {f"{self.name}_{count}": float(val) for count, val in enumerate(X[0])}
      
pipe: Pipeline = make_pipeline(
    Scaler(
        columns=[
            'sepal length (cm)'
        ],
    )
)
X_train = pipe.fit_transform(X_train, y_train)
X_test_transformed = pipe.transform(X_test)

model = linear_model.LogisticRegression()
model.fit(X_train, y_train)
wrapped_estimator = EstimatorWrapper(
    model=model,
)

model_with_pipe = make_pipeline(
    pipe,
    wrapped_estimator,
    SetOutput(
        name="output",
    )
)
model_with_pipe.predict(X_test)

| | output | |—:|:——-| | 0 | 2 | …(more rows)

model_with_pipe.predict_proba(X_test)
output_1 output_2 output_3
0 0.000146 0.073410 9.264445e-01

…(more rows)

model_with_pipe.predict(dict(X_test.iloc[0]))
{'output': 2.0}
model_with_pipe.predict_proba(dict(X_test.iloc[0]))
{'output_0': 0.0001456604268545252,
 'output_1': 0.07340981585792805,
 'output_2': 0.9264445237152173}