diff --git a/README.md b/README.md index 45edbc3..b112b1c 100644 --- a/README.md +++ b/README.md @@ -77,16 +77,39 @@ Results: 0 errors, 0 warnings ## Validation Levels -The validator implements 4 of the 6 TDD cascade validation levels: +The validator implements all 6 TDD cascade validation levels: | Level | Name | What It Checks | |:------|:-----|:---------------| | 1 | Schema Validation | YAML files conform to JSON Schema (structure, types, required fields) | | 2 | Cross-Reference Integrity | `satisfies_signal` and `satisfies_experience` annotations reference valid targets | +| 3 | Contract Satisfaction | Every declared L0 `constraint_contracts` / `commitment_contracts` is satisfied: its `validated_by` provider exists, every `satisfies_constraint` / `satisfies_commitment` reference resolves to a declared contract of the matching kind, and no declared contract is left uncovered | | 4 | Signal Coverage | Every signal requirement in `perception/signal_requirements.yaml` has at least one satisfying specification | | 5 | Experience Traceability | Every product and process traces upward to an L0 customer experience goal | +| 6 | Waste Detection | Declared units that are never consumed: orphaned `organization.yaml` roles, unconsumed products (no `implemented_by`, no references), empty contracts (no `requires`), and duplicate `implemented_by` allocations | -Levels 3 (contract satisfaction) and 6 (waste detection) are planned for future releases. +Levels 1–3 emit **errors** (non-zero exit code); Levels 4–6 emit advisory **warnings**. + +## Deterministic Query CLI + +`orgschema-query` is a plain deterministic query interface over the Level 3 and Level 6 checks — there is **no natural-language / LLM layer**. OrgSchema's constraint space is boolean (a contract is satisfied or it is not; a unit is consumed or it is not), so the appropriate interface is a deterministic query rather than a grounded NL→DSL pipeline. Results are exactly the validator's, so the two tools never disagree. + +```bash +# Run all deterministic checks (contracts + waste) over a schema +orgschema-query --schema ./orgschema-demo + +# Run a single check +orgschema-query --schema ./orgschema-demo --check contracts +orgschema-query --schema ./orgschema-demo --check waste + +# Select by level number (3 = contracts, 6 = waste) +orgschema-query --schema ./orgschema-demo --level 6 + +# Machine-readable JSON output (for CI / tooling) +orgschema-query --schema ./orgschema-demo --format json +``` + +Exit code is `1` when an error-severity check (contracts) reports a violation, else `0` — usable in CI alongside `orgschema-validate`. ## Schemas @@ -117,7 +140,8 @@ All schemas use JSON Schema Draft 2020-12 and allow additional properties for ex orgschema-framework/ ├── orgschema_framework/ │ ├── __init__.py -│ ├── validate.py # CLI validator (4 validation levels) +│ ├── validate.py # CLI validator (6 validation levels) +│ ├── query.py # Deterministic query CLI (orgschema-query) │ └── schemas/ │ ├── compliance.json │ ├── organization.json @@ -296,6 +320,7 @@ Install with `uv sync` (runtime) or `uv sync --extra dev` (with dev tools). | Block | Entry point | Inputs | Outputs | |-------|-------------|--------|---------| | Validation CLI | `orgschema-validate ` | YAML specs at `` | stdout PASS/FAIL summary; exit code 0/1 | +| Query CLI | `orgschema-query --schema ` | YAML specs at `` | structural text/JSON of L3/L6 violations; exit code 0/1 | | Schema files | `orgschema_framework/schemas/*.json` | — | Reused by `validate.py` | | Test suite | `pytest` | `tests/` (if present) | `output/logs/master_run.log` | | Full reproduction | `./reproduce.sh` | repo root | `output/logs/master_run.log` | diff --git a/orgschema_framework/query.py b/orgschema_framework/query.py new file mode 100644 index 0000000..1a23112 --- /dev/null +++ b/orgschema_framework/query.py @@ -0,0 +1,177 @@ +"""Deterministic query interface over an Organizational Schema directory. + +This is a plain, deterministic query CLI — there is NO natural-language or +LLM layer. OrgSchema's constraint space is boolean (a contract is satisfied +or it is not; a unit is consumed or it is not), so the appropriate interface +is a deterministic query over the validator's existing level functions rather +than a grounded NL->DSL pipeline. + +The CLI re-uses the Level 3 (contract satisfaction) and Level 6 (waste +detection) functions from ``orgschema_framework.validate`` so the query +results are exactly the validator's results — single source of truth. + +Usage: + orgschema-query --schema ./orgschema-demo + orgschema-query --schema ./orgschema-demo --check contracts + orgschema-query --schema ./orgschema-demo --check waste --format json + orgschema-query --schema ./orgschema-demo --level 3 + +Exit code is 1 if any selected error-severity check (contracts) reports a +violation, else 0 — making it usable in CI alongside ``orgschema-validate``. +""" + +import argparse +import json +import sys +from pathlib import Path + +from orgschema_framework.validate import ( + validate_contract_satisfaction, + validate_waste, +) + +# Each query maps to (level, severity, function). +# "error" severity affects the exit code; "warning" is advisory. +CHECKS = { + "contracts": (3, "error", validate_contract_satisfaction), + "waste": (6, "warning", validate_waste), +} + +# Map --level N to the corresponding check name (deterministic queries only +# expose the levels that this module owns: 3 and 6). +LEVEL_TO_CHECK = {3: "contracts", 6: "waste"} + + +def run_query(root: Path, checks: list[str]) -> dict: + """Run the requested deterministic checks and return a structured result. + + The returned dict is stable and JSON-serializable: + + { + "schema": "", + "checks": [ + {"check": "contracts", "level": 3, "severity": "error", + "violation_count": N, "violations": [..]}, + ... + ], + "error_count": , + "warning_count": , + "ok": + } + """ + results = [] + error_count = 0 + warning_count = 0 + + for name in checks: + level, severity, func = CHECKS[name] + violations = func(root) + results.append( + { + "check": name, + "level": level, + "severity": severity, + "violation_count": len(violations), + "violations": violations, + } + ) + if severity == "error": + error_count += len(violations) + else: + warning_count += len(violations) + + return { + "schema": str(root), + "checks": results, + "error_count": error_count, + "warning_count": warning_count, + "ok": error_count == 0, + } + + +def format_text(result: dict) -> str: + """Render a query result as a human-readable structural report.""" + lines = [f"=== OrgSchema deterministic query: {result['schema']} ==="] + for check in result["checks"]: + sev = check["severity"].upper() + header = ( + f"\nLevel {check['level']} — {check['check']} " + f"({check['violation_count']} {sev} violations)" + ) + lines.append(header) + if not check["violations"]: + lines.append(" PASSED") + else: + for v in check["violations"]: + lines.append(f" [{sev}] {v}") + lines.append( + f"\nSummary: {result['error_count']} errors, " + f"{result['warning_count']} warnings" + ) + return "\n".join(lines) + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + prog="orgschema-query", + description=( + "Deterministic query interface over an OrgSchema directory " + "(no natural-language / LLM layer)." + ), + ) + parser.add_argument( + "--schema", + required=True, + type=Path, + help="Path to the orgschema specifications directory.", + ) + parser.add_argument( + "--check", + choices=sorted(CHECKS) + ["all"], + default="all", + help="Which deterministic check(s) to run (default: all).", + ) + parser.add_argument( + "--level", + type=int, + choices=sorted(LEVEL_TO_CHECK), + help="Run a single validation level by number (3=contracts, 6=waste). " + "Overrides --check when given.", + ) + parser.add_argument( + "--format", + choices=["text", "json"], + default="text", + help="Output format (default: text).", + ) + return parser + + +def main(argv: list[str] | None = None) -> int: + parser = build_parser() + args = parser.parse_args(argv) + + root = args.schema.resolve() + if not root.is_dir(): + print(f"Error: {root} is not a directory", file=sys.stderr) + return 2 + + if args.level is not None: + checks = [LEVEL_TO_CHECK[args.level]] + elif args.check == "all": + checks = sorted(CHECKS) + else: + checks = [args.check] + + result = run_query(root, checks) + + if args.format == "json": + print(json.dumps(result, indent=2)) + else: + print(format_text(result)) + + return 0 if result["ok"] else 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/orgschema_framework/validate.py b/orgschema_framework/validate.py index 82beb5b..5df183f 100644 --- a/orgschema_framework/validate.py +++ b/orgschema_framework/validate.py @@ -1,12 +1,19 @@ """Organizational Schema Theory YAML schema validator. -Implements the first two levels of the 6-level CI/CD validation pipeline: +Implements the 6-level CI/CD validation pipeline: 1. Schema validation — "Is this valid YAML with required fields?" 2. Cross-reference integrity — "Do all linked specs exist and are consistent?" +3. Contract satisfaction — "Is every declared L0 contract actually satisfied?" +4. Signal coverage — "Does every signal requirement have a satisfying spec?" +5. Experience traceability — "Does every spec trace upward to L0?" +6. Waste detection — "Are any declared units/roles/contracts never consumed?" Usage: orgschema-validate /path/to/orgschema-demo python -m orgschema_framework.validate /path/to/orgschema-demo + +For a deterministic, machine-readable query over a single schema directory, +see orgschema_framework.query (the ``orgschema-query`` CLI). """ import datetime @@ -163,6 +170,147 @@ def validate_cross_references(root: Path) -> list[str]: return errors +def _load_yaml(filepath: Path): + """Safely load a YAML file, returning None on parse error or empty file.""" + try: + with open(filepath) as f: + return yaml.safe_load(f) + except yaml.YAMLError: + return None + + +def _load_experience_contract(root: Path) -> dict | None: + """Load the L0 customer experience contract, if present.""" + exp_path = root / "perception" / "customer_experience_contract.yaml" + if not exp_path.exists(): + return None + return _load_yaml(exp_path) + + +def _collect_declared_contracts(contract_data: dict) -> dict[str, dict]: + """Extract every declared L0 contract keyed by its id. + + Both ``constraint_contracts`` and ``commitment_contracts`` map a + human-readable name to a contract object carrying an ``id`` such as + ``L0_con_01`` / ``L0_com_01``. We index by that id and remember which + kind it is so contract references can be type-checked. + """ + contracts: dict[str, dict] = {} + for section, kind in ( + ("constraint_contracts", "constraint"), + ("commitment_contracts", "commitment"), + ): + block = contract_data.get(section) + if not isinstance(block, dict): + continue + for _name, contract in block.items(): + if isinstance(contract, dict) and "id" in contract: + entry = dict(contract) + entry["_kind"] = kind + contracts[contract["id"]] = entry + return contracts + + +def validate_contract_satisfaction(root: Path) -> list[str]: + """Level 3: Contract satisfaction. + + Every declared L0 contract (constraint or commitment) must actually be + satisfied, and every contract reference made by a spec must resolve to a + declared contract. Concretely this level enforces: + + 1. Provider exists — each contract's ``validated_by`` provider file (the + spec that discharges the contract) must exist on disk. A contract that + points at a missing provider is an unmet contract. + 2. No dangling reference — every ``satisfies_constraint`` / + ``satisfies_commitment`` annotation on any spec must point at a + contract id that is actually declared in the L0 contract, and must use + the matching reference key for the contract kind (a constraint id may + only appear under ``satisfies_constraint``, a commitment id only under + ``satisfies_commitment``). + 3. No unmet contract — every declared contract must be covered by at + least one satisfying spec, either via an existing ``validated_by`` + provider or via at least one spec that references it. + + Contracts are binding (regulatory / self-imposed obligations), so + failures here are errors, mirroring Level 2. + """ + errors: list[str] = [] + + contract_data = _load_experience_contract(root) + if not contract_data: + # No contract file => nothing to satisfy. Other levels report the + # missing file; Level 3 stays silent rather than double-reporting. + return errors + + declared = _collect_declared_contracts(contract_data) + if not declared: + return errors + + constraint_ids = {cid for cid, c in declared.items() if c["_kind"] == "constraint"} + commitment_ids = {cid for cid, c in declared.items() if c["_kind"] == "commitment"} + + # (1) Provider existence + collect which contracts have a provider. + contracts_with_provider: set[str] = set() + for cid, contract in sorted(declared.items()): + validated_by = contract.get("validated_by") + if not validated_by: + continue + # validated_by may carry a parenthetical note, e.g. + # "processes/opening_closing.yaml (safety checks)". + provider_file = str(validated_by).split("(")[0].strip() + provider_path = root / provider_file + if provider_path.exists(): + contracts_with_provider.add(cid) + else: + errors.append( + f"Unmet contract {cid}: validated_by provider " + f"'{provider_file}' does not exist" + ) + + # (2) Dangling / mistyped references across all specs, and collect which + # contracts are referenced by at least one spec. + referenced: set[str] = set() + for filepath in sorted(root.glob("**/*.yaml")): + if ".github" in str(filepath): + continue + data = _load_yaml(filepath) + if not isinstance(data, dict): + continue + rel = str(filepath.relative_to(root)) + + for key, valid_ids, kind in ( + ("satisfies_constraint", constraint_ids, "constraint"), + ("satisfies_commitment", commitment_ids, "commitment"), + ): + refs = data.get(key) + if not refs: + continue + for ref in refs: + if ref in valid_ids: + referenced.add(ref) + elif ref in declared: + other = declared[ref]["_kind"] + errors.append( + f"Contract type mismatch in {rel}: {key} references " + f"'{ref}' which is a {other} contract" + ) + else: + errors.append( + f"Unsatisfied contract reference in {rel}: {key} " + f"'{ref}' is not a declared {kind} contract" + ) + + # (3) Every declared contract must be covered by a provider or a reference. + covered = contracts_with_provider | referenced + for cid in sorted(set(declared) - covered): + errors.append( + f"Unmet contract {cid}: no satisfying spec " + f"(no existing validated_by provider and no spec references it)" + ) + + return errors + + def validate_signal_coverage(root: Path) -> list[str]: """Level 4: Signal coverage — are all signal requirements satisfied?""" warnings = [] @@ -251,6 +399,144 @@ def validate_experience_traceability(root: Path) -> list[str]: return warnings +def _walk_strings(node): + """Yield every string scalar anywhere inside a nested YAML structure.""" + if isinstance(node, str): + yield node + elif isinstance(node, dict): + for value in node.values(): + yield from _walk_strings(value) + elif isinstance(node, list): + for item in node: + yield from _walk_strings(item) + + +def validate_waste(root: Path) -> list[str]: + """Level 6: Waste detection. + + Surfaces specification waste — declared organizational units, roles, and + contracts that are never actually consumed, plus duplicated coverage. + Waste is advisory (it does not break a build), so this level emits + warnings, mirroring Levels 4 and 5. Detected forms: + + 1. Orphaned roles — a role declared in ``organization.yaml: roles`` that + is referenced by no process spec (no ``who``/``responsible`` field and + no mention anywhere in a process file). + 2. Unconsumed products — a product spec under ``products/`` that no signal + requirement lists in ``implemented_by`` and that no other spec + references by path or id. Such a product satisfies nothing upward. + 3. Empty contracts — a declared L0 contract whose ``requires`` obligation + list is missing or empty (a contract that demands nothing is waste). + 4. Duplicate coverage — a contract referenced identically by more units + than necessary is *not* flagged (redundancy can be intentional), but a + product appearing twice in a single signal's ``implemented_by`` is + flagged as duplicate allocation. + + Definitions are deliberately conservative: only entities that are + declared in one place and consumed nowhere are reported, to avoid noisy + false positives. + """ + warnings: list[str] = [] + + yaml_files = [f for f in sorted(root.glob("**/*.yaml")) if ".github" not in str(f)] + docs = {f: _load_yaml(f) for f in yaml_files} + + # --- (1) Orphaned roles ------------------------------------------------- + org_path = root / "organization.yaml" + org_data = docs.get(org_path) + declared_roles = [] + if isinstance(org_data, dict) and isinstance(org_data.get("roles"), list): + declared_roles = [r for r in org_data["roles"] if isinstance(r, str)] + + if declared_roles: + # A role counts as "referenced" if its name appears anywhere inside a + # process spec — as a structured ``who``/``responsible`` value or in + # prose. We join all process strings and test by substring, which is + # conservative (a declared role mentioned anywhere is not flagged) and + # avoids false positives from singular/plural or compound usages. + process_text_parts: list[str] = [] + for filepath, data in docs.items(): + rel = str(filepath.relative_to(root)) + if not rel.startswith("processes/") or not isinstance(data, dict): + continue + process_text_parts.extend(_walk_strings(data)) + process_text = "\n".join(process_text_parts) + for role in declared_roles: + if role not in process_text: + warnings.append( + f"Waste: role '{role}' declared in organization.yaml " + f"is never referenced by any process spec" + ) + + # --- (2) Unconsumed products ------------------------------------------- + product_files = { + filepath: data + for filepath, data in docs.items() + if str(filepath.relative_to(root)).startswith("products/") + and isinstance(data, dict) + } + + # Collect all implemented_by references and per-signal duplicates (4). + implemented_paths: set[str] = set() + sig_req_path = root / "perception" / "signal_requirements.yaml" + sig_data = docs.get(sig_req_path) + if isinstance(sig_data, dict): + for req in sig_data.get("signal_requirements", []) or []: + impls = req.get("implemented_by") if isinstance(req, dict) else None + if not impls: + continue + implemented_paths.update(impls) + seen: set[str] = set() + for impl in impls: + if impl in seen: + warnings.append( + f"Waste: duplicate allocation — signal " + f"{req.get('id', '?')} lists '{impl}' more than once " + f"in implemented_by" + ) + seen.add(impl) + + # Any string anywhere outside the product file itself that mentions its + # path or id counts as consumption. + for filepath, data in product_files.items(): + rel = str(filepath.relative_to(root)) + product_id = data.get("id") if isinstance(data, dict) else None + + if rel in implemented_paths: + continue + + consumed = False + for other_path, other_data in docs.items(): + if other_path == filepath: + continue + for s in _walk_strings(other_data): + if rel in s or (product_id and product_id == s): + consumed = True + break + if consumed: + break + + if not consumed: + warnings.append( + f"Waste: product {rel} is unconsumed — no signal " + f"implemented_by and no other spec references it" + ) + + # --- (3) Empty contracts ------------------------------------------------ + contract_data = _load_experience_contract(root) + if contract_data: + declared = _collect_declared_contracts(contract_data) + for cid, contract in sorted(declared.items()): + requires = contract.get("requires") + if not requires: + warnings.append( + f"Waste: contract {cid} declares no obligations " + f"(empty or missing 'requires')" + ) + + return warnings + + def main(): if len(sys.argv) < 2: print("Usage: orgschema-validate ") @@ -283,6 +569,16 @@ def main(): else: print(" PASSED") + # Level 3: Contract satisfaction + print("Level 3: Contract satisfaction...") + contract_errors = validate_contract_satisfaction(root) + for err in contract_errors: + print(f" ERROR: {err}") + if contract_errors: + exit_code = 1 + else: + print(" PASSED") + # Level 4: Signal coverage print("Level 4: Signal coverage...") coverage_warnings = validate_signal_coverage(root) @@ -299,9 +595,17 @@ def main(): if not trace_warnings: print(" PASSED") + # Level 6: Waste detection + print("Level 6: Waste detection...") + waste_warnings = validate_waste(root) + for warn in waste_warnings: + print(f" WARNING: {warn}") + if not waste_warnings: + print(" PASSED") + # Summary - total_errors = len(schema_errors) + len(xref_errors) - total_warnings = len(coverage_warnings) + len(trace_warnings) + total_errors = len(schema_errors) + len(xref_errors) + len(contract_errors) + total_warnings = len(coverage_warnings) + len(trace_warnings) + len(waste_warnings) print(f"\nSummary: {total_errors} errors, {total_warnings} warnings") sys.exit(exit_code) diff --git a/pyproject.toml b/pyproject.toml index 7dcbefa..5fb7a62 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,6 +16,7 @@ dev = [ [project.scripts] orgschema-validate = "orgschema_framework.validate:main" +orgschema-query = "orgschema_framework.query:main" [dependency-groups] dev = [ @@ -26,6 +27,13 @@ dev = [ "types-pyyaml>=6.0.12.20250915", ] +[tool.pytest.ini_options] +# The project is consumed as a source tree (no build-system); add the repo +# root to sys.path so tests can import the orgschema_framework package the +# same way `python -m orgschema_framework.validate` resolves it. +pythonpath = ["."] +testpaths = ["tests"] + [tool.mypy] ignore_missing_imports = true disable_error_code = ["import-untyped"] diff --git a/tests/test_query_cli.py b/tests/test_query_cli.py new file mode 100644 index 0000000..d6cd1d6 --- /dev/null +++ b/tests/test_query_cli.py @@ -0,0 +1,108 @@ +"""Smoke tests for the deterministic ``orgschema-query`` CLI.""" + +import json +from pathlib import Path + +import yaml + +from orgschema_framework import query + + +def _write(path: Path, data: dict) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(yaml.safe_dump(data, sort_keys=False)) + + +def _clean_schema(root: Path) -> None: + _write( + root / "organization.yaml", + { + "name": "Test Co", + "type": "coffee_shop", + "version": "1.0.0", + "roles": ["barista"], + }, + ) + _write( + root / "perception" / "customer_experience_contract.yaml", + { + "version": "1.0.0", + "experience_contract": {}, + "constraint_contracts": { + "food_safety": { + "id": "L0_con_01", + "type": "constraint", + "requires": ["HACCP documented"], + "validated_by": "compliance/food_safety.yaml", + } + }, + "commitment_contracts": {}, + }, + ) + _write(root / "compliance" / "food_safety.yaml", {"version": "1.0.0"}) + _write( + root / "processes" / "open.yaml", + { + "version": "1.0.0", + "satisfies_constraint": ["L0_con_01"], + "who": "barista", + }, + ) + + +def test_cli_runs_all_checks_clean(tmp_path, capsys): + _clean_schema(tmp_path) + rc = query.main(["--schema", str(tmp_path)]) + out = capsys.readouterr().out + assert rc == 0 + assert "Level 3" in out + assert "contracts" in out + assert "Level 6" in out + assert "waste" in out + + +def test_cli_json_format_is_valid(tmp_path, capsys): + _clean_schema(tmp_path) + rc = query.main(["--schema", str(tmp_path), "--format", "json"]) + out = capsys.readouterr().out + assert rc == 0 + payload = json.loads(out) + assert payload["ok"] is True + assert payload["error_count"] == 0 + assert {c["check"] for c in payload["checks"]} == {"contracts", "waste"} + + +def test_cli_check_contracts_only(tmp_path, capsys): + _clean_schema(tmp_path) + rc = query.main( + ["--schema", str(tmp_path), "--check", "contracts", "--format", "json"] + ) + out = capsys.readouterr().out + assert rc == 0 + payload = json.loads(out) + assert {c["check"] for c in payload["checks"]} == {"contracts"} + + +def test_cli_level_flag_selects_waste(tmp_path, capsys): + _clean_schema(tmp_path) + rc = query.main(["--schema", str(tmp_path), "--level", "6", "--format", "json"]) + out = capsys.readouterr().out + assert rc == 0 + payload = json.loads(out) + assert {c["check"] for c in payload["checks"]} == {"waste"} + + +def test_cli_nonzero_exit_on_contract_violation(tmp_path, capsys): + _clean_schema(tmp_path) + # Break the contract: delete provider and the referencing process. + (tmp_path / "compliance" / "food_safety.yaml").unlink() + (tmp_path / "processes" / "open.yaml").unlink() + rc = query.main(["--schema", str(tmp_path), "--check", "contracts"]) + out = capsys.readouterr().out + assert rc == 1 + assert "L0_con_01" in out + + +def test_cli_missing_dir_returns_2(tmp_path, capsys): + rc = query.main(["--schema", str(tmp_path / "nope")]) + assert rc == 2 diff --git a/tests/test_validate_l3_l6.py b/tests/test_validate_l3_l6.py new file mode 100644 index 0000000..8d9145d --- /dev/null +++ b/tests/test_validate_l3_l6.py @@ -0,0 +1,225 @@ +"""Tests for Level 3 (contract satisfaction) and Level 6 (waste detection). + +Fixtures are built on disk under ``tmp_path`` to mirror the orgschema-demo +layout (perception/, processes/, products/, organization.yaml) so the level +functions run against the same file shapes they see in production. +""" + +from pathlib import Path + +import yaml + +from orgschema_framework.validate import ( + validate_contract_satisfaction, + validate_waste, +) + + +def _write(path: Path, data: dict) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(yaml.safe_dump(data, sort_keys=False)) + + +def _experience_contract(constraints, commitments) -> dict: + return { + "version": "1.0.0", + "experience_contract": {}, + "constraint_contracts": constraints, + "commitment_contracts": commitments, + } + + +def _good_schema(root: Path) -> None: + """Build a minimal schema where all contracts are satisfied, no waste.""" + _write( + root / "organization.yaml", + { + "name": "Test Co", + "type": "coffee_shop", + "version": "1.0.0", + "roles": ["barista", "shift_lead"], + }, + ) + _write( + root / "perception" / "customer_experience_contract.yaml", + _experience_contract( + constraints={ + "food_safety": { + "id": "L0_con_01", + "type": "constraint", + "requires": ["HACCP plan documented"], + "validated_by": "compliance/food_safety.yaml", + }, + }, + commitments={ + "transparency": { + "id": "L0_com_01", + "type": "commitment", + "requires": ["Pricing visible"], + # No validated_by — must be covered by a referencing spec. + }, + }, + ), + ) + _write( + root / "perception" / "signal_requirements.yaml", + { + "version": "1.0.0", + "signal_requirements": [ + { + "id": "L1_exp_01", + "implemented_by": ["products/espresso.yaml"], + } + ], + }, + ) + _write(root / "compliance" / "food_safety.yaml", {"version": "1.0.0"}) + _write( + root / "products" / "espresso.yaml", + { + "version": "1.0.0", + "id": "espresso", + "name": "Espresso", + "category": "hot_beverage", + "available": True, + "satisfies_commitment": ["L0_com_01"], + }, + ) + _write( + root / "processes" / "opening_closing.yaml", + { + "version": "1.0.0", + "satisfies_constraint": ["L0_con_01"], + "opening": {"who": "shift_lead", "lead": "barista"}, + }, + ) + + +# -------------------------------------------------------------------------- +# Level 3: contract satisfaction +# -------------------------------------------------------------------------- + + +def test_l3_positive_all_contracts_satisfied(tmp_path): + _good_schema(tmp_path) + errors = validate_contract_satisfaction(tmp_path) + assert errors == [] + + +def test_l3_negative_missing_provider(tmp_path): + _good_schema(tmp_path) + # Remove the provider file the constraint's validated_by points at, and + # remove the spec that references it, so the contract becomes unmet. + (tmp_path / "compliance" / "food_safety.yaml").unlink() + (tmp_path / "processes" / "opening_closing.yaml").unlink() + errors = validate_contract_satisfaction(tmp_path) + assert any("L0_con_01" in e for e in errors) + + +def test_l3_negative_dangling_reference(tmp_path): + _good_schema(tmp_path) + # Reference a contract id that is not declared anywhere. + _write( + tmp_path / "products" / "ghost.yaml", + { + "version": "1.0.0", + "id": "ghost", + "name": "Ghost", + "category": "hot_beverage", + "available": True, + "satisfies_constraint": ["L0_con_99"], + }, + ) + errors = validate_contract_satisfaction(tmp_path) + assert any("L0_con_99" in e for e in errors) + + +def test_l3_negative_type_mismatch(tmp_path): + _good_schema(tmp_path) + # Reference a commitment id under satisfies_constraint (wrong key). + _write( + tmp_path / "products" / "espresso.yaml", + { + "version": "1.0.0", + "id": "espresso", + "name": "Espresso", + "category": "hot_beverage", + "available": True, + "satisfies_constraint": ["L0_com_01"], + "satisfies_commitment": ["L0_com_01"], + }, + ) + errors = validate_contract_satisfaction(tmp_path) + assert any("mismatch" in e.lower() and "L0_com_01" in e for e in errors) + + +def test_l3_no_contract_file_is_silent(tmp_path): + # An empty directory has nothing to satisfy. + errors = validate_contract_satisfaction(tmp_path) + assert errors == [] + + +# -------------------------------------------------------------------------- +# Level 6: waste detection +# -------------------------------------------------------------------------- + + +def test_l6_positive_no_waste(tmp_path): + _good_schema(tmp_path) + warnings = validate_waste(tmp_path) + assert warnings == [] + + +def test_l6_negative_orphaned_role(tmp_path): + _good_schema(tmp_path) + org_path = tmp_path / "organization.yaml" + org = yaml.safe_load(org_path.read_text()) + org["roles"].append("ghost_role") + _write(org_path, org) + warnings = validate_waste(tmp_path) + assert any("ghost_role" in w for w in warnings) + + +def test_l6_negative_unconsumed_product(tmp_path): + _good_schema(tmp_path) + # A product no signal implements and nothing references. + _write( + tmp_path / "products" / "orphan.yaml", + { + "version": "1.0.0", + "id": "orphan", + "name": "Orphan", + "category": "hot_beverage", + "available": True, + }, + ) + warnings = validate_waste(tmp_path) + assert any("orphan" in w for w in warnings) + + +def test_l6_negative_empty_contract(tmp_path): + _good_schema(tmp_path) + # Add a commitment contract with no obligations. + contract_path = tmp_path / "perception" / "customer_experience_contract.yaml" + data = yaml.safe_load(contract_path.read_text()) + data["commitment_contracts"]["empty_one"] = { + "id": "L0_com_09", + "type": "commitment", + "requires": [], + } + _write(contract_path, data) + warnings = validate_waste(tmp_path) + assert any("L0_com_09" in w for w in warnings) + + +def test_l6_negative_duplicate_allocation(tmp_path): + _good_schema(tmp_path) + sig_path = tmp_path / "perception" / "signal_requirements.yaml" + data = yaml.safe_load(sig_path.read_text()) + data["signal_requirements"][0]["implemented_by"] = [ + "products/espresso.yaml", + "products/espresso.yaml", + ] + _write(sig_path, data) + warnings = validate_waste(tmp_path) + assert any("duplicate" in w.lower() for w in warnings)