Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
[![Tests](https://github.com/gcskoenig/fippy/actions/workflows/python-package.yml/badge.svg)](https://github.com/gcskoenig/fippy/actions/workflows/python-package.yml)
[![PyPI](https://img.shields.io/pypi/v/fippy)](https://pypi.org/project/fippy/)
[![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT)
[![Python](https://img.shields.io/pypi/pyversions/fippy)](https://pypi.org/project/fippy/)
[![Python](https://img.shields.io/badge/python-3.11%20--%203.13-blue)](https://pypi.org/project/fippy/)

A Python package for model-agnostic feature importance with statistical inference. Fippy implements a unified framework where feature importance methods are composed from three orthogonal axes:
A Python package for model-agnostic feature importance with statistical inference. Fippy implements a unified framework where feature importance methods are composed from three axes:

| Axis | Options | Description |
|---|---|---|
Expand Down
11 changes: 11 additions & 0 deletions src/fippy/explainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ def loo(
y = np.asarray(y)
groups = self._resolve_features(features)

if distribution is not None:
sampler = self._get_sampler(distribution)
if hasattr(sampler, 'check_compatibility'):
requires_multi = any(len(g.columns) > 1 for g in groups)
sampler.check_compatibility(requires_multivariate=requires_multi)

n_obs = len(X)
scores = np.empty((1, n_repeats, n_obs, len(groups)))
baseline_loss = self.loss(y, self.predict(X))
Expand Down Expand Up @@ -121,6 +127,11 @@ def shapley(
y = np.asarray(y)
groups = self._resolve_features(features)

if distribution is not None:
sampler = self._get_sampler(distribution)
if hasattr(sampler, 'check_compatibility'):
sampler.check_compatibility(requires_multivariate=True)

n_obs = len(X)
scores = np.empty((1, n_repeats, n_obs, len(groups)))
baseline_loss = self.loss(y, self.predict(X))
Expand Down
3 changes: 3 additions & 0 deletions src/fippy/samplers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
from fippy.samplers.base import Sampler
from fippy.samplers.dispatch import TypeDispatchSampler
from fippy.samplers.forest import RFClassificationSampler, RFResidualSampler
from fippy.samplers.gaussian import GaussianSampler
from fippy.samplers.permutation import PermutationSampler
97 changes: 97 additions & 0 deletions src/fippy/samplers/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
"""Sampler ABC: base class for all conditional distribution samplers."""
from abc import ABC, abstractmethod

import numpy as np
import pandas as pd


class Sampler(ABC):
"""Base class for all conditional distribution samplers.

Subclasses declare three capability flags as class attributes:
multivariate: Can sample len(J) > 1 natively.
supports_categorical_target: Can produce samples for categorical features in J.
supports_categorical_context: Can condition on categorical features in S.

Categorical columns are detected from DataFrame dtypes (CategoricalDtype, object).
"""

multivariate: bool = False
supports_categorical_target: bool = False
supports_categorical_context: bool = False

def __init__(self, X_train: pd.DataFrame):
self.X_train = X_train
self.feature_names = list(X_train.columns)
self._categorical_cols = self._detect_categorical(X_train)

@staticmethod
def _detect_categorical(X: pd.DataFrame) -> set[str]:
"""Detect categorical columns from dtype."""
return {
col
for col in X.columns
if isinstance(X[col].dtype, pd.CategoricalDtype)
or pd.api.types.is_object_dtype(X[col])
}

def check_compatibility(self, requires_multivariate: bool = False):
"""Pre-flight validation: check sampler compatibility with the dataset.

Called by the Explainer at the start of loo()/shapley(), before the
feature loop. The Explainer determines what properties are required
and passes them in.

Args:
requires_multivariate: Whether the computation requires sampling
len(J) > 1 (e.g., Shapley always, LOO with multi-column groups).

Raises:
ValueError: Comprehensive error listing all incompatibilities.
"""
errors = []

cat_cols = self._categorical_cols
if cat_cols:
if not self.supports_categorical_target:
errors.append(
f"Categorical columns {cat_cols} will appear as targets, "
f"but {type(self).__name__} does not support categorical "
f"targets. Use a sampler that supports categorical targets "
f"(e.g., PermutationSampler, ARFSampler) or wrap with "
f"TypeDispatchSampler."
)
if not self.supports_categorical_context:
errors.append(
f"Categorical columns {cat_cols} will appear in conditioning "
f"sets, but {type(self).__name__} does not support categorical "
f"context features."
)

if requires_multivariate and not self.multivariate:
errors.append(
f"{type(self).__name__} is univariate (multivariate=False) "
f"and cannot sample multiple features jointly. "
f"Wrap it in SequentialSampler."
)

if errors:
raise ValueError(
f"Sampler {type(self).__name__} is incompatible with the "
f"requested computation:\n"
+ "\n".join(f" - {e}" for e in errors)
)

def fit(self, J, S):
"""Fit P(X_J | X_S)."""
self._fit(J, S)

def sample(self, X, J, S, n_samples=1):
"""Sample from P(X_J | X_S). Shape: (n_obs, n_samples, len(J))."""
return self._sample(X, J, S, n_samples)

@abstractmethod
def _fit(self, J, S): ...

@abstractmethod
def _sample(self, X, J, S, n_samples) -> np.ndarray: ...
48 changes: 48 additions & 0 deletions src/fippy/samplers/dispatch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
"""TypeDispatchSampler: routes to continuous or categorical sub-sampler."""
import pandas as pd

from fippy.samplers.base import Sampler


class TypeDispatchSampler(Sampler):
"""Routes to continuous or categorical sub-sampler based on target dtype.

Univariate only (len(J) = 1). For multivariate use, wrap in SequentialSampler.

Example:
sampler = TypeDispatchSampler(
X_train,
continuous_sampler=RFResidualSampler(X_train),
categorical_sampler=RFClassificationSampler(X_train),
)
"""

multivariate = False
supports_categorical_target = True

def __init__(
self,
X_train: pd.DataFrame,
continuous_sampler: Sampler,
categorical_sampler: Sampler,
):
super().__init__(X_train)
self._continuous = continuous_sampler
self._categorical = categorical_sampler

@property
def supports_categorical_context(self):
return (self._continuous.supports_categorical_context
and self._categorical.supports_categorical_context)

def _select_sampler(self, J):
assert len(J) == 1
if J[0] in self._categorical_cols:
return self._categorical
return self._continuous

def _fit(self, J, S):
self._select_sampler(J)._fit(J, S)

def _sample(self, X, J, S, n_samples):
return self._select_sampler(J)._sample(X, J, S, n_samples)
187 changes: 187 additions & 0 deletions src/fippy/samplers/forest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
"""Forest-based univariate conditional samplers."""
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.model_selection import RandomizedSearchCV

from fippy.samplers.base import Sampler

_RF_PARAM_GRID = {
"n_estimators": [100, 200, 500],
"max_depth": [5, 10, 20, None],
"min_samples_leaf": [1, 2, 5, 10],
"max_features": ["sqrt", "log2", 0.5, 1.0],
}


class _ForestSamplerBase(Sampler):
"""Shared logic for RF-based univariate samplers."""

def __init__(self, X_train: pd.DataFrame, *, tune: bool = True,
n_iter: int = 20, cv: int = 5, random_state=None):
super().__init__(X_train)
self.tune = tune
self.n_iter = n_iter
self.cv = cv
self.random_state = random_state
self._cache: dict[tuple, tuple] = {}
# Build encoding maps for categorical context features.
self._cat_maps: dict[str, dict] = {
col: {v: i for i, v in enumerate(X_train[col].unique())}
for col in self._categorical_cols
}

def _context_array(self, X, S):
"""Convert context columns to numeric numpy array for sklearn."""
if not S:
return np.zeros((len(X), 0))
cols = sorted(S)
arrays = []
for col in cols:
if col in self._categorical_cols:
mapping = self._cat_maps[col]
codes = np.array([mapping.get(v, -1) for v in X[col]], dtype=float)
arrays.append(codes)
else:
arrays.append(X[col].values.astype(float))
return np.column_stack(arrays)

def _tune_and_fit(self, estimator, X_context, y, scoring=None):
"""Fit estimator, optionally with hyperparameter tuning."""
if self.tune and len(y) >= self.cv:
search = RandomizedSearchCV(
estimator,
_RF_PARAM_GRID,
n_iter=self.n_iter,
cv=self.cv,
scoring=scoring,
random_state=self.random_state,
n_jobs=-1,
)
search.fit(X_context, y)
return search.best_estimator_
estimator.fit(X_context, y)
return estimator

@staticmethod
def _key(J, S):
return (tuple(sorted(J)), tuple(sorted(S)))


class RFResidualSampler(_ForestSamplerBase):
"""Univariate conditional sampler for continuous targets.

Fits a Random Forest regressor to estimate E[X_j | X_S] and stores training
residuals. At sample time, predicts the conditional expectation and adds a
randomly resampled training residual.

Assumes homoscedastic residuals: the residual distribution does not depend
on X_S.
"""

multivariate = False
supports_categorical_target = False
supports_categorical_context = True

def _fit(self, J, S):
key = self._key(J, S)
if key in self._cache:
return

assert len(J) == 1
y_train = self.X_train[J[0]].values.astype(float)

if not S:
mean = y_train.mean()
self._cache[key] = ("marginal", mean, y_train - mean)
return

X_ctx = self._context_array(self.X_train, S)
model = self._tune_and_fit(
RandomForestRegressor(random_state=self.random_state),
X_ctx, y_train,
)
residuals = y_train - model.predict(X_ctx)
self._cache[key] = ("fitted", model, residuals)

def _sample(self, X, J, S, n_samples):
key = self._key(J, S)
if key not in self._cache:
self._fit(J, S)

entry = self._cache[key]
n_obs = len(X)
result = np.empty((n_obs, n_samples, 1))

if entry[0] == "marginal":
mean, residuals = entry[1], entry[2]
for k in range(n_samples):
idx = np.random.randint(0, len(residuals), size=n_obs)
result[:, k, 0] = mean + residuals[idx]
else:
model, residuals = entry[1], entry[2]
preds = model.predict(self._context_array(X, S))
for k in range(n_samples):
idx = np.random.randint(0, len(residuals), size=n_obs)
result[:, k, 0] = preds + residuals[idx]

return result


class RFClassificationSampler(_ForestSamplerBase):
"""Univariate conditional sampler for categorical targets.

Fits a Random Forest classifier to estimate P(X_j | X_S). At sample time,
predicts class probabilities and samples from them.
"""

multivariate = False
supports_categorical_target = True
supports_categorical_context = True

def _fit(self, J, S):
key = self._key(J, S)
if key in self._cache:
return

assert len(J) == 1
y_train = self.X_train[J[0]].values

if not S:
classes, counts = np.unique(y_train, return_counts=True)
self._cache[key] = ("marginal", classes, counts / counts.sum())
return

X_ctx = self._context_array(self.X_train, S)
model = self._tune_and_fit(
RandomForestClassifier(random_state=self.random_state),
X_ctx, y_train,
scoring="neg_log_loss",
)
self._cache[key] = ("fitted", model)

def _sample(self, X, J, S, n_samples):
key = self._key(J, S)
if key not in self._cache:
self._fit(J, S)

entry = self._cache[key]
n_obs = len(X)
result = np.empty((n_obs, n_samples, 1), dtype=object)

if entry[0] == "marginal":
classes, probs = entry[1], entry[2]
for k in range(n_samples):
idx = np.random.choice(len(classes), size=n_obs, p=probs)
result[:, k, 0] = classes[idx]
else:
model = entry[1]
probs = model.predict_proba(self._context_array(X, S))
classes = model.classes_
cumprobs = np.cumsum(probs, axis=1)
for k in range(n_samples):
u = np.random.rand(n_obs, 1)
idx = (u < cumprobs).argmax(axis=1)
result[:, k, 0] = classes[idx]

return result
Loading