Skip to content

Sklearn Pipeline Example

Integrate FeatCopilot into scikit-learn pipelines.

Basic Pipeline

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score

from featcopilot import AutoFeatureEngineer

# Create pipeline
pipeline = Pipeline([
    ('features', AutoFeatureEngineer(engines=['tabular'], max_features=20)),
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression())
])

Handling NaN Values

FeatCopilot may generate features with NaN values. Handle them in the pipeline:

from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier

from featcopilot import AutoFeatureEngineer

# Pipeline with imputation
pipeline = Pipeline([
    ('features', AutoFeatureEngineer(engines=['tabular'], max_features=30)),
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler()),
    ('classifier', RandomForestClassifier(n_estimators=100))
])

Two-Stage Approach

For more control, separate feature engineering from modeling:

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression

from featcopilot import AutoFeatureEngineer

# Create sample data
np.random.seed(42)
X = pd.DataFrame({
    'a': np.random.randn(1000),
    'b': np.random.randn(1000),
    'c': np.random.randn(1000),
})
y = (X['a'] + X['b'] > 0).astype(int)

# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

# Stage 1: Feature Engineering
engineer = AutoFeatureEngineer(engines=['tabular'], max_features=20)
X_train_fe = engineer.fit_transform(X_train, y_train).fillna(0)
X_test_fe = engineer.transform(X_test).fillna(0)

# Align columns
cols = [c for c in X_train_fe.columns if c in X_test_fe.columns]
X_train_fe = X_train_fe[cols]
X_test_fe = X_test_fe[cols]

# Stage 2: Modeling Pipeline
model_pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression())
])

# Evaluate
scores = cross_val_score(model_pipeline, X_train_fe, y_train, cv=5, scoring='roc_auc')
print(f"CV ROC-AUC: {scores.mean():.4f} (+/- {scores.std()*2:.4f})")

# Final model
model_pipeline.fit(X_train_fe, y_train)
test_score = model_pipeline.score(X_test_fe, y_test)
print(f"Test Accuracy: {test_score:.4f}")

Grid Search with Feature Engineering

from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import RandomForestClassifier

# Feature engineering first
engineer = AutoFeatureEngineer(engines=['tabular'], max_features=30)
X_fe = engineer.fit_transform(X, y).fillna(0)

# Grid search on modeling pipeline
model_pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', RandomForestClassifier())
])

param_grid = {
    'classifier__n_estimators': [50, 100, 200],
    'classifier__max_depth': [5, 10, None],
}

grid_search = GridSearchCV(
    model_pipeline,
    param_grid,
    cv=5,
    scoring='roc_auc',
    n_jobs=-1
)

grid_search.fit(X_fe, y)
print(f"Best params: {grid_search.best_params_}")
print(f"Best score: {grid_search.best_score_:.4f}")

Column Transformer Integration

Combine FeatCopilot with other transformers:

from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression

from featcopilot.engines import TabularEngine

# Identify column types
numeric_features = ['age', 'income', 'tenure']
categorical_features = ['category', 'region']

# Create transformers
preprocessor = ColumnTransformer(
    transformers=[
        ('num', Pipeline([
            ('features', TabularEngine(polynomial_degree=2)),
            ('scaler', StandardScaler())
        ]), numeric_features),
        ('cat', OneHotEncoder(handle_unknown='ignore'), categorical_features)
    ]
)

# Full pipeline
pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('classifier', LogisticRegression())
])

FeatureEngineerTransformer

Use the built-in FeatureEngineerTransformer for single-engine pipelines:

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression

from featcopilot import FeatureEngineerTransformer

# Single engine transformer
pipeline = Pipeline([
    ('features', FeatureEngineerTransformer(engine='tabular', max_features=30)),
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression())
])

# With time series engine
ts_pipeline = Pipeline([
    ('features', FeatureEngineerTransformer(engine='timeseries', window_sizes=[3, 7, 14])),
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression())
])

# Fit and predict
pipeline.fit(X_train, y_train)
predictions = pipeline.predict(X_test)

Custom Transformer Wrapper

Create a fully sklearn-compatible transformer:

from sklearn.base import BaseEstimator, TransformerMixin
import pandas as pd

from featcopilot import AutoFeatureEngineer

class FeatCopilotTransformer(BaseEstimator, TransformerMixin):
    """Sklearn-compatible FeatCopilot wrapper."""

    def __init__(self, engines=None, max_features=50, fill_value=0):
        self.engines = engines or ['tabular']
        self.max_features = max_features
        self.fill_value = fill_value
        self.engineer_ = None
        self.feature_names_ = None

    def fit(self, X, y=None):
        self.engineer_ = AutoFeatureEngineer(
            engines=self.engines,
            max_features=self.max_features
        )
        X_fe = self.engineer_.fit_transform(X, y)
        self.feature_names_ = list(X_fe.columns)
        return self

    def transform(self, X):
        X_fe = self.engineer_.transform(X)
        # Ensure consistent columns
        for col in self.feature_names_:
            if col not in X_fe.columns:
                X_fe[col] = self.fill_value
        X_fe = X_fe[self.feature_names_]
        return X_fe.fillna(self.fill_value).values

    def get_feature_names_out(self, input_features=None):
        return self.feature_names_

# Use in pipeline
pipeline = Pipeline([
    ('features', FeatCopilotTransformer(engines=['tabular'], max_features=30)),
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression())
])

Cross-Validation with Proper Leakage Prevention

from sklearn.model_selection import cross_val_score, KFold
import numpy as np

def cv_with_feature_engineering(X, y, n_splits=5):
    """Cross-validation with feature engineering inside each fold."""
    kf = KFold(n_splits=n_splits, shuffle=True, random_state=42)
    scores = []

    for train_idx, val_idx in kf.split(X):
        X_train, X_val = X.iloc[train_idx], X.iloc[val_idx]
        y_train, y_val = y.iloc[train_idx], y.iloc[val_idx]

        # Feature engineering inside fold
        engineer = AutoFeatureEngineer(engines=['tabular'], max_features=30)
        X_train_fe = engineer.fit_transform(X_train, y_train).fillna(0)
        X_val_fe = engineer.transform(X_val).fillna(0)

        # Align columns
        cols = [c for c in X_train_fe.columns if c in X_val_fe.columns]

        # Train and evaluate
        model = LogisticRegression()
        model.fit(X_train_fe[cols], y_train)
        score = model.score(X_val_fe[cols], y_val)
        scores.append(score)

    return np.mean(scores), np.std(scores)

mean_score, std_score = cv_with_feature_engineering(X, y)
print(f"CV Score: {mean_score:.4f} (+/- {std_score*2:.4f})")