diff --git a/refactron/cli/main.py b/refactron/cli/main.py index 93a777a..d3f516f 100644 --- a/refactron/cli/main.py +++ b/refactron/cli/main.py @@ -89,8 +89,10 @@ def main(ctx: click.Context) -> None: # Register subcommands -# We import them here to ensure they are registered with the main group -# Using a try-except block to allow partial loading during refactoring +# We import them here to ensure they are registered with the main group. +# Each block is isolated so a single broken subcommand — or a broken optional +# dependency it pulls in (e.g. a failed native-library load raising OSError, +# not ImportError) — degrades gracefully instead of taking down the whole CLI. try: from refactron.cli.auth import auth, login, logout, telemetry @@ -98,7 +100,7 @@ def main(ctx: click.Context) -> None: main.add_command(logout) main.add_command(auth) main.add_command(telemetry) -except ImportError: +except Exception: pass try: @@ -111,7 +113,7 @@ def main(ctx: click.Context) -> None: main.add_command(metrics) main.add_command(serve_metrics) main.add_command(suggest) -except ImportError: +except Exception: pass try: @@ -121,28 +123,35 @@ def main(ctx: click.Context) -> None: main.add_command(autofix) main.add_command(rollback) main.add_command(document) -except ImportError: +except Exception: + pass + +try: + from refactron.cli.verify import verify + + main.add_command(verify) +except Exception: pass try: from refactron.cli.patterns import patterns main.add_command(patterns) -except ImportError: +except Exception: pass try: from refactron.cli.repo import repo main.add_command(repo) -except ImportError: +except Exception: pass try: from refactron.cli.rag import rag main.add_command(rag) -except ImportError: +except Exception: pass try: @@ -151,5 +160,5 @@ def main(ctx: click.Context) -> None: main.add_command(generate_cicd) main.add_command(feedback) main.add_command(init) -except ImportError: +except Exception: pass diff --git a/refactron/cli/verify.py b/refactron/cli/verify.py index c173eb3..53ab53c 100644 --- a/refactron/cli/verify.py +++ b/refactron/cli/verify.py @@ -1,3 +1,89 @@ +""" +Refactron CLI - Verification Module. +Command to run the Verification Engine on a code change and emit either a +human-readable report or a stable, machine-readable JSON object for CI gates. +""" + +from pathlib import Path +from typing import Optional + +import click + +from refactron.cli.ui import _auth_banner, console +from refactron.verification.engine import VerificationEngine +from refactron.verification.report import ( + format_verification_result, + format_verification_result_json, +) + + +@click.command() +@click.argument("target", type=click.Path(exists=True, dir_okay=False)) +@click.option( + "--against", + "-a", + "candidate", + type=click.Path(exists=True, dir_okay=False), + default=None, + help=( + "Path to the proposed/modified version of TARGET. " + "If omitted, TARGET is verified against itself." + ), +) +@click.option( + "--project-root", + type=click.Path(exists=True, file_okay=False), + default=None, + help="Project root used by the test-suite gate. Defaults to the current directory.", +) +@click.option( + "--all-checks", + is_flag=True, + default=False, + help=( + "Run every check even after one fails (no short-circuit), so all " + "failure categories surface in a single run." + ), +) +@click.option( + "--json", + "as_json", + is_flag=True, + default=False, + help="Emit a stable, machine-readable JSON report instead of formatted text.", +) +def verify( + target: str, + candidate: Optional[str], + project_root: Optional[str], + all_checks: bool, + as_json: bool, +) -> None: + """ + Verify that a code change is safe to apply. + + TARGET is the file as it currently lives in the project. With --against, + the proposed new content is checked against it (syntax, import integrity, + test suite). Exit code is 0 when safe to apply, 1 when blocked — and with + --json a versioned JSON object is printed for CI dashboards and bots. + """ + target_path = Path(target) + original = target_path.read_text(encoding="utf-8") + transformed = Path(candidate).read_text(encoding="utf-8") if candidate else original + + root = Path(project_root) if project_root else Path.cwd() + engine = VerificationEngine(project_root=root) + result = engine.verify( + original, transformed, target_path, short_circuit=not all_checks + ) + + if as_json: + # JSON mode prints only the JSON object so consumers can parse stdout. + click.echo(format_verification_result_json(result)) + else: + console.print() + _auth_banner("Verification") + format_verification_result(result, console) """refactron verify --against — standalone verification command.""" from pathlib import Path diff --git a/refactron/verification/checks/imports.py b/refactron/verification/checks/imports.py index e5eba8a..f795b65 100644 --- a/refactron/verification/checks/imports.py +++ b/refactron/verification/checks/imports.py @@ -4,17 +4,59 @@ import importlib.util import time from pathlib import Path -from typing import Any, Dict, Set +from typing import Any, Dict, List, Optional, Set from refactron.verification.engine import BaseCheck from refactron.verification.result import CheckResult +# Guardrails for project-wide import-graph construction (Step 4: cycle +# detection). The graph is intentionally bounded — cycle detection is a +# best-effort confidence boost, never a correctness guarantee, so when a +# limit is hit we skip the check rather than block a transform. +_MAX_GRAPH_FILES = 2000 +_GRAPH_TIME_BUDGET_S = 5.0 +_SKIP_DIRS = { + ".git", + ".hg", + ".svn", + ".tox", + ".venv", + "venv", + "env", + "__pycache__", + "build", + "dist", + "node_modules", + ".mypy_cache", + ".pytest_cache", +} + class ImportIntegrityVerifier(BaseCheck): - """Validates import integrity after a transform.""" + """Validates import integrity after a transform. + + Steps 1-3 (removed-but-used imports, new-import resolvability) work on the + single file. Step 4 (cycle detection) builds a bounded project-wide import + graph and reports cycles *introduced* by the transform. + + Complexity limits for Step 4: + - At most ``_MAX_GRAPH_FILES`` (2000) files are scanned. + - Graph construction is abandoned after ``_GRAPH_TIME_BUDGET_S`` (5s). + - The graph is cached for the lifetime of this verifier instance. + - Only intra-project (first-party) imports become edges; third-party + and stdlib imports are ignored. + When any limit is hit, or no ``project_root`` was supplied, cycle + detection is silently skipped — never blocking. + """ name = "import_integrity" + def __init__(self, project_root: Optional[Path] = None) -> None: + self.project_root = project_root + # Base import graph (module -> set of imported project modules), + # cached per resolved project root for the verifier's lifetime. + self._graph_cache: Dict[str, Optional[Dict[str, Set[str]]]] = {} + def verify(self, original: str, transformed: str, file_path: Path) -> CheckResult: start = time.monotonic() details: Dict[str, Any] = {} @@ -36,8 +78,6 @@ def verify(self, original: str, transformed: str, file_path: Path) -> CheckResul ) # Step 3: new imports resolvable - # TODO: Step 4 (cycle detection) deferred — requires project-wide import graph - # which is expensive for MVP. Will add in Phase 3 if needed. added = trans_imports - orig_imports if added: orig_modules = self._extract_module_names(original) @@ -53,6 +93,16 @@ def verify(self, original: str, transformed: str, file_path: Path) -> CheckResul details, ) + # Step 4: cycle detection — report cycles introduced by the transform. + cycle = self._detect_new_cycle(original, transformed, file_path, details) + if cycle is not None: + details["import_cycle"] = cycle + return self._fail( + "Transform introduces an import cycle: " + " -> ".join(cycle), + start, + details, + ) + elapsed = int((time.monotonic() - start) * 1000) return CheckResult( check_name=self.name, @@ -74,6 +124,199 @@ def _fail(self, reason: str, start: float, details: Dict[str, Any]) -> CheckResu details=details, ) + # ----- Step 4: import-cycle detection --------------------------------- + + def _detect_new_cycle( + self, + original: str, + transformed: str, + file_path: Path, + details: Dict[str, Any], + ) -> Optional[List[str]]: + """Return a cycle path the transform introduces, or None. + + A cycle is reported only when it runs through the changed module and + did *not* exist before the transform — pre-existing cycles are left + for a dedicated lint pass, not this verifier. + """ + if self.project_root is None: + details["cycle_check"] = "skipped: no project_root" + return None + + root = self.project_root.resolve() + changed_mod = self._module_name(file_path, root) + if changed_mod is None: + details["cycle_check"] = "skipped: file outside project_root" + return None + + base_graph = self._build_base_graph(root) + if base_graph is None: + details["cycle_check"] = "skipped: graph budget exceeded" + return None + + is_pkg = Path(file_path).name == "__init__.py" + orig_edges = self._module_edges(original, changed_mod, is_pkg, root) + trans_edges = self._module_edges(transformed, changed_mod, is_pkg, root) + if orig_edges == trans_edges: + details["cycle_check"] = "ran: no project import edges changed" + return None + + graph_trans = dict(base_graph) + graph_trans[changed_mod] = trans_edges + cyc_trans = self._find_cycle_through(graph_trans, changed_mod) + if cyc_trans is None: + details["cycle_check"] = "ran: no cycle" + return None + + graph_orig = dict(base_graph) + graph_orig[changed_mod] = orig_edges + if self._find_cycle_through(graph_orig, changed_mod) is not None: + # Cycle already existed; the transform did not introduce it. + details["cycle_check"] = "ran: pre-existing cycle (not introduced)" + return None + + details["cycle_check"] = "ran: new cycle detected" + return cyc_trans + + def _build_base_graph(self, root: Path) -> Optional[Dict[str, Set[str]]]: + """Build (and cache) the project's module import graph. + + Returns None if a guardrail (file count / time budget) is hit. + """ + key = str(root) + if key in self._graph_cache: + return self._graph_cache[key] + + graph: Dict[str, Set[str]] = {} + deadline = time.monotonic() + _GRAPH_TIME_BUDGET_S + scanned = 0 + for path in root.rglob("*.py"): + if any(part in _SKIP_DIRS for part in path.relative_to(root).parts): + continue + scanned += 1 + if scanned > _MAX_GRAPH_FILES or time.monotonic() > deadline: + self._graph_cache[key] = None + return None + module_name = self._module_name(path, root) + if module_name is None: + continue + try: + code = path.read_text(encoding="utf-8") + except (OSError, UnicodeDecodeError): + continue + is_pkg = path.name == "__init__.py" + graph[module_name] = self._module_edges(code, module_name, is_pkg, root) + + self._graph_cache[key] = graph + return graph + + @staticmethod + def _module_name(file_path: Path, root: Path) -> Optional[str]: + """Map a file path to its dotted module name relative to ``root``.""" + try: + rel = Path(file_path).resolve().relative_to(root) + except ValueError: + return None + parts = list(rel.with_suffix("").parts) + if parts and parts[-1] == "__init__": + parts = parts[:-1] + return ".".join(parts) if parts else None + + @classmethod + def _module_edges( + cls, code: str, module_name: str, is_pkg: bool, root: Path + ) -> Set[str]: + """Intra-project modules imported by ``code`` (graph out-edges).""" + edges: Set[str] = set() + try: + tree = ast.parse(code) + except SyntaxError: + return edges + + # The package a relative import resolves against. + if is_pkg: + package = module_name + elif "." in module_name: + package = module_name.rsplit(".", 1)[0] + else: + package = "" + + for node in ast.walk(tree): + if isinstance(node, ast.Import): + for alias in node.names: + target = cls._resolve(alias.name, root) + if target and target != module_name: + edges.add(target) + elif isinstance(node, ast.ImportFrom): + base = cls._relative_base(node.level, package) + if node.level and node.module: + base = f"{base}.{node.module}" if base else node.module + elif node.level == 0: + base = node.module or "" + if base: + resolved = cls._resolve(base, root) + if resolved and resolved != module_name: + edges.add(resolved) + # `from pkg import sub` may import a submodule, not a name. + for alias in node.names: + candidate = f"{base}.{alias.name}" if base else alias.name + resolved = cls._resolve(candidate, root) + if resolved and resolved != module_name: + edges.add(resolved) + return edges + + @staticmethod + def _relative_base(level: int, package: str) -> str: + """Resolve the package a relative import (``level`` dots) points at.""" + if level <= 0: + return package + parts = package.split(".") if package else [] + # level 1 == current package, level 2 == its parent, ... + keep = len(parts) - (level - 1) + if keep < 0: + return "" + return ".".join(parts[:keep]) + + @staticmethod + def _resolve(dotted: str, root: Path) -> Optional[str]: + """Return ``dotted`` if it names a file/package under ``root``.""" + if not dotted: + return None + parts = dotted.split(".") + as_module = root.joinpath(*parts).with_suffix(".py") + as_package = root.joinpath(*parts, "__init__.py") + if as_module.is_file() or as_package.is_file(): + return dotted + return None + + @staticmethod + def _find_cycle_through( + graph: Dict[str, Set[str]], start: str + ) -> Optional[List[str]]: + """Return a cycle ``start -> ... -> start``, or None. + + A cycle exists iff ``start`` is reachable from one of its own + successors. Uses bounded BFS per successor — O(V+E) each. + """ + for first in sorted(graph.get(start, set())): + parent: Dict[str, Optional[str]] = {first: None} + queue: List[str] = [first] + while queue: + node = queue.pop(0) + if node == start: + chain: List[str] = [] + cur: Optional[str] = node + while cur is not None: + chain.append(cur) + cur = parent[cur] + chain.reverse() + return [start] + chain + for nxt in sorted(graph.get(node, set())): + if nxt not in parent: + parent[nxt] = node + queue.append(nxt) + return None + @staticmethod def _extract_import_names(code: str) -> Set[str]: """Extract all locally-bound names from import statements.""" diff --git a/refactron/verification/checks/test_gate.py b/refactron/verification/checks/test_gate.py index 3f8327f..6186b05 100644 --- a/refactron/verification/checks/test_gate.py +++ b/refactron/verification/checks/test_gate.py @@ -4,6 +4,7 @@ import os import shutil import subprocess +import sys import time from pathlib import Path from typing import Any, Dict, List, Optional @@ -20,6 +21,8 @@ class TestSuiteGate(BaseCheck): def __init__(self, project_root: Optional[Path] = None): self.project_root = project_root + self._test_file_cache: Optional[Dict[str, List[Path]]] = None + self._all_test_files: Optional[List[Path]] = None def verify(self, original: str, transformed: str, file_path: Path) -> CheckResult: start = time.monotonic() @@ -54,16 +57,30 @@ def verify(self, original: str, transformed: str, file_path: Path) -> CheckResul # Delete .pyc cache self._clear_pycache(file_path) - # Run pytest - cmd = ["python3", "-m", "pytest", "-x", "-q"] + # Run pytest from the project root with the host interpreter so + # the repo's pyproject.toml / pytest.ini / conftest.py are picked + # up and the same venv as the host process is used. + run_cwd = (self.project_root or file_path.parent).resolve() + + # Make the project root importable for edge cases where the layout + # relies on PYTHONPATH rather than an installed package. + env = {**os.environ, "PYTHONDONTWRITEBYTECODE": "1"} + existing_pythonpath = env.get("PYTHONPATH", "") + env["PYTHONPATH"] = ( + os.pathsep.join([str(run_cwd), existing_pythonpath]) + if existing_pythonpath + else str(run_cwd) + ) + + cmd = [sys.executable, "-m", "pytest", "-x", "-q"] cmd += [str(f) for f in test_files] result = subprocess.run( cmd, timeout=45, capture_output=True, text=True, - env={**os.environ, "PYTHONDONTWRITEBYTECODE": "1"}, - cwd=str(file_path.parent), + env=env, + cwd=str(run_cwd), ) elapsed = int((time.monotonic() - start) * 1000) @@ -110,11 +127,27 @@ def _find_relevant_tests(self, file_path: Path) -> List[Path]: module_name = file_path.stem search_root = self.project_root or file_path.parent + if self._test_file_cache is None: + self._test_file_cache = {} + self._all_test_files = [] + + test_dirs = [d for d in [search_root / "tests", search_root / "test"] if d.is_dir()] + search_dirs = test_dirs if test_dirs else [search_root] + excluded_dirs = {".git", ".rag", "__pycache__", "venv", ".venv", "env", "node_modules"} + + for root_dir in search_dirs: + for py_file in root_dir.rglob("*.py"): + if any(excluded in py_file.parts for excluded in excluded_dirs): + continue + name = py_file.name + if name.startswith("test_") or name.endswith("_test.py"): + self._all_test_files.append(py_file) + + if module_name in self._test_file_cache: + return self._test_file_cache[module_name] + test_files: List[Path] = [] - for py_file in search_root.rglob("*.py"): - name = py_file.name - if not (name.startswith("test_") or name.endswith("_test.py")): - continue + for py_file in self._all_test_files: # type: ignore if py_file == file_path: continue try: @@ -123,6 +156,8 @@ def _find_relevant_tests(self, file_path: Path) -> List[Path]: test_files.append(py_file) except Exception: continue + + self._test_file_cache[module_name] = test_files return test_files @staticmethod diff --git a/refactron/verification/engine.py b/refactron/verification/engine.py index 3e3771a..0030af2 100644 --- a/refactron/verification/engine.py +++ b/refactron/verification/engine.py @@ -21,7 +21,7 @@ def verify(self, original: str, transformed: str, file_path: Path) -> CheckResul class VerificationEngine: - """Orchestrates verification checks in a short-circuit pipeline.""" + """Orchestrates verification checks, short-circuiting by default.""" def __init__( self, @@ -39,12 +39,29 @@ def __init__( self.checks = [ SyntaxVerifier(), - ImportIntegrityVerifier(), + ImportIntegrityVerifier(project_root=project_root), TestSuiteGate(project_root=project_root), ] - def verify(self, original: str, transformed: str, file_path: Path) -> VerificationResult: - """Run all checks in order, short-circuiting on first failure.""" + def verify( + self, + original: str, + transformed: str, + file_path: Path, + short_circuit: bool = True, + ) -> VerificationResult: + """Run the verification checks in order and aggregate the results. + + Args: + original: Source code before the transform. + transformed: Source code after the transform. + file_path: Path of the file within the project. + short_circuit: When True (default), stop after the first failing + check and record the rest in ``skipped_checks`` — fastest, but + hides stacked problems. When False, run every check so the + caller sees all failure categories in a single run; nothing is + skipped. ``blocking_reason`` is always the first failure. + """ start = time.monotonic() check_results: List[CheckResult] = [] checks_run: List[str] = [] @@ -77,12 +94,14 @@ def verify(self, original: str, transformed: str, file_path: Path) -> Verificati checks_failed.append(check.name) if blocking_reason is None: blocking_reason = cr.blocking_reason - # Short-circuit: skip remaining checks - for remaining in self.checks[i + 1 :]: - skipped_checks.append( - (remaining.name, f"Short-circuited after {check.name} failed") - ) - break + if short_circuit: + # Skip remaining checks and stop the pipeline. + for remaining in self.checks[i + 1 :]: + skipped_checks.append( + (remaining.name, f"Short-circuited after {check.name} failed") + ) + break + # short_circuit=False: keep going so every failure surfaces. elapsed_ms = int((time.monotonic() - start) * 1000) confidence = self._compute_confidence(check_results) diff --git a/refactron/verification/report.py b/refactron/verification/report.py index 5ae0807..e245597 100644 --- a/refactron/verification/report.py +++ b/refactron/verification/report.py @@ -1,4 +1,6 @@ -"""Rich CLI output formatting for VerificationResult.""" +"""CLI output formatting for VerificationResult (Rich and JSON).""" + +import json from rich.console import Console @@ -29,3 +31,13 @@ def format_verification_result(result: VerificationResult, console: Console) -> for name, reason in result.skipped_checks: console.print(f" [dim]- {name}: {reason}[/dim]") console.print(f"\n [bold red]Blocked.[/bold red] {result.blocking_reason}") + + +def format_verification_result_json(result: VerificationResult, indent: int = 2) -> str: + """Render a VerificationResult as a stable, machine-readable JSON string. + + Intended for CI gates, bots, and parent tools that need structured data + rather than scraping terminal text. The schema is versioned — see + ``VerificationResult.to_json_dict``. + """ + return json.dumps(result.to_json_dict(), indent=indent, sort_keys=True) diff --git a/refactron/verification/result.py b/refactron/verification/result.py index c37b7ea..fe5aef5 100644 --- a/refactron/verification/result.py +++ b/refactron/verification/result.py @@ -6,6 +6,10 @@ from dataclasses import dataclass from typing import Any, Dict, List, Optional, Tuple +# Bumped whenever the JSON shape produced by ``to_json_dict`` changes in a +# way consumers must adapt to. Additive fields do not require a bump. +JSON_SCHEMA_VERSION = 1 + @dataclass(frozen=True) class CheckResult: @@ -18,6 +22,17 @@ class CheckResult: duration_ms: int details: Dict[str, Any] + def to_json_dict(self) -> Dict[str, Any]: + """Return a JSON-serializable dict of this check's result.""" + return { + "check_name": self.check_name, + "passed": self.passed, + "blocking_reason": self.blocking_reason, + "confidence": self.confidence, + "duration_ms": self.duration_ms, + "details": self.details, + } + @dataclass(frozen=True) class VerificationResult: @@ -39,3 +54,28 @@ class VerificationResult: confidence_score: float verification_ms: int check_results: List[CheckResult] + + def to_json_dict(self) -> Dict[str, Any]: + """Return a stable, JSON-serializable dict of the full result. + + The shape is versioned via ``schema_version`` so machine consumers + (CI gates, bots, parent tools) can evolve safely. Adding new keys is + backwards-compatible; removing or renaming keys bumps the version. + """ + return { + "schema_version": JSON_SCHEMA_VERSION, + "status": "safe" if self.safe_to_apply else "blocked", + "safe_to_apply": self.safe_to_apply, + "passed": self.passed, + "blocking_reason": self.blocking_reason, + "confidence_score": self.confidence_score, + "verification_ms": self.verification_ms, + "checks_run": list(self.checks_run), + "checks_passed": list(self.checks_passed), + "checks_failed": list(self.checks_failed), + "skipped_checks": [ + {"check_name": name, "reason": reason} + for name, reason in self.skipped_checks + ], + "checks": [cr.to_json_dict() for cr in self.check_results], + } diff --git a/tests/test_import_verifier.py b/tests/test_import_verifier.py index be173f3..ca90950 100644 --- a/tests/test_import_verifier.py +++ b/tests/test_import_verifier.py @@ -71,3 +71,68 @@ def test_duration_populated(self, verifier): code = "x = 1\n" cr = verifier.verify(code, code, Path("/tmp/t.py")) assert cr.duration_ms >= 0 + + +def _make_pkg(root: Path) -> Path: + """Create a minimal `pkg` package: a.py (no imports) + b.py -> a.""" + pkg = root / "pkg" + pkg.mkdir() + (pkg / "__init__.py").write_text("", encoding="utf-8") + (pkg / "a.py").write_text("VALUE = 1\n", encoding="utf-8") + (pkg / "b.py").write_text("from . import a\n\nprint(a.VALUE)\n", encoding="utf-8") + return pkg + + +class TestImportCycleDetection: + """Step 4 — project-wide import-cycle detection.""" + + def test_transform_introducing_cycle_blocks(self, tmp_path): + pkg = _make_pkg(tmp_path) + verifier = ImportIntegrityVerifier(project_root=tmp_path) + original = (pkg / "a.py").read_text(encoding="utf-8") + # a.py now imports b, while b.py already imports a -> cycle. + transformed = "from . import b\n\nVALUE = b\n" + cr = verifier.verify(original, transformed, pkg / "a.py") + assert cr.passed is False + assert "cycle" in cr.blocking_reason.lower() + assert "pkg.a" in cr.blocking_reason and "pkg.b" in cr.blocking_reason + assert cr.details["import_cycle"][0] == "pkg.a" + assert cr.details["import_cycle"][-1] == "pkg.a" + + def test_transform_without_cycle_passes(self, tmp_path): + pkg = _make_pkg(tmp_path) + (pkg / "c.py").write_text("C = 1\n", encoding="utf-8") + verifier = ImportIntegrityVerifier(project_root=tmp_path) + original = (pkg / "a.py").read_text(encoding="utf-8") + transformed = "from . import c\n\nVALUE = c.C\n" + cr = verifier.verify(original, transformed, pkg / "a.py") + assert cr.passed is True + assert cr.details["cycle_check"] == "ran: no cycle" + + def test_preexisting_cycle_not_blocked(self, tmp_path): + pkg = tmp_path / "pkg" + pkg.mkdir() + (pkg / "__init__.py").write_text("", encoding="utf-8") + # a <-> b are already cyclic on disk. + (pkg / "a.py").write_text("from . import b\n", encoding="utf-8") + (pkg / "b.py").write_text("from . import a\n", encoding="utf-8") + (pkg / "c.py").write_text("C = 1\n", encoding="utf-8") + verifier = ImportIntegrityVerifier(project_root=tmp_path) + original = (pkg / "a.py").read_text(encoding="utf-8") + transformed = "from . import b\nfrom . import c\n" + cr = verifier.verify(original, transformed, pkg / "a.py") + assert cr.passed is True + assert cr.details["cycle_check"].startswith("ran: pre-existing") + + def test_cycle_check_skipped_without_project_root(self): + verifier = ImportIntegrityVerifier() + cr = verifier.verify("VALUE = 1\n", "from . import b\n", Path("/tmp/x.py")) + assert cr.passed is True + assert cr.details["cycle_check"] == "skipped: no project_root" + + def test_file_outside_project_root_skips_cycle_check(self, tmp_path): + _make_pkg(tmp_path) + verifier = ImportIntegrityVerifier(project_root=tmp_path) + cr = verifier.verify("VALUE = 1\n", "from . import b\n", Path("/elsewhere/x.py")) + assert cr.passed is True + assert cr.details["cycle_check"] == "skipped: file outside project_root" diff --git a/tests/test_test_gate.py b/tests/test_test_gate.py index defd8d3..fa0f09a 100644 --- a/tests/test_test_gate.py +++ b/tests/test_test_gate.py @@ -1,9 +1,12 @@ """Unit tests for TestSuiteGate (Check 3).""" +import os +import sys from pathlib import Path import pytest +from refactron.verification.checks import test_gate as test_gate_module from refactron.verification.checks.test_gate import TestSuiteGate FIXTURES_DIR = Path(__file__).parent / "fixtures" @@ -71,3 +74,61 @@ def test_confidence_is_0_9(self, gate): original = file_path.read_text(encoding="utf-8") cr = gate.verify(original, original, file_path) assert cr.confidence == 0.9 + + +class TestPytestInvocation: + """The pytest subprocess must use the host interpreter and project root.""" + + def _capture_run(self, monkeypatch): + """Patch subprocess.run to capture its arguments without running pytest.""" + captured = {} + + def fake_run(cmd, **kwargs): + captured["cmd"] = cmd + captured["cwd"] = kwargs.get("cwd") + captured["env"] = kwargs.get("env") + + class _Completed: + returncode = 0 + stdout = "" + stderr = "" + + return _Completed() + + monkeypatch.setattr(test_gate_module.subprocess, "run", fake_run) + return captured + + def test_uses_host_interpreter(self, monkeypatch): + """pytest must run via sys.executable, not a hard-coded python3.""" + captured = self._capture_run(monkeypatch) + gate = TestSuiteGate(project_root=FIXTURES_DIR) + file_path = FIXTURES_DIR / "fixture_test_break.py" + original = file_path.read_text(encoding="utf-8") + + gate.verify(original, original, file_path) + + assert captured["cmd"][0] == sys.executable + assert captured["cmd"][1:4] == ["-m", "pytest", "-x"] + + def test_cwd_is_resolved_project_root(self, monkeypatch): + """pytest must run from the resolved project root, not file_path.parent.""" + captured = self._capture_run(monkeypatch) + gate = TestSuiteGate(project_root=FIXTURES_DIR) + file_path = FIXTURES_DIR / "fixture_test_break.py" + original = file_path.read_text(encoding="utf-8") + + gate.verify(original, original, file_path) + + assert captured["cwd"] == str(FIXTURES_DIR.resolve()) + + def test_project_root_added_to_pythonpath(self, monkeypatch): + """The project root must be present on PYTHONPATH for the subprocess.""" + captured = self._capture_run(monkeypatch) + gate = TestSuiteGate(project_root=FIXTURES_DIR) + file_path = FIXTURES_DIR / "fixture_test_break.py" + original = file_path.read_text(encoding="utf-8") + + gate.verify(original, original, file_path) + + pythonpath = captured["env"]["PYTHONPATH"].split(os.pathsep) + assert str(FIXTURES_DIR.resolve()) in pythonpath diff --git a/tests/test_verification_engine.py b/tests/test_verification_engine.py index 4caba8f..643e2c4 100644 --- a/tests/test_verification_engine.py +++ b/tests/test_verification_engine.py @@ -71,6 +71,28 @@ def test_first_fail_short_circuits(self): assert tracker.called is False # short-circuited assert ("tracker", "Short-circuited after always_fail failed") in result.skipped_checks + def test_all_checks_runs_every_check_after_failure(self): + tracker = _TrackingCheck() + engine = VerificationEngine(checks=[_FailingCheck(), tracker]) + result = engine.verify( + "a = 1", "a = 1", Path("/tmp/test.py"), short_circuit=False + ) + assert result.safe_to_apply is False + assert tracker.called is True # ran despite the earlier failure + assert result.checks_run == ["always_fail", "tracker"] + assert result.checks_failed == ["always_fail"] + assert result.checks_passed == ["tracker"] + assert result.skipped_checks == [] # nothing skipped when not short-circuiting + + def test_all_checks_aggregates_multiple_failures(self): + engine = VerificationEngine(checks=[_FailingCheck(), _PassingCheck(), _FailingCheck()]) + result = engine.verify( + "a = 1", "a = 1", Path("/tmp/test.py"), short_circuit=False + ) + assert result.checks_failed == ["always_fail", "always_fail"] + assert result.checks_passed == ["always_pass"] + assert result.blocking_reason == "Intentional failure" # first failure + def test_confidence_is_geometric_mean(self): engine = VerificationEngine(checks=[_PassingCheck(), _TrackingCheck()]) result = engine.verify("a = 1", "a = 1", Path("/tmp/test.py")) diff --git a/tests/test_verification_result.py b/tests/test_verification_result.py index 5c7d44b..68cad62 100644 --- a/tests/test_verification_result.py +++ b/tests/test_verification_result.py @@ -1,8 +1,15 @@ """Contract tests for VerificationResult and CheckResult.""" +import json + import pytest -from refactron.verification.result import CheckResult, VerificationResult +from refactron.verification.report import format_verification_result_json +from refactron.verification.result import ( + JSON_SCHEMA_VERSION, + CheckResult, + VerificationResult, +) class TestCheckResult: @@ -95,3 +102,75 @@ def test_confidence_zero_when_no_checks_passed(self): check_results=[], ) assert vr.confidence_score == 0.0 + + +def _sample_result(safe: bool) -> VerificationResult: + cr = CheckResult( + check_name="syntax", + passed=safe, + blocking_reason="" if safe else "SyntaxError on line 5", + confidence=1.0 if safe else 0.0, + duration_ms=12, + details={"line": 5}, + ) + return VerificationResult( + safe_to_apply=safe, + passed=True, + checks_run=["syntax"], + checks_passed=["syntax"] if safe else [], + checks_failed=[] if safe else ["syntax"], + skipped_checks=[] if safe else [("test_suite", "Short-circuited after syntax failed")], + blocking_reason=None if safe else "SyntaxError on line 5", + confidence_score=1.0 if safe else 0.0, + verification_ms=42, + check_results=[cr], + ) + + +class TestCheckResultJson: + def test_to_json_dict_has_all_fields(self): + cr = CheckResult( + check_name="syntax", + passed=False, + blocking_reason="boom", + confidence=0.0, + duration_ms=7, + details={"k": "v"}, + ) + d = cr.to_json_dict() + assert d == { + "check_name": "syntax", + "passed": False, + "blocking_reason": "boom", + "confidence": 0.0, + "duration_ms": 7, + "details": {"k": "v"}, + } + + +class TestVerificationResultJson: + def test_to_json_dict_safe(self): + d = _sample_result(safe=True).to_json_dict() + assert d["schema_version"] == JSON_SCHEMA_VERSION + assert d["status"] == "safe" + assert d["safe_to_apply"] is True + assert d["blocking_reason"] is None + assert d["checks_run"] == ["syntax"] + assert d["skipped_checks"] == [] + assert d["checks"][0]["check_name"] == "syntax" + + def test_to_json_dict_blocked(self): + d = _sample_result(safe=False).to_json_dict() + assert d["status"] == "blocked" + assert d["safe_to_apply"] is False + assert d["blocking_reason"] == "SyntaxError on line 5" + assert d["checks_failed"] == ["syntax"] + assert d["skipped_checks"] == [ + {"check_name": "test_suite", "reason": "Short-circuited after syntax failed"} + ] + + def test_format_json_is_parseable(self): + text = format_verification_result_json(_sample_result(safe=True)) + parsed = json.loads(text) + assert parsed["schema_version"] == JSON_SCHEMA_VERSION + assert parsed["status"] == "safe"