Scikit-Learn - Pipeline - Workarounds using transform when model is not the last step method 2.
In the previous post, we can use an EstimatorWrapper
:
class EstimatorWrapper(base.BaseEstimator, base.TransformerMixin):
def __init__(self, model: linear_model.LogisticRegression):
self.model: linear_model.LogisticRegression = model
def fit(self, X, y=None) -> EstimatorWrapper:
return self
def predict(self, X: Union[pd.DataFrame, Dict[str, Union[float, int]]]) -> np.ndarray:
return self.transform(X)
def transform(self, X: Union[pd.DataFrame, Dict[str, Union[float, int]]]) -> np.ndarray:
X = copy.copy(X)
if isinstance(X, dict):
return self.model.predict(np.array(list(X.values())).reshape(1, -1))
elif isinstance(X, pd.DataFrame):
return self.model.predict(X)
To workaround pipeline’s limitation with predictions that must be in the last stage. However we also required to override the transform function with:
model_with_pipe = pipeline.make_pipeline(
pipe,
wrapped_estimator,
SetOutput(
name="output",
)
)
model_with_pipe.predict = model_with_pipe.transform
If one was to require both predict and prediction then this method would fail. The other approach is we can override the pipeline function to detect the EstimatorWrapper, and use the corresponding function, such as:
class Pipeline(pipeline.Pipeline):
def predict_proba(self, X, **predict_proba_params):
Xt = X
for _, name, transform in self._iter():
if isinstance(transform, EstimatorWrapper):
Xt = transform.predict_proba(Xt)
else:
Xt = transform.transform(Xt)
return Xt
def predict(self, X, **predict_params):
Xt = X
for _, name, transform in self._iter():
if isinstance(transform, EstimatorWrapper):
Xt = transform.predict(Xt)
else:
Xt = transform.transform(Xt)
return Xt
def make_pipeline(*steps, memory=None, verbose=False):
return Pipeline(pipeline._name_estimators(steps), memory=memory, verbose=verbose)
If we test if the transform is a EstimatorWrapper
then use either the predict
or predict_proba
.
The EstimatorWrapper
can be re-written to have both predict
and predict_proba
such as:
class EstimatorWrapper(base.BaseEstimator, base.TransformerMixin):
def __init__(self, model: linear_model.LogisticRegression):
self.model: linear_model.LogisticRegression = model
def fit(self, X, y=None) -> EstimatorWrapper:
return self
def predict(self, X: Union[pd.DataFrame, Dict[str, Union[float, int]]]) -> np.ndarray:
if isinstance(X, dict):
return self.model.predict(np.array(list(X.values())).reshape(1, -1))
elif isinstance(X, pd.DataFrame):
return self.model.predict(X)
def predict_proba(self, X: Union[pd.DataFrame, Dict[str, Union[float, int]]]) -> np.ndarray:
if isinstance(X, dict):
return self.model.predict_proba(np.array(list(X.values())).reshape(1, -1))
elif isinstance(X, pd.DataFrame):
return self.model.predict_proba(X)
This would use the corresponding predict
or predict_proba
function.
A full example:
from __future__ import annotations
import pandas as pd
import numpy as np
import copy
from typing import Union, List, Dict
from sklearn import datasets, linear_model, model_selection, preprocessing, base, pipeline, utils
iris: utils.bunch.Bunch = datasets.load_iris(as_frame=True)
df: pd.DataFrame = iris["data"]
target: pd.Series = iris["target"]
X_train: pd.DataFrame
X_test: pd.DataFrame
y_train: pd.Series
y_test: pd.Series
X_train, X_test, y_train, y_test = model_selection.train_test_split(
df,
target,
test_size=0.3,
random_state=0
)
class Pipeline(pipeline.Pipeline):
def predict_proba(self, X, **predict_proba_params):
Xt = X
for _, name, transform in self._iter():
if isinstance(transform, EstimatorWrapper):
Xt = transform.predict_proba(Xt)
else:
Xt = transform.transform(Xt)
return Xt
def predict(self, X, **predict_params):
Xt = X
for _, name, transform in self._iter():
if isinstance(transform, EstimatorWrapper):
Xt = transform.predict(Xt)
else:
Xt = transform.transform(Xt)
return Xt
def make_pipeline(*steps, memory=None, verbose=False):
return Pipeline(pipeline._name_estimators(steps), memory=memory, verbose=verbose)
class Scaler(base.BaseEstimator, base.TransformerMixin):
def __init__(self, columns: List[str]):
self.columns: List[str] = columns
self.scalers: Dict[str, preprocessing.MinMaxScaler] = {column: preprocessing.MinMaxScaler() for column in self.columns}
def fit(self, X: pd.DataFrame, y=None) -> Scaler:
column: str
for column in self.columns:
self.scalers[column].fit(X[column].to_numpy().reshape(-1, 1))
return self
def transform(self, X: Union[pd.DataFrame, Dict[str, Union[float, int]]]) -> Union[pd.DataFrame, Dict[str, Union[float, int]]]:
X = copy.copy(X)
if isinstance(X, dict):
for column in self.columns:
X[column] = self.scalers[column].transform(np.array(X[column]).reshape(1, -1))
elif isinstance(X, pd.DataFrame):
for column in self.columns:
X[column] = self.scalers[column].transform(X[column].to_numpy().reshape(-1, 1))
return X
class EstimatorWrapper(base.BaseEstimator, base.TransformerMixin):
def __init__(self, model: linear_model.LogisticRegression):
self.model: linear_model.LogisticRegression = model
def fit(self, X, y=None) -> EstimatorWrapper:
return self
def predict(self, X: Union[pd.DataFrame, Dict[str, Union[float, int]]]) -> np.ndarray:
if isinstance(X, dict):
return self.model.predict(np.array(list(X.values())).reshape(1, -1))
elif isinstance(X, pd.DataFrame):
return self.model.predict(X)
def predict_proba(self, X: Union[pd.DataFrame, Dict[str, Union[float, int]]]) -> np.ndarray:
if isinstance(X, dict):
return self.model.predict_proba(np.array(list(X.values())).reshape(1, -1))
elif isinstance(X, pd.DataFrame):
return self.model.predict_proba(X)
class SetOutput(base.BaseEstimator, base.TransformerMixin):
def __init__(self, name: str):
self.name: str = name
def fit(self, X: pd.DataFrame, y=None) -> SetOutput:
return self
def transform(self, X: np.ndarray) -> Union[pd.DataFrame, Dict[str, Union[float, int]]]:
if X.shape[0] > 1:
if len(X.shape) == 1:
return pd.DataFrame({self.name: X})
else:
return pd.DataFrame(X, columns=[f"{self.name}_{count}" for count, _ in enumerate(X[0])])
elif X.shape[0] == 1:
if len(X.shape) == 1:
return {self.name: float(X[0])}
else:
return {f"{self.name}_{count}": float(val) for count, val in enumerate(X[0])}
pipe: Pipeline = make_pipeline(
Scaler(
columns=[
'sepal length (cm)'
],
)
)
X_train = pipe.fit_transform(X_train, y_train)
X_test_transformed = pipe.transform(X_test)
model = linear_model.LogisticRegression()
model.fit(X_train, y_train)
wrapped_estimator = EstimatorWrapper(
model=model,
)
model_with_pipe = make_pipeline(
pipe,
wrapped_estimator,
SetOutput(
name="output",
)
)
model_with_pipe.predict(X_test)
| | output | |—:|:——-| | 0 | 2 | …(more rows)
model_with_pipe.predict_proba(X_test)
output_1 | output_2 | output_3 | |
---|---|---|---|
0 | 0.000146 | 0.073410 | 9.264445e-01 |
…(more rows)
model_with_pipe.predict(dict(X_test.iloc[0]))
{'output': 2.0}
model_with_pipe.predict_proba(dict(X_test.iloc[0]))
{'output_0': 0.0001456604268545252,
'output_1': 0.07340981585792805,
'output_2': 0.9264445237152173}