diff --git a/graphconstructor/operators/__init__.py b/graphconstructor/operators/__init__.py index 57e15a5..36e520d 100644 --- a/graphconstructor/operators/__init__.py +++ b/graphconstructor/operators/__init__.py @@ -1,13 +1,13 @@ from .base import GraphOperator from .disparity import DisparityFilter from .doubly_stochastic import DoublyStochastic +from .enhanced_configuration_model import EnhancedConfigurationModelFilter from .knn_selector import KNNSelector from .locally_adaptive_sparsification import LocallyAdaptiveSparsification from .marginal_likelihood import MarginalLikelihoodFilter from .minimum_spanning_tree import MinimumSpanningTree from .noise_corrected import NoiseCorrected from .weight_threshold import WeightThreshold -from .enhanced_configuration_model import EnhancedConfigurationModelFilter __all__ = [ diff --git a/graphconstructor/operators/enhanced_configuration_model.py b/graphconstructor/operators/enhanced_configuration_model.py index 713ae49..835343a 100644 --- a/graphconstructor/operators/enhanced_configuration_model.py +++ b/graphconstructor/operators/enhanced_configuration_model.py @@ -1,162 +1,369 @@ from dataclasses import dataclass import numpy as np -import scipy.sparse -import scipy.optimize +import scipy.optimize as so +import scipy.sparse as sp +from numba import njit from ..graph import Graph from .base import GraphOperator -from jax import jit -import jax.numpy as jnp -import jax -# utils ----------------------------------------------------------------- EPS = np.finfo(float).eps -### R <=> (0, inf) homeomorphisms -@jit -def softplus_inv(x): - return jnp.log(jnp.exp(x) - 1) -R_to_zero_to_inf = [(jit(jnp.exp), jit(jnp.log)), (jit(jax.nn.softplus), softplus_inv)] +# --------------------------------------------------------------------------- +# R <=> (0, inf) homeomorphisms +# --------------------------------------------------------------------------- -### R <=> (0,1) homeomorphisms -@jit -def shift_scale_arctan(x): - # scaled, shifted arctan - return (1 / jnp.pi) * jnp.arctan(x) + 1 / 2 +@njit(cache=True) +def _exp(x): + return np.exp(x) +@njit(cache=True) +def _log(x): + return np.log(x) -@jit -def shift_scale_arctan_inv(x): - return jnp.tan(jnp.pi * (x - 1 / 2)) +@njit(cache=True) +def _softplus(x): + return np.log1p(np.exp(-np.abs(x))) + np.maximum(x, 0.0) # avoids overflows for large x +@njit(cache=True) +def _softplus_inv(x): + return np.log(np.expm1(x)) # avoids unstability near 0 -@jit -def sigmoid_inv(x): - return -jnp.log(1 / x - 1) +R_to_zero_to_inf = [ + (_exp, _log), + (_softplus, _softplus_inv), +] + + +# --------------------------------------------------------------------------- +# R <=> (0, 1) homeomorphisms +# --------------------------------------------------------------------------- + +@njit(cache=True) +def _sigmoid(x): + return 1.0 / (1.0 + np.exp(-x)) + +@njit(cache=True) +def _sigmoid_inv(x): + return -np.log(1.0 / x - 1.0) + +@njit(cache=True) +def _shift_scale_arctan(x): + return (1.0 / np.pi) * np.arctan(x) + 0.5 + +@njit(cache=True) +def _shift_scale_arctan_inv(x): + return np.tan(np.pi * (x - 0.5)) R_to_zero_to_one = [ - (jit(jax.nn.sigmoid), sigmoid_inv), - (shift_scale_arctan, shift_scale_arctan_inv), + (_sigmoid, _sigmoid_inv), + (_shift_scale_arctan, _shift_scale_arctan_inv), ] -# Enhanced Configuration Model Filter ------------------------------------------------ -@dataclass(slots=True) -class EnhancedConfigurationModelFilter(GraphOperator): +# --------------------------------------------------------------------------- +# Core numba kernels +# --------------------------------------------------------------------------- + +@njit(cache=True) +def _neg_log_likelihood(x, y, k, s): """ - Enhanced Configuration Model (ECM) filter for weighted, undirected - similarity graphs. + Negative log-likelihood of the ECM model (upper-triangle sum). - Paper: https://arxiv.org/abs/1706.00230 - Code: https://gitlab.liris.cnrs.fr/coregraphie/aliplosone/-/blob/main/Backbones/ecm.py?ref_type=heads + Parameters are already in bounded form (x > 0, 0 < y < 1). + Implements formula (13) from https://arxiv.org/abs/1706.00230. """ - supported_modes = ["similarity"] + N = len(x) - @staticmethod - def _transform_parameters(num_nodes, x_transform, y_transform, v): - x = v[:num_nodes] - y = v[num_nodes:] - return np.concatenate((x_transform(x), y_transform(y))) + llhood = 0.0 + for i in range(N): + llhood += k[i] * np.log(x[i]) + llhood += s[i] * np.log(y[i]) - @staticmethod - def _neg_log_likelihood(num_nodes, k, s, x_transform, y_transform, v): - # Formel (13) - z = EnhancedConfigurationModelFilter._transform_parameters(num_nodes, x_transform, y_transform, v) + # Sum log(t_ij) for i < j (upper triangle = unique pairs) + for i in range(N): + for j in range(i): + xx = x[i] * x[j] + yy = y[i] * y[j] + t = (1.0 - yy) / (1.0 - yy + xx * yy) + llhood += np.log(t) - x = z[:num_nodes] - y = z[num_nodes:] + return -llhood - llhood = 0.0 - llhood += np.sum(k * np.log(x)) - llhood += np.sum(s * np.log(y)) - xx = np.outer(x, x) - yy = np.outer(y, y) +@njit(cache=True) +def _neg_log_likelihood_grad(x, y, k, s): + """ + Analytic gradient of the negative log-likelihood w.r.t. the *bounded* + parameters (x, y). Derived by differentiating formula (13). + Parameters should be in their bounded form (x > 0, 0 < y < 1) for correct gradients. + """ + N = len(x) + grad_x = np.empty(N, dtype=np.float64) + grad_y = np.empty(N, dtype=np.float64) + + # Derivatives of the node-wise terms: + # - sum_i k_i log(x_i) -> -k_i / x_i + # - sum_i s_i log(y_i) -> -s_i / y_i + for i in range(N): + grad_x[i] = -k[i] / x[i] + grad_y[i] = -s[i] / y[i] + + # Pair contributions for i>j + for i in range(N): + for j in range(i): + yy = y[i] * y[j] + xx = x[i] * x[j] + D = 1.0 - yy + xx * yy + + # x-grad from +log(D) + # d/dx_i log(D) = (yy * x_j)/D + grad_x[i] += (yy * x[j]) / D + grad_x[j] += (yy * x[i]) / D + + # y-grad from -log(1-yy) + log(D) + # First part: + # d/dy_i [-log(1 - yy)] = y_j / (1 - yy) + # d/dy_j [-log(1 - yy)] = y_i / (1 - yy) + d_minus_log1myy_dyi = y[j] / (1.0 - yy) + d_minus_log1myy_dyj = y[i] / (1.0 - yy) + + # Second part: + # dD/dy_i = y_j (xx - 1) + # dD/dy_j = y_i (xx - 1) + d_logD_dyi = y[j] * (xx - 1.0) / D + d_logD_dyj = y[i] * (xx - 1.0) / D + + grad_y[i] += d_minus_log1myy_dyi + d_logD_dyi + grad_y[j] += d_minus_log1myy_dyj + d_logD_dyj + + return grad_x, grad_y + + +@njit(cache=True) +def _pval_matrix_data(x, y, row, col, weights): + """ + Compute p-values for each entry in the lower triangle of the weight matrix. - t = (1 - yy) / (1 - yy + xx * yy) - log_t = np.log(t) - llhood += np.sum(log_t) - np.sum(np.tril(log_t)) + p_val = p_ij * (y_i * y_j)^(w - 1) [formula (15)] + + Returns an array of p-values aligned with (row, col, weights). + Only entries where row > col (lower triangle) are meaningful; upper-triangle + indices are skipped to avoid double work. + """ + n = len(weights) + out = np.empty(n) + for k in range(n): + i = row[k] + j = col[k] + w = weights[k] + xx = x[i] * x[j] + yy = y[i] * y[j] + pij = xx * yy / (1.0 - yy + xx * yy) + out[k] = pij * (y[i] * y[j]) ** (w - 1.0) + return out + + +# --------------------------------------------------------------------------- +# Thin wrapper: reparameterised objective for scipy +# --------------------------------------------------------------------------- + +def _make_objective(num_nodes, k, s, x_transform, x_inv_transform, + y_transform, y_inv_transform): + """ + Return (fun, jac) callables in *unconstrained* space for scipy.optimize. + + The reparameterisation (transform_parameters) maps R^2N → (0,∞)^N × (0,1)^N, + so no explicit bounds are needed by the solver — domain constraints are + enforced implicitly, matching the design of the original MaxentGraph.solve(). + """ + + def _transform(v): + """Clip and transform unconstrained v to bounded (x, y) for the objective.""" + x_raw = v[:num_nodes] + y_raw = v[num_nodes:] + + x = x_transform(x_raw) + y = y_transform(y_raw) + + x = np.maximum(x, EPS) + y = np.clip(y, EPS, 1.0 - EPS) + + return x, y + + def fun(v): + x, y = _transform(v) + return float(_neg_log_likelihood(x, y, k, s)) + + def jac(v): + x, y = _transform(v) + gx_bounded, gy_bounded = _neg_log_likelihood_grad(x, y, k, s) + + # Chain rule: dL/dv = dL/dz * dz/dv + # Approximates dz/dv numerically for the transform Jacobian diagonal, + # using a small finite difference. + h = np.sqrt(EPS) + v_raw_x = v[:num_nodes] + v_raw_y = v[num_nodes:] + + dx_dv = (x_transform(v_raw_x + h) - x_transform(v_raw_x - h)) / (2 * h) + dy_dv = (y_transform(v_raw_y + h) - y_transform(v_raw_y - h)) / (2 * h) + + grad_v = np.concatenate([gx_bounded * dx_dv, gy_bounded * dy_dv]) + return grad_v + + return fun, jac + + +# --------------------------------------------------------------------------- +# Graph operator +# --------------------------------------------------------------------------- + +@dataclass(slots=True) +class EnhancedConfigurationModelFilter(GraphOperator): + """ + Extract a weighted undirected backbone using the Enhanced Configuration + Model (ECM). + + The operator fits the ECM null model to the input graph, computes an ECM + p-value for each observed edge, and retains only edges whose p-value is + less than or equal to ``alpha``. + + By default, the returned graph preserves the original weights of the + retained edges. Optionally, the retained edge weights can be replaced by + their ECM p-values. + + Parameters + ---------- + alpha : float, default=0.05 + Significance threshold for retaining edges. Smaller values produce + sparser backbones. + replace_weights_by_p_values : bool, default=False + If ``False``, retain the original edge weights for all edges passing + the ECM significance threshold. + + If ``True``, replace retained edge weights by their ECM p-values. + Note that in this case smaller weights indicate higher statistical + significance, which differs from the usual interpretation of + similarity weights. + copy_meta : bool, default=True + If ``True``, copy graph metadata to the returned graph. If ``False``, + keep the original metadata reference. + x_transform_idx : int, default=0 + Index selecting the reparameterization used for the positive ECM + parameters ``x``. The index refers to :data:`R_to_zero_to_inf`, whose + entries map from the real line to the open interval ``(0, inf)``. + + y_transform_idx : int, default=0 + Index selecting the reparameterization used for the bounded ECM + parameters ``y``. The index refers to :data:`R_to_zero_to_one`, whose + entries map from the real line to the open interval ``(0, 1)``. + + References + ---------- + Paper: https://arxiv.org/abs/1706.00230 + Original Source code: https://gitlab.liris.cnrs.fr/coregraphie/aliplosone/-/blob/main/Backbones/ecm.py + """ + alpha: float = 0.05 + replace_weights_by_p_values: bool = False + copy_meta: bool = True + x_transform_idx: int = 0 + y_transform_idx: int = 0 + + supported_modes = ["similarity"] - return -llhood - def _directed(self, G: Graph) -> Graph: raise NotImplementedError( "EnhancedConfigurationModelFilter is defined only for undirected graphs." ) def _undirected(self, G: Graph) -> Graph: - W = G.adj.copy().tocsr() + if not (0.0 < self.alpha <= 1.0): + raise ValueError("alpha must satisfy 0 < alpha <= 1.") - W -= scipy.sparse.diags(W.diagonal()) + W = G.adj.copy().tocsr() + W -= sp.diags(W.diagonal()) - k = (W > 0).sum(axis=1).A1.astype("float64") - s = W.sum(axis=1).A1 + k = np.asarray((W > 0).sum(axis=1)).ravel().astype(np.float64) + s = np.asarray(W.sum(axis=1)).ravel().astype(np.float64) num_nodes = G.n_nodes - x_transform, x_inv_transform = R_to_zero_to_inf[0] - y_transform, y_inv_transform = R_to_zero_to_one[0] + x_transform, x_inv_transform = R_to_zero_to_inf[self.x_transform_idx] + y_transform, y_inv_transform = R_to_zero_to_one[self.y_transform_idx] + + # ---- Initial guess (option 5 from original) ---------------------- + num_edges = k.sum() / 2.0 + x0_bounded = k / np.sqrt(max(num_edges, EPS)) + y0_bounded = np.random.random(num_nodes) + + # Clip to valid domain before applying inverse transform + lower = np.full(2 * num_nodes, EPS) + upper = np.concatenate([np.full(num_nodes, np.inf), + np.full(num_nodes, 1.0 - EPS)]) + v0_bounded = np.clip(np.concatenate([x0_bounded, y0_bounded]), + lower, upper) - # -------- Initial Guess -------- - # initial guess option 5 - num_edges = np.sum(k) / 2 v0 = np.concatenate([ - k / np.sqrt(num_edges + EPS), - np.random.random(num_nodes) + x_inv_transform(v0_bounded[:num_nodes]), + y_inv_transform(v0_bounded[num_nodes:]), ]) - v0 = np.concatenate(( - x_inv_transform(v0[:num_nodes]), - y_inv_transform(v0[num_nodes:]) - )) - - # -------- Bounds -------- - # bounds() - lower_bounds = np.array([EPS] * (2 * num_nodes)) - upper_bounds = np.array([np.inf] * num_nodes + [1 - EPS] * num_nodes) - bounds = scipy.optimize.Bounds(lower_bounds, upper_bounds) - - # -------- Optimierung -------- - # solve() - res = scipy.optimize.minimize( - fun=lambda v: np.array(self._neg_log_likelihood(num_nodes, k, s, x_transform, y_transform, v)), - x0=np.array(v0), - bounds=bounds, - method="L-BFGS-B" + # ---- Objective + gradient in unconstrained space --------- + fun, jac = _make_objective( + num_nodes, k, s, + x_transform, x_inv_transform, + y_transform, y_inv_transform, ) - v_opt = res.x + # ---- Optimize in reparameterized space ----------------------------- + # The optimizer works on unconstrained variables; valid ECM parameter + # domains are enforced by the x/y transforms before each evaluation. + res = so.minimize( + fun=fun, + jac=jac, + x0=v0, + method="L-BFGS-B", + ) - # -------- p-value Matrix -------- - # get_pval_matrix() - # pij: Formel (9) - # p_val: Formel (15) - z = self._transform_parameters(num_nodes, x_transform, y_transform, v_opt) + if not res.success: + import warnings + warnings.warn( + f"ECM optimisation did not converge: {res.message}", + RuntimeWarning, + ) - x = z[:num_nodes] - y = z[num_nodes:] - - W_p = scipy.sparse.tril(W.copy()).tolil().astype(float) + # ---- p-value matrix ---------------------------------------------- + x_opt = x_transform(res.x[:num_nodes]) + y_opt = y_transform(res.x[num_nodes:]) - for i, j in zip(*W.nonzero()): - w = W[i, j] - xx_out = x[i] * x[j] - yy_out = y[i] * y[j] + # Work only on the lower triangle to avoid double computation + W_lower = sp.tril(W, k=-1).tocoo() + row = W_lower.row.astype(np.int64) + col = W_lower.col.astype(np.int64) + weights = W_lower.data - pij = xx_out * yy_out / (1 - yy_out + xx_out * yy_out) - p_val = pij * (y[i] * y[j]) ** (w - 1) + pvals = _pval_matrix_data(x_opt, y_opt, row, col, weights) + keep = pvals <= self.alpha - W_p[i, j] = p_val + out_data = pvals[keep] if self.replace_weights_by_p_values else weights[keep] - W_p = W_p.tocsr() + W_backbone = sp.csr_matrix( + (out_data, (row[keep], col[keep])), + shape=(num_nodes, num_nodes), + dtype=np.float64 if self.replace_weights_by_p_values else W.dtype, + ) + W_backbone = W_backbone + W_backbone.T # Convert back to Graph return Graph.from_csr( - W_p, - mode="similarity", + W_backbone, + directed=False, weighted=True, - directed=False + mode=G.mode, + meta=(G.meta.copy() if (self.copy_meta and G.meta is not None) else G.meta), + sym_op="max", ) def apply(self, G: Graph) -> Graph: @@ -165,4 +372,3 @@ def apply(self, G: Graph) -> Graph: return self._directed(G) else: return self._undirected(G) - diff --git a/pyproject.toml b/pyproject.toml index 51059b1..e3bb512 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,6 +23,7 @@ classifiers = [ [tool.poetry.dependencies] python = ">=3.10,<3.14" +numba = ">=0.61.2" numpy = ">=2.0.0" networkx = ">3.4.0" pandas = ">=2.1.1" diff --git a/tests/test_enhanced_configuration_model.py b/tests/test_enhanced_configuration_model.py new file mode 100644 index 0000000..ef87744 --- /dev/null +++ b/tests/test_enhanced_configuration_model.py @@ -0,0 +1,307 @@ +import numpy as np +import pytest +import scipy.sparse as sp +from graphconstructor import Graph +from graphconstructor.operators import EnhancedConfigurationModelFilter +from graphconstructor.operators.enhanced_configuration_model import _neg_log_likelihood, _neg_log_likelihood_grad + + +def _csr(*, data, rows, cols, n): + """Small helper to build CSR adjacency.""" + return sp.csr_matrix((data, (rows, cols)), shape=(n, n)) + + +def _finite_difference_gradient(fun, x, y, k, s, h=1e-7): + """Central-difference gradient for x and y separately.""" + grad_x = np.empty_like(x, dtype=np.float64) + grad_y = np.empty_like(y, dtype=np.float64) + + for i in range(len(x)): + x_plus = x.copy() + x_minus = x.copy() + x_plus[i] += h + x_minus[i] -= h + grad_x[i] = (fun(x_plus, y, k, s) - fun(x_minus, y, k, s)) / (2.0 * h) + + for i in range(len(y)): + y_plus = y.copy() + y_minus = y.copy() + y_plus[i] += h + y_minus[i] -= h + grad_y[i] = (fun(x, y_plus, k, s) - fun(x, y_minus, k, s)) / (2.0 * h) + + return grad_x, grad_y + + +@pytest.fixture +def small_undirected_graph(): + """A small symmetric weighted similarity graph.""" + A = _csr( + data=[4, 4, 2, 2, 1, 1], + rows=[0, 1, 0, 2, 1, 2], + cols=[1, 0, 2, 0, 2, 1], + n=3, + ) + return Graph.from_csr(A, directed=False, weighted=True, mode="similarity") + + +def test_ecm_rejects_directed_graph(): + A = _csr( + data=[4, 1, 3, 2], + rows=[0, 1, 2, 0], + cols=[1, 2, 0, 2], + n=3, + ) + G = Graph.from_csr(A, directed=True, weighted=True, mode="similarity") + + op = EnhancedConfigurationModelFilter() + + with pytest.raises(NotImplementedError): + op.apply(G) + + +@pytest.mark.parametrize("alpha", [0.01, 0.05, 0.5, 1.0]) +def test_ecm_output_graph_basic_invariants_undirected(small_undirected_graph, alpha): + np.random.seed(0) + op = EnhancedConfigurationModelFilter(alpha=alpha) + Gp = op.apply(small_undirected_graph) + + assert isinstance(Gp, Graph) + assert Gp.n_nodes == small_undirected_graph.n_nodes + assert Gp.directed is False + assert Gp.weighted is True + assert Gp.mode == "similarity" + + Wp = Gp.adj.tocsr() + + # No self-loops in output + assert np.allclose(Wp.diagonal(), 0.0) + + # Output must remain symmetric + assert (Wp != Wp.T).nnz == 0 + + +def test_ecm_ignores_input_self_loops(): + A = _csr( + data=[10, 4, 4], + rows=[0, 0, 1], + cols=[0, 1, 0], + n=2, + ) + A = A + A.T + + G = Graph.from_csr(A.tocsr(), directed=False, weighted=True, mode="similarity") + + np.random.seed(0) + op = EnhancedConfigurationModelFilter() + Gp = op.apply(G) + + Wp = Gp.adj.tocsr() + assert np.allclose(Wp.diagonal(), 0.0) + + +def test_ecm_output_edges_are_subset_of_input_edges(small_undirected_graph): + """ + Thresholding may remove observed edges, but must never introduce new ones. + """ + np.random.seed(0) + op = EnhancedConfigurationModelFilter(alpha=0.5) + Gp = op.apply(small_undirected_graph) + + Win = small_undirected_graph.adj.tocsr() + Wout = Gp.adj.tocsr() + + in_mask = (Win != 0).astype(np.int8) + out_mask = (Wout != 0).astype(np.int8) + + diff = out_mask - in_mask + assert diff.nnz == 0 or diff.max() <= 0 + + +def test_ecm_alpha_one_keeps_all_existing_edges(small_undirected_graph): + """ + Since ECM p-values should lie in [0, 1], alpha=1 should keep all observed + off-diagonal edges. + """ + np.random.seed(0) + op = EnhancedConfigurationModelFilter(alpha=1.0) + Gp = op.apply(small_undirected_graph) + + Win = small_undirected_graph.adj.tocsr().copy() + Win = Win - sp.diags(Win.diagonal()) + Wout = Gp.adj.tocsr() + + in_mask = (Win != 0).astype(np.int8) + out_mask = (Wout != 0).astype(np.int8) + + assert (in_mask != out_mask).nnz == 0 + + +@pytest.mark.parametrize("alpha", [1e-6, 1e-3, 1e-2]) +def test_ecm_smaller_alpha_cannot_increase_number_of_edges(small_undirected_graph, alpha): + np.random.seed(0) + G_loose = EnhancedConfigurationModelFilter(alpha=1.0).apply(small_undirected_graph) + np.random.seed(0) + G_strict = EnhancedConfigurationModelFilter(alpha=alpha).apply(small_undirected_graph) + + assert G_strict.adj.nnz <= G_loose.adj.nnz + + +def test_ecm_default_keeps_original_weights_for_retained_edges(small_undirected_graph): + np.random.seed(0) + op = EnhancedConfigurationModelFilter(alpha=1.0, replace_weights_by_p_values=False) + Gp = op.apply(small_undirected_graph) + + Win = small_undirected_graph.adj.tocsr() + Wout = Gp.adj.tocsr() + + # With alpha=1, all off-diagonal edges should remain, and the weights + # should match the original weights. + assert np.array_equal(Wout.indices, Win.indices) + assert np.array_equal(Wout.indptr, Win.indptr) + assert np.allclose(Wout.data, Win.data, rtol=1e-8, atol=1e-10) + + +def test_ecm_can_replace_weights_by_p_values(small_undirected_graph): + np.random.seed(0) + op = EnhancedConfigurationModelFilter(alpha=1.0, replace_weights_by_p_values=True) + Gp = op.apply(small_undirected_graph) + + Wp = Gp.adj.tocsr() + + # Retained edge weights now represent p-values + if Wp.nnz > 0: + assert np.nanmin(Wp.data) >= -1e-12 + assert np.nanmax(Wp.data) <= 1.0 + 1e-12 + + # In general these should differ from the original weights + Win = small_undirected_graph.adj.tocsr() + assert not np.allclose(Wp.data, Win.data) + + +def test_ecm_reproducible_given_fixed_seed(small_undirected_graph): + op = EnhancedConfigurationModelFilter(alpha=0.5, replace_weights_by_p_values=True) + + np.random.seed(123) + Gp1 = op.apply(small_undirected_graph) + W1 = Gp1.adj.tocsr() + + np.random.seed(123) + Gp2 = op.apply(small_undirected_graph) + W2 = Gp2.adj.tocsr() + + assert np.array_equal(W1.indices, W2.indices) + assert np.array_equal(W1.indptr, W2.indptr) + assert np.allclose(W1.data, W2.data, rtol=1e-8, atol=1e-10) + + +@pytest.mark.parametrize("alpha", [-0.1, 0.0, 1.1]) +def test_ecm_rejects_invalid_alpha(small_undirected_graph, alpha): + op = EnhancedConfigurationModelFilter(alpha=alpha) + with pytest.raises(ValueError): + op.apply(small_undirected_graph) + + +@pytest.mark.parametrize("n", [3, 5]) +def test_neg_log_likelihood_grad_matches_finite_difference(n): + """ + Check that the analytic gradient of the ECM negative log-likelihood + matches a numerical central-difference approximation. + """ + rng = np.random.default_rng(12345) + + x = rng.uniform(0.3, 2.0, size=n).astype(np.float64) + y = rng.uniform(0.2, 0.8, size=n).astype(np.float64) + k = rng.uniform(0.5, 5.0, size=n).astype(np.float64) + s = rng.uniform(0.5, 5.0, size=n).astype(np.float64) + + grad_x_analytic, grad_y_analytic = _neg_log_likelihood_grad(x, y, k, s) + grad_x_fd, grad_y_fd = _finite_difference_gradient( + _neg_log_likelihood, x, y, k, s, h=1e-7 + ) + + np.testing.assert_allclose(grad_x_analytic, grad_x_fd, rtol=1e-6, atol=1e-7) + np.testing.assert_allclose(grad_y_analytic, grad_y_fd, rtol=1e-6, atol=1e-7) + + +def test_neg_log_likelihood_grad_returns_finite_arrays(): + x = np.array([0.7, 1.2, 1.8], dtype=np.float64) + y = np.array([0.2, 0.4, 0.6], dtype=np.float64) + k = np.array([1.0, 2.0, 3.0], dtype=np.float64) + s = np.array([1.5, 2.5, 3.5], dtype=np.float64) + + grad_x, grad_y = _neg_log_likelihood_grad(x, y, k, s) + + assert grad_x.shape == x.shape + assert grad_y.shape == y.shape + assert np.all(np.isfinite(grad_x)) + assert np.all(np.isfinite(grad_y)) + + +@pytest.mark.parametrize("alphas", [ + [0.001, 0.01, 0.05, 0.1, 0.5, 1.0], +]) +def test_ecm_retained_edges_monotone_in_alpha(small_undirected_graph, alphas): + """ + Add mock-based tests to verify that the alpha thresholding logic in ECM's apply() method + correctly retains edges based on the p-values computed from the optimization output. + """ + counts = [] + for alpha in alphas: + np.random.seed(123) + Gp = EnhancedConfigurationModelFilter(alpha=alpha).apply(small_undirected_graph) + # undirected graph stores both directions explicitly + counts.append(Gp.adj.nnz) + + assert counts == sorted(counts) + + +def test_ecm_alpha_thresholding_exact_edge_counts(monkeypatch): + A = _csr( + data=[4, 4, 2, 2, 1, 1], + rows=[0, 1, 0, 2, 1, 2], + cols=[1, 0, 2, 0, 2, 1], + n=3, + ) + G = Graph.from_csr(A, directed=False, weighted=True, mode="similarity") + + # Lower-triangle edges of this graph are: + # (1,0), (2,0), (2,1) + fake_pvals = np.array([0.001, 0.02, 0.2], dtype=np.float64) + + def fake_pval_matrix_data(x, y, row, col, weights): + assert len(weights) == 3 + return fake_pvals.copy() + + def fake_make_objective(*args, **kwargs): + def fun(v): + return 0.0 + def jac(v): + return np.zeros_like(v) + return fun, jac + + class FakeResult: + success = True + message = "ok" + x = np.zeros(2 * G.n_nodes, dtype=np.float64) + + def fake_minimize(*args, **kwargs): + return FakeResult() + + import graphconstructor.operators.enhanced_configuration_model as ecm_mod + + monkeypatch.setattr(ecm_mod, "_pval_matrix_data", fake_pval_matrix_data) + monkeypatch.setattr(ecm_mod, "_make_objective", fake_make_objective) + monkeypatch.setattr(ecm_mod.so, "minimize", fake_minimize) + + cases = [ + (0.0005, 0), # keep none + (0.01, 2), # keep one undirected edge -> 2 stored entries + (0.05, 4), # keep two undirected edges -> 4 stored entries + (0.5, 6), # keep all three undirected edges -> 6 stored entries + ] + + for alpha, expected_nnz in cases: + op = EnhancedConfigurationModelFilter(alpha=alpha) + Gp = op.apply(G) + assert Gp.adj.nnz == expected_nnz