Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion artifacts/layered_admissibility_results.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
"governance_score": 1.0,
"observed_admissible": false,
"operational_score": 1.0,
"overall_admissibility_score": 0.8333333333333333,
"overall_admissibility_score": 0.8333333333333334,
"passed_contracts": [
"no_orphan_tool_calls",
"pre_merge_review"
Expand Down
9 changes: 7 additions & 2 deletions src/validation/admissibility_scorer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

from dataclasses import dataclass
from fractions import Fraction

from src.validation.contract_validator import Layer, ValidationResult

Expand Down Expand Up @@ -47,15 +48,18 @@ def score(self, results: list[ValidationResult], expected_admissible: bool | Non

layer_scores: list[LayerScore] = []
score_by_layer: dict[Layer, float] = {}
score_fraction_by_layer: dict[Layer, Fraction] = {}

for layer in self._LAYER_ORDER:
layer_results = [result for result in results if result.layer == layer]
passed_in_layer = tuple(sorted(result.contract_id for result in layer_results if result.passed))
failed_in_layer = tuple(sorted(result.contract_id for result in layer_results if not result.passed))
labels_in_layer = tuple(sorted({result.failure_label for result in layer_results if result.failure_label is not None}))
total_contracts = len(layer_results)
layer_score = 1.0 if total_contracts == 0 else len(passed_in_layer) / total_contracts
layer_score_fraction = Fraction(1, 1) if total_contracts == 0 else Fraction(len(passed_in_layer), total_contracts)
layer_score = float(layer_score_fraction)
score_by_layer[layer] = layer_score
score_fraction_by_layer[layer] = layer_score_fraction
layer_scores.append(
LayerScore(
layer=layer,
Expand All @@ -66,7 +70,8 @@ def score(self, results: list[ValidationResult], expected_admissible: bool | Non
)
)

overall_admissibility_score = sum(score_by_layer[layer] for layer in self._LAYER_ORDER) / len(self._LAYER_ORDER)
overall_score_fraction = sum(score_fraction_by_layer[layer] for layer in self._LAYER_ORDER) / len(self._LAYER_ORDER)
overall_admissibility_score = float(overall_score_fraction)

return AdmissibilityScore(
structural_score=score_by_layer[Layer.STRUCTURAL],
Expand Down
33 changes: 32 additions & 1 deletion src/validation/degradation_curve_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
from src.validation.contract_validator import ContractValidator


MANIFEST_PATH = Path("fixtures/manifest.json")
LAYERED_CURVE_FAMILY = "coding_workflow_pr_review"
LAYERED_CURVE_LEVELS = ("baseline", "mild", "moderate", "severe")


@dataclass(frozen=True, slots=True)
class FixtureScorePoint:
fixture_id: str
Expand Down Expand Up @@ -106,7 +111,33 @@ def evaluate_fixture(self, fixture_path: Path) -> FixtureScorePoint:
failure_labels=tuple(sorted(score.failure_labels)),
)

def generate(self, fixtures: list[Path], curve_id: str) -> DegradationCurve:
def _load_fixture_manifest(self, manifest_path: Path = MANIFEST_PATH) -> tuple[dict[str, Any], ...]:
manifest = self._load_json(manifest_path)
fixtures = manifest.get("fixtures")
if not isinstance(fixtures, list):
raise ValueError(f"invalid fixture manifest format: {manifest_path}")
return tuple(fixtures)

def fixtures_for_layered_admissibility_curve(self, manifest_path: Path = MANIFEST_PATH) -> tuple[Path, ...]:
level_to_path: dict[str, Path] = {}

for entry in self._load_fixture_manifest(manifest_path):
if entry.get("family") != LAYERED_CURVE_FAMILY:
continue
level = entry.get("degradation_level")
if level in LAYERED_CURVE_LEVELS:
path_str = entry.get("path")
if not path_str:
raise ValueError(f"missing path for fixture in manifest: {entry.get('fixture_id')}")
level_to_path[str(level)] = Path(path_str)

missing_levels = [level for level in LAYERED_CURVE_LEVELS if level not in level_to_path]
if missing_levels:
raise ValueError(f"missing layered admissibility fixtures for levels: {missing_levels}")

return tuple(level_to_path[level] for level in LAYERED_CURVE_LEVELS)

def generate(self, fixtures: list[Path] | tuple[Path, ...], curve_id: str) -> DegradationCurve:
points = tuple(self.evaluate_fixture(path) for path in fixtures)
return DegradationCurve(curve_id=curve_id, version=self.VERSION, generated_by=self.__class__.__name__, points=points)

Expand Down
21 changes: 16 additions & 5 deletions tests/test_degradation_curve_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,26 @@ def test_evaluate_negative_fixture_detects_expected_failures() -> None:

def test_generate_curve_is_deterministic() -> None:
generator = DegradationCurveGenerator()
fixtures = [POS_FIXTURE, MILD_FIXTURE, MODERATE_FIXTURE, NEG_FIXTURE]
fixtures = generator.fixtures_for_layered_admissibility_curve()
assert generator.to_dict(generator.generate(fixtures, curve_id=CURVE_ID)) == generator.to_dict(
generator.generate(fixtures, curve_id=CURVE_ID)
)




def test_layered_curve_fixtures_are_loaded_from_manifest_order() -> None:
fixtures = DegradationCurveGenerator().fixtures_for_layered_admissibility_curve()
assert [fixture.as_posix() for fixture in fixtures] == [
POS_FIXTURE.as_posix(),
MILD_FIXTURE.as_posix(),
MODERATE_FIXTURE.as_posix(),
NEG_FIXTURE.as_posix(),
]

def test_to_dict_is_json_compatible_and_sorted() -> None:
generator = DegradationCurveGenerator()
curve = generator.generate([POS_FIXTURE, MILD_FIXTURE, MODERATE_FIXTURE, NEG_FIXTURE], curve_id=CURVE_ID)
curve = generator.generate(generator.fixtures_for_layered_admissibility_curve(), curve_id=CURVE_ID)
curve_dict = generator.to_dict(curve)
json.dumps(curve_dict, sort_keys=True)
assert [point["fixture_path"] for point in curve_dict["points"]] == [
Expand All @@ -61,7 +72,7 @@ def test_to_dict_is_json_compatible_and_sorted() -> None:

def test_write_json_matches_committed_artifact(tmp_path: Path) -> None:
generator = DegradationCurveGenerator()
curve = generator.generate([POS_FIXTURE, MILD_FIXTURE, MODERATE_FIXTURE, NEG_FIXTURE], curve_id=CURVE_ID)
curve = generator.generate(generator.fixtures_for_layered_admissibility_curve(), curve_id=CURVE_ID)
generated_path = tmp_path / "layered_admissibility_results.json"
generator.write_json(curve, generated_path)

Expand All @@ -72,7 +83,7 @@ def test_write_json_matches_committed_artifact(tmp_path: Path) -> None:

def test_write_markdown_contains_fixture_rows(tmp_path: Path) -> None:
generator = DegradationCurveGenerator()
curve = generator.generate([POS_FIXTURE, MILD_FIXTURE, MODERATE_FIXTURE, NEG_FIXTURE], curve_id=CURVE_ID)
curve = generator.generate(generator.fixtures_for_layered_admissibility_curve(), curve_id=CURVE_ID)
markdown_path = tmp_path / "layered_admissibility.md"
generator.write_markdown(curve, markdown_path)

Expand Down Expand Up @@ -116,7 +127,7 @@ def test_disallowed_failure_label_raises_clear_error() -> None:

def test_progressive_curve_scores_are_monotonic_or_non_increasing() -> None:
generator = DegradationCurveGenerator()
curve = generator.generate([POS_FIXTURE, MILD_FIXTURE, MODERATE_FIXTURE, NEG_FIXTURE], curve_id=CURVE_ID)
curve = generator.generate(generator.fixtures_for_layered_admissibility_curve(), curve_id=CURVE_ID)
points = {point.fixture_id: point for point in curve.points}

assert points["coding_workflow_pr_review_v1"].overall_admissibility_score == 1.0
Expand Down
Loading