Skip to content

unified

Unified feature selector combining multiple methods.

FeatureSelector

Bases: BaseSelector

Unified feature selector combining multiple selection methods.

Combines statistical tests, model importance, and redundancy elimination for comprehensive feature selection.

Parameters:

Name Type Description Default
methods list

Selection methods to use

['mutual_info', 'importance']
max_features int

Maximum features to select

None
correlation_threshold float

Threshold for redundancy elimination

0.95

Examples:

>>> selector = FeatureSelector(
...     methods=['mutual_info', 'importance', 'correlation'],
...     max_features=50,
...     correlation_threshold=0.95
... )
>>> X_selected = selector.fit_transform(X, y)
Source code in featcopilot/selection/unified.py
class FeatureSelector(BaseSelector):
    """
    Unified feature selector combining multiple selection methods.

    Combines statistical tests, model importance, and redundancy
    elimination for comprehensive feature selection.

    Parameters
    ----------
    methods : list, default=['mutual_info', 'importance']
        Selection methods to use
    max_features : int, optional
        Maximum features to select
    correlation_threshold : float, default=0.95
        Threshold for redundancy elimination

    Examples
    --------
    >>> selector = FeatureSelector(
    ...     methods=['mutual_info', 'importance', 'correlation'],
    ...     max_features=50,
    ...     correlation_threshold=0.95
    ... )
    >>> X_selected = selector.fit_transform(X, y)
    """

    def __init__(
        self,
        methods: Optional[list[str]] = None,
        max_features: Optional[int] = None,
        correlation_threshold: float = 0.95,
        combination: str = "union",
        original_features: Optional[set[str]] = None,
        verbose: bool = False,
        **kwargs,
    ):
        super().__init__(**kwargs)
        self.methods = methods or ["mutual_info", "importance"]
        self.max_features = max_features
        self.correlation_threshold = correlation_threshold
        self.combination = combination  # 'union' or 'intersection'
        self.original_features = original_features or set()
        self.verbose = verbose
        self._selectors: dict[str, BaseSelector] = {}
        self._method_scores: dict[str, dict[str, float]] = {}

    def fit(self, X: Union[pd.DataFrame, np.ndarray], y: Union[pd.Series, np.ndarray], **kwargs) -> "FeatureSelector":
        """
        Fit all selection methods.

        Parameters
        ----------
        X : DataFrame or ndarray
            Input features
        y : Series or ndarray
            Target variable

        Returns
        -------
        self : FeatureSelector
        """
        X = self._validate_input(X)
        y = np.array(y)

        # Identify categorical/text columns (can't be scored by numeric methods)
        categorical_cols = X.select_dtypes(include=["object", "category", "string"]).columns.tolist()
        numeric_cols = X.select_dtypes(include=[np.number]).columns.tolist()

        # Initialize and fit each selector
        for method in self.methods:
            selector = self._create_selector(method)
            selector.fit(X, y)
            self._selectors[method] = selector
            self._method_scores[method] = selector.get_feature_scores()

        # Combine scores from all methods
        self._combine_scores(X.columns.tolist())

        # Give categorical columns a minimum score so they're not filtered out
        # Original categorical columns are important for models that can handle them
        if categorical_cols:
            # Get the median score of numeric features to use as baseline for categorical
            numeric_scores = [v for k, v in self._feature_scores.items() if k in numeric_cols and v > 0]
            if numeric_scores:
                baseline_score = np.median(numeric_scores)
            else:
                baseline_score = 0.5  # Default if no numeric scores

            for col in categorical_cols:
                if col in self.original_features:
                    # Original categorical columns get a baseline score
                    self._feature_scores[col] = max(self._feature_scores.get(col, 0), baseline_score)

        # Apply redundancy elimination
        if self.correlation_threshold < 1.0:
            eliminator = RedundancyEliminator(
                correlation_threshold=self.correlation_threshold,
                importance_scores=self._feature_scores,
                original_features=self.original_features,
                verbose=self.verbose,
            )
            eliminator.fit(X)
            non_redundant = set(eliminator.get_selected_features())
            self._feature_scores = {k: v for k, v in self._feature_scores.items() if k in non_redundant}

        # Final selection
        self._final_selection()

        # Apply L1 refinement to derived features
        derived = [f for f in self._selected_features if f not in self.original_features]
        if len(derived) > 0:
            numeric_derived = [f for f in derived if f in X.select_dtypes(include=[np.number]).columns.tolist()]
            if len(numeric_derived) > 0:
                refined = self._l1_refine(X, y, numeric_derived)
                non_numeric_derived = [f for f in derived if f not in numeric_derived]
                original_selected = [f for f in self._selected_features if f in self.original_features]
                self._selected_features = original_selected + refined + non_numeric_derived

        self._is_fitted = True
        return self

    def _create_selector(self, method: str) -> BaseSelector:
        """Create selector for a given method."""
        if method == "mutual_info":
            return StatisticalSelector(method="mutual_info", verbose=self.verbose)
        elif method == "f_test":
            return StatisticalSelector(method="f_test", verbose=self.verbose)
        elif method == "chi2":
            return StatisticalSelector(method="chi2", verbose=self.verbose)
        elif method == "correlation":
            return StatisticalSelector(method="correlation", verbose=self.verbose)
        elif method == "importance":
            return ImportanceSelector(model="random_forest", verbose=self.verbose)
        elif method == "xgboost":
            return ImportanceSelector(model="xgboost", verbose=self.verbose)
        else:
            raise ValueError(f"Unknown selection method: {method}")

    def _combine_scores(self, columns: list[str]) -> None:
        """Combine scores from multiple methods."""
        combined = {}

        for col in columns:
            scores = []
            for _, method_scores in self._method_scores.items():
                if col in method_scores:
                    # Normalize score to 0-1 range
                    all_scores = list(method_scores.values())
                    max_score = max(all_scores) if all_scores else 1
                    if max_score > 0:
                        normalized = method_scores[col] / max_score
                    else:
                        normalized = 0
                    scores.append(normalized)

            # Average normalized scores
            if scores:
                combined[col] = np.mean(scores)
            else:
                combined[col] = 0

        self._feature_scores = combined

    def _l1_refine(self, X: pd.DataFrame, y: np.ndarray, candidates: list[str]) -> list[str]:
        """Use tree-based importance to select the most predictive derived features."""
        from sklearn.ensemble import GradientBoostingClassifier, GradientBoostingRegressor

        if len(candidates) == 0:
            return candidates

        X_cand = X[candidates].copy()
        X_cand = X_cand.replace([np.inf, -np.inf], np.nan).fillna(0)

        try:
            is_classification = len(np.unique(y)) <= 20 and pd.api.types.is_integer_dtype(y)
            if is_classification:
                model = GradientBoostingClassifier(n_estimators=50, max_depth=3, subsample=0.8, random_state=42)
            else:
                model = GradientBoostingRegressor(n_estimators=50, max_depth=3, subsample=0.8, random_state=42)
            model.fit(X_cand, y)
            importances = model.feature_importances_

            # Keep features with importance above mean importance
            mean_imp = np.mean(importances)
            selected = [c for c, imp in zip(candidates, importances) if imp >= mean_imp * 0.5]

            if len(selected) == 0:
                # Fallback: keep top half by importance
                top_k = max(3, len(candidates) // 2)
                idx = np.argsort(importances)[::-1][:top_k]
                selected = [candidates[i] for i in idx]

            return selected
        except Exception:
            return candidates

    def _final_selection(self) -> None:
        """Make final feature selection."""
        sorted_features = sorted(self._feature_scores.items(), key=lambda x: x[1], reverse=True)

        # Always include original features first
        original_selected = []
        derived_selected = []

        # Compute importance threshold based on original feature scores
        original_scores = [self._feature_scores.get(f, 0) for f in self.original_features if f in self._feature_scores]
        if original_scores:
            median_original = float(np.median(original_scores))
            # Derived features must score well relative to original feature distribution
            importance_threshold = median_original * 1.5
        else:
            max_score = max(self._feature_scores.values()) if self._feature_scores else 1.0
            importance_threshold = max_score * 0.10

        for name, score in sorted_features:
            if name in self.original_features:
                original_selected.append(name)
            else:
                if score >= importance_threshold:
                    derived_selected.append(name)
                elif self.verbose:
                    logger.debug(f"Excluding low-importance feature {name} (score={score:.4f})")

        # Cap derived features at 1.5x original feature count
        max_derived_ratio = max(int(len(self.original_features) * 1.5), 10)
        derived_selected = derived_selected[:max_derived_ratio]

        # Apply max_features limit only to derived features
        if self.max_features is not None:
            # Reserve slots for original features, then fill with top derived
            n_derived = max(0, self.max_features - len(original_selected))
            derived_selected = derived_selected[:n_derived]

        self._selected_features = original_selected + derived_selected

        # Ensure we never have fewer features than original
        if len(self._selected_features) < len(self.original_features):
            # This should not happen, but add all original features as safety
            for f in self.original_features:
                if f not in self._selected_features:
                    self._selected_features.append(f)

        if self.verbose:
            logger.info(
                f"FeatureSelector: Selected {len(self._selected_features)} features "
                f"({len(original_selected)} original + {len(derived_selected)} derived)"
            )

    def transform(self, X: Union[pd.DataFrame, np.ndarray], **kwargs) -> pd.DataFrame:
        """Select features from data."""
        if not self._is_fitted:
            raise RuntimeError("Selector must be fitted before transform")

        X = self._validate_input(X)
        available = [f for f in self._selected_features if f in X.columns]
        return X[available]

    def get_method_scores(self) -> dict[str, dict[str, float]]:
        """Get scores from each individual method."""
        return self._method_scores

    def get_ranking(self) -> list[tuple]:
        """Get feature ranking as list of (name, score) tuples."""
        return sorted(self._feature_scores.items(), key=lambda x: x[1], reverse=True)

fit(X, y, **kwargs)

Fit all selection methods.

Parameters:

Name Type Description Default
X DataFrame or ndarray

Input features

required
y Series or ndarray

Target variable

required

Returns:

Name Type Description
self FeatureSelector
Source code in featcopilot/selection/unified.py
def fit(self, X: Union[pd.DataFrame, np.ndarray], y: Union[pd.Series, np.ndarray], **kwargs) -> "FeatureSelector":
    """
    Fit all selection methods.

    Parameters
    ----------
    X : DataFrame or ndarray
        Input features
    y : Series or ndarray
        Target variable

    Returns
    -------
    self : FeatureSelector
    """
    X = self._validate_input(X)
    y = np.array(y)

    # Identify categorical/text columns (can't be scored by numeric methods)
    categorical_cols = X.select_dtypes(include=["object", "category", "string"]).columns.tolist()
    numeric_cols = X.select_dtypes(include=[np.number]).columns.tolist()

    # Initialize and fit each selector
    for method in self.methods:
        selector = self._create_selector(method)
        selector.fit(X, y)
        self._selectors[method] = selector
        self._method_scores[method] = selector.get_feature_scores()

    # Combine scores from all methods
    self._combine_scores(X.columns.tolist())

    # Give categorical columns a minimum score so they're not filtered out
    # Original categorical columns are important for models that can handle them
    if categorical_cols:
        # Get the median score of numeric features to use as baseline for categorical
        numeric_scores = [v for k, v in self._feature_scores.items() if k in numeric_cols and v > 0]
        if numeric_scores:
            baseline_score = np.median(numeric_scores)
        else:
            baseline_score = 0.5  # Default if no numeric scores

        for col in categorical_cols:
            if col in self.original_features:
                # Original categorical columns get a baseline score
                self._feature_scores[col] = max(self._feature_scores.get(col, 0), baseline_score)

    # Apply redundancy elimination
    if self.correlation_threshold < 1.0:
        eliminator = RedundancyEliminator(
            correlation_threshold=self.correlation_threshold,
            importance_scores=self._feature_scores,
            original_features=self.original_features,
            verbose=self.verbose,
        )
        eliminator.fit(X)
        non_redundant = set(eliminator.get_selected_features())
        self._feature_scores = {k: v for k, v in self._feature_scores.items() if k in non_redundant}

    # Final selection
    self._final_selection()

    # Apply L1 refinement to derived features
    derived = [f for f in self._selected_features if f not in self.original_features]
    if len(derived) > 0:
        numeric_derived = [f for f in derived if f in X.select_dtypes(include=[np.number]).columns.tolist()]
        if len(numeric_derived) > 0:
            refined = self._l1_refine(X, y, numeric_derived)
            non_numeric_derived = [f for f in derived if f not in numeric_derived]
            original_selected = [f for f in self._selected_features if f in self.original_features]
            self._selected_features = original_selected + refined + non_numeric_derived

    self._is_fitted = True
    return self

get_method_scores()

Get scores from each individual method.

Source code in featcopilot/selection/unified.py
def get_method_scores(self) -> dict[str, dict[str, float]]:
    """Get scores from each individual method."""
    return self._method_scores

get_ranking()

Get feature ranking as list of (name, score) tuples.

Source code in featcopilot/selection/unified.py
def get_ranking(self) -> list[tuple]:
    """Get feature ranking as list of (name, score) tuples."""
    return sorted(self._feature_scores.items(), key=lambda x: x[1], reverse=True)

transform(X, **kwargs)

Select features from data.

Source code in featcopilot/selection/unified.py
def transform(self, X: Union[pd.DataFrame, np.ndarray], **kwargs) -> pd.DataFrame:
    """Select features from data."""
    if not self._is_fitted:
        raise RuntimeError("Selector must be fitted before transform")

    X = self._validate_input(X)
    available = [f for f in self._selected_features if f in X.columns]
    return X[available]