Scikit-Learn - Pipeline - Getting previous transformers.
One of the issues with pipeline is that a step is unable to inspect any previous steps. And this is important for transformers such as Binning after a Weight of Encoding.
When you bin the results after WoE, you will get different bins, and for each different bins you would only know what values went in the bin after the encoding. So in order to get the previous names, you’ll need to access the WoE’s mapper.
However, with the current scikit-learn implementation it is not possible, since transformers are not passed through fit_params
.
Similar to the previous blog post, everything should be the same except Pipeline
class.
def make_pipeline(*steps, memory=None, verbose=False):
return Pipeline(pipeline._name_estimators(steps), memory=memory, verbose=verbose)
class Scaler(base.BaseEstimator, base.TransformerMixin):
def __init__(self, columns: List[str]):
self.columns: List[str] = columns
self.scalers: Dict[str, preprocessing.MinMaxScaler] = {column: preprocessing.MinMaxScaler() for column in self.columns}
self.original_names = None
def fit(self, X: pd.DataFrame, y=None, **fit_params) -> Scaler:
if "transformers" in fit_params and fit_params["transformers"] and isinstance(fit_params["transformers"][-1], Rename):
self.original_names = fit_params["transformers"][-1].names
column: str
for column in self.columns:
self.scalers[column].fit(X[column].to_numpy().reshape(-1, 1))
return self
def transform(self, X: Union[pd.DataFrame, Dict[str, Union[float, int]]]) -> Union[pd.DataFrame, Dict[str, Union[float, int]]]:
X = copy.copy(X)
if isinstance(X, dict):
for column in self.columns:
X[column] = self.scalers[column].transform(np.array(X[column]).reshape(1, -1))
elif isinstance(X, pd.DataFrame):
for column in self.columns:
X[column] = self.scalers[column].transform(X[column].to_numpy().reshape(-1, 1))
return X
class SetOutput(base.BaseEstimator, base.TransformerMixin):
def __init__(self, name: str):
self.name: str = name
def fit(self, X: pd.DataFrame, y=None, **fit_params) -> SetOutput:
return self
def transform(self, X: np.ndarray) -> Union[pd.DataFrame, Dict[str, Union[float, int]]]:
if X.shape[0] > 1:
if len(X.shape) == 1:
return pd.DataFrame({self.name: X})
else:
return pd.DataFrame(X, columns=[f"{self.name}_{count}" for count, _ in enumerate(X[0])])
elif X.shape[0] == 1:
if len(X.shape) == 1:
return {self.name: float(X[0])}
else:
return {f"{self.name}_{count}": float(val) for count, val in enumerate(X[0])}
With the extra addition of the Rename
class, which can change the column names in our dataframe or dictionary. And then passing this variable via the fit_params
dictionary using **{**fit_params_steps[name], **{"transformers": transformers}}
.
After the transformer is fitted, then we’ll need to add the fitted_transformer
into the transformers
list by using transformers.append(fitted_transformer)
.
class Rename(base.BaseEstimator, base.TransformerMixin):
def __init__(self, names: dict):
self.names: dict = names
def fit(self, X: pd.DataFrame, y=None, **fit_params) -> SetOutput:
return self
def transform(self, X: Union[pd.DataFrame, Dict[str, Union[float, int]]]) -> Union[pd.DataFrame, Dict[str, Union[float, int]]]:
X = copy.copy(X)
if isinstance(X, dict):
X = {name_after: X[name_before] for name_before, name_after in self.names.items()} | {key: value for key, value in X.items() if key not in self.names}
elif isinstance(X, pd.DataFrame):
X = X.rename(columns=self.names)
return X
We need to modify the pipeline class to store previous transformers, specifically transformers = []
class Pipeline(pipeline.Pipeline):
def predict_proba(self, X, **predict_proba_params):
Xt = X
for _, name, transform in self._iter():
if isinstance(transform, EstimatorWrapper):
Xt = transform.predict_proba(Xt)
else:
Xt = transform.transform(Xt)
return Xt
def predict(self, X, **predict_params):
Xt = X
for _, name, transform in self._iter():
if isinstance(transform, EstimatorWrapper):
Xt = transform.predict(Xt)
else:
Xt = transform.transform(Xt)
return Xt
def _fit(self, X, y=None, **fit_params_steps):
# shallow copy of steps - this should really be steps_
self.steps = list(self.steps)
self._validate_steps()
# Setup the memory
memory = check_memory(self.memory)
fit_transform_one_cached = memory.cache(_fit_transform_one)
# Keep a list of the previous transformer
transformers = []
for step_idx, name, transformer in self._iter(
with_final=False, filter_passthrough=False
):
if transformer is None or transformer == "passthrough":
with _print_elapsed_time("Pipeline", self._log_message(step_idx)):
continue
if hasattr(memory, "location") and memory.location is None:
# we do not clone when caching is disabled to
# preserve backward compatibility
cloned_transformer = transformer
else:
cloned_transformer = clone(transformer)
# Fit or load from cache the current transformer
X, fitted_transformer = fit_transform_one_cached(
cloned_transformer,
X,
y,
None,
message_clsname="Pipeline",
message=self._log_message(step_idx),
**{**fit_params_steps[name], **{"transformers": transformers}},
)
transformers.append(fitted_transformer)
# Replace the transformer of the step with the fitted
# transformer. This is necessary when loading the transformer
# from the cache.
self.steps[step_idx] = (name, fitted_transformer)
return X, transformers
def fit(self, X, y=None, **fit_params):
"""Fit the model.
Fit all the transformers one after the other and transform the
data. Finally, fit the transformed data using the final estimator.
Parameters
----------
X : iterable
Training data. Must fulfill input requirements of first step of the
pipeline.
y : iterable, default=None
Training targets. Must fulfill label requirements for all steps of
the pipeline.
**fit_params : dict of string -> object
Parameters passed to the ``fit`` method of each step, where
each parameter name is prefixed such that parameter ``p`` for step
``s`` has key ``s__p``.
Returns
-------
self : object
Pipeline with fitted steps.
"""
fit_params_steps = self._check_fit_params(**fit_params)
Xt, transformers = self._fit(X, y, **fit_params_steps)
with _print_elapsed_time("Pipeline", self._log_message(len(self.steps) - 1)):
if self._final_estimator != "passthrough":
fit_params_last_step = fit_params_steps[self.steps[-1][0]]
self._final_estimator.fit(Xt, y, **{**fit_params_last_step, **{"transformers": transformers}})
return self
def fit_transform(self, X, y=None, **fit_params):
"""Fit the model and transform with the final estimator.
Fits all the transformers one after the other and transform the
data. Then uses `fit_transform` on transformed data with the final
estimator.
Parameters
----------
X : iterable
Training data. Must fulfill input requirements of first step of the
pipeline.
y : iterable, default=None
Training targets. Must fulfill label requirements for all steps of
the pipeline.
**fit_params : dict of string -> object
Parameters passed to the ``fit`` method of each step, where
each parameter name is prefixed such that parameter ``p`` for step
``s`` has key ``s__p``.
Returns
-------
Xt : ndarray of shape (n_samples, n_transformed_features)
Transformed samples.
"""
fit_params_steps = self._check_fit_params(**fit_params)
Xt, transformers = self._fit(X, y, **fit_params_steps)
last_step = self._final_estimator
with _print_elapsed_time("Pipeline", self._log_message(len(self.steps) - 1)):
if last_step == "passthrough":
return Xt
fit_params_last_step = fit_params_steps[self.steps[-1][0]]
if hasattr(last_step, "fit_transform"):
return last_step.fit_transform(Xt, y, **{**fit_params_last_step, **{"transformers": transformers}})
else:
return last_step.fit(Xt, y, **{**fit_params_last_step, **{"transformers": transformers}}).transform(Xt)
Changing the _fit
function would only include the transformer in all the transformers except the last stage, since:
for step_idx, name, transformer in self._iter(
with_final=False, filter_passthrough=False
):
Since this loop has with_final=False
, then we’ll also need to modify the fit
and fit_transform
function too for the last stage, via:
# For fit function
if self._final_estimator != "passthrough":
fit_params_last_step = fit_params_steps[self.steps[-1][0]]
self._final_estimator.fit(Xt, y, **{**fit_params_last_step, **{"transformers": transformers}})
# For fit_transform
if last_step == "passthrough":
return Xt
fit_params_last_step = fit_params_steps[self.steps[-1][0]]
if hasattr(last_step, "fit_transform"):
return last_step.fit_transform(Xt, y, **{**fit_params_last_step, **{"transformers": transformers}})
else:
return last_step.fit(Xt, y, **{**fit_params_last_step, **{"transformers": transformers}}).transform(Xt)
Modifying these two functions would allow us to as the transformers on all the transformers.
With the scaler, we could set self.original_name
in the fit
function by inspecting transformers
. Since the previous transformers are added into fit_params
in the Pipeline
class.
class EstimatorWrapper(base.BaseEstimator, base.TransformerMixin):
def __init__(self, model: linear_model.LogisticRegression):
self.model: linear_model.LogisticRegression = model
def fit(self, X, y=None, **fit_params) -> EstimatorWrapper:
return self
def predict(self, X: Union[pd.DataFrame, Dict[str, Union[float, int]]]) -> np.ndarray:
if isinstance(X, dict):
return self.model.predict(np.array(list(X.values())).reshape(1, -1))
elif isinstance(X, pd.DataFrame):
return self.model.predict(X)
def predict_proba(self, X: Union[pd.DataFrame, Dict[str, Union[float, int]]]) -> np.ndarray:
if isinstance(X, dict):
return self.model.predict_proba(np.array(list(X.values())).reshape(1, -1))
elif isinstance(X, pd.DataFrame):
return self.model.predict_proba(X)
As an example, we’ll record down the original names from the Rename
class in the Scaler
.
pipe: Pipeline = make_pipeline(
Rename(
names={'sepal length (cm)': 'sepal length'}
),
Scaler(
columns=[
'sepal length'
],
)
)
X_train = pipe.fit_transform(X_train, y_train)
X_test_transformed = pipe.transform(X_test)
pipe.steps[-1][1].original_names
{'sepal length (cm)': 'sepal length'}
We can also modify the Pipeline
to include names of the transformers. But most likely you’ll be using just the previous transformer, so a list would be fine in most cases.