Scikit-Learn - Modifying feature union to work with Pandas DataFrame

Posted on Apr 13, 2022

A sample transformer is as follows, where it will add the columns specified +1:

import pandas as pd
from sklearn.base import BaseEstimator, TransformerMixin

import copy

class PlusOne(BaseEstimator, TransformerMixin):
    def __init__(self, columns):
        self.columns = columns

    def fit(self, X, y=None):
        return self

    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        return X[self.columns] + 1

We can easily put this into a pipeline:

from sklearn.pipeline import make_pipeline

sample = pd.DataFrame([{"a": 1}])

pipe = make_pipeline(
    PlusOne(columns=["a"]),
    PlusOne(columns=["a"]),
)
pipe.fit_transform(sample)
a
0 3

However, if we were to transform on two columns using make_union, it’s possible, but we would lose our column names:

from sklearn.pipeline import make_pipeline, make_union

sample = pd.DataFrame([{"a": 1, "b": 2}])

pipe = make_pipeline(
    make_union(
        PlusOne(columns=["a"]),
        PlusOne(columns=["b"]),
    ),
)
pipe.fit_transform(sample)
0 1
0 2 3

Notice that the output columns are now labeled as 0 and 1.

This is because the FeatureUnion class:

class FeatureUnion(TransformerMixin, _BaseComposition):
    def fit_transform(self, X, y=None, **fit_params):
        """Fit all transformers, transform the data and concatenate results.
        Parameters
        ----------
        X : iterable or array-like, depending on transformers
            Input data to be transformed.
        y : array-like of shape (n_samples, n_outputs), default=None
            Targets for supervised learning.
        **fit_params : dict, default=None
            Parameters to pass to the fit method of the estimator.
        Returns
        -------
        X_t : array-like or sparse matrix of \
                shape (n_samples, sum_n_components)
            The `hstack` of results of transformers. `sum_n_components` is the
            sum of `n_components` (output dimension) over transformers.
        """
        results = self._parallel_func(X, y, fit_params, _fit_transform_one)
        if not results:
            # All transformers are None
            return np.zeros((X.shape[0], 0))

        Xs, transformers = zip(*results)
        self._update_transformer_list(transformers)

        return self._hstack(Xs)
    def transform(self, X):
        """Transform X separately by each transformer, concatenate results.
        Parameters
        ----------
        X : iterable or array-like, depending on transformers
            Input data to be transformed.
        Returns
        -------
        X_t : array-like or sparse matrix of \
                shape (n_samples, sum_n_components)
            The `hstack` of results of transformers. `sum_n_components` is the
            sum of `n_components` (output dimension) over transformers.
        """
        Xs = Parallel(n_jobs=self.n_jobs)(
            delayed(_transform_one)(trans, X, None, weight)
            for name, trans, weight in self._iter()
        )
        if not Xs:
            # All transformers are None
            return np.zeros((X.shape[0], 0))

        return self._hstack(Xs)
    def _hstack(self, Xs):
        if any(sparse.issparse(f) for f in Xs):
            Xs = sparse.hstack(Xs).tocsr()
        else:
            Xs = np.hstack(Xs)
        return Xs

Notice that the original implemtnation _hstack to stack the output features horizontally. This calls the hstack function in numpy.

For example if we had a dataframe as output:

import numpy as np

np.hstack([pd.DataFrame([{"a": 3}]), pd.DataFrame([{"b": 4}])])
0 1
0 3 4

The output of a stacked pandas dataframe loses it’s column. Hence we need to modify the _hstack function so that it recognizes that it is a dataframe or numpy array.

An alternative implemention is as follows:

from sklearn.utils.metaestimators import _BaseComposition
from sklearn.pipeline import FeatureUnion, _name_estimators
from scipy import sparse

class ModifiedFeatureUnion(FeatureUnion, TransformerMixin, _BaseComposition):
    def _hstack(self, Xs):
        # Xs is a list of results, either a list of numpy arrays or pandas dataframe.
        if isinstance(Xs[0], np.ndarray):
            if any(sparse.issparse(f) for f in Xs):
                Xs = sparse.hstack(Xs).tocsr()
            else:
                Xs = np.hstack(Xs)
            return Xs
        elif isinstance(Xs[0], pd.DataFrame):
            return pd.concat(Xs, axis=1)

def modified_make_union(*transformers, n_jobs=None, verbose=False):
    return ModifiedFeatureUnion(_name_estimators(transformers), n_jobs=n_jobs, verbose=verbose)
sample = pd.DataFrame([{"a": 1, "b": 2}])

pipe = make_pipeline(
    modified_make_union(
        PlusOne(columns=["a"]),
        PlusOne(columns=["b"]),
    ),
)

pipe.fit_transform(sample)
a b
0 2 3

Since Xs is a list of pd.DataFrame, we can basically use concat, with the caveat that both dataframes must have the same index.

We also get to keep the original functionality by detecting whether the first item in the list is an numpy array.

With this simple implementation the pipeline can now support any functions that uses pandas dataframe, and with FeatureUnion each of the pipelines are executed on different threads, which provides speedup to the overall pipeline.