From 82e8a9788abd76624dcc6d69234400409cac1b27 Mon Sep 17 00:00:00 2001 From: Javier Tausia Hoyal Date: Mon, 12 Jan 2026 13:10:52 +0100 Subject: [PATCH 1/4] [JTH] start docu cleaning for structured package --- .github/workflows/python-tests.yml | 25 +++++++++++--- pyproject.toml | 55 ++++++++++++++++++++++++++++++ 2 files changed, 76 insertions(+), 4 deletions(-) diff --git a/.github/workflows/python-tests.yml b/.github/workflows/python-tests.yml index 769edf9..dff4642 100644 --- a/.github/workflows/python-tests.yml +++ b/.github/workflows/python-tests.yml @@ -9,6 +9,27 @@ on: - synchronize jobs: + # Separate linting job for faster feedback + lint: + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.11" + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install .[tests] + + - name: Lint with ruff + run: | + ruff check bluemath_tk/ || true # TODO: Remove || true once docstrings are fixed + python-tests: runs-on: ${{ matrix.os }} @@ -32,10 +53,6 @@ jobs: python -m pip install --upgrade pip pip install .[all,tests] - - name: Lint - run: | - ruff check bluemath_tk/datamining/ || true # optional: don't fail lint for now - - name: Run tests run: | pytest -s -v tests diff --git a/pyproject.toml b/pyproject.toml index 0378c94..713024a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -79,3 +79,58 @@ include = ["bluemath_tk*"] [tool.setuptools_scm] version_file = "bluemath_tk/_version.py" local_scheme = "no-local-version" + +[tool.ruff] +line-length = 88 +target-version = "py311" + +[tool.ruff.lint] +# Select rules to check +select = [ + "E", # pycodestyle errors + "F", # pyflakes + "W", # pycodestyle warnings + "D", # pydocstyle (docstring checking) + "I", # isort (import sorting) + "N", # pep8-naming + "UP", # pyupgrade +] + +# Ignore some docstring rules (start lenient, tighten later) +# Focus on missing docstrings first, style issues can be auto-fixed later +ignore = [ + # "D100", # Missing docstring in public module + # "D104", # Missing docstring in public package + # "D107", # Missing docstring in __init__ + # "D203", # 1 blank line required before class docstring + # "D213", # Multi-line docstring summary should start at the second line + # "D401", # First line should be in imperative mood (can be strict) + "D202", # No blank lines allowed after docstring (auto-fixable) + # "D300", # Use """triple double quotes""" (auto-fixable) + "D400", # First line should end with a period (can fix gradually) + # "D200", # One-line docstring should fit on one line (auto-fixable) + # "D403", # First word of the first line should be capitalized (auto-fixable) + # "D404", # First word of the docstring should not be This (auto-fixable) + # "D409", # Section underline length mismatch (auto-fixable) + # "D411", # No blank line before section (auto-fixable) + # "D410", # No blank line after section (auto-fixable) + # "D407", # Missing dashed underline after section (auto-fixable) + # "D405", # Section name should be properly capitalized (auto-fixable) + # "D210", # No whitespace allowed surrounding docstring summary (auto-fixable) + # "D406", # Missing newline after section name (auto-fixable) + # "D201", # No blank lines allowed before function docstring (auto-fixable) + # "D204", # 1 blank line required after class docstring (auto-fixable) + # "D206", # Docstring should be indented with spaces, not tabs (auto-fixable) + "D205", # 1 blank line required between summary line and description (auto-fixable) + # "D419", # Empty docstring (rare, can fix manually) + "N802", # Use "f" prefix for string formatting + "N803", # Use "f" prefix for string formatting + "N806", # Use f-string for string formatting +] + +[tool.ruff.lint.pydocstyle] +convention = "numpy" # Use NumPy convention + +[tool.ruff.lint.per-file-ignores] +# Ignore docstring requirements in tests +"tests/*" = ["D103"] # Missing docstring in public function From 34a74e7645b66ee1609ea3097f6b6da77f69e52d Mon Sep 17 00:00:00 2001 From: Javier Tausia Hoyal Date: Wed, 14 Jan 2026 13:20:50 +0100 Subject: [PATCH 2/4] [JTH] add kma in pyclustering and rbf with shap --- bluemath_tk/datamining/kma.py | 526 ++++++++++++++++++++++++------- bluemath_tk/interpolation/rbf.py | 405 +++++++++++++++++++++--- pyproject.toml | 3 +- 3 files changed, 772 insertions(+), 162 deletions(-) diff --git a/bluemath_tk/datamining/kma.py b/bluemath_tk/datamining/kma.py index a1ce203..165ce8b 100644 --- a/bluemath_tk/datamining/kma.py +++ b/bluemath_tk/datamining/kma.py @@ -1,8 +1,14 @@ -from typing import Dict, List, Tuple +""" +Package: BlueMath_tk +Module: datamining +File: kma.py +Author: GeoOcean Research Group, Universidad de Cantabria +Repository: https://github.com/GeoOcean/BlueMath_tk.git +Status: Under development (Working) +""" import numpy as np import pandas as pd -from sklearn.cluster import KMeans from sklearn.linear_model import LinearRegression from ..core.decorators import validate_data_kma @@ -11,7 +17,7 @@ class KMAError(Exception): """ - Custom exception for KMA class. + Custom exception for the KMA class. """ def __init__(self, message: str = "KMA error occurred."): @@ -21,34 +27,11 @@ def __init__(self, message: str = "KMA error occurred."): class KMA(BaseClustering): """ - K-Means Algorithm (KMA) class. + K-Means Algorithm (KMA) class - generalized to support multiple clustering + algorithms. - This class performs the K-Means algorithm on a given dataframe. - - Attributes - ---------- - num_clusters : int - The number of clusters to use in the K-Means algorithm. - seed : int - The random seed to use as initial datapoint. - data_variables : List[str] - A list with all data variables. - directional_variables : List[str] - A list with directional variables. - fitting_variables : List[str] - A list with fitting variables. - custom_scale_factor : dict - A dictionary of custom scale factors. - scale_factor : dict - A dictionary of scale factors (after normalizing the data). - centroids : pd.DataFrame - The selected centroids. - normalized_centroids : pd.DataFrame - The selected normalized centroids. - centroid_real_indices : np.array - The real indices of the selected centroids. - is_fitted : bool - A flag indicating whether the model is fitted or not. + This class performs centroid-based clustering on a given dataframe. + Supports k-means, k-medians, and k-medoids (all via pyclustering). Examples -------- @@ -65,38 +48,58 @@ class KMA(BaseClustering): "Dir": np.random.rand(1000) * 360 } ) + # K-Means (default) kma = KMA(num_clusters=5) - nearest_centroids_idxs, nearest_centroids_df = kma.fit_predict( + kma.fit( data=data, directional_variables=["Dir"], ) - kma.plot_selected_centroids(plot_text=True) + References + ---------- + [1] https://github.com/asavvides/pyclustering """ def __init__( self, num_clusters: int, - seed: int = None, + seed: int | None = None, + algorithm: str = "kmeans", + distance_metric: str | None = None, + distance_matrix: np.ndarray | None = None, ) -> None: """ - Initializes the KMA class. + Initialize the KMA class. Parameters ---------- num_clusters : int - The number of clusters to use in the K-Means algorithm. + The number of clusters to use in the clustering algorithm. Must be greater than 0. seed : int, optional The random seed to use as initial datapoint. Must be greater or equal to 0 and less than number of datapoints. - Default is 0. - - Raises - ------ - ValueError - If num_clusters is not greater than 0. - Or if seed is not greater or equal to 0. + Default is None. + algorithm : str, optional + The clustering algorithm to use. Options: + - 'kmeans': K-Means (default) + - 'kmedians': K-Medians + - 'kmedoids': K-Medoids + + distance_metric : str, optional + Distance metric to use. Options depend on algorithm: + - For kmeans/kmedians: 'euclidean' (default), 'manhattan', + 'chebyshev' + - For kmedoids: 'euclidean' (default), 'manhattan', 'chebyshev', + 'minkowski' + Default is None (uses algorithm-specific default). + + distance_matrix : np.ndarray, optional + Precomputed distance matrix. Shape (n_samples, n_samples). + Element (i, j) represents the distance between data points i and j. + Currently only supported for k-medoids algorithm. + If provided, the algorithm will use this matrix instead of computing + distances from raw data. Default is None. """ super().__init__() @@ -112,21 +115,51 @@ def __init__( self.seed = int(seed) else: raise ValueError("Variable seed must be >= 0") - self._kma = KMeans( - n_clusters=self.num_clusters, - random_state=self.seed, - ) + + # Validate algorithm + algorithm = algorithm.lower() + if algorithm not in ["kmeans", "kmedians", "kmedoids"]: + raise ValueError( + f"Unknown algorithm '{algorithm}'. " + "Supported: 'kmeans', 'kmedians', 'kmedoids'" + ) + + # Store algorithm info + self.algorithm_name = algorithm + self.distance_metric = distance_metric + self.distance_matrix = distance_matrix + + # Validate distance matrix if provided + if distance_matrix is not None: + if algorithm != "kmedoids": + raise ValueError( + f"distance_matrix is only supported for 'kmedoids' algorithm, " + f"not '{algorithm}'" + ) + distance_matrix = np.array(distance_matrix) + if distance_matrix.ndim != 2: + raise ValueError("distance_matrix must be 2-dimensional") + if distance_matrix.shape[0] != distance_matrix.shape[1]: + raise ValueError("distance_matrix must be square") + self.distance_matrix = distance_matrix + + # Internal state + self._model = None + self._labels = None + self._cluster_centers = None + self._init_centers = None + self.logger.info( - f"KMA object created with {self.num_clusters} clusters and seed {self.seed}." - "To customize kma, do self.kma = dict(n_clusters=..., random_state=..., etc)" + f"KMA object created with {self.num_clusters} clusters, " + f"algorithm '{self.algorithm_name}', and seed {self.seed}." ) self._data: pd.DataFrame = pd.DataFrame() self._normalized_data: pd.DataFrame = pd.DataFrame() self._data_to_fit: pd.DataFrame = pd.DataFrame() - self.data_variables: List[str] = [] - self.directional_variables: List[str] = [] - self.fitting_variables: List[str] = [] + self.data_variables: list[str] = [] + self.directional_variables: list[str] = [] + self.fitting_variables: list[str] = [] self.custom_scale_factor: dict = {} self.scale_factor: dict = {} self.centroids: pd.DataFrame = pd.DataFrame() @@ -135,31 +168,238 @@ def __init__( self.is_fitted: bool = False self.regression_guided: dict = {} - @property - def kma(self) -> KMeans: - return self._kma + def _get_initial_centers(self, data: list) -> list: + """ + Get initial centers/medians/medoids for the algorithm. + + Parameters + ---------- + data : list + Data in pyclustering format (list of lists) or distance matrix. + + Returns + ------- + list + Initial centers/medians/medoids (indices if using distance matrix). + """ + if self._init_centers is not None: + # Use provided initial centers + if isinstance(self._init_centers, pd.DataFrame): + if self.distance_matrix is not None: + # For distance matrix, we need indices, not actual centers + # Find nearest data points to the provided centers + raise NotImplementedError( + "MDA initialization with distance matrix not yet supported" + ) + initial_centers = self._init_centers.values.tolist() + else: + if self.distance_matrix is not None: + # For distance matrix, initial_centers should be indices + initial_centers = list(np.array(self._init_centers).flatten()) + else: + initial_centers = np.array(self._init_centers).tolist() + else: + # Initialize randomly + if self.seed is not None: + np.random.seed(self.seed) + n_samples = len(data) if isinstance(data, list) else data.shape[0] + initial_centers_idx = np.random.choice( + n_samples, size=self.num_clusters, replace=False + ) + if self.distance_matrix is not None: + # For distance matrix, return indices + initial_centers = list(initial_centers_idx) + else: + # For raw data, return actual data points + initial_centers = [data[i] for i in initial_centers_idx] + return initial_centers + + def _create_pyclustering_model( + self, data: list | np.ndarray, initial_centers: list + ): + """ + Create and return the pyclustering model. + + Parameters + ---------- + data : list or np.ndarray + Data in pyclustering format (list of lists) or distance matrix. + initial_centers : list + Initial centers/medians/medoids (or indices if using distance matrix). + + Returns + ------- + pyclustering model + The initialized pyclustering clustering model. + """ + # Build kwargs for pyclustering + kwargs = {} + if self.distance_metric is not None: + # Map common metric names to pyclustering format if needed + kwargs["ccore"] = False # Use Python implementation + # Note: pyclustering's distance metric handling varies by algorithm + # For simplicity, we'll let pyclustering use defaults + # Advanced users can modify the model directly if needed + + # Import and create the appropriate algorithm + if self.algorithm_name == "kmeans": + from pyclustering.cluster.kmeans import kmeans + + self._model = kmeans(data, initial_centers, **kwargs) + elif self.algorithm_name == "kmedians": + from pyclustering.cluster.kmedians import kmedians + + self._model = kmedians(data, initial_centers, **kwargs) + elif self.algorithm_name == "kmedoids": + from pyclustering.cluster.kmedoids import kmedoids + + # For k-medoids, support distance matrix + if self.distance_matrix is not None: + # Convert distance matrix to list of lists + if isinstance(self.distance_matrix, np.ndarray): + distance_data = self.distance_matrix.tolist() + else: + distance_data = self.distance_matrix + self._model = kmedoids( + distance_data, + initial_centers, + data_type="distance_matrix", + **kwargs, + ) + else: + self._model = kmedoids(data, initial_centers, **kwargs) + + return self._model + + def _fit_clustering(self, normalized_data: pd.DataFrame): + """ + Fit the clustering algorithm to normalized data. + + Parameters + ---------- + normalized_data : pd.DataFrame + Normalized data to fit (ignored if distance_matrix is provided). + """ + try: + # Use distance matrix if provided, otherwise use normalized data + if self.distance_matrix is not None: + data = self.distance_matrix + else: + # Convert to list of lists (pyclustering format) + data = normalized_data.values.tolist() + + # Get initial centers + initial_centers = self._get_initial_centers(data) + + # Create and process the model + self._create_pyclustering_model(data, initial_centers) + self._model.process() + + # Extract labels + clusters = self._model.get_clusters() + n_samples = ( + self.distance_matrix.shape[0] + if self.distance_matrix is not None + else len(data) + ) + self._labels = np.zeros(n_samples, dtype=int) + for cluster_idx, cluster in enumerate(clusters): + for point_idx in cluster: + self._labels[point_idx] = cluster_idx + + # Extract centers (method name varies by algorithm) + if self.algorithm_name == "kmeans": + centers = self._model.get_centers() + elif self.algorithm_name == "kmedians": + centers = self._model.get_medians() + elif self.algorithm_name == "kmedoids": + if self.distance_matrix is not None: + # For distance matrix, medoids are indices, not actual points + # We need to get the actual data points corresponding to medoid + # indices + medoid_indices = self._model.get_medoids() + # If we have normalized_data, use it to get actual centers + if not normalized_data.empty: + centers = normalized_data.iloc[medoid_indices].values + else: + # Can't get actual centers without data, use indices as + # placeholder. This shouldn't happen in normal usage + raise KMAError( + "Cannot extract centers from distance matrix without " + "original data. Provide normalized_data when using " + "distance_matrix." + ) + else: + centers = self._model.get_medoids() + + self._cluster_centers = np.array(centers) + + except ImportError: + raise ImportError( + "Clustering algorithms require pyclustering. " + "Install it with: pip install pyclustering" + ) - @kma.setter - def kma(self, kma_params_dict) -> None: + def _predict_clustering(self, normalized_data: pd.DataFrame) -> np.ndarray: """ - Setter for the KMeans object. + Predict cluster labels for normalized data. Parameters ---------- - kma_params_dict : dict - A dictionary with KMeans parameters. - The keys should be the same as the KMeans parameters. - Example: {"n_clusters": 5, "random_state": 42} + normalized_data : pd.DataFrame + Normalized data to predict. + + Returns + ------- + np.ndarray + Cluster labels. + """ + if self._model is None: + raise KMAError("Model must be fitted before prediction.") + + # Convert to numpy array if needed + if isinstance(normalized_data, pd.DataFrame): + X = normalized_data.values + else: + X = np.array(normalized_data) + + # Calculate distances to each center + centers = self._cluster_centers + + # Use appropriate distance metric based on algorithm + if self.algorithm_name == "kmedians": + # L1 distance for medians + distances = np.array( + [np.sum(np.abs(X - center), axis=1) for center in centers] + ) + else: + # L2 distance (Euclidean) for means and medoids + distances = np.array( + [np.sum((X - center) ** 2, axis=1) for center in centers] + ) + + # Assign to closest center + labels = np.argmin(distances, axis=0) + return labels + + @property + def kma(self) -> object: """ + Returns the clustering algorithm object (for advanced usage). - self._kma = KMeans(**kma_params_dict) + Returns + ------- + object + The pyclustering model object. Can be modified directly for advanced + usage. + """ + return self._model @property def data(self) -> pd.DataFrame: """ Returns the original data used for clustering. """ - return self._data @property @@ -167,20 +407,18 @@ def normalized_data(self) -> pd.DataFrame: """ Returns the normalized data used for clustering. """ - return self._normalized_data @property def data_to_fit(self) -> pd.DataFrame: """ - Returns the data used for fitting the K-Means algorithm. + Returns the data used for fitting the clustering algorithm. """ - return self._data_to_fit @staticmethod def add_regression_guided( - data: pd.DataFrame, vars: List[str], alpha: List[float] + data: pd.DataFrame, vars: list[str], alpha: list[float] ) -> pd.DataFrame: """ Calculate regression-guided variables. @@ -188,10 +426,10 @@ def add_regression_guided( Parameters ---------- data : pd.DataFrame - The data to fit the K-Means algorithm. - vars : List[str] + The data to fit the clustering algorithm. + vars : list[str] The variables to use for regression-guided clustering. - alpha : List[float] + alpha : list[float] The alpha values to use for regression-guided clustering. Returns @@ -233,26 +471,28 @@ def add_regression_guided( def fit( self, data: pd.DataFrame, - directional_variables: List[str] = [], + directional_variables: list[str] = [], custom_scale_factor: dict = {}, min_number_of_points: int = None, max_number_of_iterations: int = 10, normalize_data: bool = False, - regression_guided: Dict[str, List] = {}, + regression_guided: dict[str, list] = {}, + init_mda_centroids: pd.DataFrame | None = None, ) -> None: """ - Fit the K-Means algorithm to the provided data. - - TODO: Add option to force KMA initialization with MDA centroids. + Fit the clustering algorithm to the provided data. Parameters ---------- data : pd.DataFrame - The input data to be used for the KMA algorithm. - directional_variables : List[str], optional - A list of directional variables that will be transformed to u and v components. - Then, to use custom_scale_factor, you must specify the variables names with the u and v suffixes. - Example: directional_variables=["Dir"], custom_scale_factor={"Dir_u": [0, 1], "Dir_v": [0, 1]}. + The input data to be used for the clustering algorithm. + directional_variables : list[str], optional + A list of directional variables that will be transformed to u and v + components. + Then, to use custom_scale_factor, you must specify the variables names with + the u and v suffixes. + Example: directional_variables=["Dir"], + custom_scale_factor={"Dir_u": [0, 1], "Dir_v": [0, 1]}. Default is []. custom_scale_factor : dict, optional A dictionary specifying custom scale factors for normalization. @@ -263,7 +503,7 @@ def fit( The minimum number of points to consider a cluster. Default is None. max_number_of_iterations : int, optional - The maximum number of iterations for the K-Means algorithm. + The maximum number of iterations for the clustering algorithm. This is used when min_number_of_points is not None. Default is 10. normalize_data : bool, optional @@ -271,9 +511,15 @@ def fit( If True, the data will be normalized using the custom_scale_factor. Default is False. regression_guided: dict, optional - A dictionary specifying regression-guided clustering variables and relative weights. + A dictionary specifying regression-guided clustering variables and + relative weights. Example: {"vars": ["Fe"], "alpha": [0.6]}. Default is {}. + init_mda_centroids : pd.DataFrame, optional + Normalized centroids from MDA algorithm to use as initialization. + Should have shape (num_clusters, n_features) and be in normalized space. + If provided, the clustering algorithm will be initialized with these + centroids. Default is None. """ if regression_guided: @@ -290,33 +536,75 @@ def fit( normalize_data=normalize_data, ) - # Fit K-Means algorithm + # Handle MDA initialization if provided + if init_mda_centroids is not None: + if not isinstance(init_mda_centroids, pd.DataFrame): + raise ValueError("init_mda_centroids must be a pd.DataFrame") + if init_mda_centroids.shape[0] != self.num_clusters: + raise ValueError( + f"init_mda_centroids must have {self.num_clusters} rows, " + f"but got {init_mda_centroids.shape[0]}" + ) + if list(init_mda_centroids.columns) != self.fitting_variables: + raise ValueError( + f"init_mda_centroids columns must match fitting_variables: " + f"{self.fitting_variables}" + ) + + # Store MDA centroids for initialization + self._init_centers = init_mda_centroids + self.logger.info(f"Initializing {self.algorithm_name} with MDA centroids.") + + # Fit clustering algorithm if min_number_of_points is not None: - stable_kma_child = False + stable_cluster = False number_of_tries = 0 - while not stable_kma_child: - kma_child = KMeans(n_clusters=self.num_clusters) - predicted_labels = kma_child.fit_predict(self.normalized_data) + while not stable_cluster: + # Create a new instance + # If MDA init was provided, use it for first try, then random + if init_mda_centroids is not None and number_of_tries == 0: + self._init_centers = init_mda_centroids + else: + self._init_centers = None + self.seed = None # Use different random state each try + + self._fit_clustering(self.normalized_data) + predicted_labels = self._labels.copy() + _unique_labels, counts = np.unique(predicted_labels, return_counts=True) if np.all(counts >= min_number_of_points): - stable_kma_child = True + stable_cluster = True number_of_tries += 1 if number_of_tries > max_number_of_iterations: raise ValueError( - f"Failed to find a stable K-Means configuration after {max_number_of_iterations} attempts." + f"Failed to find a stable {self.algorithm_name} configuration " + f"after {max_number_of_iterations} attempts. " "Change max_number_of_iterations or min_number_of_points." ) self.logger.info( - f"Found a stable K-Means configuration after {number_of_tries} attempts." + f"Found a stable {self.algorithm_name} configuration after " + f"{number_of_tries} attempts." ) - self._kma = kma_child else: - self._kma = self.kma.fit(self.normalized_data) + self._fit_clustering(self.normalized_data) # Calculate the centroids - self.centroid_real_indices = self.kma.labels_.copy() + self.centroid_real_indices = self._labels.copy() + + # Use the cluster centers from the model + if self._cluster_centers is not None: + centers = self._cluster_centers + else: + # Fallback: compute centroids from labels + centers = np.array( + [ + self.normalized_data[self._labels == i].mean(axis=0) + for i in range(self.num_clusters) + ] + ) + self.normalized_centroids = pd.DataFrame( - self.kma.cluster_centers_, columns=self.fitting_variables + centers, columns=self.fitting_variables ) self.centroids = self.denormalize( normalized_data=self.normalized_centroids, scale_factor=self.scale_factor @@ -331,7 +619,7 @@ def fit( # Set the fitted flag to True self.is_fitted = True - def predict(self, data: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]: + def predict(self, data: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame]: """ Predict the nearest centroid for the provided data. @@ -342,7 +630,7 @@ def predict(self, data: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]: Returns ------- - Tuple[pd.DataFrame, pd.DataFrame] + tuple[pd.DataFrame, pd.DataFrame] A tuple containing the nearest centroid index for each data point, and the nearest centroids. """ @@ -352,34 +640,39 @@ def predict(self, data: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]: normalized_data = super().predict(data=data) - y = self.kma.predict(X=normalized_data) + y = self._predict_clustering(normalized_data) - return pd.DataFrame( - y, columns=["kma_bmus"], index=data.index - ), self.centroids.iloc[y] + return ( + pd.DataFrame(y, columns=["kma_bmus"], index=data.index), + self.centroids.iloc[y], + ) def fit_predict( self, data: pd.DataFrame, - directional_variables: List[str] = [], + directional_variables: list[str] = [], custom_scale_factor: dict = {}, min_number_of_points: int = None, max_number_of_iterations: int = 10, normalize_data: bool = False, - regression_guided: Dict[str, List] = {}, - ) -> Tuple[pd.DataFrame, pd.DataFrame]: + regression_guided: dict[str, list] = {}, + init_mda_centroids: pd.DataFrame | None = None, + ) -> tuple[pd.DataFrame, pd.DataFrame]: """ - Fit the K-Means algorithm to the provided data and predict the nearest centroid - for each data point. + Fit the clustering algorithm to the provided data and predict the nearest + centroid for each data point. Parameters ---------- data : pd.DataFrame - The input data to be used for the KMA algorithm. - directional_variables : List[str], optional - A list of directional variables that will be transformed to u and v components. - Then, to use custom_scale_factor, you must specify the variables names with the u and v suffixes. - Example: directional_variables=["Dir"], custom_scale_factor={"Dir_u": [0, 1], "Dir_v": [0, 1]}. + The input data to be used for the clustering algorithm. + directional_variables : list[str], optional + A list of directional variables that will be transformed to u and v + components. + Then, to use custom_scale_factor, you must specify the variables names with + the u and v suffixes. + Example: directional_variables=["Dir"], + custom_scale_factor={"Dir_u": [0, 1], "Dir_v": [0, 1]}. Default is []. custom_scale_factor : dict, optional A dictionary specifying custom scale factors for normalization. @@ -390,7 +683,7 @@ def fit_predict( The minimum number of points to consider a cluster. Default is None. max_number_of_iterations : int, optional - The maximum number of iterations for the K-Means algorithm. + The maximum number of iterations for the clustering algorithm. This is used when min_number_of_points is not None. Default is 10. normalize_data : bool, optional @@ -398,13 +691,19 @@ def fit_predict( If True, the data will be normalized using the custom_scale_factor. Default is False. regression_guided: dict, optional - A dictionary specifying regression-guided clustering variables and relative weights. + A dictionary specifying regression-guided clustering variables and + relative weights. Example: {"vars": ["Fe"], "alpha": [0.6]}. Default is {}. + init_mda_centroids : pd.DataFrame, optional + Normalized centroids from MDA algorithm to use as initialization. + Should have shape (num_clusters, n_features) and be in normalized space. + If provided, the clustering algorithm will be initialized with these + centroids. Default is None. Returns ------- - Tuple[pd.DataFrame, pd.DataFrame] + tuple[pd.DataFrame, pd.DataFrame] A tuple containing the nearest centroid index for each data point, and the nearest centroids. """ @@ -417,6 +716,7 @@ def fit_predict( max_number_of_iterations=max_number_of_iterations, normalize_data=normalize_data, regression_guided=regression_guided, + init_mda_centroids=init_mda_centroids, ) return self.predict(data=data) diff --git a/bluemath_tk/interpolation/rbf.py b/bluemath_tk/interpolation/rbf.py index f8a3342..5e6c90b 100644 --- a/bluemath_tk/interpolation/rbf.py +++ b/bluemath_tk/interpolation/rbf.py @@ -1,5 +1,14 @@ +""" +Package: BlueMath_tk +Module: interpolation +File: rbf.py +Author: GeoOcean Research Group, Universidad de Cantabria +Repository: https://github.com/GeoOcean/BlueMath_tk.git +Status: Under development (Working) +""" + import time -from typing import Callable, List, Tuple +from collections.abc import Callable import dask.array as da import numpy as np @@ -13,24 +22,24 @@ def gaussian_kernel(r: float, const: float) -> float: """ - This function calculates the Gaussian kernel for the given distance and constant. + Calculate the Gaussian kernel value for the given distance and constant. Parameters ---------- r : float - The distance. + The distance between the data points. const : float - The constant (default name is usually sigma for gaussian kernel). + The constant (usually called sigma for the Gaussian kernel). Returns ------- float - The Gaussian kernel value. + The value of the Gaussian kernel. Notes ----- - The Gaussian kernel is defined as: - K(r) = exp(r^2 / 2 * const^2) (https://en.wikipedia.org/wiki/Gaussian_function) + K(r) = exp(-0.5 * (r / const)**2) (https://en.wikipedia.org/wiki/Gaussian_function) - Here, we are assuming the mean is 0. """ @@ -38,24 +47,88 @@ def gaussian_kernel(r: float, const: float) -> float: def multiquadratic_kernel(r, const): + """ + Calculate the multiquadratic kernel value. + + Parameters + ---------- + r : float + The distance between the data points. + const : float + The constant parameter. + + Returns + ------- + float + The value of the multiquadratic kernel. + """ + return np.sqrt(1 + (r / const) ** 2) def inverse_kernel(r, const): + """ + Calculate the inverse multiquadratic kernel value. + + Parameters + ---------- + r : float + The distance between the data points. + const : float + The constant parameter. + + Returns + ------- + float + The value of the inverse multiquadratic kernel. + """ + return 1 / np.sqrt(1 + (r / const) ** 2) def cubic_kernel(r, const): + """ + Calculate the cubic kernel value. + + Parameters + ---------- + r : float + The distance between the data points. + const : float + The constant parameter (not used in cubic kernel). + + Returns + ------- + float + The value of the cubic kernel. + """ + return r**3 def thin_plate_kernel(r, const): + """ + Calculate the thin plate spline kernel value. + + Parameters + ---------- + r : float + The distance between the data points. + const : float + The constant parameter. + + Returns + ------- + float + The value of the thin plate spline kernel. + """ + return r**2 * np.log(r / const) class RBFError(Exception): """ - Custom exception for RBF class. + Custom exception for RBF interpolation model. """ def __init__(self, message: str = "RBF error occurred."): @@ -76,8 +149,9 @@ class RBF(BaseInterpolation): The maximum value for the sigma parameter. This value might change in the optimization process. sigma_diff : float - The difference between the optimal bounded sigma and the minimum and maximum sigma values. - If the difference is less than this value, the optimization process continues. + The difference between the optimal bounded sigma and the minimum and + maximum sigma values. If the difference is less than this value, the + optimization process continues. kernel : str The kernel to use for the RBF model. The available kernels are: @@ -101,13 +175,13 @@ class RBF(BaseInterpolation): normalized_target_data : pd.DataFrame The normalized target data used to fit the model. This attribute is only set if normalize_target_data is True in the fit method. - subset_directional_variables : List[str] + subset_directional_variables : list[str] The subset directional variables. - target_directional_variables : List[str] + target_directional_variables : list[str] The target directional variables. - subset_processed_variables : List[str] + subset_processed_variables : list[str] The subset processed variables. - target_processed_variables : List[str] + target_processed_variables : list[str] The target processed variables. subset_custom_scale_factor : dict The custom scale factor for the subset data. @@ -130,12 +204,16 @@ class RBF(BaseInterpolation): Predicts the data for the provided dataset. fit_predict -> pd.DataFrame Fits the model to the subset and predicts the interpolated dataset. + explain -> dict or shap.Explanation + Explains predictions using SHAP values to show feature importance + and contributions. Notes ----- - TODO: For the moment, this class only supports optimization for one parameter kernels. - For this reason, we only have sigma as the parameter to optimize. - This sigma refers to the sigma parameter in the Gaussian kernel (but is used for all kernels). + TODO: For the moment, this class only supports optimization for one + parameter kernels. For this reason, we only have sigma as the + parameter to optimize. This sigma refers to the sigma parameter + in the Gaussian kernel (but is used for all kernels). Main reference for sigma optimization: https://link.springer.com/article/10.1023/A:1018975909870 @@ -201,6 +279,8 @@ def __init__( smooth: float = 1e-5, ): """ + Initialize RBF interpolation model. + Raises ------ ValueError @@ -264,10 +344,10 @@ def __init__( self._normalized_subset_data: pd.DataFrame = pd.DataFrame() self._target_data: pd.DataFrame = pd.DataFrame() self._normalized_target_data: pd.DataFrame = pd.DataFrame() - self._subset_directional_variables: List[str] = [] - self._target_directional_variables: List[str] = [] - self._subset_processed_variables: List[str] = [] - self._target_processed_variables: List[str] = [] + self._subset_directional_variables: list[str] = [] + self._target_directional_variables: list[str] = [] + self._subset_processed_variables: list[str] = [] + self._target_processed_variables: list[str] = [] self._subset_custom_scale_factor: dict = {} self._target_custom_scale_factor: dict = {} self._subset_scale_factor: dict = {} @@ -283,95 +363,116 @@ def __init__( @property def sigma_min(self) -> float: + """Return the minimum sigma value.""" return self._sigma_min @property def sigma_max(self) -> float: + """Return the maximum sigma value.""" return self._sigma_max @property def sigma_diff(self) -> float: + """Return the sigma difference threshold.""" return self._sigma_diff @property def sigma_opt(self) -> float: + """Return the optimal sigma value.""" return self._sigma_opt @property def kernel(self) -> str: + """Return the kernel name.""" return self._kernel @property def kernel_func(self) -> Callable: + """Return the kernel function.""" return self._kernel_func @property def smooth(self) -> float: + """Return the smoothness parameter.""" return self._smooth @property def subset_data(self) -> pd.DataFrame: + """Return the subset data.""" return self._subset_data @property def normalized_subset_data(self) -> pd.DataFrame: + """Return the normalized subset data.""" return self._normalized_subset_data @property def target_data(self) -> pd.DataFrame: + """Return the target data.""" return self._target_data @property def normalized_target_data(self) -> pd.DataFrame: + """Return the normalized target data.""" if self._normalized_target_data.empty: raise ValueError("Target data is not normalized.") return self._normalized_target_data @property - def subset_directional_variables(self) -> List[str]: + def subset_directional_variables(self) -> list[str]: + """Return the subset directional variables.""" return self._subset_directional_variables @property - def target_directional_variables(self) -> List[str]: + def target_directional_variables(self) -> list[str]: + """Return the target directional variables.""" return self._target_directional_variables @property - def subset_processed_variables(self) -> List[str]: + def subset_processed_variables(self) -> list[str]: + """Return the subset processed variables.""" return self._subset_processed_variables @property - def target_processed_variables(self) -> List[str]: + def target_processed_variables(self) -> list[str]: + """Return the target processed variables.""" return self._target_processed_variables @property def subset_custom_scale_factor(self) -> dict: + """Return the subset custom scale factor.""" return self._subset_custom_scale_factor @property def target_custom_scale_factor(self) -> dict: + """Return the target custom scale factor.""" return self._target_custom_scale_factor @property def subset_scale_factor(self) -> dict: + """Return the subset scale factor.""" return self._subset_scale_factor @property def target_scale_factor(self) -> dict: + """Return the target scale factor.""" return self._target_scale_factor @property def rbf_coeffs(self) -> pd.DataFrame: + """Return the RBF coefficients.""" return self._rbf_coeffs @property def opt_sigmas(self) -> dict: + """Return the optimal sigmas.""" return self._opt_sigmas def _preprocess_subset_data( self, subset_data: pd.DataFrame, is_fit: bool = True ) -> pd.DataFrame: """ - This function preprocesses the subset data. + Preprocess the subset data. Parameters ---------- @@ -392,7 +493,7 @@ def _preprocess_subset_data( Notes ----- - - This function preprocesses the subset data by: + - Preprocesses the subset data by: - Checking for NaNs. - Preprocessing directional variables. - Normalizing the data. @@ -442,7 +543,7 @@ def _preprocess_target_data( normalize_target_data: bool = True, ) -> pd.DataFrame: """ - This function preprocesses the target data. + Preprocess the target data. Parameters ---------- @@ -463,7 +564,7 @@ def _preprocess_target_data( Notes ----- - - This function preprocesses the target data by: + - Preprocesses the target data by: - Checking for NaNs. - Preprocessing directional variables. - Normalizing the data. @@ -546,9 +647,9 @@ def _rbf_assemble(self, x, sigma): def _calc_rbf_coeff( self, sigma: float, x: np.ndarray, y: np.ndarray - ) -> Tuple[np.ndarray, np.ndarray]: + ) -> tuple[np.ndarray, np.ndarray]: """ - This function calculates the RBF coefficients for the given data. + Calculate the RBF coefficients for the given data. Parameters ---------- @@ -561,7 +662,7 @@ def _calc_rbf_coeff( Returns ------- - Tuple[np.ndarray, np.ndarray] + tuple[np.ndarray, np.ndarray] The RBF coefficients and the A matrix. """ @@ -581,7 +682,7 @@ def _calc_rbf_coeff( def _cost_sigma(self, sigma: float, x: np.ndarray, y: np.ndarray) -> float: """ - This function is called by fminbound to minimize the cost function. + Minimize the cost function (called by fminbound). Parameters ---------- @@ -618,7 +719,8 @@ def _cost_sigma(self, sigma: float, x: np.ndarray, y: np.ndarray) -> float: # kk = kk - rbf_coeff[m1 - m + i] * x[i, :] kk -= np.dot(rbf_coeff[m1 - m :].T, x).reshape(-1) - # Calculate the cost by multiplying invA with kk and normalizing by the diagonal elements of invA + # Calculate the cost by multiplying invA with kk and normalizing + # by the diagonal elements of invA ceps = np.dot(invA, kk) / np.diagonal(invA) # Return the norm of ceps, representing the cost @@ -633,7 +735,7 @@ def _calc_opt_sigma( iteratively_update_sigma: bool = False, ) -> float: """ - This function calculates the optimal sigma for the given target variable. + Calculate the optimal sigma for the given target variable. Parameters ---------- @@ -770,7 +872,7 @@ def _rbf_interpolate( self, dataset: pd.DataFrame, num_workers: int = None ) -> pd.DataFrame: """ - This function interpolates the dataset. + Interpolate the dataset. Parameters ---------- @@ -801,7 +903,8 @@ def _rbf_interpolate( # Loop through the target variables if num_workers > 1: self.logger.info( - f"Interpolating target variables using parallel execution and num_workers={num_workers}" + f"Interpolating target variables using parallel execution " + f"and num_workers={num_workers}" ) rbf_interpolated_vars = self.parallel_execute( func=self._rbf_variable_interpolation, @@ -841,8 +944,8 @@ def fit( self, subset_data: pd.DataFrame, target_data: pd.DataFrame, - subset_directional_variables: List[str] = [], - target_directional_variables: List[str] = [], + subset_directional_variables: list[str] = [], + target_directional_variables: list[str] = [], subset_custom_scale_factor: dict = {}, normalize_target_data: bool = True, target_custom_scale_factor: dict = {}, @@ -858,9 +961,9 @@ def fit( The subset data used to fit the model. target_data : pd.DataFrame The target data used to fit the model. - subset_directional_variables : List[str], optional + subset_directional_variables : list[str], optional The subset directional variables. Default is []. - target_directional_variables : List[str], optional + target_directional_variables : list[str], optional The target directional variables. Default is []. subset_custom_scale_factor : dict, optional The custom scale factor for the subset data. Default is {}. @@ -901,7 +1004,8 @@ def fit( if num_workers > 1: self.logger.info( - f"Fitting RBF model using parallel execution and num_workers={num_workers}" + f"Fitting RBF model using parallel execution " + f"and num_workers={num_workers}" ) rbf_coeffs_and_sigmas = self.parallel_execute( func=self._calc_opt_sigma, @@ -994,8 +1098,8 @@ def fit_predict( subset_data: pd.DataFrame, target_data: pd.DataFrame, dataset: pd.DataFrame, - subset_directional_variables: List[str] = [], - target_directional_variables: List[str] = [], + subset_directional_variables: list[str] = [], + target_directional_variables: list[str] = [], subset_custom_scale_factor: dict = {}, normalize_target_data: bool = True, target_custom_scale_factor: dict = {}, @@ -1013,9 +1117,9 @@ def fit_predict( The target data used to fit the model. dataset : pd.DataFrame The dataset to predict (must have same variables than subset). - subset_directional_variables : List[str], optional + subset_directional_variables : list[str], optional The subset directional variables. Default is []. - target_directional_variables : List[str], optional + target_directional_variables : list[str], optional The target directional variables. Default is []. subset_custom_scale_factor : dict, optional The custom scale factor for the subset data. Default is {}. @@ -1035,7 +1139,7 @@ def fit_predict( Notes ----- - - This function fits the model to the subset and predicts the interpolated dataset. + - Fits the model to the subset and predicts the interpolated dataset. """ if num_workers is None: @@ -1055,6 +1159,211 @@ def fit_predict( return self.predict(dataset=dataset, num_workers=num_workers) + def explain( + self, + dataset: pd.DataFrame, + target_variable: str = None, + num_samples: int = 100, + max_background_samples: int = 100, + ): + """ + Explain RBF predictions using SHAP (SHapley Additive exPlanations) values. + + This method provides comprehensive model interpretability by automatically + generating interactive SHAP visualizations for each target variable. It uses + the training subset data as background. + + Parameters + ---------- + dataset : pd.DataFrame + The test dataset to explain predictions for. Must have the same variables + as the subset_data used for fitting. + target_variable : str, optional + The target variable to explain. If None, explains all target variables. + Default is None. + num_samples : int, optional + Number of samples to use for SHAP approximation. Higher values give + more accurate results but are slower. Default is 100. + Recommended: 100-500 for good balance between speed and accuracy. + max_background_samples : int, optional + Maximum number of background samples to use. The subset data will be + automatically summarized using k-means if it exceeds this value. + Default is 100. + + Returns + ------- + dict + Dictionary containing explanation results for each target variable: + - 'explanation': SHAP Explanation object + - 'shap_values': numpy array of SHAP values + - 'expected_value': base/expected prediction value + - 'feature_importance': pandas Series with mean absolute SHAP values + - 'summary_stats': dictionary with summary statistics + + Raises + ------ + ImportError + If SHAP is not installed. + RBFError + If the model is not fitted. + """ + + try: + import logging + + import shap + + # Suppress SHAP INFO logs (keep progress bars) + shap_logger = logging.getLogger("shap") + shap_logger.setLevel(logging.WARNING) + + shap.initjs() # Initialize JavaScript for interactive plots + except ImportError: + raise ImportError( + "SHAP is required for explain method. Install with: pip install shap" + ) + + if not self.is_fitted: + raise RBFError("RBF model must be fitted before explaining.") + + # Determine which target variables to explain + if target_variable is None: + target_vars = self.target_processed_variables + else: + if target_variable not in self.target_processed_variables: + raise ValueError( + f"target_variable '{target_variable}' not found in " + f"target_processed_variables: {self.target_processed_variables}" + ) + target_vars = [target_variable] + + # Prepare background data from subset (automatic) + background = self.subset_data.copy() + + # Summarize background data for efficiency if it's too large + if len(background) > max_background_samples: + self.logger.info( + f"Summarizing background data from {len(background)} " + f"to {max_background_samples} samples using k-means" + ) + n_clusters = min(max_background_samples, len(background)) + background_summary = shap.kmeans(background, n_clusters) + else: + n_clusters = len(background) + background_summary = background + + explanations = {} + + for target_var in target_vars: + self.logger.info( + f"Explaining predictions for target variable: {target_var}" + ) + + # Create a prediction function for this specific target variable + # This wrapper is needed because SHAP expects a function that takes + # numpy arrays and returns predictions + def predict_fn(X): + """ + Predict the target variable for SHAP explanation. + + Parameters + ---------- + X : np.ndarray + Input features in normalized space (shape: n_samples, n_features) + + Returns + ------- + np.ndarray + Predictions for the target variable (shape: n_samples,) + """ + + # Convert to DataFrame with proper column names + dataset = pd.DataFrame(X, columns=self.subset_processed_variables) + + # Get predictions for all targets + predictions = self.predict(dataset=dataset) + + # Return only the target variable we're explaining + return predictions[target_var] + + # Create SHAP explainer + self.logger.info( + f"Creating SHAP KernelExplainer with {n_clusters} " + f"background samples and {num_samples} evaluation samples" + ) + explainer = shap.KernelExplainer(predict_fn, background_summary) + + # Calculate SHAP values using normalized data + self.logger.info(f"Calculating SHAP values for {len(dataset)} samples...") + shap_values = explainer.shap_values(dataset.values, nsamples=num_samples) + + # Ensure shap_values is 2D (handle both single and multiple samples) + shap_values = np.array(shap_values) + if shap_values.ndim == 1: + shap_values = shap_values.reshape(1, -1) + + # Create SHAP Explanation object using original dataset for plotting + explanation = shap.Explanation( + values=shap_values, + base_values=explainer.expected_value, + data=dataset.values, + feature_names=dataset.columns.tolist(), + ) + + # Calculate feature importance (mean absolute SHAP values) + feature_importance = pd.Series( + np.abs(shap_values).mean(axis=0), + index=self.subset_processed_variables, + name="mean_abs_shap", + ).sort_values(ascending=False) + + # Calculate summary statistics + summary_stats = { + "n_samples": len(dataset), + "n_features": len(self.subset_processed_variables), + "expected_value": float(explainer.expected_value), + "mean_prediction": float( + shap_values.sum(axis=1).mean() + explainer.expected_value + ), + "shap_values_range": { + "min": float(shap_values.min()), + "max": float(shap_values.max()), + "mean": float(shap_values.mean()), + "std": float(shap_values.std()), + }, + "top_features": feature_importance.head(10).to_dict(), + } + + # Generate single comprehensive plot + self.logger.info(f"Generating SHAP summary plot for {target_var}") + + # Print minimal essential statistics + print(f"SHAP Explanation: {target_var}") + print(f" Baseline: {summary_stats['expected_value']:.4f}") + print(f" Mean prediction: {summary_stats['mean_prediction']:.4f}") + top_features = ", ".join(feature_importance.head(5).index.tolist()) + print(f" Top 5 features: {top_features}") + + # Single comprehensive summary plot using original dataset + shap.summary_plot(shap_values, dataset, show=True) + + # Store comprehensive explanation + explanations[target_var] = { + "explanation": explanation, + "shap_values": shap_values, + "expected_value": explainer.expected_value, + "feature_importance": feature_importance, + "summary_stats": summary_stats, + "explainer": explainer, + "data": dataset, # Store original dataset + } + + # Return single explanation if only one target variable + if len(explanations) == 1: + return list(explanations.values())[0] + + return explanations + def basic_rbf_metric(df_true: pd.DataFrame, df_pred: pd.DataFrame) -> float: """ @@ -1079,8 +1388,8 @@ def basic_rbf_metric(df_true: pd.DataFrame, df_pred: pd.DataFrame) -> float: def KFold_cross_validation_RBF( subset_data: pd.DataFrame, target_data: pd.DataFrame, - subset_directional_variables: List[str] = [], - target_directional_variables: List[str] = [], + subset_directional_variables: list[str] = [], + target_directional_variables: list[str] = [], subset_custom_scale_factor: dict = {}, normalize_target_data: bool = True, target_custom_scale_factor: dict = {}, diff --git a/pyproject.toml b/pyproject.toml index 713024a..7f6ce63 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,7 @@ dependencies = [ "zarr", "scipy", "scikit-learn", + "pyclustering", "matplotlib", "cartopy", "jinja2", @@ -108,7 +109,7 @@ ignore = [ "D202", # No blank lines allowed after docstring (auto-fixable) # "D300", # Use """triple double quotes""" (auto-fixable) "D400", # First line should end with a period (can fix gradually) - # "D200", # One-line docstring should fit on one line (auto-fixable) + "D200", # One-line docstring should fit on one line (auto-fixable) # "D403", # First word of the first line should be capitalized (auto-fixable) # "D404", # First word of the docstring should not be This (auto-fixable) # "D409", # Section underline length mismatch (auto-fixable) From c4f40a812044a7af6bfff9e8aac775c0ecc94d06 Mon Sep 17 00:00:00 2001 From: Javier Tausia Hoyal Date: Wed, 14 Jan 2026 19:59:28 +0100 Subject: [PATCH 3/4] [JTH] add tests for kma and consistency across docu --- bluemath_tk/core/decorators.py | 82 +++-- bluemath_tk/datamining/kma.py | 75 ++++- bluemath_tk/interpolation/rbf.py | 142 ++++----- tests/datamining/test_kma.py | 524 +++++++++++++++++++++++++++++-- 4 files changed, 673 insertions(+), 150 deletions(-) diff --git a/bluemath_tk/core/decorators.py b/bluemath_tk/core/decorators.py index 7b1c200..47a6ba2 100644 --- a/bluemath_tk/core/decorators.py +++ b/bluemath_tk/core/decorators.py @@ -1,5 +1,12 @@ +""" +Validation decorators for BlueMath_tk classes. + +This module provides decorators to validate input data for various +clustering, reduction, and analysis methods. +""" + import functools -from typing import Any, Dict, List +from typing import Any import pandas as pd import xarray as xr @@ -7,7 +14,7 @@ def validate_data_lhs(func): """ - Decorator to validate data in LHS class fit method. + Validate data in LHS class fit method. Parameters ---------- @@ -23,9 +30,9 @@ def validate_data_lhs(func): @functools.wraps(func) def wrapper( self, - dimensions_names: List[str], - lower_bounds: List[float], - upper_bounds: List[float], + dimensions_names: list[str], + lower_bounds: list[float], + upper_bounds: list[float], num_samples: int, ): if not isinstance(dimensions_names, list): @@ -38,7 +45,8 @@ def wrapper( upper_bounds ): raise ValueError( - "Dimensions names, lower bounds and upper bounds must have the same length" + "Dimensions names, lower bounds and upper bounds " + "must have the same length" ) if not all( [lower <= upper for lower, upper in zip(lower_bounds, upper_bounds)] @@ -53,7 +61,7 @@ def wrapper( def validate_data_mda(func): """ - Decorator to validate data in MDA class fit method. + Validate data in MDA class fit method. Parameters ---------- @@ -70,7 +78,7 @@ def validate_data_mda(func): def wrapper( self, data: pd.DataFrame, - directional_variables: List[str] = [], + directional_variables: list[str] = [], custom_scale_factor: dict = {}, first_centroid_seed: int = None, normalize_data: bool = False, @@ -90,7 +98,8 @@ def wrapper( or first_centroid_seed > data.shape[0] ): raise ValueError( - "First centroid seed must be an integer >= 0 and < num of data points" + "First centroid seed must be an integer >= 0 " + "and < num of data points" ) if not isinstance(normalize_data, bool): raise TypeError("Normalize data must be a boolean") @@ -108,7 +117,7 @@ def wrapper( def validate_data_kma(func): """ - Decorator to validate data in KMA class fit method. + Validate data in KMA class fit method. Parameters ---------- @@ -125,12 +134,13 @@ def validate_data_kma(func): def wrapper( self, data: pd.DataFrame, - directional_variables: List[str] = [], + directional_variables: list[str] = [], custom_scale_factor: dict = {}, min_number_of_points: int = None, max_number_of_iterations: int = 10, normalize_data: bool = False, - regression_guided: Dict[str, List] = {}, + regression_guided: dict[str, list] = {}, + init_mda_centroids: pd.DataFrame = None, ): if data is None: raise ValueError("data cannot be None") @@ -157,7 +167,8 @@ def wrapper( for var in regression_guided.get("vars", []) ): raise TypeError( - "regression_guided vars must be a list of strings and must exist in data" + "regression_guided vars must be a list of strings " + "and must exist in data" ) if not all( isinstance(alpha, float) and alpha >= 0 and alpha <= 1 @@ -175,6 +186,7 @@ def wrapper( max_number_of_iterations, normalize_data, regression_guided, + init_mda_centroids, ) return wrapper @@ -182,7 +194,7 @@ def wrapper( def validate_data_som(func): """ - Decorator to validate data in SOM class fit method. + Validate data in SOM class fit method. Parameters ---------- @@ -199,7 +211,7 @@ def validate_data_som(func): def wrapper( self, data: pd.DataFrame, - directional_variables: List[str] = [], + directional_variables: list[str] = [], custom_scale_factor: dict = {}, num_iteration: int = 1000, normalize_data: bool = False, @@ -230,7 +242,7 @@ def wrapper( def validate_data_pca(func): """ - Decorator to validate data in PCA class fit method. + Validate data in PCA class fit method. Parameters ---------- @@ -247,8 +259,8 @@ def validate_data_pca(func): def wrapper( self, data: xr.Dataset, - vars_to_stack: List[str], - coords_to_stack: List[str], + vars_to_stack: list[str], + coords_to_stack: list[str], pca_dim_for_rows: str, windows_in_pca_dim_for_rows: dict = {}, value_to_replace_nans: dict = {}, @@ -263,18 +275,22 @@ def wrapper( for var in vars_to_stack: if var not in data.data_vars: raise ValueError(f"Variable {var} not found in data") - # Check that all variables in vars_to_stack have the same coordinates and dimensions + # Check that all variables in vars_to_stack have the same + # coordinates and dimensions + first_var = vars_to_stack[0] first_var = vars_to_stack[0] first_var_dims = list(data[first_var].dims) first_var_coords = list(data[first_var].coords) for var in vars_to_stack: if list(data[var].dims) != first_var_dims: raise ValueError( - f"All variables must have the same dimensions. Variable {var} does not match." + f"All variables must have the same dimensions. " + f"Variable {var} does not match." ) if list(data[var].coords) != first_var_coords: raise ValueError( - f"All variables must have the same coordinates. Variable {var} does not match." + f"All variables must have the same coordinates. " + f"Variable {var} does not match." ) # Check that all coords_to_stack are in the data if not isinstance(coords_to_stack, list) or len(coords_to_stack) == 0: @@ -285,7 +301,8 @@ def wrapper( # Check that pca_dim_for_rows is in the data, and window > 0 if provided if not isinstance(pca_dim_for_rows, str) or pca_dim_for_rows not in data.dims: raise ValueError( - "PCA dimension for rows must be a string and found in the data dimensions" + "PCA dimension for rows must be a string " + "and found in the data dimensions" ) for variable, windows in windows_in_pca_dim_for_rows.items(): if not isinstance(windows, list): @@ -314,7 +331,7 @@ def wrapper( def validate_data_rbf(func): """ - Decorator to validate data in RBF class fit method. + Validate data in RBF class fit method. Parameters ---------- @@ -332,8 +349,8 @@ def wrapper( self, subset_data: pd.DataFrame, target_data: pd.DataFrame, - subset_directional_variables: List[str] = [], - target_directional_variables: List[str] = [], + subset_directional_variables: list[str] = [], + target_directional_variables: list[str] = [], subset_custom_scale_factor: dict = {}, normalize_target_data: bool = True, target_custom_scale_factor: dict = {}, @@ -353,14 +370,16 @@ def wrapper( for directional_variable in subset_directional_variables: if directional_variable not in subset_data.columns: raise ValueError( - f"Directional variable {directional_variable} not found in subset data" + f"Directional variable {directional_variable} " + f"not found in subset data" ) if not isinstance(target_directional_variables, list): raise TypeError("Target directional variables must be a list") for directional_variable in target_directional_variables: if directional_variable not in target_data.columns: raise ValueError( - f"Directional variable {directional_variable} not found in target data" + f"Directional variable {directional_variable} " + f"not found in target data" ) if not isinstance(subset_custom_scale_factor, dict): raise TypeError("Subset custom scale factor must be a dict") @@ -391,7 +410,7 @@ def wrapper( def validate_data_xwt(func): """ - Decorator to validate data in XWT class fit method. + Validate data in XWT class fit method. Parameters ---------- @@ -408,7 +427,7 @@ def validate_data_xwt(func): def wrapper( self, data: xr.Dataset, - fit_params: Dict[str, Dict[str, Any]] = {}, + fit_params: dict[str, dict[str, Any]] = {}, variable_to_sort_bmus: str = None, ): if not isinstance(data, xr.Dataset): @@ -427,7 +446,8 @@ def wrapper( or variable_to_sort_bmus not in data.data_vars ): raise TypeError( - "variable_to_sort_bmus must be a string and must exist in data variables" + "variable_to_sort_bmus must be a string " + "and must exist in data variables" ) return func( self, @@ -441,7 +461,7 @@ def wrapper( def validate_data_calval(func): """ - Decorator to validate data in CalVal class fit method. + Validate data in CalVal class fit method. Parameters ---------- diff --git a/bluemath_tk/datamining/kma.py b/bluemath_tk/datamining/kma.py index 165ce8b..e689a3a 100644 --- a/bluemath_tk/datamining/kma.py +++ b/bluemath_tk/datamining/kma.py @@ -9,6 +9,7 @@ import numpy as np import pandas as pd +from scipy.spatial.distance import cdist from sklearn.linear_model import LinearRegression from ..core.decorators import validate_data_kma @@ -105,6 +106,19 @@ def __init__( super().__init__() self.set_logger_name(name=self.__class__.__name__) + initial_msg = f""" + --------------------------------------------------------------------------------- + | Initializing KMA object with the following parameters: + | - num_clusters: {num_clusters} + | - seed: {seed} + | - algorithm: {algorithm} + | - distance_metric: {distance_metric} + | - distance_matrix: {distance_matrix} + | For more information, please refer to the documentation. + --------------------------------------------------------------------------------- + """ + self.logger.info(initial_msg) + if num_clusters > 0: self.num_clusters = int(num_clusters) else: @@ -182,20 +196,39 @@ def _get_initial_centers(self, data: list) -> list: list Initial centers/medians/medoids (indices if using distance matrix). """ + if self._init_centers is not None: # Use provided initial centers if isinstance(self._init_centers, pd.DataFrame): - if self.distance_matrix is not None: - # For distance matrix, we need indices, not actual centers + if ( + self.distance_matrix is not None + or self.algorithm_name == "kmedoids" + ): + # For distance matrix or kmedoids, we need indices, + # not actual centers # Find nearest data points to the provided centers - raise NotImplementedError( - "MDA initialization with distance matrix not yet supported" - ) - initial_centers = self._init_centers.values.tolist() + if self.distance_matrix is not None: + raise NotImplementedError( + "MDA initialization with distance matrix not yet supported" + ) + # For kmedoids without distance matrix, find nearest indices + init_values = self._init_centers.values + data_array = np.array(data) + distances = cdist(init_values, data_array) + nearest_indices = np.argmin(distances, axis=1) + initial_centers = [int(i) for i in nearest_indices] + else: + initial_centers = self._init_centers.values.tolist() else: - if self.distance_matrix is not None: - # For distance matrix, initial_centers should be indices - initial_centers = list(np.array(self._init_centers).flatten()) + if ( + self.distance_matrix is not None + or self.algorithm_name == "kmedoids" + ): + # For distance matrix or kmedoids, + # initial_centers should be indices + initial_centers = [ + int(i) for i in np.array(self._init_centers).flatten() + ] else: initial_centers = np.array(self._init_centers).tolist() else: @@ -206,12 +239,13 @@ def _get_initial_centers(self, data: list) -> list: initial_centers_idx = np.random.choice( n_samples, size=self.num_clusters, replace=False ) - if self.distance_matrix is not None: - # For distance matrix, return indices - initial_centers = list(initial_centers_idx) + if self.distance_matrix is not None or self.algorithm_name == "kmedoids": + # For distance matrix or kmedoids, return indices + initial_centers = [int(i) for i in initial_centers_idx] else: - # For raw data, return actual data points + # For raw data with kmeans/kmedians, return actual data points initial_centers = [data[i] for i in initial_centers_idx] + return initial_centers def _create_pyclustering_model( @@ -232,6 +266,7 @@ def _create_pyclustering_model( pyclustering model The initialized pyclustering clustering model. """ + # Build kwargs for pyclustering kwargs = {} if self.distance_metric is not None: @@ -280,6 +315,7 @@ def _fit_clustering(self, normalized_data: pd.DataFrame): normalized_data : pd.DataFrame Normalized data to fit (ignored if distance_matrix is provided). """ + try: # Use distance matrix if provided, otherwise use normalized data if self.distance_matrix is not None: @@ -313,11 +349,12 @@ def _fit_clustering(self, normalized_data: pd.DataFrame): elif self.algorithm_name == "kmedians": centers = self._model.get_medians() elif self.algorithm_name == "kmedoids": + # For kmedoids, get_medoids() always returns indices + medoid_indices = self._model.get_medoids() if self.distance_matrix is not None: # For distance matrix, medoids are indices, not actual points # We need to get the actual data points corresponding to medoid # indices - medoid_indices = self._model.get_medoids() # If we have normalized_data, use it to get actual centers if not normalized_data.empty: centers = normalized_data.iloc[medoid_indices].values @@ -330,7 +367,14 @@ def _fit_clustering(self, normalized_data: pd.DataFrame): "distance_matrix." ) else: - centers = self._model.get_medoids() + # For kmedoids without distance matrix, + # medoid_indices are still indices + # Convert to actual data points + if not normalized_data.empty: + centers = normalized_data.iloc[medoid_indices].values + else: + # Fallback: use indices (shouldn't happen) + centers = np.array(medoid_indices) self._cluster_centers = np.array(centers) @@ -354,6 +398,7 @@ def _predict_clustering(self, normalized_data: pd.DataFrame) -> np.ndarray: np.ndarray Cluster labels. """ + if self._model is None: raise KMAError("Model must be fitted before prediction.") diff --git a/bluemath_tk/interpolation/rbf.py b/bluemath_tk/interpolation/rbf.py index 5e6c90b..85625f4 100644 --- a/bluemath_tk/interpolation/rbf.py +++ b/bluemath_tk/interpolation/rbf.py @@ -46,7 +46,7 @@ def gaussian_kernel(r: float, const: float) -> float: return np.exp(-0.5 * r * r / (const * const)) -def multiquadratic_kernel(r, const): +def multiquadratic_kernel(r: float, const: float): """ Calculate the multiquadratic kernel value. @@ -66,7 +66,7 @@ def multiquadratic_kernel(r, const): return np.sqrt(1 + (r / const) ** 2) -def inverse_kernel(r, const): +def inverse_kernel(r: float, const: float): """ Calculate the inverse multiquadratic kernel value. @@ -86,7 +86,7 @@ def inverse_kernel(r, const): return 1 / np.sqrt(1 + (r / const) ** 2) -def cubic_kernel(r, const): +def cubic_kernel(r: float, const: float): """ Calculate the cubic kernel value. @@ -106,7 +106,7 @@ def cubic_kernel(r, const): return r**3 -def thin_plate_kernel(r, const): +def thin_plate_kernel(r: float, const: float): """ Calculate the thin plate spline kernel value. @@ -140,74 +140,6 @@ class RBF(BaseInterpolation): """ Radial Basis Function (RBF) interpolation model. - Attributes - ---------- - sigma_min : float - The minimum value for the sigma parameter. - This value might change in the optimization process. - sigma_max : float - The maximum value for the sigma parameter. - This value might change in the optimization process. - sigma_diff : float - The difference between the optimal bounded sigma and the minimum and - maximum sigma values. If the difference is less than this value, the - optimization process continues. - kernel : str - The kernel to use for the RBF model. - The available kernels are: - - - gaussian : ``exp(-1/2 * (r / const)**2)`` - - multiquadratic : ``sqrt(1 + (r / const)**2)`` - - inverse : ``1 / sqrt(1 + (r / const)**2)`` - - cubic : ``r**3`` - - thin_plate : ``r**2 * log(r / const)`` - - kernel_func : function - The kernel function to use for the RBF model. - smooth : float - The smoothness parameter. - subset_data : pd.DataFrame - The subset data used to fit the model. - normalized_subset_data : pd.DataFrame - The normalized subset data used to fit the model. - target_data : pd.DataFrame - The target data used to fit the model. - normalized_target_data : pd.DataFrame - The normalized target data used to fit the model. - This attribute is only set if normalize_target_data is True in the fit method. - subset_directional_variables : list[str] - The subset directional variables. - target_directional_variables : list[str] - The target directional variables. - subset_processed_variables : list[str] - The subset processed variables. - target_processed_variables : list[str] - The target processed variables. - subset_custom_scale_factor : dict - The custom scale factor for the subset data. - target_custom_scale_factor : dict - The custom scale factor for the target data. - subset_scale_factor : dict - The scale factor for the subset data. - target_scale_factor : dict - The scale factor for the target data. - rbf_coeffs : pd.DataFrame - The RBF coefficients for the target variables. - opt_sigmas : dict - The optimal sigmas for the target variables. - - Methods - ------- - fit -> None - Fits the model to the data. - predict -> pd.DataFrame - Predicts the data for the provided dataset. - fit_predict -> pd.DataFrame - Fits the model to the subset and predicts the interpolated dataset. - explain -> dict or shap.Explanation - Explains predictions using SHAP values to show feature importance - and contributions. - Notes ----- TODO: For the moment, this class only supports optimization for one @@ -215,8 +147,6 @@ class RBF(BaseInterpolation): parameter to optimize. This sigma refers to the sigma parameter in the Gaussian kernel (but is used for all kernels). - Main reference for sigma optimization: https://link.springer.com/article/10.1023/A:1018975909870 - Examples -------- .. jupyter-execute:: @@ -241,7 +171,6 @@ class RBF(BaseInterpolation): ) rbf = RBF() - predictions = rbf.fit_predict( subset_data=subset, subset_directional_variables=["Dir"], @@ -253,12 +182,13 @@ class RBF(BaseInterpolation): iteratively_update_sigma=True, ) print(predictions.head()) + rbf.explain(dataset=dataset, target_variable="HsPred") References ---------- - [1] https://en.wikipedia.org/wiki/Radial_basis_function - [2] https://en.wikipedia.org/wiki/Gaussian_function - [3] https://link.springer.com/article/10.1023/A:1018975909870 + [1] https://link.springer.com/article/10.1023/A:1018975909870 + [2] https://en.wikipedia.org/wiki/Radial_basis_function + [3] https://en.wikipedia.org/wiki/Gaussian_function """ rbf_kernels = { @@ -281,14 +211,20 @@ def __init__( """ Initialize RBF interpolation model. - Raises - ------ - ValueError - If the sigma_min is not a positive float. - If the sigma_max is not a positive float greater than sigma_min. - If the sigma_diff is not a positive float. - If the kernel is not a string and one of the available kernels. - If the smooth is not a positive float. + Parameters + ---------- + sigma_min : float, optional + The minimum value for the sigma parameter. Default is 0.001. + sigma_max : float, optional + The maximum value for the sigma parameter. Default is 0.1. + sigma_diff : float, optional + The difference between the sigma parameters. Default is 0.0001. + sigma_opt : float, optional + The optimal value for the sigma parameter. Default is None. + kernel : str, optional + The kernel to use for the interpolation. Default is "gaussian". + smooth : float, optional + The smoothness parameter. Default is 1e-5. """ super().__init__() @@ -1401,6 +1337,40 @@ def KFold_cross_validation_RBF( ): """ Perform K-Fold cross-validation for the RBF model. + + Parameters + ---------- + subset_data : pd.DataFrame + The subset data used to fit the model. + target_data : pd.DataFrame + The target data used to fit the model. + subset_directional_variables : list[str], optional + The subset directional variables. Default is []. + target_directional_variables : list[str], optional + The target directional variables. Default is []. + subset_custom_scale_factor : dict, optional + The custom scale factor for the subset data. Default is {}. + normalize_target_data : bool, optional + Whether to normalize the target data. Default is True. + target_custom_scale_factor : dict, optional + The custom scale factor for the target data. Default is {}. + num_workers : int, optional + The number of workers to use for the optimization. Default is None. + iteratively_update_sigma : bool, optional + Whether to iteratively update the sigma parameter. Default is False. + rbf_model : RBF, optional + The RBF model to use for the cross-validation. Default is None. + n_splits : int, optional + The number of splits for the cross-validation. Default is 5. + metric : Callable, optional + The metric to use for the cross-validation. Default is basic_rbf_metric. + + Returns + ------- + dict + A dictionary containing the results of the cross-validation. + The keys are the fold indices, and the values are dictionaries containing the + train and test data, the predictions, and the metric. """ if rbf_model is None: diff --git a/tests/datamining/test_kma.py b/tests/datamining/test_kma.py index 8d5dfe9..e445625 100644 --- a/tests/datamining/test_kma.py +++ b/tests/datamining/test_kma.py @@ -3,11 +3,13 @@ import numpy as np import pandas as pd -from bluemath_tk.datamining.kma import KMA +from bluemath_tk.datamining.kma import KMA, KMAError class TestKMA(unittest.TestCase): def setUp(self): + """Set up test fixtures.""" + np.random.seed(42) # For reproducible tests self.df = pd.DataFrame( { "Hs": np.random.rand(1000) * 7, @@ -15,14 +17,264 @@ def setUp(self): "Dir": np.random.rand(1000) * 360, } ) - self.kma = KMA(num_clusters=10) + self.small_df = pd.DataFrame( + { + "Hs": np.random.rand(50) * 7, + "Tp": np.random.rand(50) * 20, + "Dir": np.random.rand(50) * 360, + } + ) + + # ==================== Basic Functionality Tests ==================== + + def test_init_basic(self): + """Test basic initialization.""" + kma = KMA(num_clusters=5) + self.assertEqual(kma.num_clusters, 5) + self.assertIsNone(kma.seed) + self.assertEqual(kma.algorithm_name, "kmeans") + self.assertFalse(kma.is_fitted) + + def test_init_with_seed(self): + """Test initialization with seed.""" + kma = KMA(num_clusters=5, seed=42) + self.assertEqual(kma.seed, 42) + + def test_init_invalid_num_clusters(self): + """Test initialization with invalid num_clusters.""" + with self.assertRaises(ValueError): + KMA(num_clusters=0) + with self.assertRaises(ValueError): + KMA(num_clusters=-1) + + def test_init_invalid_seed(self): + """Test initialization with invalid seed.""" + with self.assertRaises(ValueError): + KMA(num_clusters=5, seed=-1) + + def test_init_invalid_algorithm(self): + """Test initialization with invalid algorithm.""" + with self.assertRaises(ValueError): + KMA(num_clusters=5, algorithm="invalid") + + # ==================== Algorithm Tests ==================== + + def test_kmeans_algorithm(self): + """Test kmeans algorithm.""" + kma = KMA(num_clusters=5, algorithm="kmeans") + kma.fit(data=self.df) + self.assertEqual(kma.algorithm_name, "kmeans") + self.assertIsNotNone(kma._model) + self.assertEqual(kma.centroids.shape[0], 5) + self.assertTrue(kma.is_fitted) + + def test_kmedians_algorithm(self): + """Test kmedians algorithm.""" + kma = KMA(num_clusters=5, algorithm="kmedians") + kma.fit(data=self.df) + self.assertEqual(kma.algorithm_name, "kmedians") + self.assertIsNotNone(kma._model) + self.assertEqual(kma.centroids.shape[0], 5) + self.assertTrue(kma.is_fitted) + + def test_kmedoids_algorithm(self): + """Test kmedoids algorithm.""" + kma = KMA(num_clusters=5, algorithm="kmedoids") + kma.fit(data=self.df) + self.assertEqual(kma.algorithm_name, "kmedoids") + self.assertIsNotNone(kma._model) + self.assertEqual(kma.centroids.shape[0], 5) + self.assertTrue(kma.is_fitted) + + def test_algorithm_case_insensitive(self): + """Test that algorithm names are case-insensitive.""" + kma1 = KMA(num_clusters=5, algorithm="KMEANS") + kma2 = KMA(num_clusters=5, algorithm="kMeAnS") + self.assertEqual(kma1.algorithm_name, "kmeans") + self.assertEqual(kma2.algorithm_name, "kmeans") + + # ==================== Distance Matrix Tests ==================== + + def test_distance_matrix_kmedoids(self): + """Test distance matrix with kmedoids.""" + # Create a simple distance matrix + n_samples = 100 + data = self.df.iloc[:n_samples] + distances = np.random.rand(n_samples, n_samples) + distances = (distances + distances.T) / 2 # Make symmetric + np.fill_diagonal(distances, 0) # Zero diagonal + + kma = KMA(num_clusters=5, algorithm="kmedoids", distance_matrix=distances) + kma.fit(data=data) + self.assertTrue(kma.is_fitted) + self.assertEqual(kma.centroids.shape[0], 5) + + def test_distance_matrix_invalid_algorithm(self): + """Test that distance matrix only works with kmedoids.""" + distances = np.random.rand(100, 100) + distances = (distances + distances.T) / 2 + np.fill_diagonal(distances, 0) + + with self.assertRaises(ValueError): + KMA(num_clusters=5, algorithm="kmeans", distance_matrix=distances) + with self.assertRaises(ValueError): + KMA(num_clusters=5, algorithm="kmedians", distance_matrix=distances) + + def test_distance_matrix_invalid_shape(self): + """Test distance matrix validation.""" + # Non-square matrix + distances = np.random.rand(100, 50) + with self.assertRaises(ValueError): + KMA(num_clusters=5, algorithm="kmedoids", distance_matrix=distances) + + # Non-2D matrix + distances = np.random.rand(100) + with self.assertRaises(ValueError): + KMA(num_clusters=5, algorithm="kmedoids", distance_matrix=distances) + + # ==================== Fit Tests ==================== + + def test_fit_basic(self): + """Test basic fit.""" + kma = KMA(num_clusters=10) + kma.fit(data=self.df) + self.assertIsInstance(kma.centroids, pd.DataFrame) + self.assertEqual(kma.centroids.shape[0], 10) + self.assertTrue(kma.is_fitted) + self.assertEqual(len(kma.centroid_real_indices), len(self.df)) + + def test_fit_with_directional_variables(self): + """Test fit with directional variables.""" + kma = KMA(num_clusters=5) + kma.fit(data=self.df, directional_variables=["Dir"]) + self.assertTrue(kma.is_fitted) + # Check that Dir_u and Dir_v are in centroids + self.assertIn("Dir_u", kma.centroids.columns) + self.assertIn("Dir_v", kma.centroids.columns) + self.assertIn("Dir", kma.centroids.columns) + + def test_fit_with_custom_scale_factor(self): + """Test fit with custom scale factor.""" + kma = KMA(num_clusters=5) + custom_scale = {"Hs": [0, 10], "Tp": [0, 20]} + kma.fit( + data=self.df, + custom_scale_factor=custom_scale, + normalize_data=True, + ) + self.assertTrue(kma.is_fitted) + + def test_fit_with_normalize_data(self): + """Test fit with normalization.""" + kma = KMA(num_clusters=5) + kma.fit(data=self.df, normalize_data=True) + self.assertTrue(kma.is_fitted) + # Check that normalized data exists + self.assertFalse(kma.normalized_data.empty) + + def test_fit_without_normalize_data(self): + """Test fit without normalization.""" + kma = KMA(num_clusters=5) + kma.fit(data=self.df, normalize_data=False) + self.assertTrue(kma.is_fitted) + + def test_fit_with_min_number_of_points(self): + """Test fit with min_number_of_points constraint.""" + kma = KMA(num_clusters=5) + kma.fit(data=self.df, min_number_of_points=50) + self.assertTrue(kma.is_fitted) + # Verify all clusters have at least min_number_of_points + unique_labels, counts = np.unique(kma.centroid_real_indices, return_counts=True) + self.assertTrue(np.all(counts >= 50)) + + def test_fit_with_min_number_of_points_failure(self): + """Test fit with impossible min_number_of_points.""" + kma = KMA(num_clusters=5) + with self.assertRaises(ValueError): + kma.fit( + data=self.small_df, + min_number_of_points=20, + max_number_of_iterations=2, + ) - def test_fit(self): - self.kma.fit(data=self.df, min_number_of_points=50) - self.assertIsInstance(self.kma.centroids, pd.DataFrame) - self.assertEqual(self.kma.centroids.shape[0], 10) + def test_fit_with_max_iterations(self): + """Test fit with max_iterations.""" + kma = KMA(num_clusters=5) + kma.fit( + data=self.df, + min_number_of_points=50, + max_number_of_iterations=20, + ) + self.assertTrue(kma.is_fitted) + + def test_fit_with_regression_guided(self): + """Test fit with regression-guided clustering.""" + data = self.df.copy() + data["Fe"] = data["Hs"] ** 2 * data["Tp"] + kma = KMA(num_clusters=5) + kma.fit( + data=data, + regression_guided={"vars": ["Fe"], "alpha": [0.6]}, + ) + self.assertTrue(kma.is_fitted) + # Check that Fe is in the fitting variables + self.assertIn("Fe", kma.fitting_variables) + + def test_fit_with_regression_guided_multiple_vars(self): + """Test fit with multiple regression-guided variables.""" + data = self.df.copy() + data["Fe"] = data["Hs"] ** 2 * data["Tp"] + data["Fe2"] = data["Hs"] * data["Tp"] + kma = KMA(num_clusters=5) + kma.fit( + data=data, + regression_guided={"vars": ["Fe", "Fe2"], "alpha": [0.4, 0.3]}, + ) + self.assertTrue(kma.is_fitted) + self.assertIn("Fe", kma.fitting_variables) + self.assertIn("Fe2", kma.fitting_variables) - def test_predict(self): + def test_fit_with_init_mda_centroids(self): + """Test fit with MDA initialization.""" + kma = KMA(num_clusters=5) + kma.fit(data=self.df) + # Get normalized centroids for initialization + init_centroids = kma.normalized_centroids.copy() + + # Create new KMA instance with MDA initialization + kma2 = KMA(num_clusters=5) + kma2.fit(data=self.df, init_mda_centroids=init_centroids) + self.assertTrue(kma2.is_fitted) + self.assertEqual(kma2.centroids.shape[0], 5) + + def test_fit_init_mda_centroids_invalid_shape(self): + """Test fit with invalid MDA centroids shape.""" + kma = KMA(num_clusters=5) + invalid_centroids = pd.DataFrame( + np.random.rand(3, len(self.df.columns)) + ) # Wrong number of clusters + with self.assertRaises(ValueError): + kma.fit(data=self.df, init_mda_centroids=invalid_centroids) + + def test_fit_init_mda_centroids_invalid_columns(self): + """Test fit with invalid MDA centroids columns.""" + kma = KMA(num_clusters=5) + invalid_centroids = pd.DataFrame( + np.random.rand(5, 2), columns=["Wrong", "Columns"] + ) + with self.assertRaises(ValueError): + kma.fit(data=self.df, init_mda_centroids=invalid_centroids) + + def test_fit_init_mda_centroids_invalid_type(self): + """Test fit with invalid MDA centroids type.""" + kma = KMA(num_clusters=5) + with self.assertRaises(ValueError): + kma.fit(data=self.df, init_mda_centroids=np.array([[1, 2], [3, 4]])) + + # ==================== Predict Tests ==================== + + def test_predict_basic(self): + """Test basic prediction.""" data_sample = pd.DataFrame( { "Hs": np.random.rand(15) * 7, @@ -30,37 +282,273 @@ def test_predict(self): "Dir": np.random.rand(15) * 360, } ) - self.kma.fit(data=self.df) - nearest_centroids, nearest_centroid_df = self.kma.predict(data=data_sample) + kma = KMA(num_clusters=10) + kma.fit(data=self.df) + nearest_centroids, nearest_centroid_df = kma.predict(data=data_sample) self.assertIsInstance(nearest_centroids, pd.DataFrame) self.assertEqual(len(nearest_centroids), 15) + self.assertIn("kma_bmus", nearest_centroids.columns) self.assertIsInstance(nearest_centroid_df, pd.DataFrame) self.assertEqual(nearest_centroid_df.shape[0], 15) - def test_fit_predict(self): - predicted_labels, predicted_labels_df = self.kma.fit_predict( - data=self.df, min_number_of_points=50 + def test_predict_with_directional_variables(self): + """Test prediction with directional variables.""" + data_sample = pd.DataFrame( + { + "Hs": np.random.rand(15) * 7, + "Tp": np.random.rand(15) * 20, + "Dir": np.random.rand(15) * 360, + } ) - _unique_labels, counts = np.unique(predicted_labels, return_counts=True) - self.assertTrue(np.all(counts >= 50)) + kma = KMA(num_clusters=5) + kma.fit(data=self.df, directional_variables=["Dir"]) + nearest_centroids, nearest_centroid_df = kma.predict(data=data_sample) + self.assertEqual(len(nearest_centroids), 15) + # Check that Dir is in the output + self.assertIn("Dir", nearest_centroid_df.columns) + + def test_predict_not_fitted(self): + """Test prediction without fitting.""" + kma = KMA(num_clusters=5) + with self.assertRaises(KMAError): + kma.predict(data=self.df) + + def test_predict_all_algorithms(self): + """Test prediction for all algorithms.""" + data_sample = pd.DataFrame( + { + "Hs": np.random.rand(10) * 7, + "Tp": np.random.rand(10) * 20, + "Dir": np.random.rand(10) * 360, + } + ) + for algorithm in ["kmeans", "kmedians", "kmedoids"]: + kma = KMA(num_clusters=5, algorithm=algorithm) + kma.fit(data=self.df) + nearest_centroids, nearest_centroid_df = kma.predict(data=data_sample) + self.assertEqual(len(nearest_centroids), 10) + self.assertEqual(nearest_centroid_df.shape[0], 10) + + # ==================== Fit-Predict Tests ==================== + + def test_fit_predict_basic(self): + """Test basic fit_predict.""" + kma = KMA(num_clusters=10) + predicted_labels, predicted_labels_df = kma.fit_predict(data=self.df) self.assertIsInstance(predicted_labels, pd.DataFrame) self.assertEqual(len(predicted_labels), 1000) self.assertIsInstance(predicted_labels_df, pd.DataFrame) self.assertEqual(predicted_labels_df.shape[0], 1000) - def test_add_regression_guided(self): + def test_fit_predict_with_min_number_of_points(self): + """Test fit_predict with min_number_of_points.""" + kma = KMA(num_clusters=10) + predicted_labels, predicted_labels_df = kma.fit_predict( + data=self.df, min_number_of_points=50 + ) + _unique_labels, counts = np.unique(predicted_labels, return_counts=True) + self.assertTrue(np.all(counts >= 50)) + self.assertEqual(len(predicted_labels), 1000) + + def test_fit_predict_with_all_options(self): + """Test fit_predict with all options.""" data = self.df.copy() data["Fe"] = data["Hs"] ** 2 * data["Tp"] - predicted_labels, predicted_labels_df = self.kma.fit_predict( + kma = KMA(num_clusters=5) + predicted_labels, predicted_labels_df = kma.fit_predict( data=data, directional_variables=["Dir"], + custom_scale_factor={"Hs": [0, 10], "Tp": [0, 20]}, + normalize_data=True, regression_guided={"vars": ["Fe"], "alpha": [0.6]}, ) - self.assertIsInstance(predicted_labels, pd.DataFrame) self.assertEqual(len(predicted_labels), 1000) - self.assertIsInstance(predicted_labels_df, pd.DataFrame) self.assertEqual(predicted_labels_df.shape[0], 1000) + # ==================== Property Tests ==================== + + def test_properties(self): + """Test class properties.""" + kma = KMA(num_clusters=5) + kma.fit(data=self.df) + # Test data property + self.assertIsInstance(kma.data, pd.DataFrame) + self.assertEqual(len(kma.data), 1000) + # Test normalized_data property + self.assertIsInstance(kma.normalized_data, pd.DataFrame) + # Test data_to_fit property + self.assertIsInstance(kma.data_to_fit, pd.DataFrame) + # Test kma property (model) + self.assertIsNotNone(kma.kma) + + # ==================== Regression Guided Tests ==================== + + def test_add_regression_guided(self): + """Test add_regression_guided static method.""" + data = self.df.copy() + data["Fe"] = data["Hs"] ** 2 * data["Tp"] + result = KMA.add_regression_guided(data=data, vars=["Fe"], alpha=[0.6]) + self.assertIsInstance(result, pd.DataFrame) + self.assertIn("Fe", result.columns) + self.assertIn("Hs", result.columns) + self.assertIn("Tp", result.columns) + + def test_add_regression_guided_multiple_vars(self): + """Test add_regression_guided with multiple variables.""" + data = self.df.copy() + data["Fe"] = data["Hs"] ** 2 * data["Tp"] + data["Fe2"] = data["Hs"] * data["Tp"] + result = KMA.add_regression_guided( + data=data, vars=["Fe", "Fe2"], alpha=[0.4, 0.3] + ) + self.assertIn("Fe", result.columns) + self.assertIn("Fe2", result.columns) + + # ==================== Seed Tests ==================== + + def test_seed_reproducibility(self): + """Test that seed produces reproducible results.""" + kma1 = KMA(num_clusters=5, seed=42) + kma1.fit(data=self.df) + kma2 = KMA(num_clusters=5, seed=42) + kma2.fit(data=self.df) + # Centroids should be the same with same seed + np.testing.assert_array_almost_equal( + kma1.centroids.values, kma2.centroids.values, decimal=5 + ) + + def test_seed_none(self): + """Test that seed=None produces different results.""" + kma1 = KMA(num_clusters=5, seed=None) + kma1.fit(data=self.df) + kma2 = KMA(num_clusters=5, seed=None) + kma2.fit(data=self.df) + # With seed=None, results may differ (though they might be the same by chance) + + # ==================== Algorithm-Specific Tests ==================== + + def test_all_algorithms_produce_centroids(self): + """Test that all algorithms produce centroids.""" + for algorithm in ["kmeans", "kmedians", "kmedoids"]: + kma = KMA(num_clusters=5, algorithm=algorithm) + kma.fit(data=self.df) + self.assertEqual(kma.centroids.shape[0], 5) + self.assertTrue(kma.is_fitted) + + def test_all_algorithms_with_directional_variables(self): + """Test all algorithms with directional variables.""" + for algorithm in ["kmeans", "kmedians", "kmedoids"]: + kma = KMA(num_clusters=5, algorithm=algorithm) + kma.fit(data=self.df, directional_variables=["Dir"]) + self.assertIn("Dir", kma.centroids.columns) + self.assertTrue(kma.is_fitted) + + def test_all_algorithms_with_normalization(self): + """Test all algorithms with normalization.""" + for algorithm in ["kmeans", "kmedians", "kmedoids"]: + kma = KMA(num_clusters=5, algorithm=algorithm) + kma.fit(data=self.df, normalize_data=True) + self.assertTrue(kma.is_fitted) + + # ==================== Edge Cases and Error Tests ==================== + + def test_fit_empty_dataframe(self): + """Test fit with empty dataframe.""" + kma = KMA(num_clusters=5) + empty_df = pd.DataFrame() + # This should raise an error from the decorator or base class + with self.assertRaises((ValueError, TypeError)): + kma.fit(data=empty_df) + + def test_fit_single_cluster(self): + """Test fit with single cluster.""" + kma = KMA(num_clusters=1) + kma.fit(data=self.df) + self.assertEqual(kma.centroids.shape[0], 1) + self.assertTrue(kma.is_fitted) + + def test_fit_many_clusters(self): + """Test fit with many clusters.""" + kma = KMA(num_clusters=50) + kma.fit(data=self.df) + self.assertEqual(kma.centroids.shape[0], 50) + self.assertTrue(kma.is_fitted) + + def test_predict_different_columns(self): + """Test prediction with different columns (should fail).""" + kma = KMA(num_clusters=5) + kma.fit(data=self.df) + different_df = pd.DataFrame({"X": np.random.rand(10), "Y": np.random.rand(10)}) + # This should raise an error + with self.assertRaises((ValueError, KeyError)): + kma.predict(data=different_df) + + # ==================== Combination Tests ==================== + + def test_combination_seed_and_algorithm(self): + """Test combination of seed and algorithm.""" + for algorithm in ["kmeans", "kmedians", "kmedoids"]: + kma = KMA(num_clusters=5, seed=42, algorithm=algorithm) + kma.fit(data=self.df) + self.assertTrue(kma.is_fitted) + + def test_combination_directional_and_normalize(self): + """Test combination of directional variables and normalization.""" + kma = KMA(num_clusters=5) + kma.fit( + data=self.df, + directional_variables=["Dir"], + normalize_data=True, + custom_scale_factor={"Dir_u": [0, 1], "Dir_v": [0, 1]}, + ) + self.assertTrue(kma.is_fitted) + self.assertIn("Dir", kma.centroids.columns) + + def test_combination_regression_and_directional(self): + """Test combination of regression-guided and directional variables.""" + data = self.df.copy() + data["Fe"] = data["Hs"] ** 2 * data["Tp"] + kma = KMA(num_clusters=5) + kma.fit( + data=data, + directional_variables=["Dir"], + regression_guided={"vars": ["Fe"], "alpha": [0.6]}, + ) + self.assertTrue(kma.is_fitted) + + def test_combination_mda_init_and_min_points(self): + """Test combination of MDA initialization and min points.""" + kma = KMA(num_clusters=5) + kma.fit(data=self.df) + init_centroids = kma.normalized_centroids.copy() + + kma2 = KMA(num_clusters=5) + kma2.fit( + data=self.df, + init_mda_centroids=init_centroids, + min_number_of_points=50, + ) + self.assertTrue(kma2.is_fitted) + + def test_combination_all_options(self): + """Test combination of all options together.""" + data = self.df.copy() + data["Fe"] = data["Hs"] ** 2 * data["Tp"] + kma = KMA(num_clusters=5, seed=42, algorithm="kmeans") + kma.fit( + data=data, + directional_variables=["Dir"], + custom_scale_factor={"Hs": [0, 10], "Tp": [0, 20]}, + normalize_data=True, + regression_guided={"vars": ["Fe"], "alpha": [0.6]}, + min_number_of_points=50, + max_number_of_iterations=20, + ) + self.assertTrue(kma.is_fitted) + # Verify min points constraint + unique_labels, counts = np.unique(kma.centroid_real_indices, return_counts=True) + self.assertTrue(np.all(counts >= 50)) + if __name__ == "__main__": unittest.main() From 15f4ec44fdcc66b99662e6b625807a588bff3abe Mon Sep 17 00:00:00 2001 From: Javier Tausia Hoyal Date: Wed, 14 Jan 2026 20:11:06 +0100 Subject: [PATCH 4/4] [JTH] add MDA init in kma tests --- tests/datamining/test_kma.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/tests/datamining/test_kma.py b/tests/datamining/test_kma.py index e445625..6025160 100644 --- a/tests/datamining/test_kma.py +++ b/tests/datamining/test_kma.py @@ -4,6 +4,7 @@ import pandas as pd from bluemath_tk.datamining.kma import KMA, KMAError +from bluemath_tk.datamining.mda import MDA class TestKMA(unittest.TestCase): @@ -236,16 +237,12 @@ def test_fit_with_regression_guided_multiple_vars(self): def test_fit_with_init_mda_centroids(self): """Test fit with MDA initialization.""" + mda = MDA(num_centers=5) + mda.fit(data=self.df) + init_centroids = mda.normalized_centroids.copy() kma = KMA(num_clusters=5) - kma.fit(data=self.df) - # Get normalized centroids for initialization - init_centroids = kma.normalized_centroids.copy() - - # Create new KMA instance with MDA initialization - kma2 = KMA(num_clusters=5) - kma2.fit(data=self.df, init_mda_centroids=init_centroids) - self.assertTrue(kma2.is_fitted) - self.assertEqual(kma2.centroids.shape[0], 5) + kma.fit(data=self.df, init_mda_centroids=init_centroids) + self.assertTrue(kma.is_fitted) def test_fit_init_mda_centroids_invalid_shape(self): """Test fit with invalid MDA centroids shape."""