Scikit-Learn - Pipeline - Workarounds using transform when model is not the last step.
One issue with Scikit-learn’s Pipeline in production is that it requires an estimator to be placed at the final stage. A lot of times however you’ll need to do extra logic after the final step, for example normalization of the output values, or perform some arithmetic.
@available_if(_final_estimator_has("predict"))
def predict(self, X, **predict_params):
Xt = X
for _, name, transform in self._iter(with_final=False):
Xt = transform.transform(Xt)
return self.steps[-1][1].predict(Xt, **predict_params)
In order to use the function, it’s required that the final estimator is a model that has a predict function, and we’ll only call the last step’s predict function due to self.steps[-1][1].predict(Xt, **predict_params)
.
One approach is to wrap around the model into a transformer step, and the other approach is to update the Pipeline class. This post will be about wrapping a model.
An example pipeline without using Pipelines
from __future__ import annotations
import pandas as pd
import numpy as np
import copy
from typing import Union, List, Dict
from sklearn import datasets, linear_model, model_selection, preprocessing, metrics, base, pipeline, utils
iris: utils.bunch.Bunch = datasets.load_iris(as_frame=True)
df: pd.DataFrame = iris["data"]
target: pd.Series = iris["target"]
X_train: pd.DataFrame
X_test: pd.DataFrame
y_train: pd.Series
y_test: pd.Series
X_train, X_test, y_train, y_test = model_selection.train_test_split(
df,
target,
test_size=0.3,
random_state=0
)
scaler: preprocessing.MinMaxScaler = preprocessing.MinMaxScaler()
X_train['sepal length (cm)'] = scaler.fit_transform(X_train['sepal length (cm)'].to_numpy().reshape(-1, 1))
X_test['sepal length (cm)'] = scaler.transform(X_test['sepal length (cm)'].to_numpy().reshape(-1, 1))
model: linear_model.LogisticRegression = linear_model.LogisticRegression().fit(X_train, y_train)
metrics.accuracy_score(y_test, model.predict(X_test))
To support dictionaries during prediction, we’ll need to create custom transformers for preprocessing.MinMaxScaler
, linear_model.LogisticRegression
and a conversion to a dictionary format later, since the model outputs numpy arrays.
class Scaler(base.BaseEstimator, base.TransformerMixin):
def __init__(self, columns: List[str]):
self.columns: List[str] = columns
self.scalers: Dict[str, preprocessing.MinMaxScaler] = {column: preprocessing.MinMaxScaler() for column in self.columns}
def fit(self, X: pd.DataFrame, y=None) -> Scaler:
column: str
for column in self.columns:
self.scalers[column].fit(X[column].to_numpy().reshape(-1, 1))
return self
def transform(self, X: Union[pd.DataFrame, Dict[str, Union[float, int]]]) -> Union[pd.DataFrame, Dict[str, Union[float, int]]]:
X = copy.copy(X)
if isinstance(X, dict):
for column in self.columns:
X[column] = self.scalers[column].transform(np.array(X[column]).reshape(1, -1))
elif isinstance(X, pd.DataFrame):
for column in self.columns:
X[column] = self.scalers[column].transform(X[column].to_numpy().reshape(-1, 1))
return X
class EstimatorWrapper(base.BaseEstimator, base.TransformerMixin):
def __init__(self, model: linear_model.LogisticRegression):
self.model: linear_model.LogisticRegression = model
def fit(self, X, y=None) -> EstimatorWrapper:
return self
def predict(self, X: Union[pd.DataFrame, Dict[str, Union[float, int]]]) -> np.ndarray:
return self.transform(X)
def transform(self, X: Union[pd.DataFrame, Dict[str, Union[float, int]]]) -> np.ndarray:
X = copy.copy(X)
if isinstance(X, dict):
return self.model.predict(np.array(list(X.values())).reshape(1, -1))
elif isinstance(X, pd.DataFrame):
return self.model.predict(X)
class SetOutput(base.BaseEstimator, base.TransformerMixin):
def __init__(self, name: str):
self.name: str = name
def fit(self, X: pd.DataFrame, y=None) -> SetOutput:
return self
def transform(self, X: np.ndarray) -> Union[pd.DataFrame, Dict[str, Union[float, int]]]:
if X.shape[0] > 1:
return pd.DataFrame({self.name: X})
elif X.shape[0] == 1:
return {self.name: float(X[0])}
A note for the SetOutput
function is to cast numpy
on the output, otherwise it will be a numpy.float
and this object is not pickle-able.
We can then create a pipeline for the transformers, although there’s only one transformers here, but if we had multiple transformers we can fit them first before going to the model.
pipe: pipeline.Pipeline = pipeline.make_pipeline(
Scaler(
columns=[
'sepal length (cm)'
],
)
)
X_train = pipe.fit_transform(X_train, y_train)
You could also fit them with the model, and include this with GridSearchCV, or without. I prefer to only create a pipeline with the transformers first, and then transform all the dataset, since GridSearchCV
will also fit the pipeline, which makes the hyperparameter tuning phase slow.
Then we can create a model, and throw it inside our EstimatorWrapper
:
model = linear_model.LogisticRegression()
model.fit(X_train, y_train)
wrapped_estimator = EstimatorWrapper(
model=model,
)
Then once we are done with any model tuning that we want, we can create another pipeline that stacks: 1. Preprocessing 2. Model 3. Post processing
model_with_pipe = pipeline.make_pipeline(
pipe,
wrapped_estimator,
SetOutput(
name="output",
)
)
model_with_pipe.predict = model_with_pipe.transform
Then we can call the pipeline a predict
since we are just calling the transform per each step by step, we can safely override the predict function.
model_with_pipe.predict(X_test)
or with a dictionary
model_with_pipe.predict(dict(X_test.iloc[0]))
If we used the model without the wrapper, it can only work as the last stage:
model_with_pipe = pipeline.make_pipeline(
pipe,
model,
)
And if we added any post processing, it would error out during prediction, i.e.:
model_with_pipe = pipeline.make_pipeline(
pipe,
model,
SetOutput(
name="output",
)
)
model_with_pipe.predict(X_test)
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
/tmp/ipykernel_2821917/1686141088.py in <cell line: 1>()
----> 1 model_with_pipe.predict(X_test)
~/.pyenv/versions/3.10.4/envs/3_10_4/lib/python3.10/site-packages/sklearn/utils/metaestimators.py in __get__(self, obj, owner)
125 # delegate only on instances, not the classes.
126 # this is to allow access to the docstrings.
--> 127 if not self.check(obj):
128 raise attr_err
129 out = MethodType(self.fn, obj)
~/.pyenv/versions/3.10.4/envs/3_10_4/lib/python3.10/site-packages/sklearn/pipeline.py in check(self)
44 def check(self):
45 # raise original `AttributeError` if `attr` does not exist
---> 46 getattr(self._final_estimator, attr)
47 return True
48
AttributeError: 'SetOutput' object has no attribute 'predict'