Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 35 additions & 8 deletions src/vowl/contracts/check_reference_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
class CheckResultMetadata(TypedDict, total=False):
"""Stable metadata derived from a check reference and its contract.

Computed fields are derived by the check reference; ``contract_definition``
carries the raw ODCS quality entry so that every contract field (tags,
customProperties, authoritativeDefinitions, …) is available to consumers
without manual forwarding.
Computed fields are derived by the check reference; ``check_definition``
carries the resolved/generated check definition (which may be synthesised
for generated checks), and ``contract_definition`` carries the raw ODCS
contract content at the check's JSONPath so that every contract field
(tags, customProperties, authoritativeDefinitions, …) is available to
consumers without manual forwarding.
"""

check_path: str
Expand All @@ -37,6 +39,7 @@ class CheckResultMetadata(TypedDict, total=False):
multi_source: bool
aggregation_type: str
engine: str
check_definition: dict[str, Any]
contract_definition: dict[str, Any]


Expand Down Expand Up @@ -99,25 +102,49 @@ def unit(self) -> str | None:
unit = self.get_check().get("unit")
return unit if isinstance(unit, str) else None

def get_raw_contract_definition(self) -> dict[str, Any] | Any | None:
"""Return the raw contract content at this check's JSONPath.

For user-authored quality checks this returns the original ODCS
quality entry dict. For generated checks (e.g. logicalType,
logicalTypeOptions) it returns the raw value at the path (which
may be a scalar, dict, or ``None``).
"""
if self._contract is None:
return None
raw = self._contract.resolve(self._path)
if isinstance(raw, dict):
return dict(raw) # shallow copy
return raw

def get_result_metadata(self) -> CheckResultMetadata:
"""Build stable CheckResult metadata from the contract context.

Computed fields (``check_path``, ``target``, ``operator``, …) are
derived by the check reference. The raw ODCS quality entry is
attached as ``contract_definition`` so that every contract field
automatically flows through without manual forwarding.
derived by the check reference. ``check_definition`` contains
the resolved check dict (which may be generated/synthesised for
auto-generated checks). ``contract_definition`` contains the
raw ODCS contract content at the check's JSONPath.
"""
check = self.get_check()
schema_name = self.get_schema_name()
operator, _expected = self.get_expected_value()
raw_def = self.get_raw_contract_definition()
check_def = dict(check)
if self.is_generated():
tags = list(check_def.get("tags", []))
if "vowl_generated_check" not in tags:
tags.append("vowl_generated_check")
check_def["tags"] = tags
metadata: CheckResultMetadata = {
"check_path": self.path,
"check_ref_type": type(self).__name__,
"schema_name": schema_name,
"operator": operator,
"is_generated": self.is_generated(),
"engine": self.get_execution_engine(),
"contract_definition": dict(check),
"check_definition": check_def,
"contract_definition": raw_def if isinstance(raw_def, dict) else {"value": raw_def} if raw_def is not None else {},
}

column_name = self.get_column_name()
Expand Down
8 changes: 4 additions & 4 deletions src/vowl/contracts/check_reference_library_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,10 @@ def _auto_description(self) -> str:

def get_result_metadata(self):
metadata = super().get_result_metadata()
cd = metadata.get("contract_definition", {})
cd = metadata.get("check_definition", {})
if not cd.get("description"):
cd["description"] = self._auto_description()
metadata["contract_definition"] = cd
metadata["check_definition"] = cd
return metadata


Expand Down Expand Up @@ -308,10 +308,10 @@ def _auto_description(self) -> str:

def get_result_metadata(self):
metadata = super().get_result_metadata()
cd = metadata.get("contract_definition", {})
cd = metadata.get("check_definition", {})
if not cd.get("description"):
cd["description"] = self._auto_description()
metadata["contract_definition"] = cd
metadata["check_definition"] = cd
return metadata


Expand Down
41 changes: 33 additions & 8 deletions src/vowl/validation/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,8 @@ def _consolidate_grouped_output(combined: nw.DataFrame) -> nw.DataFrame:
'check_ref_type',
'logical_type',
'is_generated',
'check_definition',
'contract_definition',
]

@staticmethod
Expand All @@ -558,18 +560,30 @@ def _arrow_safe(value):
return None
return str(value)

def get_check_results_df(self) -> nw.DataFrame:
def get_check_results_df(
self,
*,
include_check_definition: bool = False,
include_contract_definition: bool = False,
) -> nw.DataFrame:
"""Return a DataFrame with one row per check result.

Args:
include_check_definition: When *True* a
``check_definition`` column is appended containing the
resolved/generated check definition serialised as JSON.
include_contract_definition: When *True* a
``contract_definition`` column is appended containing the
raw ODCS contract content at the check's JSONPath.
"""
_safe = self._arrow_safe
data = []
extra_keys: list[str] = []
for cr in self.check_results:
# Flatten contract_definition into top-level columns so every
# contract field gets its own column in the output. Computed
# metadata fields are overlaid last so they take precedence.
raw_meta = dict(cr.metadata)
check_def = raw_meta.pop("check_definition", {})
contract_def = raw_meta.pop("contract_definition", {})
flat_meta = {k: _safe(v) for k, v in contract_def.items()}
flat_meta.update({k: _safe(v) for k, v in raw_meta.items()})
flat_meta = {k: _safe(v) for k, v in raw_meta.items()}
row = {
'check_name': cr.check_name,
'status': cr.status,
Expand All @@ -580,6 +594,10 @@ def get_check_results_df(self) -> nw.DataFrame:
'execution_time_ms': cr.execution_time_ms,
**flat_meta,
}
if include_check_definition:
row['check_definition'] = json.dumps(check_def, default=str) if check_def else None
if include_contract_definition:
row['contract_definition'] = json.dumps(contract_def, default=str) if contract_def else None
data.append(row)
for key in row:
if key not in self._CHECK_RESULTS_COLUMN_ORDER and key not in extra_keys:
Expand All @@ -593,12 +611,19 @@ def get_check_results_df(self) -> nw.DataFrame:
eager_only=True,
)

def save(self, output_dir: str = ".", prefix: str = "vowl_results") -> ValidationResult:
def save(
self,
output_dir: str = ".",
prefix: str = "vowl_results",
*,
include_check_definition: bool = False,
include_contract_definition: bool = False,
) -> ValidationResult:
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)

check_csv = output_path / f"{prefix}_check_results.csv"
_pa_csv.write_csv(self.get_check_results_df().to_arrow(), str(check_csv))
_pa_csv.write_csv(self.get_check_results_df(include_check_definition=include_check_definition, include_contract_definition=include_contract_definition).to_arrow(), str(check_csv))

consolidated = self.get_consolidated_output_dfs()
saved_files = [str(check_csv)]
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def _compare_or_update_single(df_to_compare: Any, expected_path: Path) -> None:

def _is_expected_error(cr: Any) -> bool:
"""Return True if this ERROR check result is expected and should not fail the test."""
if cr.metadata.get("contract_definition", {}).get("type") == "text":
if cr.metadata.get("check_definition", {}).get("type") == "text":
return True
for substring in _ALLOWED_ERROR_SUBSTRINGS.get():
if substring in (cr.details or ""):
Expand Down
Loading
Loading