Scikit-Learn - Pipeline - Getting previous transformers.

Posted on Jul 20, 2022

One of the issues with pipeline is that a step is unable to inspect any previous steps. And this is important for transformers such as Binning after a Weight of Encoding.

When you bin the results after WoE, you will get different bins, and for each different bins you would only know what values went in the bin after the encoding. So in order to get the previous names, you’ll need to access the WoE’s mapper.

However, with the current scikit-learn implementation it is not possible, since transformers are not passed through fit_params.

Similar to the previous blog post, everything should be the same except Pipeline class.

def make_pipeline(*steps, memory=None, verbose=False):
    return Pipeline(pipeline._name_estimators(steps), memory=memory, verbose=verbose)

class Scaler(base.BaseEstimator, base.TransformerMixin):
    def __init__(self, columns: List[str]):
        self.columns: List[str] = columns
        self.scalers: Dict[str, preprocessing.MinMaxScaler] = {column: preprocessing.MinMaxScaler() for column in self.columns}
        self.original_names = None

    def fit(self, X: pd.DataFrame, y=None, **fit_params) -> Scaler:
        if "transformers" in fit_params and fit_params["transformers"] and isinstance(fit_params["transformers"][-1], Rename):
            self.original_names = fit_params["transformers"][-1].names

        column: str
        for column in self.columns:
            self.scalers[column].fit(X[column].to_numpy().reshape(-1, 1))

        return self

    def transform(self, X: Union[pd.DataFrame, Dict[str, Union[float, int]]]) -> Union[pd.DataFrame, Dict[str, Union[float, int]]]:
        X = copy.copy(X)

        if isinstance(X, dict):
            for column in self.columns:
                X[column] = self.scalers[column].transform(np.array(X[column]).reshape(1, -1))
        elif isinstance(X, pd.DataFrame):
            for column in self.columns:
                X[column] = self.scalers[column].transform(X[column].to_numpy().reshape(-1, 1))

        return X


class SetOutput(base.BaseEstimator, base.TransformerMixin):
    def __init__(self, name: str):
        self.name: str = name

    def fit(self, X: pd.DataFrame, y=None, **fit_params) -> SetOutput:
        return self

    def transform(self, X: np.ndarray) -> Union[pd.DataFrame, Dict[str, Union[float, int]]]:
        if X.shape[0] > 1:
            if len(X.shape) == 1:
                return pd.DataFrame({self.name: X})
            else:
                return pd.DataFrame(X, columns=[f"{self.name}_{count}" for count, _ in enumerate(X[0])])

        elif X.shape[0] == 1:
            if len(X.shape) == 1:
                return {self.name: float(X[0])}
            else:
                return {f"{self.name}_{count}": float(val) for count, val in enumerate(X[0])}

With the extra addition of the Rename class, which can change the column names in our dataframe or dictionary. And then passing this variable via the fit_params dictionary using **{**fit_params_steps[name], **{"transformers": transformers}}.

After the transformer is fitted, then we’ll need to add the fitted_transformer into the transformers list by using transformers.append(fitted_transformer).

class Rename(base.BaseEstimator, base.TransformerMixin):
    def __init__(self, names: dict):
        self.names: dict = names

    def fit(self, X: pd.DataFrame, y=None, **fit_params) -> SetOutput:
        return self

    def transform(self, X: Union[pd.DataFrame, Dict[str, Union[float, int]]]) -> Union[pd.DataFrame, Dict[str, Union[float, int]]]:
        X = copy.copy(X)

        if isinstance(X, dict):
            X = {name_after: X[name_before] for name_before, name_after in self.names.items()} | {key: value for key, value in X.items() if key not in self.names}

        elif isinstance(X, pd.DataFrame):
            X = X.rename(columns=self.names)

        return X

We need to modify the pipeline class to store previous transformers, specifically transformers = []

class Pipeline(pipeline.Pipeline):
    def predict_proba(self, X, **predict_proba_params):
        Xt = X
        for _, name, transform in self._iter():
            if isinstance(transform, EstimatorWrapper):
                Xt = transform.predict_proba(Xt)
            else:
                Xt = transform.transform(Xt)
        return Xt

    def predict(self, X, **predict_params):
        Xt = X
        for _, name, transform in self._iter():
            if isinstance(transform, EstimatorWrapper):
                Xt = transform.predict(Xt)
            else:
                Xt = transform.transform(Xt)
        return Xt

    def _fit(self, X, y=None, **fit_params_steps):
        # shallow copy of steps - this should really be steps_
        self.steps = list(self.steps)
        self._validate_steps()
        # Setup the memory
        memory = check_memory(self.memory)

        fit_transform_one_cached = memory.cache(_fit_transform_one)

        # Keep a list of the previous transformer
        transformers = []

        for step_idx, name, transformer in self._iter(
            with_final=False, filter_passthrough=False
        ):
            if transformer is None or transformer == "passthrough":
                with _print_elapsed_time("Pipeline", self._log_message(step_idx)):
                    continue

            if hasattr(memory, "location") and memory.location is None:
                # we do not clone when caching is disabled to
                # preserve backward compatibility
                cloned_transformer = transformer
            else:
                cloned_transformer = clone(transformer)
            # Fit or load from cache the current transformer
            X, fitted_transformer = fit_transform_one_cached(
                cloned_transformer,
                X,
                y,
                None,
                message_clsname="Pipeline",
                message=self._log_message(step_idx),
                **{**fit_params_steps[name], **{"transformers": transformers}},
            )
            transformers.append(fitted_transformer)
            # Replace the transformer of the step with the fitted
            # transformer. This is necessary when loading the transformer
            # from the cache.
            self.steps[step_idx] = (name, fitted_transformer)
        return X, transformers

    def fit(self, X, y=None, **fit_params):
        """Fit the model.
        Fit all the transformers one after the other and transform the
        data. Finally, fit the transformed data using the final estimator.
        Parameters
        ----------
        X : iterable
            Training data. Must fulfill input requirements of first step of the
            pipeline.
        y : iterable, default=None
            Training targets. Must fulfill label requirements for all steps of
            the pipeline.
        **fit_params : dict of string -> object
            Parameters passed to the ``fit`` method of each step, where
            each parameter name is prefixed such that parameter ``p`` for step
            ``s`` has key ``s__p``.
        Returns
        -------
        self : object
            Pipeline with fitted steps.
        """
        fit_params_steps = self._check_fit_params(**fit_params)
        Xt, transformers = self._fit(X, y, **fit_params_steps)
        with _print_elapsed_time("Pipeline", self._log_message(len(self.steps) - 1)):
            if self._final_estimator != "passthrough":
                fit_params_last_step = fit_params_steps[self.steps[-1][0]]
                self._final_estimator.fit(Xt, y, **{**fit_params_last_step, **{"transformers": transformers}})

        return self

    def fit_transform(self, X, y=None, **fit_params):
        """Fit the model and transform with the final estimator.
        Fits all the transformers one after the other and transform the
        data. Then uses `fit_transform` on transformed data with the final
        estimator.
        Parameters
        ----------
        X : iterable
            Training data. Must fulfill input requirements of first step of the
            pipeline.
        y : iterable, default=None
            Training targets. Must fulfill label requirements for all steps of
            the pipeline.
        **fit_params : dict of string -> object
            Parameters passed to the ``fit`` method of each step, where
            each parameter name is prefixed such that parameter ``p`` for step
            ``s`` has key ``s__p``.
        Returns
        -------
        Xt : ndarray of shape (n_samples, n_transformed_features)
            Transformed samples.
        """
        fit_params_steps = self._check_fit_params(**fit_params)
        Xt, transformers = self._fit(X, y, **fit_params_steps)

        last_step = self._final_estimator
        with _print_elapsed_time("Pipeline", self._log_message(len(self.steps) - 1)):
            if last_step == "passthrough":
                return Xt
            fit_params_last_step = fit_params_steps[self.steps[-1][0]]
            if hasattr(last_step, "fit_transform"):
                return last_step.fit_transform(Xt, y, **{**fit_params_last_step, **{"transformers": transformers}})
            else:
                return last_step.fit(Xt, y, **{**fit_params_last_step, **{"transformers": transformers}}).transform(Xt)

Changing the _fit function would only include the transformer in all the transformers except the last stage, since:

for step_idx, name, transformer in self._iter(
    with_final=False, filter_passthrough=False
):

Since this loop has with_final=False, then we’ll also need to modify the fit and fit_transform function too for the last stage, via:

# For fit function
if self._final_estimator != "passthrough":
    fit_params_last_step = fit_params_steps[self.steps[-1][0]]
    self._final_estimator.fit(Xt, y, **{**fit_params_last_step, **{"transformers": transformers}})
# For fit_transform
if last_step == "passthrough":
    return Xt
fit_params_last_step = fit_params_steps[self.steps[-1][0]]
if hasattr(last_step, "fit_transform"):
    return last_step.fit_transform(Xt, y, **{**fit_params_last_step, **{"transformers": transformers}})
else:
    return last_step.fit(Xt, y, **{**fit_params_last_step, **{"transformers": transformers}}).transform(Xt)

Modifying these two functions would allow us to as the transformers on all the transformers.

With the scaler, we could set self.original_name in the fit function by inspecting transformers. Since the previous transformers are added into fit_params in the Pipeline class.

class EstimatorWrapper(base.BaseEstimator, base.TransformerMixin):
    def __init__(self, model: linear_model.LogisticRegression):
        self.model: linear_model.LogisticRegression = model

    def fit(self, X, y=None, **fit_params) -> EstimatorWrapper:
        return self

    def predict(self, X: Union[pd.DataFrame, Dict[str, Union[float, int]]]) -> np.ndarray:
        if isinstance(X, dict):
            return self.model.predict(np.array(list(X.values())).reshape(1, -1))
        elif isinstance(X, pd.DataFrame):
            return self.model.predict(X)

    def predict_proba(self, X: Union[pd.DataFrame, Dict[str, Union[float, int]]]) -> np.ndarray:
        if isinstance(X, dict):
            return self.model.predict_proba(np.array(list(X.values())).reshape(1, -1))
        elif isinstance(X, pd.DataFrame):
            return self.model.predict_proba(X)

As an example, we’ll record down the original names from the Rename class in the Scaler.

pipe: Pipeline = make_pipeline(
    Rename(
        names={'sepal length (cm)': 'sepal length'}
    ),
    Scaler(
        columns=[
            'sepal length'
        ],
    )
)
X_train = pipe.fit_transform(X_train, y_train)
X_test_transformed = pipe.transform(X_test)
pipe.steps[-1][1].original_names

{'sepal length (cm)': 'sepal length'}

We can also modify the Pipeline to include names of the transformers. But most likely you’ll be using just the previous transformer, so a list would be fine in most cases.