Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 54 additions & 17 deletions src/intugle/analysis/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,49 +322,86 @@ def identify_datatypes_l2(self) -> "DataSet":
self.columns[col_l2.column_name].category = col_l2.datatype_l2
return self


def identify_keys(self, save: bool = False) -> 'DataSet':
"""
Identifies potential primary keys in the dataset based on column profiles.
This method relies on the 'column_profiles' result.
"""
self._validate_datatypes_ready_for_key_identification()

column_profiles_df = self._build_column_profiles_df(include_samples=True)

ki_result = self._run_key_identification(column_profiles_df)
self._apply_primary_key_result(ki_result)

self._maybe_save_yaml(save)
return self

def _validate_datatypes_ready_for_key_identification(self) -> None:
"""Ensure L1/L2 datatypes are identified before key identification."""
if not self.source.table.columns or any(
c.type is None or c.category is None for c in self.source.table.columns
):
raise RuntimeError("DataTypeIdentifierL1 and L2 must be run before KeyIdentifier.")

def _build_column_profiles_df(self, include_samples: bool = True) -> pd.DataFrame:
"""
Build a dataframe of profiling + datatype metadata for each column.

Parameters
----------
include_samples : bool
Whether to include sample_data in the output (needed for some downstream steps).

Returns
-------
pd.DataFrame
A dataframe containing profiling and datatype information per column.
"""
column_profiles_data = []
for column in self.source.table.columns:
metrics = column.profiling_metrics

count = metrics.count if metrics.count is not None else 0
null_count = metrics.null_count if metrics.null_count is not None else 0
distinct_count = metrics.distinct_count if metrics.distinct_count is not None else 0
column_profiles_data.append(
{
"column_name": column.name,
"table_name": self.name,
"datatype_l1": column.type,
"datatype_l2": column.category,
"count": count,
"null_count": null_count,
"distinct_count": distinct_count,
"uniqueness": distinct_count / count if count > 0 else 0.0,
"completeness": (count - null_count) / count if count > 0 else 0.0,
"sample_data": metrics.sample_data,
}
)
column_profiles_df = pd.DataFrame(column_profiles_data)

row = {
"column_name": column.name,
"table_name": self.name,
"datatype_l1": column.type,
"datatype_l2": column.category,
"count": count,
"null_count": null_count,
"distinct_count": distinct_count,
"uniqueness": distinct_count / count if count > 0 else 0.0,
"completeness": (count - null_count) / count if count > 0 else 0.0,
}

if include_samples:
row["sample_data"] = metrics.sample_data

column_profiles_data.append(row)

return pd.DataFrame(column_profiles_data)

def _run_key_identification(self, column_profiles_df: pd.DataFrame) -> Optional[dict]:
"""Run the KeyIdentificationAgent and return its result."""
ki_agent = KeyIdentificationAgent(
profiling_data=column_profiles_df, adapter=self.adapter, dataset_data=self.data
)
ki_result = ki_agent()
return ki_agent()

def _apply_primary_key_result(self, ki_result: Optional[dict]) -> None:
"""Apply the key identification result to the dataset."""
if ki_result:
self.source.table.key = PrimaryKey(**ki_result)

def _maybe_save_yaml(self, save: bool) -> None:
"""Persist the YAML representation if requested."""
if save:
self.save_yaml()
return self

def profile(self, save: bool = False) -> 'DataSet':
"""
Expand Down