From efbdfb8212b65b52cac15e5f9257b3353115b860 Mon Sep 17 00:00:00 2001 From: Dmitry <69451861+viberesearch@users.noreply.github.com> Date: Sat, 6 Jun 2026 01:21:08 +0300 Subject: [PATCH] feat(validate): implement L3 contract satisfaction + L6 waste detection + query CLI MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements the two previously-planned validator levels and a deterministic query interface, completing the 6-level TDD cascade. Level 3 — Contract satisfaction (errors): - Each declared L0 constraint_contracts / commitment_contracts must be satisfied: its validated_by provider file must exist, every satisfies_constraint / satisfies_commitment reference must resolve to a declared contract of the matching kind (constraint vs commitment), and no declared contract may be left uncovered (no provider and no referencing spec). Mirrors the binding semantics of Level 2. Level 6 — Waste detection (warnings): - Orphaned organization.yaml roles never referenced in any process spec - Unconsumed products (no signal implemented_by, no other reference) - Empty contracts (missing/empty requires obligations) - Duplicate implemented_by allocation within a single signal - Conservative definitions to avoid false positives. Deterministic query CLI (orgschema-query): - Plain deterministic interface over L3/L6 (NO natural-language / LLM layer); OrgSchema's constraint space is boolean, so a deterministic query is the correct interface rather than a grounded NL->DSL pipeline. - --schema FILE [--check contracts|waste|all] [--level 3|6] [--format text|json] - Re-uses the validator level functions so results never diverge. - console_scripts entry point added in pyproject.toml. Tests: positive + negative cases for L3 and L6, plus CLI smoke tests (16 tests). pytest pythonpath configured so the source tree imports. Verified against orgschema-demo: all 6 levels pass, 0 false positives. Co-Authored-By: Claude Opus 4.8 (1M context) --- README.md | 31 +++- orgschema_framework/query.py | 177 ++++++++++++++++++ orgschema_framework/validate.py | 310 +++++++++++++++++++++++++++++++- pyproject.toml | 8 + tests/test_query_cli.py | 108 +++++++++++ tests/test_validate_l3_l6.py | 225 +++++++++++++++++++++++ 6 files changed, 853 insertions(+), 6 deletions(-) create mode 100644 orgschema_framework/query.py create mode 100644 tests/test_query_cli.py create mode 100644 tests/test_validate_l3_l6.py diff --git a/README.md b/README.md index 45edbc3..b112b1c 100644 --- a/README.md +++ b/README.md @@ -77,16 +77,39 @@ Results: 0 errors, 0 warnings ## Validation Levels -The validator implements 4 of the 6 TDD cascade validation levels: +The validator implements all 6 TDD cascade validation levels: | Level | Name | What It Checks | |:------|:-----|:---------------| | 1 | Schema Validation | YAML files conform to JSON Schema (structure, types, required fields) | | 2 | Cross-Reference Integrity | `satisfies_signal` and `satisfies_experience` annotations reference valid targets | +| 3 | Contract Satisfaction | Every declared L0 `constraint_contracts` / `commitment_contracts` is satisfied: its `validated_by` provider exists, every `satisfies_constraint` / `satisfies_commitment` reference resolves to a declared contract of the matching kind, and no declared contract is left uncovered | | 4 | Signal Coverage | Every signal requirement in `perception/signal_requirements.yaml` has at least one satisfying specification | | 5 | Experience Traceability | Every product and process traces upward to an L0 customer experience goal | +| 6 | Waste Detection | Declared units that are never consumed: orphaned `organization.yaml` roles, unconsumed products (no `implemented_by`, no references), empty contracts (no `requires`), and duplicate `implemented_by` allocations | -Levels 3 (contract satisfaction) and 6 (waste detection) are planned for future releases. +Levels 1–3 emit **errors** (non-zero exit code); Levels 4–6 emit advisory **warnings**. + +## Deterministic Query CLI + +`orgschema-query` is a plain deterministic query interface over the Level 3 and Level 6 checks — there is **no natural-language / LLM layer**. OrgSchema's constraint space is boolean (a contract is satisfied or it is not; a unit is consumed or it is not), so the appropriate interface is a deterministic query rather than a grounded NL→DSL pipeline. Results are exactly the validator's, so the two tools never disagree. + +```bash +# Run all deterministic checks (contracts + waste) over a schema +orgschema-query --schema ./orgschema-demo + +# Run a single check +orgschema-query --schema ./orgschema-demo --check contracts +orgschema-query --schema ./orgschema-demo --check waste + +# Select by level number (3 = contracts, 6 = waste) +orgschema-query --schema ./orgschema-demo --level 6 + +# Machine-readable JSON output (for CI / tooling) +orgschema-query --schema ./orgschema-demo --format json +``` + +Exit code is `1` when an error-severity check (contracts) reports a violation, else `0` — usable in CI alongside `orgschema-validate`. ## Schemas @@ -117,7 +140,8 @@ All schemas use JSON Schema Draft 2020-12 and allow additional properties for ex orgschema-framework/ ├── orgschema_framework/ │ ├── __init__.py -│ ├── validate.py # CLI validator (4 validation levels) +│ ├── validate.py # CLI validator (6 validation levels) +│ ├── query.py # Deterministic query CLI (orgschema-query) │ └── schemas/ │ ├── compliance.json │ ├── organization.json @@ -296,6 +320,7 @@ Install with `uv sync` (runtime) or `uv sync --extra dev` (with dev tools). | Block | Entry point | Inputs | Outputs | |-------|-------------|--------|---------| | Validation CLI | `orgschema-validate ` | YAML specs at `` | stdout PASS/FAIL summary; exit code 0/1 | +| Query CLI | `orgschema-query --schema ` | YAML specs at `` | structural text/JSON of L3/L6 violations; exit code 0/1 | | Schema files | `orgschema_framework/schemas/*.json` | — | Reused by `validate.py` | | Test suite | `pytest` | `tests/` (if present) | `output/logs/master_run.log` | | Full reproduction | `./reproduce.sh` | repo root | `output/logs/master_run.log` | diff --git a/orgschema_framework/query.py b/orgschema_framework/query.py new file mode 100644 index 0000000..1a23112 --- /dev/null +++ b/orgschema_framework/query.py @@ -0,0 +1,177 @@ +"""Deterministic query interface over an Organizational Schema directory. + +This is a plain, deterministic query CLI — there is NO natural-language or +LLM layer. OrgSchema's constraint space is boolean (a contract is satisfied +or it is not; a unit is consumed or it is not), so the appropriate interface +is a deterministic query over the validator's existing level functions rather +than a grounded NL->DSL pipeline. + +The CLI re-uses the Level 3 (contract satisfaction) and Level 6 (waste +detection) functions from ``orgschema_framework.validate`` so the query +results are exactly the validator's results — single source of truth. + +Usage: + orgschema-query --schema ./orgschema-demo + orgschema-query --schema ./orgschema-demo --check contracts + orgschema-query --schema ./orgschema-demo --check waste --format json + orgschema-query --schema ./orgschema-demo --level 3 + +Exit code is 1 if any selected error-severity check (contracts) reports a +violation, else 0 — making it usable in CI alongside ``orgschema-validate``. +""" + +import argparse +import json +import sys +from pathlib import Path + +from orgschema_framework.validate import ( + validate_contract_satisfaction, + validate_waste, +) + +# Each query maps to (level, severity, function). +# "error" severity affects the exit code; "warning" is advisory. +CHECKS = { + "contracts": (3, "error", validate_contract_satisfaction), + "waste": (6, "warning", validate_waste), +} + +# Map --level N to the corresponding check name (deterministic queries only +# expose the levels that this module owns: 3 and 6). +LEVEL_TO_CHECK = {3: "contracts", 6: "waste"} + + +def run_query(root: Path, checks: list[str]) -> dict: + """Run the requested deterministic checks and return a structured result. + + The returned dict is stable and JSON-serializable: + + { + "schema": "", + "checks": [ + {"check": "contracts", "level": 3, "severity": "error", + "violation_count": N, "violations": [..]}, + ... + ], + "error_count": , + "warning_count": , + "ok": + } + """ + results = [] + error_count = 0 + warning_count = 0 + + for name in checks: + level, severity, func = CHECKS[name] + violations = func(root) + results.append( + { + "check": name, + "level": level, + "severity": severity, + "violation_count": len(violations), + "violations": violations, + } + ) + if severity == "error": + error_count += len(violations) + else: + warning_count += len(violations) + + return { + "schema": str(root), + "checks": results, + "error_count": error_count, + "warning_count": warning_count, + "ok": error_count == 0, + } + + +def format_text(result: dict) -> str: + """Render a query result as a human-readable structural report.""" + lines = [f"=== OrgSchema deterministic query: {result['schema']} ==="] + for check in result["checks"]: + sev = check["severity"].upper() + header = ( + f"\nLevel {check['level']} — {check['check']} " + f"({check['violation_count']} {sev} violations)" + ) + lines.append(header) + if not check["violations"]: + lines.append(" PASSED") + else: + for v in check["violations"]: + lines.append(f" [{sev}] {v}") + lines.append( + f"\nSummary: {result['error_count']} errors, " + f"{result['warning_count']} warnings" + ) + return "\n".join(lines) + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + prog="orgschema-query", + description=( + "Deterministic query interface over an OrgSchema directory " + "(no natural-language / LLM layer)." + ), + ) + parser.add_argument( + "--schema", + required=True, + type=Path, + help="Path to the orgschema specifications directory.", + ) + parser.add_argument( + "--check", + choices=sorted(CHECKS) + ["all"], + default="all", + help="Which deterministic check(s) to run (default: all).", + ) + parser.add_argument( + "--level", + type=int, + choices=sorted(LEVEL_TO_CHECK), + help="Run a single validation level by number (3=contracts, 6=waste). " + "Overrides --check when given.", + ) + parser.add_argument( + "--format", + choices=["text", "json"], + default="text", + help="Output format (default: text).", + ) + return parser + + +def main(argv: list[str] | None = None) -> int: + parser = build_parser() + args = parser.parse_args(argv) + + root = args.schema.resolve() + if not root.is_dir(): + print(f"Error: {root} is not a directory", file=sys.stderr) + return 2 + + if args.level is not None: + checks = [LEVEL_TO_CHECK[args.level]] + elif args.check == "all": + checks = sorted(CHECKS) + else: + checks = [args.check] + + result = run_query(root, checks) + + if args.format == "json": + print(json.dumps(result, indent=2)) + else: + print(format_text(result)) + + return 0 if result["ok"] else 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/orgschema_framework/validate.py b/orgschema_framework/validate.py index 82beb5b..5df183f 100644 --- a/orgschema_framework/validate.py +++ b/orgschema_framework/validate.py @@ -1,12 +1,19 @@ """Organizational Schema Theory YAML schema validator. -Implements the first two levels of the 6-level CI/CD validation pipeline: +Implements the 6-level CI/CD validation pipeline: 1. Schema validation — "Is this valid YAML with required fields?" 2. Cross-reference integrity — "Do all linked specs exist and are consistent?" +3. Contract satisfaction — "Is every declared L0 contract actually satisfied?" +4. Signal coverage — "Does every signal requirement have a satisfying spec?" +5. Experience traceability — "Does every spec trace upward to L0?" +6. Waste detection — "Are any declared units/roles/contracts never consumed?" Usage: orgschema-validate /path/to/orgschema-demo python -m orgschema_framework.validate /path/to/orgschema-demo + +For a deterministic, machine-readable query over a single schema directory, +see orgschema_framework.query (the ``orgschema-query`` CLI). """ import datetime @@ -163,6 +170,147 @@ def validate_cross_references(root: Path) -> list[str]: return errors +def _load_yaml(filepath: Path): + """Safely load a YAML file, returning None on parse error or empty file.""" + try: + with open(filepath) as f: + return yaml.safe_load(f) + except yaml.YAMLError: + return None + + +def _load_experience_contract(root: Path) -> dict | None: + """Load the L0 customer experience contract, if present.""" + exp_path = root / "perception" / "customer_experience_contract.yaml" + if not exp_path.exists(): + return None + return _load_yaml(exp_path) + + +def _collect_declared_contracts(contract_data: dict) -> dict[str, dict]: + """Extract every declared L0 contract keyed by its id. + + Both ``constraint_contracts`` and ``commitment_contracts`` map a + human-readable name to a contract object carrying an ``id`` such as + ``L0_con_01`` / ``L0_com_01``. We index by that id and remember which + kind it is so contract references can be type-checked. + """ + contracts: dict[str, dict] = {} + for section, kind in ( + ("constraint_contracts", "constraint"), + ("commitment_contracts", "commitment"), + ): + block = contract_data.get(section) + if not isinstance(block, dict): + continue + for _name, contract in block.items(): + if isinstance(contract, dict) and "id" in contract: + entry = dict(contract) + entry["_kind"] = kind + contracts[contract["id"]] = entry + return contracts + + +def validate_contract_satisfaction(root: Path) -> list[str]: + """Level 3: Contract satisfaction. + + Every declared L0 contract (constraint or commitment) must actually be + satisfied, and every contract reference made by a spec must resolve to a + declared contract. Concretely this level enforces: + + 1. Provider exists — each contract's ``validated_by`` provider file (the + spec that discharges the contract) must exist on disk. A contract that + points at a missing provider is an unmet contract. + 2. No dangling reference — every ``satisfies_constraint`` / + ``satisfies_commitment`` annotation on any spec must point at a + contract id that is actually declared in the L0 contract, and must use + the matching reference key for the contract kind (a constraint id may + only appear under ``satisfies_constraint``, a commitment id only under + ``satisfies_commitment``). + 3. No unmet contract — every declared contract must be covered by at + least one satisfying spec, either via an existing ``validated_by`` + provider or via at least one spec that references it. + + Contracts are binding (regulatory / self-imposed obligations), so + failures here are errors, mirroring Level 2. + """ + errors: list[str] = [] + + contract_data = _load_experience_contract(root) + if not contract_data: + # No contract file => nothing to satisfy. Other levels report the + # missing file; Level 3 stays silent rather than double-reporting. + return errors + + declared = _collect_declared_contracts(contract_data) + if not declared: + return errors + + constraint_ids = {cid for cid, c in declared.items() if c["_kind"] == "constraint"} + commitment_ids = {cid for cid, c in declared.items() if c["_kind"] == "commitment"} + + # (1) Provider existence + collect which contracts have a provider. + contracts_with_provider: set[str] = set() + for cid, contract in sorted(declared.items()): + validated_by = contract.get("validated_by") + if not validated_by: + continue + # validated_by may carry a parenthetical note, e.g. + # "processes/opening_closing.yaml (safety checks)". + provider_file = str(validated_by).split("(")[0].strip() + provider_path = root / provider_file + if provider_path.exists(): + contracts_with_provider.add(cid) + else: + errors.append( + f"Unmet contract {cid}: validated_by provider " + f"'{provider_file}' does not exist" + ) + + # (2) Dangling / mistyped references across all specs, and collect which + # contracts are referenced by at least one spec. + referenced: set[str] = set() + for filepath in sorted(root.glob("**/*.yaml")): + if ".github" in str(filepath): + continue + data = _load_yaml(filepath) + if not isinstance(data, dict): + continue + rel = str(filepath.relative_to(root)) + + for key, valid_ids, kind in ( + ("satisfies_constraint", constraint_ids, "constraint"), + ("satisfies_commitment", commitment_ids, "commitment"), + ): + refs = data.get(key) + if not refs: + continue + for ref in refs: + if ref in valid_ids: + referenced.add(ref) + elif ref in declared: + other = declared[ref]["_kind"] + errors.append( + f"Contract type mismatch in {rel}: {key} references " + f"'{ref}' which is a {other} contract" + ) + else: + errors.append( + f"Unsatisfied contract reference in {rel}: {key} " + f"'{ref}' is not a declared {kind} contract" + ) + + # (3) Every declared contract must be covered by a provider or a reference. + covered = contracts_with_provider | referenced + for cid in sorted(set(declared) - covered): + errors.append( + f"Unmet contract {cid}: no satisfying spec " + f"(no existing validated_by provider and no spec references it)" + ) + + return errors + + def validate_signal_coverage(root: Path) -> list[str]: """Level 4: Signal coverage — are all signal requirements satisfied?""" warnings = [] @@ -251,6 +399,144 @@ def validate_experience_traceability(root: Path) -> list[str]: return warnings +def _walk_strings(node): + """Yield every string scalar anywhere inside a nested YAML structure.""" + if isinstance(node, str): + yield node + elif isinstance(node, dict): + for value in node.values(): + yield from _walk_strings(value) + elif isinstance(node, list): + for item in node: + yield from _walk_strings(item) + + +def validate_waste(root: Path) -> list[str]: + """Level 6: Waste detection. + + Surfaces specification waste — declared organizational units, roles, and + contracts that are never actually consumed, plus duplicated coverage. + Waste is advisory (it does not break a build), so this level emits + warnings, mirroring Levels 4 and 5. Detected forms: + + 1. Orphaned roles — a role declared in ``organization.yaml: roles`` that + is referenced by no process spec (no ``who``/``responsible`` field and + no mention anywhere in a process file). + 2. Unconsumed products — a product spec under ``products/`` that no signal + requirement lists in ``implemented_by`` and that no other spec + references by path or id. Such a product satisfies nothing upward. + 3. Empty contracts — a declared L0 contract whose ``requires`` obligation + list is missing or empty (a contract that demands nothing is waste). + 4. Duplicate coverage — a contract referenced identically by more units + than necessary is *not* flagged (redundancy can be intentional), but a + product appearing twice in a single signal's ``implemented_by`` is + flagged as duplicate allocation. + + Definitions are deliberately conservative: only entities that are + declared in one place and consumed nowhere are reported, to avoid noisy + false positives. + """ + warnings: list[str] = [] + + yaml_files = [f for f in sorted(root.glob("**/*.yaml")) if ".github" not in str(f)] + docs = {f: _load_yaml(f) for f in yaml_files} + + # --- (1) Orphaned roles ------------------------------------------------- + org_path = root / "organization.yaml" + org_data = docs.get(org_path) + declared_roles = [] + if isinstance(org_data, dict) and isinstance(org_data.get("roles"), list): + declared_roles = [r for r in org_data["roles"] if isinstance(r, str)] + + if declared_roles: + # A role counts as "referenced" if its name appears anywhere inside a + # process spec — as a structured ``who``/``responsible`` value or in + # prose. We join all process strings and test by substring, which is + # conservative (a declared role mentioned anywhere is not flagged) and + # avoids false positives from singular/plural or compound usages. + process_text_parts: list[str] = [] + for filepath, data in docs.items(): + rel = str(filepath.relative_to(root)) + if not rel.startswith("processes/") or not isinstance(data, dict): + continue + process_text_parts.extend(_walk_strings(data)) + process_text = "\n".join(process_text_parts) + for role in declared_roles: + if role not in process_text: + warnings.append( + f"Waste: role '{role}' declared in organization.yaml " + f"is never referenced by any process spec" + ) + + # --- (2) Unconsumed products ------------------------------------------- + product_files = { + filepath: data + for filepath, data in docs.items() + if str(filepath.relative_to(root)).startswith("products/") + and isinstance(data, dict) + } + + # Collect all implemented_by references and per-signal duplicates (4). + implemented_paths: set[str] = set() + sig_req_path = root / "perception" / "signal_requirements.yaml" + sig_data = docs.get(sig_req_path) + if isinstance(sig_data, dict): + for req in sig_data.get("signal_requirements", []) or []: + impls = req.get("implemented_by") if isinstance(req, dict) else None + if not impls: + continue + implemented_paths.update(impls) + seen: set[str] = set() + for impl in impls: + if impl in seen: + warnings.append( + f"Waste: duplicate allocation — signal " + f"{req.get('id', '?')} lists '{impl}' more than once " + f"in implemented_by" + ) + seen.add(impl) + + # Any string anywhere outside the product file itself that mentions its + # path or id counts as consumption. + for filepath, data in product_files.items(): + rel = str(filepath.relative_to(root)) + product_id = data.get("id") if isinstance(data, dict) else None + + if rel in implemented_paths: + continue + + consumed = False + for other_path, other_data in docs.items(): + if other_path == filepath: + continue + for s in _walk_strings(other_data): + if rel in s or (product_id and product_id == s): + consumed = True + break + if consumed: + break + + if not consumed: + warnings.append( + f"Waste: product {rel} is unconsumed — no signal " + f"implemented_by and no other spec references it" + ) + + # --- (3) Empty contracts ------------------------------------------------ + contract_data = _load_experience_contract(root) + if contract_data: + declared = _collect_declared_contracts(contract_data) + for cid, contract in sorted(declared.items()): + requires = contract.get("requires") + if not requires: + warnings.append( + f"Waste: contract {cid} declares no obligations " + f"(empty or missing 'requires')" + ) + + return warnings + + def main(): if len(sys.argv) < 2: print("Usage: orgschema-validate ") @@ -283,6 +569,16 @@ def main(): else: print(" PASSED") + # Level 3: Contract satisfaction + print("Level 3: Contract satisfaction...") + contract_errors = validate_contract_satisfaction(root) + for err in contract_errors: + print(f" ERROR: {err}") + if contract_errors: + exit_code = 1 + else: + print(" PASSED") + # Level 4: Signal coverage print("Level 4: Signal coverage...") coverage_warnings = validate_signal_coverage(root) @@ -299,9 +595,17 @@ def main(): if not trace_warnings: print(" PASSED") + # Level 6: Waste detection + print("Level 6: Waste detection...") + waste_warnings = validate_waste(root) + for warn in waste_warnings: + print(f" WARNING: {warn}") + if not waste_warnings: + print(" PASSED") + # Summary - total_errors = len(schema_errors) + len(xref_errors) - total_warnings = len(coverage_warnings) + len(trace_warnings) + total_errors = len(schema_errors) + len(xref_errors) + len(contract_errors) + total_warnings = len(coverage_warnings) + len(trace_warnings) + len(waste_warnings) print(f"\nSummary: {total_errors} errors, {total_warnings} warnings") sys.exit(exit_code) diff --git a/pyproject.toml b/pyproject.toml index 7dcbefa..5fb7a62 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,6 +16,7 @@ dev = [ [project.scripts] orgschema-validate = "orgschema_framework.validate:main" +orgschema-query = "orgschema_framework.query:main" [dependency-groups] dev = [ @@ -26,6 +27,13 @@ dev = [ "types-pyyaml>=6.0.12.20250915", ] +[tool.pytest.ini_options] +# The project is consumed as a source tree (no build-system); add the repo +# root to sys.path so tests can import the orgschema_framework package the +# same way `python -m orgschema_framework.validate` resolves it. +pythonpath = ["."] +testpaths = ["tests"] + [tool.mypy] ignore_missing_imports = true disable_error_code = ["import-untyped"] diff --git a/tests/test_query_cli.py b/tests/test_query_cli.py new file mode 100644 index 0000000..d6cd1d6 --- /dev/null +++ b/tests/test_query_cli.py @@ -0,0 +1,108 @@ +"""Smoke tests for the deterministic ``orgschema-query`` CLI.""" + +import json +from pathlib import Path + +import yaml + +from orgschema_framework import query + + +def _write(path: Path, data: dict) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(yaml.safe_dump(data, sort_keys=False)) + + +def _clean_schema(root: Path) -> None: + _write( + root / "organization.yaml", + { + "name": "Test Co", + "type": "coffee_shop", + "version": "1.0.0", + "roles": ["barista"], + }, + ) + _write( + root / "perception" / "customer_experience_contract.yaml", + { + "version": "1.0.0", + "experience_contract": {}, + "constraint_contracts": { + "food_safety": { + "id": "L0_con_01", + "type": "constraint", + "requires": ["HACCP documented"], + "validated_by": "compliance/food_safety.yaml", + } + }, + "commitment_contracts": {}, + }, + ) + _write(root / "compliance" / "food_safety.yaml", {"version": "1.0.0"}) + _write( + root / "processes" / "open.yaml", + { + "version": "1.0.0", + "satisfies_constraint": ["L0_con_01"], + "who": "barista", + }, + ) + + +def test_cli_runs_all_checks_clean(tmp_path, capsys): + _clean_schema(tmp_path) + rc = query.main(["--schema", str(tmp_path)]) + out = capsys.readouterr().out + assert rc == 0 + assert "Level 3" in out + assert "contracts" in out + assert "Level 6" in out + assert "waste" in out + + +def test_cli_json_format_is_valid(tmp_path, capsys): + _clean_schema(tmp_path) + rc = query.main(["--schema", str(tmp_path), "--format", "json"]) + out = capsys.readouterr().out + assert rc == 0 + payload = json.loads(out) + assert payload["ok"] is True + assert payload["error_count"] == 0 + assert {c["check"] for c in payload["checks"]} == {"contracts", "waste"} + + +def test_cli_check_contracts_only(tmp_path, capsys): + _clean_schema(tmp_path) + rc = query.main( + ["--schema", str(tmp_path), "--check", "contracts", "--format", "json"] + ) + out = capsys.readouterr().out + assert rc == 0 + payload = json.loads(out) + assert {c["check"] for c in payload["checks"]} == {"contracts"} + + +def test_cli_level_flag_selects_waste(tmp_path, capsys): + _clean_schema(tmp_path) + rc = query.main(["--schema", str(tmp_path), "--level", "6", "--format", "json"]) + out = capsys.readouterr().out + assert rc == 0 + payload = json.loads(out) + assert {c["check"] for c in payload["checks"]} == {"waste"} + + +def test_cli_nonzero_exit_on_contract_violation(tmp_path, capsys): + _clean_schema(tmp_path) + # Break the contract: delete provider and the referencing process. + (tmp_path / "compliance" / "food_safety.yaml").unlink() + (tmp_path / "processes" / "open.yaml").unlink() + rc = query.main(["--schema", str(tmp_path), "--check", "contracts"]) + out = capsys.readouterr().out + assert rc == 1 + assert "L0_con_01" in out + + +def test_cli_missing_dir_returns_2(tmp_path, capsys): + rc = query.main(["--schema", str(tmp_path / "nope")]) + assert rc == 2 diff --git a/tests/test_validate_l3_l6.py b/tests/test_validate_l3_l6.py new file mode 100644 index 0000000..8d9145d --- /dev/null +++ b/tests/test_validate_l3_l6.py @@ -0,0 +1,225 @@ +"""Tests for Level 3 (contract satisfaction) and Level 6 (waste detection). + +Fixtures are built on disk under ``tmp_path`` to mirror the orgschema-demo +layout (perception/, processes/, products/, organization.yaml) so the level +functions run against the same file shapes they see in production. +""" + +from pathlib import Path + +import yaml + +from orgschema_framework.validate import ( + validate_contract_satisfaction, + validate_waste, +) + + +def _write(path: Path, data: dict) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(yaml.safe_dump(data, sort_keys=False)) + + +def _experience_contract(constraints, commitments) -> dict: + return { + "version": "1.0.0", + "experience_contract": {}, + "constraint_contracts": constraints, + "commitment_contracts": commitments, + } + + +def _good_schema(root: Path) -> None: + """Build a minimal schema where all contracts are satisfied, no waste.""" + _write( + root / "organization.yaml", + { + "name": "Test Co", + "type": "coffee_shop", + "version": "1.0.0", + "roles": ["barista", "shift_lead"], + }, + ) + _write( + root / "perception" / "customer_experience_contract.yaml", + _experience_contract( + constraints={ + "food_safety": { + "id": "L0_con_01", + "type": "constraint", + "requires": ["HACCP plan documented"], + "validated_by": "compliance/food_safety.yaml", + }, + }, + commitments={ + "transparency": { + "id": "L0_com_01", + "type": "commitment", + "requires": ["Pricing visible"], + # No validated_by — must be covered by a referencing spec. + }, + }, + ), + ) + _write( + root / "perception" / "signal_requirements.yaml", + { + "version": "1.0.0", + "signal_requirements": [ + { + "id": "L1_exp_01", + "implemented_by": ["products/espresso.yaml"], + } + ], + }, + ) + _write(root / "compliance" / "food_safety.yaml", {"version": "1.0.0"}) + _write( + root / "products" / "espresso.yaml", + { + "version": "1.0.0", + "id": "espresso", + "name": "Espresso", + "category": "hot_beverage", + "available": True, + "satisfies_commitment": ["L0_com_01"], + }, + ) + _write( + root / "processes" / "opening_closing.yaml", + { + "version": "1.0.0", + "satisfies_constraint": ["L0_con_01"], + "opening": {"who": "shift_lead", "lead": "barista"}, + }, + ) + + +# -------------------------------------------------------------------------- +# Level 3: contract satisfaction +# -------------------------------------------------------------------------- + + +def test_l3_positive_all_contracts_satisfied(tmp_path): + _good_schema(tmp_path) + errors = validate_contract_satisfaction(tmp_path) + assert errors == [] + + +def test_l3_negative_missing_provider(tmp_path): + _good_schema(tmp_path) + # Remove the provider file the constraint's validated_by points at, and + # remove the spec that references it, so the contract becomes unmet. + (tmp_path / "compliance" / "food_safety.yaml").unlink() + (tmp_path / "processes" / "opening_closing.yaml").unlink() + errors = validate_contract_satisfaction(tmp_path) + assert any("L0_con_01" in e for e in errors) + + +def test_l3_negative_dangling_reference(tmp_path): + _good_schema(tmp_path) + # Reference a contract id that is not declared anywhere. + _write( + tmp_path / "products" / "ghost.yaml", + { + "version": "1.0.0", + "id": "ghost", + "name": "Ghost", + "category": "hot_beverage", + "available": True, + "satisfies_constraint": ["L0_con_99"], + }, + ) + errors = validate_contract_satisfaction(tmp_path) + assert any("L0_con_99" in e for e in errors) + + +def test_l3_negative_type_mismatch(tmp_path): + _good_schema(tmp_path) + # Reference a commitment id under satisfies_constraint (wrong key). + _write( + tmp_path / "products" / "espresso.yaml", + { + "version": "1.0.0", + "id": "espresso", + "name": "Espresso", + "category": "hot_beverage", + "available": True, + "satisfies_constraint": ["L0_com_01"], + "satisfies_commitment": ["L0_com_01"], + }, + ) + errors = validate_contract_satisfaction(tmp_path) + assert any("mismatch" in e.lower() and "L0_com_01" in e for e in errors) + + +def test_l3_no_contract_file_is_silent(tmp_path): + # An empty directory has nothing to satisfy. + errors = validate_contract_satisfaction(tmp_path) + assert errors == [] + + +# -------------------------------------------------------------------------- +# Level 6: waste detection +# -------------------------------------------------------------------------- + + +def test_l6_positive_no_waste(tmp_path): + _good_schema(tmp_path) + warnings = validate_waste(tmp_path) + assert warnings == [] + + +def test_l6_negative_orphaned_role(tmp_path): + _good_schema(tmp_path) + org_path = tmp_path / "organization.yaml" + org = yaml.safe_load(org_path.read_text()) + org["roles"].append("ghost_role") + _write(org_path, org) + warnings = validate_waste(tmp_path) + assert any("ghost_role" in w for w in warnings) + + +def test_l6_negative_unconsumed_product(tmp_path): + _good_schema(tmp_path) + # A product no signal implements and nothing references. + _write( + tmp_path / "products" / "orphan.yaml", + { + "version": "1.0.0", + "id": "orphan", + "name": "Orphan", + "category": "hot_beverage", + "available": True, + }, + ) + warnings = validate_waste(tmp_path) + assert any("orphan" in w for w in warnings) + + +def test_l6_negative_empty_contract(tmp_path): + _good_schema(tmp_path) + # Add a commitment contract with no obligations. + contract_path = tmp_path / "perception" / "customer_experience_contract.yaml" + data = yaml.safe_load(contract_path.read_text()) + data["commitment_contracts"]["empty_one"] = { + "id": "L0_com_09", + "type": "commitment", + "requires": [], + } + _write(contract_path, data) + warnings = validate_waste(tmp_path) + assert any("L0_com_09" in w for w in warnings) + + +def test_l6_negative_duplicate_allocation(tmp_path): + _good_schema(tmp_path) + sig_path = tmp_path / "perception" / "signal_requirements.yaml" + data = yaml.safe_load(sig_path.read_text()) + data["signal_requirements"][0]["implemented_by"] = [ + "products/espresso.yaml", + "products/espresso.yaml", + ] + _write(sig_path, data) + warnings = validate_waste(tmp_path) + assert any("duplicate" in w.lower() for w in warnings)