From 0d0f02ac5ffa0b4297ebb76997d24038acdc0ca3 Mon Sep 17 00:00:00 2001 From: AlexQuinn <1527962551@qq.com> Date: Wed, 7 Jan 2026 19:54:06 +0800 Subject: [PATCH] Refactor DataSet.identify_keys into helper methods --- src/intugle/analysis/models.py | 71 ++++++++++++++++++++++++++-------- 1 file changed, 54 insertions(+), 17 deletions(-) diff --git a/src/intugle/analysis/models.py b/src/intugle/analysis/models.py index 294b918..a8b7204 100644 --- a/src/intugle/analysis/models.py +++ b/src/intugle/analysis/models.py @@ -322,49 +322,86 @@ def identify_datatypes_l2(self) -> "DataSet": self.columns[col_l2.column_name].category = col_l2.datatype_l2 return self + def identify_keys(self, save: bool = False) -> 'DataSet': """ Identifies potential primary keys in the dataset based on column profiles. This method relies on the 'column_profiles' result. """ + self._validate_datatypes_ready_for_key_identification() + + column_profiles_df = self._build_column_profiles_df(include_samples=True) + + ki_result = self._run_key_identification(column_profiles_df) + self._apply_primary_key_result(ki_result) + + self._maybe_save_yaml(save) + return self + + def _validate_datatypes_ready_for_key_identification(self) -> None: + """Ensure L1/L2 datatypes are identified before key identification.""" if not self.source.table.columns or any( c.type is None or c.category is None for c in self.source.table.columns ): raise RuntimeError("DataTypeIdentifierL1 and L2 must be run before KeyIdentifier.") + def _build_column_profiles_df(self, include_samples: bool = True) -> pd.DataFrame: + """ + Build a dataframe of profiling + datatype metadata for each column. + + Parameters + ---------- + include_samples : bool + Whether to include sample_data in the output (needed for some downstream steps). + + Returns + ------- + pd.DataFrame + A dataframe containing profiling and datatype information per column. + """ column_profiles_data = [] for column in self.source.table.columns: metrics = column.profiling_metrics + count = metrics.count if metrics.count is not None else 0 null_count = metrics.null_count if metrics.null_count is not None else 0 distinct_count = metrics.distinct_count if metrics.distinct_count is not None else 0 - column_profiles_data.append( - { - "column_name": column.name, - "table_name": self.name, - "datatype_l1": column.type, - "datatype_l2": column.category, - "count": count, - "null_count": null_count, - "distinct_count": distinct_count, - "uniqueness": distinct_count / count if count > 0 else 0.0, - "completeness": (count - null_count) / count if count > 0 else 0.0, - "sample_data": metrics.sample_data, - } - ) - column_profiles_df = pd.DataFrame(column_profiles_data) + row = { + "column_name": column.name, + "table_name": self.name, + "datatype_l1": column.type, + "datatype_l2": column.category, + "count": count, + "null_count": null_count, + "distinct_count": distinct_count, + "uniqueness": distinct_count / count if count > 0 else 0.0, + "completeness": (count - null_count) / count if count > 0 else 0.0, + } + + if include_samples: + row["sample_data"] = metrics.sample_data + + column_profiles_data.append(row) + + return pd.DataFrame(column_profiles_data) + + def _run_key_identification(self, column_profiles_df: pd.DataFrame) -> Optional[dict]: + """Run the KeyIdentificationAgent and return its result.""" ki_agent = KeyIdentificationAgent( profiling_data=column_profiles_df, adapter=self.adapter, dataset_data=self.data ) - ki_result = ki_agent() + return ki_agent() + def _apply_primary_key_result(self, ki_result: Optional[dict]) -> None: + """Apply the key identification result to the dataset.""" if ki_result: self.source.table.key = PrimaryKey(**ki_result) + def _maybe_save_yaml(self, save: bool) -> None: + """Persist the YAML representation if requested.""" if save: self.save_yaml() - return self def profile(self, save: bool = False) -> 'DataSet': """