diff --git a/docs/essentials/configuration.mdx b/docs/essentials/configuration.mdx index be08de0..58eacad 100644 --- a/docs/essentials/configuration.mdx +++ b/docs/essentials/configuration.mdx @@ -163,6 +163,61 @@ export REFACTRON_LOG_LEVEL=DEBUG export REFACTRON_CONFIG=/path/to/.refactron.yaml ``` +## Verification Pipeline + +The verification pipeline (`refactron verify`, and `refactron autofix --verify`) +runs safety checks before a change is applied. Its behavior is configurable +through a `verification:` section so teams can trade safety for speed based on +repo size, CI tier, or policy. + +```yaml +verification: + # Checks to run, in execution order. Any subset/ordering of: + # syntax, import_integrity, test_gate + enabled_checks: [syntax, import_integrity, test_gate] + + # Stop after the first failing check (true) or run them all (false). + short_circuit: true + + # Subprocess timeout for the pytest-based test gate, in seconds. + test_gate_timeout_sec: 45 + + # Extra arguments appended to the pytest command line. + pytest_extra_args: [] +``` + +### Defaults + +| Key | Default | Meaning | +|-----|---------|---------| +| `enabled_checks` | `[syntax, import_integrity, test_gate]` | All three checks, in this order | +| `short_circuit` | `true` | Stop on first failure (fastest) | +| `test_gate_timeout_sec` | `45` | pytest subprocess timeout | +| `pytest_extra_args` | `[]` | No extra pytest arguments | + +### Common recipes + +```yaml +# Fast PR check — skip the (slow) test gate +verification: + enabled_checks: [syntax, import_integrity] + +# Slow CI tier — give the test gate more time +verification: + test_gate_timeout_sec: 300 + +# Full failure report instead of stop-on-first-failure +verification: + short_circuit: false +``` + +### Migration + +The `verification:` section is **optional**. Existing config files without it +keep the exact behavior of earlier releases — the defaults above reproduce the +previously hard-coded values (all checks, 45s test-gate timeout, short-circuit +on first failure). No changes are required to upgrade. + ## Per-File Ignores Ignore specific issues in code using comments: diff --git a/refactron/cli/main.py b/refactron/cli/main.py index 93a777a..d3f516f 100644 --- a/refactron/cli/main.py +++ b/refactron/cli/main.py @@ -89,8 +89,10 @@ def main(ctx: click.Context) -> None: # Register subcommands -# We import them here to ensure they are registered with the main group -# Using a try-except block to allow partial loading during refactoring +# We import them here to ensure they are registered with the main group. +# Each block is isolated so a single broken subcommand — or a broken optional +# dependency it pulls in (e.g. a failed native-library load raising OSError, +# not ImportError) — degrades gracefully instead of taking down the whole CLI. try: from refactron.cli.auth import auth, login, logout, telemetry @@ -98,7 +100,7 @@ def main(ctx: click.Context) -> None: main.add_command(logout) main.add_command(auth) main.add_command(telemetry) -except ImportError: +except Exception: pass try: @@ -111,7 +113,7 @@ def main(ctx: click.Context) -> None: main.add_command(metrics) main.add_command(serve_metrics) main.add_command(suggest) -except ImportError: +except Exception: pass try: @@ -121,28 +123,35 @@ def main(ctx: click.Context) -> None: main.add_command(autofix) main.add_command(rollback) main.add_command(document) -except ImportError: +except Exception: + pass + +try: + from refactron.cli.verify import verify + + main.add_command(verify) +except Exception: pass try: from refactron.cli.patterns import patterns main.add_command(patterns) -except ImportError: +except Exception: pass try: from refactron.cli.repo import repo main.add_command(repo) -except ImportError: +except Exception: pass try: from refactron.cli.rag import rag main.add_command(rag) -except ImportError: +except Exception: pass try: @@ -151,5 +160,5 @@ def main(ctx: click.Context) -> None: main.add_command(generate_cicd) main.add_command(feedback) main.add_command(init) -except ImportError: +except Exception: pass diff --git a/refactron/cli/verify.py b/refactron/cli/verify.py index c173eb3..1bd29d6 100644 --- a/refactron/cli/verify.py +++ b/refactron/cli/verify.py @@ -1,3 +1,110 @@ +""" +Refactron CLI - Verification Module. +Command to run the Verification Engine on a code change and emit either a +human-readable report or a stable, machine-readable JSON object for CI gates. +""" + +from pathlib import Path +from typing import Optional + +import click + +from refactron.cli.ui import _auth_banner, console +from refactron.verification.engine import VerificationEngine +from refactron.verification.report import ( + format_verification_result, + format_verification_result_json, +) + + +@click.command() +@click.argument("target", type=click.Path(exists=True, dir_okay=False)) +@click.option( + "--against", + "-a", + "candidate", + type=click.Path(exists=True, dir_okay=False), + default=None, + help=( + "Path to the proposed/modified version of TARGET. " + "If omitted, TARGET is verified against itself." + ), +) +@click.option( + "--project-root", + type=click.Path(exists=True, file_okay=False), + default=None, + help="Project root used by the test-suite gate. Defaults to the current directory.", +) +@click.option( + "--config", + "-c", + type=click.Path(exists=True, dir_okay=False), + default=None, + help="Refactron config file; its `verification:` section tunes the pipeline " + "(enabled checks, ordering, test-gate timeout, pytest args).", +) +@click.option( + "--all-checks", + is_flag=True, + default=False, + help=( + "Run every check even after one fails (no short-circuit), so all " + "failure categories surface in a single run. Overrides config." + ), +) +@click.option( + "--json", + "as_json", + is_flag=True, + default=False, + help="Emit a stable, machine-readable JSON report instead of formatted text.", +) +def verify( + target: str, + candidate: Optional[str], + project_root: Optional[str], + config: Optional[str], + all_checks: bool, + as_json: bool, +) -> None: + """ + Verify that a code change is safe to apply. + + TARGET is the file as it currently lives in the project. With --against, + the proposed new content is checked against it (syntax, import integrity, + test suite). Exit code is 0 when safe to apply, 1 when blocked — and with + --json a versioned JSON object is printed for CI dashboards and bots. + """ + target_path = Path(target) + original = target_path.read_text(encoding="utf-8") + transformed = Path(candidate).read_text(encoding="utf-8") if candidate else original + + # A config file's `verification:` section tunes the pipeline; without one + # the engine falls back to its built-in defaults. + verification_cfg = None + if config: + from refactron.core.config import RefactronConfig + + verification_cfg = RefactronConfig.from_file(Path(config)).verification + + root = Path(project_root) if project_root else Path.cwd() + engine = VerificationEngine(project_root=root, config=verification_cfg) + # --all-checks forces a full run; otherwise honour the configured value. + result = engine.verify( + original, + transformed, + target_path, + short_circuit=False if all_checks else None, + ) + + if as_json: + # JSON mode prints only the JSON object so consumers can parse stdout. + click.echo(format_verification_result_json(result)) + else: + console.print() + _auth_banner("Verification") + format_verification_result(result, console) """refactron verify --against — standalone verification command.""" from pathlib import Path diff --git a/refactron/core/config.py b/refactron/core/config.py index 4f0c830..72845f2 100644 --- a/refactron/core/config.py +++ b/refactron/core/config.py @@ -11,6 +11,38 @@ from refactron.core.exceptions import ConfigError +# Valid verification check names, also the default execution order. +DEFAULT_VERIFICATION_CHECKS = ["syntax", "import_integrity", "test_gate"] + + +@dataclass +class VerificationConfig: + """Settings for the verification pipeline (`refactron verify`, `--verify`). + + Every default reproduces the historical hard-coded behaviour, so existing + projects need no migration: a config file with no ``verification:`` section + behaves exactly as before. + + Fields: + enabled_checks: Checks to run, in execution order. Any subset (and any + ordering) of ``syntax``, ``import_integrity``, ``test_gate``. For + fast PR checks a team might use ``[syntax, import_integrity]`` only. + short_circuit: Stop after the first failing check (fast) when True; + run every check for a complete failure report when False. + test_gate_timeout_sec: Subprocess timeout for the pytest-based test + gate, in seconds. + pytest_extra_args: Extra arguments appended to the pytest command line + (e.g. ``["-p", "no:cacheprovider"]``). + """ + + enabled_checks: List[str] = field( + default_factory=lambda: list(DEFAULT_VERIFICATION_CHECKS) + ) + short_circuit: bool = True + test_gate_timeout_sec: int = 45 + pytest_extra_args: List[str] = field(default_factory=list) + + @dataclass class RefactronConfig: """Configuration for Refactron analysis and refactoring.""" @@ -130,6 +162,9 @@ class RefactronConfig: pattern_learning_enabled: bool = True # Enable learning from feedback pattern_ranking_enabled: bool = True # Enable ranking based on learned patterns + # Verification pipeline settings (refactron verify / autofix --verify) + verification: VerificationConfig = field(default_factory=VerificationConfig) + @classmethod def from_file( cls, @@ -168,6 +203,22 @@ def from_file( ): config_dict[path_field] = Path(config_dict[path_field]) + # Convert the nested `verification:` mapping into its dataclass. + if isinstance(config_dict.get("verification"), dict): + try: + config_dict["verification"] = VerificationConfig( + **config_dict["verification"] + ) + except TypeError as e: + raise ConfigError( + f"Invalid 'verification' configuration: {e}", + config_path=config_path, + recovery_suggestion=( + "Valid keys: enabled_checks, short_circuit, " + "test_gate_timeout_sec, pytest_extra_args." + ), + ) from e + try: return cls(**config_dict) except TypeError as e: @@ -243,6 +294,12 @@ def to_file(self, config_path: Path) -> None: ), "pattern_learning_enabled": self.pattern_learning_enabled, "pattern_ranking_enabled": self.pattern_ranking_enabled, + "verification": { + "enabled_checks": self.verification.enabled_checks, + "short_circuit": self.verification.short_circuit, + "test_gate_timeout_sec": self.verification.test_gate_timeout_sec, + "pytest_extra_args": self.verification.pytest_extra_args, + }, } try: diff --git a/refactron/verification/checks/imports.py b/refactron/verification/checks/imports.py index e5eba8a..f795b65 100644 --- a/refactron/verification/checks/imports.py +++ b/refactron/verification/checks/imports.py @@ -4,17 +4,59 @@ import importlib.util import time from pathlib import Path -from typing import Any, Dict, Set +from typing import Any, Dict, List, Optional, Set from refactron.verification.engine import BaseCheck from refactron.verification.result import CheckResult +# Guardrails for project-wide import-graph construction (Step 4: cycle +# detection). The graph is intentionally bounded — cycle detection is a +# best-effort confidence boost, never a correctness guarantee, so when a +# limit is hit we skip the check rather than block a transform. +_MAX_GRAPH_FILES = 2000 +_GRAPH_TIME_BUDGET_S = 5.0 +_SKIP_DIRS = { + ".git", + ".hg", + ".svn", + ".tox", + ".venv", + "venv", + "env", + "__pycache__", + "build", + "dist", + "node_modules", + ".mypy_cache", + ".pytest_cache", +} + class ImportIntegrityVerifier(BaseCheck): - """Validates import integrity after a transform.""" + """Validates import integrity after a transform. + + Steps 1-3 (removed-but-used imports, new-import resolvability) work on the + single file. Step 4 (cycle detection) builds a bounded project-wide import + graph and reports cycles *introduced* by the transform. + + Complexity limits for Step 4: + - At most ``_MAX_GRAPH_FILES`` (2000) files are scanned. + - Graph construction is abandoned after ``_GRAPH_TIME_BUDGET_S`` (5s). + - The graph is cached for the lifetime of this verifier instance. + - Only intra-project (first-party) imports become edges; third-party + and stdlib imports are ignored. + When any limit is hit, or no ``project_root`` was supplied, cycle + detection is silently skipped — never blocking. + """ name = "import_integrity" + def __init__(self, project_root: Optional[Path] = None) -> None: + self.project_root = project_root + # Base import graph (module -> set of imported project modules), + # cached per resolved project root for the verifier's lifetime. + self._graph_cache: Dict[str, Optional[Dict[str, Set[str]]]] = {} + def verify(self, original: str, transformed: str, file_path: Path) -> CheckResult: start = time.monotonic() details: Dict[str, Any] = {} @@ -36,8 +78,6 @@ def verify(self, original: str, transformed: str, file_path: Path) -> CheckResul ) # Step 3: new imports resolvable - # TODO: Step 4 (cycle detection) deferred — requires project-wide import graph - # which is expensive for MVP. Will add in Phase 3 if needed. added = trans_imports - orig_imports if added: orig_modules = self._extract_module_names(original) @@ -53,6 +93,16 @@ def verify(self, original: str, transformed: str, file_path: Path) -> CheckResul details, ) + # Step 4: cycle detection — report cycles introduced by the transform. + cycle = self._detect_new_cycle(original, transformed, file_path, details) + if cycle is not None: + details["import_cycle"] = cycle + return self._fail( + "Transform introduces an import cycle: " + " -> ".join(cycle), + start, + details, + ) + elapsed = int((time.monotonic() - start) * 1000) return CheckResult( check_name=self.name, @@ -74,6 +124,199 @@ def _fail(self, reason: str, start: float, details: Dict[str, Any]) -> CheckResu details=details, ) + # ----- Step 4: import-cycle detection --------------------------------- + + def _detect_new_cycle( + self, + original: str, + transformed: str, + file_path: Path, + details: Dict[str, Any], + ) -> Optional[List[str]]: + """Return a cycle path the transform introduces, or None. + + A cycle is reported only when it runs through the changed module and + did *not* exist before the transform — pre-existing cycles are left + for a dedicated lint pass, not this verifier. + """ + if self.project_root is None: + details["cycle_check"] = "skipped: no project_root" + return None + + root = self.project_root.resolve() + changed_mod = self._module_name(file_path, root) + if changed_mod is None: + details["cycle_check"] = "skipped: file outside project_root" + return None + + base_graph = self._build_base_graph(root) + if base_graph is None: + details["cycle_check"] = "skipped: graph budget exceeded" + return None + + is_pkg = Path(file_path).name == "__init__.py" + orig_edges = self._module_edges(original, changed_mod, is_pkg, root) + trans_edges = self._module_edges(transformed, changed_mod, is_pkg, root) + if orig_edges == trans_edges: + details["cycle_check"] = "ran: no project import edges changed" + return None + + graph_trans = dict(base_graph) + graph_trans[changed_mod] = trans_edges + cyc_trans = self._find_cycle_through(graph_trans, changed_mod) + if cyc_trans is None: + details["cycle_check"] = "ran: no cycle" + return None + + graph_orig = dict(base_graph) + graph_orig[changed_mod] = orig_edges + if self._find_cycle_through(graph_orig, changed_mod) is not None: + # Cycle already existed; the transform did not introduce it. + details["cycle_check"] = "ran: pre-existing cycle (not introduced)" + return None + + details["cycle_check"] = "ran: new cycle detected" + return cyc_trans + + def _build_base_graph(self, root: Path) -> Optional[Dict[str, Set[str]]]: + """Build (and cache) the project's module import graph. + + Returns None if a guardrail (file count / time budget) is hit. + """ + key = str(root) + if key in self._graph_cache: + return self._graph_cache[key] + + graph: Dict[str, Set[str]] = {} + deadline = time.monotonic() + _GRAPH_TIME_BUDGET_S + scanned = 0 + for path in root.rglob("*.py"): + if any(part in _SKIP_DIRS for part in path.relative_to(root).parts): + continue + scanned += 1 + if scanned > _MAX_GRAPH_FILES or time.monotonic() > deadline: + self._graph_cache[key] = None + return None + module_name = self._module_name(path, root) + if module_name is None: + continue + try: + code = path.read_text(encoding="utf-8") + except (OSError, UnicodeDecodeError): + continue + is_pkg = path.name == "__init__.py" + graph[module_name] = self._module_edges(code, module_name, is_pkg, root) + + self._graph_cache[key] = graph + return graph + + @staticmethod + def _module_name(file_path: Path, root: Path) -> Optional[str]: + """Map a file path to its dotted module name relative to ``root``.""" + try: + rel = Path(file_path).resolve().relative_to(root) + except ValueError: + return None + parts = list(rel.with_suffix("").parts) + if parts and parts[-1] == "__init__": + parts = parts[:-1] + return ".".join(parts) if parts else None + + @classmethod + def _module_edges( + cls, code: str, module_name: str, is_pkg: bool, root: Path + ) -> Set[str]: + """Intra-project modules imported by ``code`` (graph out-edges).""" + edges: Set[str] = set() + try: + tree = ast.parse(code) + except SyntaxError: + return edges + + # The package a relative import resolves against. + if is_pkg: + package = module_name + elif "." in module_name: + package = module_name.rsplit(".", 1)[0] + else: + package = "" + + for node in ast.walk(tree): + if isinstance(node, ast.Import): + for alias in node.names: + target = cls._resolve(alias.name, root) + if target and target != module_name: + edges.add(target) + elif isinstance(node, ast.ImportFrom): + base = cls._relative_base(node.level, package) + if node.level and node.module: + base = f"{base}.{node.module}" if base else node.module + elif node.level == 0: + base = node.module or "" + if base: + resolved = cls._resolve(base, root) + if resolved and resolved != module_name: + edges.add(resolved) + # `from pkg import sub` may import a submodule, not a name. + for alias in node.names: + candidate = f"{base}.{alias.name}" if base else alias.name + resolved = cls._resolve(candidate, root) + if resolved and resolved != module_name: + edges.add(resolved) + return edges + + @staticmethod + def _relative_base(level: int, package: str) -> str: + """Resolve the package a relative import (``level`` dots) points at.""" + if level <= 0: + return package + parts = package.split(".") if package else [] + # level 1 == current package, level 2 == its parent, ... + keep = len(parts) - (level - 1) + if keep < 0: + return "" + return ".".join(parts[:keep]) + + @staticmethod + def _resolve(dotted: str, root: Path) -> Optional[str]: + """Return ``dotted`` if it names a file/package under ``root``.""" + if not dotted: + return None + parts = dotted.split(".") + as_module = root.joinpath(*parts).with_suffix(".py") + as_package = root.joinpath(*parts, "__init__.py") + if as_module.is_file() or as_package.is_file(): + return dotted + return None + + @staticmethod + def _find_cycle_through( + graph: Dict[str, Set[str]], start: str + ) -> Optional[List[str]]: + """Return a cycle ``start -> ... -> start``, or None. + + A cycle exists iff ``start`` is reachable from one of its own + successors. Uses bounded BFS per successor — O(V+E) each. + """ + for first in sorted(graph.get(start, set())): + parent: Dict[str, Optional[str]] = {first: None} + queue: List[str] = [first] + while queue: + node = queue.pop(0) + if node == start: + chain: List[str] = [] + cur: Optional[str] = node + while cur is not None: + chain.append(cur) + cur = parent[cur] + chain.reverse() + return [start] + chain + for nxt in sorted(graph.get(node, set())): + if nxt not in parent: + parent[nxt] = node + queue.append(nxt) + return None + @staticmethod def _extract_import_names(code: str) -> Set[str]: """Extract all locally-bound names from import statements.""" diff --git a/refactron/verification/checks/test_gate.py b/refactron/verification/checks/test_gate.py index 3f8327f..1744b9d 100644 --- a/refactron/verification/checks/test_gate.py +++ b/refactron/verification/checks/test_gate.py @@ -4,6 +4,7 @@ import os import shutil import subprocess +import sys import time from pathlib import Path from typing import Any, Dict, List, Optional @@ -18,8 +19,24 @@ class TestSuiteGate(BaseCheck): __test__ = False # Prevent pytest from collecting this as a test class name = "test_gate" - def __init__(self, project_root: Optional[Path] = None): + def __init__( + self, + project_root: Optional[Path] = None, + timeout_sec: int = 45, + pytest_extra_args: Optional[List[str]] = None, + ): + """Create the test-suite gate. + + Args: + project_root: Project root pytest runs from. + timeout_sec: Subprocess timeout for the pytest run (default 45s). + pytest_extra_args: Extra arguments appended to the pytest command. + """ self.project_root = project_root + self.timeout_sec = timeout_sec + self.pytest_extra_args = list(pytest_extra_args or []) + self._test_file_cache: Optional[Dict[str, List[Path]]] = None + self._all_test_files: Optional[List[Path]] = None def verify(self, original: str, transformed: str, file_path: Path) -> CheckResult: start = time.monotonic() @@ -29,12 +46,16 @@ def verify(self, original: str, transformed: str, file_path: Path) -> CheckResul test_files = self._find_relevant_tests(file_path) if not test_files: elapsed = int((time.monotonic() - start) * 1000) - details["note"] = "No tests cover this module" + # No matching tests is not a failure, but it is also not the + # strong assurance a passing test run gives — the change is + # simply unverified here, so confidence is reduced rather than + # left high (which would mask a potential false negative). + details["note"] = "No tests found importing this module — change not covered by the gate" return CheckResult( check_name=self.name, passed=True, blocking_reason="", - confidence=0.9, + confidence=0.6, duration_ms=elapsed, details=details, ) @@ -54,16 +75,31 @@ def verify(self, original: str, transformed: str, file_path: Path) -> CheckResul # Delete .pyc cache self._clear_pycache(file_path) - # Run pytest - cmd = ["python3", "-m", "pytest", "-x", "-q"] + # Run pytest from the project root with the host interpreter so + # the repo's pyproject.toml / pytest.ini / conftest.py are picked + # up and the same venv as the host process is used. + run_cwd = (self.project_root or file_path.parent).resolve() + + # Make the project root importable for edge cases where the layout + # relies on PYTHONPATH rather than an installed package. + env = {**os.environ, "PYTHONDONTWRITEBYTECODE": "1"} + existing_pythonpath = env.get("PYTHONPATH", "") + env["PYTHONPATH"] = ( + os.pathsep.join([str(run_cwd), existing_pythonpath]) + if existing_pythonpath + else str(run_cwd) + ) + + cmd = [sys.executable, "-m", "pytest", "-x", "-q"] + cmd += self.pytest_extra_args cmd += [str(f) for f in test_files] result = subprocess.run( cmd, - timeout=45, + timeout=self.timeout_sec, capture_output=True, text=True, - env={**os.environ, "PYTHONDONTWRITEBYTECODE": "1"}, - cwd=str(file_path.parent), + env=env, + cwd=str(run_cwd), ) elapsed = int((time.monotonic() - start) * 1000) @@ -95,7 +131,7 @@ def verify(self, original: str, transformed: str, file_path: Path) -> CheckResul return CheckResult( check_name=self.name, passed=False, - blocking_reason="Test suite gate timed out (45s limit)", + blocking_reason=f"Test suite gate timed out ({self.timeout_sec}s limit)", confidence=0.0, duration_ms=elapsed, details=details, @@ -107,44 +143,149 @@ def verify(self, original: str, transformed: str, file_path: Path) -> CheckResul def _find_relevant_tests(self, file_path: Path) -> List[Path]: """Find test files that import the module at file_path.""" - module_name = file_path.stem search_root = self.project_root or file_path.parent + targets = self._module_targets(file_path) + + if self._test_file_cache is None: + self._test_file_cache = {} + self._all_test_files = [] + + test_dirs = [d for d in [search_root / "tests", search_root / "test"] if d.is_dir()] + search_dirs = test_dirs if test_dirs else [search_root] + excluded_dirs = {".git", ".rag", "__pycache__", "venv", ".venv", "env", "node_modules"} + + for root_dir in search_dirs: + for py_file in root_dir.rglob("*.py"): + if any(excluded in py_file.parts for excluded in excluded_dirs): + continue + name = py_file.name + if name.startswith("test_") or name.endswith("_test.py"): + self._all_test_files.append(py_file) + + cache_key = str(file_path) + if cache_key in self._test_file_cache: + return self._test_file_cache[cache_key] + target_file = file_path.resolve() test_files: List[Path] = [] - for py_file in search_root.rglob("*.py"): - name = py_file.name - if not (name.startswith("test_") or name.endswith("_test.py")): - continue + for py_file in self._all_test_files: # type: ignore if py_file == file_path: continue try: source = py_file.read_text(encoding="utf-8") - if self._imports_module(source, module_name): + if self._imports_module(source, targets, target_file, py_file): test_files.append(py_file) except Exception: continue + + self._test_file_cache[cache_key] = test_files return test_files + def _module_targets(self, file_path: Path) -> set: + """Qualified module names a test could import to exercise file_path. + + Returns both the project-root-relative dotted path and the + package-root-relative path (walking up the ``__init__.py`` chain), so + ``mypkg/submodule/foo.py`` is matched by ``from mypkg.submodule import + foo`` regardless of where the project is rooted. The bare stem is kept + as a loose fallback for flat (non-package) layouts. + """ + path = Path(file_path) + abs_path = path.resolve() + targets: set = set() + if path.name != "__init__.py": + targets.add(path.stem) + + # Project-root-relative qualified name. + root = (self.project_root or path.parent) + try: + rel = abs_path.relative_to(root.resolve()) + dotted = self._dotted(rel) + if dotted: + targets.add(dotted) + except ValueError: + pass + + # Package-root-relative qualified name (walk up the __init__.py chain). + parts: List[str] = [] if path.name == "__init__.py" else [path.stem] + pkg_dir = abs_path.parent + while (pkg_dir / "__init__.py").is_file(): + parts.append(pkg_dir.name) + pkg_dir = pkg_dir.parent + if parts: + targets.add(".".join(reversed(parts))) + + return targets + @staticmethod - def _imports_module(source: str, module_name: str) -> bool: - """Check if source code imports the given module name.""" + def _dotted(rel_path: Path) -> str: + """Convert a relative file path to a dotted module name.""" + parts = list(rel_path.with_suffix("").parts) + if parts and parts[-1] == "__init__": + parts = parts[:-1] + return ".".join(parts) + + @staticmethod + def _imports_module( + source: str, targets: set, target_file: Path, test_path: Path + ) -> bool: + """Whether ``source`` imports the module under verification. + + Absolute imports are matched by qualified name against ``targets``; + package-relative imports (``from . import x``) are resolved on the + filesystem and compared directly to ``target_file``. + """ try: tree = ast.parse(source) except SyntaxError: return False + imported: set = set() for node in ast.walk(tree): if isinstance(node, ast.Import): for alias in node.names: - if alias.name == module_name or alias.name.startswith(module_name + "."): - return True + imported.add(alias.name) elif isinstance(node, ast.ImportFrom): - if node.module and ( - node.module == module_name or node.module.startswith(module_name + ".") - ): + if node.level and node.level > 0: + if TestSuiteGate._relative_import_hits(node, test_path, target_file): + return True + continue + base = node.module or "" + if base: + imported.add(base) + for alias in node.names: + imported.add(f"{base}.{alias.name}" if base else alias.name) + + for name in imported: + for target in targets: + if name == target or name.startswith(target + "."): return True return False + @staticmethod + def _relative_import_hits( + node: ast.ImportFrom, test_path: Path, target_file: Path + ) -> bool: + """Resolve a relative import to file paths and test against target.""" + base_dir = test_path.resolve().parent + for _ in range(node.level - 1): + base_dir = base_dir.parent + if node.module: + base_dir = base_dir.joinpath(*node.module.split(".")) + + candidates = [base_dir.with_suffix(".py"), base_dir / "__init__.py"] + for alias in node.names: + candidates.append((base_dir / alias.name).with_suffix(".py")) + candidates.append(base_dir / alias.name / "__init__.py") + + for cand in candidates: + try: + if cand.resolve() == target_file: + return True + except OSError: + continue + return False + @staticmethod def _clear_pycache(file_path: Path) -> None: """Remove .pyc files for this module to force re-import.""" diff --git a/refactron/verification/engine.py b/refactron/verification/engine.py index 3e3771a..627c397 100644 --- a/refactron/verification/engine.py +++ b/refactron/verification/engine.py @@ -4,10 +4,13 @@ import time from abc import ABC, abstractmethod from pathlib import Path -from typing import List, Optional +from typing import TYPE_CHECKING, List, Optional from refactron.verification.result import CheckResult, VerificationResult +if TYPE_CHECKING: + from refactron.core.config import VerificationConfig + class BaseCheck(ABC): """Abstract base class for all verification checks.""" @@ -21,30 +24,86 @@ def verify(self, original: str, transformed: str, file_path: Path) -> CheckResul class VerificationEngine: - """Orchestrates verification checks in a short-circuit pipeline.""" + """Orchestrates verification checks, short-circuiting by default.""" def __init__( self, project_root: Optional[Path] = None, checks: Optional[List[BaseCheck]] = None, + config: Optional["VerificationConfig"] = None, ): + """Build a verification pipeline. + + Args: + project_root: Project root, used by import and test-gate checks. + checks: Explicit check list — bypasses ``config`` entirely. + config: VerificationConfig controlling which checks run, in what + order, and with what test-gate timeout / pytest args. When + None, defaults reproduce the historical behaviour. + """ + from refactron.core.config import VerificationConfig + self.project_root = project_root + self.config = config or VerificationConfig() self.checks: List[BaseCheck] if checks is not None: self.checks = checks else: - from refactron.verification.checks.imports import ImportIntegrityVerifier - from refactron.verification.checks.syntax import SyntaxVerifier - from refactron.verification.checks.test_gate import TestSuiteGate - - self.checks = [ - SyntaxVerifier(), - ImportIntegrityVerifier(), - TestSuiteGate(project_root=project_root), - ] - - def verify(self, original: str, transformed: str, file_path: Path) -> VerificationResult: - """Run all checks in order, short-circuiting on first failure.""" + self.checks = self._build_checks(self.config, project_root) + + @staticmethod + def _build_checks( + config: "VerificationConfig", project_root: Optional[Path] + ) -> List[BaseCheck]: + """Instantiate the configured checks, in the configured order.""" + from refactron.verification.checks.imports import ImportIntegrityVerifier + from refactron.verification.checks.syntax import SyntaxVerifier + from refactron.verification.checks.test_gate import TestSuiteGate + + factories = { + "syntax": lambda: SyntaxVerifier(), + "import_integrity": lambda: ImportIntegrityVerifier( + project_root=project_root + ), + "test_gate": lambda: TestSuiteGate( + project_root=project_root, + timeout_sec=config.test_gate_timeout_sec, + pytest_extra_args=config.pytest_extra_args, + ), + } + checks: List[BaseCheck] = [] + for name in config.enabled_checks: + factory = factories.get(name) + if factory is None: + raise ValueError( + f"Unknown verification check: {name!r}. " + f"Valid checks: {sorted(factories)}" + ) + checks.append(factory()) + return checks + + def verify( + self, + original: str, + transformed: str, + file_path: Path, + short_circuit: Optional[bool] = None, + ) -> VerificationResult: + """Run the verification checks in order and aggregate the results. + + Args: + original: Source code before the transform. + transformed: Source code after the transform. + file_path: Path of the file within the project. + short_circuit: When True, stop after the first failing check and + record the rest in ``skipped_checks`` — fastest, but hides + stacked problems. When False, run every check so the caller + sees all failure categories in a single run; nothing is + skipped. ``blocking_reason`` is always the first failure. + When None (default), the engine's configured value is used. + """ + if short_circuit is None: + short_circuit = self.config.short_circuit start = time.monotonic() check_results: List[CheckResult] = [] checks_run: List[str] = [] @@ -77,12 +136,14 @@ def verify(self, original: str, transformed: str, file_path: Path) -> Verificati checks_failed.append(check.name) if blocking_reason is None: blocking_reason = cr.blocking_reason - # Short-circuit: skip remaining checks - for remaining in self.checks[i + 1 :]: - skipped_checks.append( - (remaining.name, f"Short-circuited after {check.name} failed") - ) - break + if short_circuit: + # Skip remaining checks and stop the pipeline. + for remaining in self.checks[i + 1 :]: + skipped_checks.append( + (remaining.name, f"Short-circuited after {check.name} failed") + ) + break + # short_circuit=False: keep going so every failure surfaces. elapsed_ms = int((time.monotonic() - start) * 1000) confidence = self._compute_confidence(check_results) diff --git a/refactron/verification/report.py b/refactron/verification/report.py index 5ae0807..e245597 100644 --- a/refactron/verification/report.py +++ b/refactron/verification/report.py @@ -1,4 +1,6 @@ -"""Rich CLI output formatting for VerificationResult.""" +"""CLI output formatting for VerificationResult (Rich and JSON).""" + +import json from rich.console import Console @@ -29,3 +31,13 @@ def format_verification_result(result: VerificationResult, console: Console) -> for name, reason in result.skipped_checks: console.print(f" [dim]- {name}: {reason}[/dim]") console.print(f"\n [bold red]Blocked.[/bold red] {result.blocking_reason}") + + +def format_verification_result_json(result: VerificationResult, indent: int = 2) -> str: + """Render a VerificationResult as a stable, machine-readable JSON string. + + Intended for CI gates, bots, and parent tools that need structured data + rather than scraping terminal text. The schema is versioned — see + ``VerificationResult.to_json_dict``. + """ + return json.dumps(result.to_json_dict(), indent=indent, sort_keys=True) diff --git a/refactron/verification/result.py b/refactron/verification/result.py index c37b7ea..fe5aef5 100644 --- a/refactron/verification/result.py +++ b/refactron/verification/result.py @@ -6,6 +6,10 @@ from dataclasses import dataclass from typing import Any, Dict, List, Optional, Tuple +# Bumped whenever the JSON shape produced by ``to_json_dict`` changes in a +# way consumers must adapt to. Additive fields do not require a bump. +JSON_SCHEMA_VERSION = 1 + @dataclass(frozen=True) class CheckResult: @@ -18,6 +22,17 @@ class CheckResult: duration_ms: int details: Dict[str, Any] + def to_json_dict(self) -> Dict[str, Any]: + """Return a JSON-serializable dict of this check's result.""" + return { + "check_name": self.check_name, + "passed": self.passed, + "blocking_reason": self.blocking_reason, + "confidence": self.confidence, + "duration_ms": self.duration_ms, + "details": self.details, + } + @dataclass(frozen=True) class VerificationResult: @@ -39,3 +54,28 @@ class VerificationResult: confidence_score: float verification_ms: int check_results: List[CheckResult] + + def to_json_dict(self) -> Dict[str, Any]: + """Return a stable, JSON-serializable dict of the full result. + + The shape is versioned via ``schema_version`` so machine consumers + (CI gates, bots, parent tools) can evolve safely. Adding new keys is + backwards-compatible; removing or renaming keys bumps the version. + """ + return { + "schema_version": JSON_SCHEMA_VERSION, + "status": "safe" if self.safe_to_apply else "blocked", + "safe_to_apply": self.safe_to_apply, + "passed": self.passed, + "blocking_reason": self.blocking_reason, + "confidence_score": self.confidence_score, + "verification_ms": self.verification_ms, + "checks_run": list(self.checks_run), + "checks_passed": list(self.checks_passed), + "checks_failed": list(self.checks_failed), + "skipped_checks": [ + {"check_name": name, "reason": reason} + for name, reason in self.skipped_checks + ], + "checks": [cr.to_json_dict() for cr in self.check_results], + } diff --git a/tests/test_import_verifier.py b/tests/test_import_verifier.py index be173f3..ca90950 100644 --- a/tests/test_import_verifier.py +++ b/tests/test_import_verifier.py @@ -71,3 +71,68 @@ def test_duration_populated(self, verifier): code = "x = 1\n" cr = verifier.verify(code, code, Path("/tmp/t.py")) assert cr.duration_ms >= 0 + + +def _make_pkg(root: Path) -> Path: + """Create a minimal `pkg` package: a.py (no imports) + b.py -> a.""" + pkg = root / "pkg" + pkg.mkdir() + (pkg / "__init__.py").write_text("", encoding="utf-8") + (pkg / "a.py").write_text("VALUE = 1\n", encoding="utf-8") + (pkg / "b.py").write_text("from . import a\n\nprint(a.VALUE)\n", encoding="utf-8") + return pkg + + +class TestImportCycleDetection: + """Step 4 — project-wide import-cycle detection.""" + + def test_transform_introducing_cycle_blocks(self, tmp_path): + pkg = _make_pkg(tmp_path) + verifier = ImportIntegrityVerifier(project_root=tmp_path) + original = (pkg / "a.py").read_text(encoding="utf-8") + # a.py now imports b, while b.py already imports a -> cycle. + transformed = "from . import b\n\nVALUE = b\n" + cr = verifier.verify(original, transformed, pkg / "a.py") + assert cr.passed is False + assert "cycle" in cr.blocking_reason.lower() + assert "pkg.a" in cr.blocking_reason and "pkg.b" in cr.blocking_reason + assert cr.details["import_cycle"][0] == "pkg.a" + assert cr.details["import_cycle"][-1] == "pkg.a" + + def test_transform_without_cycle_passes(self, tmp_path): + pkg = _make_pkg(tmp_path) + (pkg / "c.py").write_text("C = 1\n", encoding="utf-8") + verifier = ImportIntegrityVerifier(project_root=tmp_path) + original = (pkg / "a.py").read_text(encoding="utf-8") + transformed = "from . import c\n\nVALUE = c.C\n" + cr = verifier.verify(original, transformed, pkg / "a.py") + assert cr.passed is True + assert cr.details["cycle_check"] == "ran: no cycle" + + def test_preexisting_cycle_not_blocked(self, tmp_path): + pkg = tmp_path / "pkg" + pkg.mkdir() + (pkg / "__init__.py").write_text("", encoding="utf-8") + # a <-> b are already cyclic on disk. + (pkg / "a.py").write_text("from . import b\n", encoding="utf-8") + (pkg / "b.py").write_text("from . import a\n", encoding="utf-8") + (pkg / "c.py").write_text("C = 1\n", encoding="utf-8") + verifier = ImportIntegrityVerifier(project_root=tmp_path) + original = (pkg / "a.py").read_text(encoding="utf-8") + transformed = "from . import b\nfrom . import c\n" + cr = verifier.verify(original, transformed, pkg / "a.py") + assert cr.passed is True + assert cr.details["cycle_check"].startswith("ran: pre-existing") + + def test_cycle_check_skipped_without_project_root(self): + verifier = ImportIntegrityVerifier() + cr = verifier.verify("VALUE = 1\n", "from . import b\n", Path("/tmp/x.py")) + assert cr.passed is True + assert cr.details["cycle_check"] == "skipped: no project_root" + + def test_file_outside_project_root_skips_cycle_check(self, tmp_path): + _make_pkg(tmp_path) + verifier = ImportIntegrityVerifier(project_root=tmp_path) + cr = verifier.verify("VALUE = 1\n", "from . import b\n", Path("/elsewhere/x.py")) + assert cr.passed is True + assert cr.details["cycle_check"] == "skipped: file outside project_root" diff --git a/tests/test_test_gate.py b/tests/test_test_gate.py index defd8d3..438b6f1 100644 --- a/tests/test_test_gate.py +++ b/tests/test_test_gate.py @@ -1,9 +1,12 @@ """Unit tests for TestSuiteGate (Check 3).""" +import os +import sys from pathlib import Path import pytest +from refactron.verification.checks import test_gate as test_gate_module from refactron.verification.checks.test_gate import TestSuiteGate FIXTURES_DIR = Path(__file__).parent / "fixtures" @@ -71,3 +74,126 @@ def test_confidence_is_0_9(self, gate): original = file_path.read_text(encoding="utf-8") cr = gate.verify(original, original, file_path) assert cr.confidence == 0.9 + + +class TestPackageImportMatching: + """Relevance matching must handle qualified package imports, not just stems.""" + + def _layout(self, tmp_path: Path, test_import: str): + """Create mypkg/submodule/foo.py and tests/test_foo.py importing it.""" + pkg = tmp_path / "mypkg" / "submodule" + pkg.mkdir(parents=True) + (tmp_path / "mypkg" / "__init__.py").write_text("", encoding="utf-8") + (pkg / "__init__.py").write_text("", encoding="utf-8") + foo = pkg / "foo.py" + foo.write_text("def f():\n return 1\n", encoding="utf-8") + tests = tmp_path / "tests" + tests.mkdir() + test_file = tests / "test_foo.py" + test_file.write_text( + f"{test_import}\n\n\ndef test_it():\n assert True\n", encoding="utf-8" + ) + return foo, test_file + + def test_qualified_from_import_matches(self, tmp_path): + """from mypkg.submodule import foo -> matches mypkg/submodule/foo.py.""" + foo, test_file = self._layout(tmp_path, "from mypkg.submodule import foo") + gate = TestSuiteGate(project_root=tmp_path) + assert test_file in gate._find_relevant_tests(foo) + + def test_dotted_import_matches(self, tmp_path): + """import mypkg.submodule.foo -> matches the file under verification.""" + foo, test_file = self._layout(tmp_path, "import mypkg.submodule.foo") + gate = TestSuiteGate(project_root=tmp_path) + assert test_file in gate._find_relevant_tests(foo) + + def test_unrelated_import_does_not_match(self, tmp_path): + foo, _ = self._layout(tmp_path, "import os") + gate = TestSuiteGate(project_root=tmp_path) + assert gate._find_relevant_tests(foo) == [] + + def test_relative_import_matches(self, tmp_path): + """from . import foo in a test inside the package is attributed.""" + pkg = tmp_path / "mypkg" + pkg.mkdir() + (pkg / "__init__.py").write_text("", encoding="utf-8") + foo = pkg / "foo.py" + foo.write_text("def f():\n return 1\n", encoding="utf-8") + test_file = pkg / "test_foo.py" + test_file.write_text( + "from . import foo\n\n\ndef test_it():\n assert foo.f() == 1\n", + encoding="utf-8", + ) + gate = TestSuiteGate(project_root=tmp_path) + assert test_file in gate._find_relevant_tests(foo) + + def test_no_tests_yields_reduced_confidence(self, tmp_path): + """An uncovered change is passed but with reduced (not high) confidence.""" + pkg = tmp_path / "mypkg" + pkg.mkdir() + (pkg / "__init__.py").write_text("", encoding="utf-8") + foo = pkg / "foo.py" + foo.write_text("X = 1\n", encoding="utf-8") + gate = TestSuiteGate(project_root=tmp_path) + cr = gate.verify("X = 1\n", "X = 1\n", foo) + assert cr.passed is True + assert cr.confidence == 0.6 + assert "not covered" in cr.details["note"] + + +class TestPytestInvocation: + """The pytest subprocess must use the host interpreter and project root.""" + + def _capture_run(self, monkeypatch): + """Patch subprocess.run to capture its arguments without running pytest.""" + captured = {} + + def fake_run(cmd, **kwargs): + captured["cmd"] = cmd + captured["cwd"] = kwargs.get("cwd") + captured["env"] = kwargs.get("env") + + class _Completed: + returncode = 0 + stdout = "" + stderr = "" + + return _Completed() + + monkeypatch.setattr(test_gate_module.subprocess, "run", fake_run) + return captured + + def test_uses_host_interpreter(self, monkeypatch): + """pytest must run via sys.executable, not a hard-coded python3.""" + captured = self._capture_run(monkeypatch) + gate = TestSuiteGate(project_root=FIXTURES_DIR) + file_path = FIXTURES_DIR / "fixture_test_break.py" + original = file_path.read_text(encoding="utf-8") + + gate.verify(original, original, file_path) + + assert captured["cmd"][0] == sys.executable + assert captured["cmd"][1:4] == ["-m", "pytest", "-x"] + + def test_cwd_is_resolved_project_root(self, monkeypatch): + """pytest must run from the resolved project root, not file_path.parent.""" + captured = self._capture_run(monkeypatch) + gate = TestSuiteGate(project_root=FIXTURES_DIR) + file_path = FIXTURES_DIR / "fixture_test_break.py" + original = file_path.read_text(encoding="utf-8") + + gate.verify(original, original, file_path) + + assert captured["cwd"] == str(FIXTURES_DIR.resolve()) + + def test_project_root_added_to_pythonpath(self, monkeypatch): + """The project root must be present on PYTHONPATH for the subprocess.""" + captured = self._capture_run(monkeypatch) + gate = TestSuiteGate(project_root=FIXTURES_DIR) + file_path = FIXTURES_DIR / "fixture_test_break.py" + original = file_path.read_text(encoding="utf-8") + + gate.verify(original, original, file_path) + + pythonpath = captured["env"]["PYTHONPATH"].split(os.pathsep) + assert str(FIXTURES_DIR.resolve()) in pythonpath diff --git a/tests/test_verification_config.py b/tests/test_verification_config.py new file mode 100644 index 0000000..443f7c5 --- /dev/null +++ b/tests/test_verification_config.py @@ -0,0 +1,117 @@ +"""Tests for the configurable verification pipeline.""" + +from pathlib import Path + +import pytest + +from refactron.core.config import RefactronConfig, VerificationConfig +from refactron.verification.checks.test_gate import TestSuiteGate +from refactron.verification.engine import VerificationEngine + + +class TestVerificationConfigDefaults: + def test_defaults_match_historical_behaviour(self): + cfg = VerificationConfig() + assert cfg.enabled_checks == ["syntax", "import_integrity", "test_gate"] + assert cfg.short_circuit is True + assert cfg.test_gate_timeout_sec == 45 + assert cfg.pytest_extra_args == [] + + def test_refactron_config_has_verification_section(self): + cfg = RefactronConfig.default() + assert isinstance(cfg.verification, VerificationConfig) + assert cfg.verification.enabled_checks == [ + "syntax", + "import_integrity", + "test_gate", + ] + + +class TestEngineHonoursConfig: + def test_enabled_checks_subset(self): + cfg = VerificationConfig(enabled_checks=["syntax", "import_integrity"]) + engine = VerificationEngine(config=cfg) + assert [c.name for c in engine.checks] == ["syntax", "import_integrity"] + + def test_enabled_checks_ordering_respected(self): + cfg = VerificationConfig(enabled_checks=["test_gate", "syntax"]) + engine = VerificationEngine(config=cfg) + assert [c.name for c in engine.checks] == ["test_gate", "syntax"] + + def test_unknown_check_raises(self): + cfg = VerificationConfig(enabled_checks=["syntax", "bogus_check"]) + with pytest.raises(ValueError, match="Unknown verification check"): + VerificationEngine(config=cfg) + + def test_test_gate_timeout_threaded_from_config(self): + cfg = VerificationConfig(test_gate_timeout_sec=120, pytest_extra_args=["-p", "no:randomly"]) + engine = VerificationEngine(config=cfg) + gate = next(c for c in engine.checks if c.name == "test_gate") + assert gate.timeout_sec == 120 + assert gate.pytest_extra_args == ["-p", "no:randomly"] + + def test_short_circuit_default_taken_from_config(self): + cfg = VerificationConfig(short_circuit=False, enabled_checks=["syntax"]) + engine = VerificationEngine(config=cfg) + # verify() with short_circuit=None must fall back to the config value. + result = engine.verify("x = (", "y = (", Path("/tmp/t.py")) + assert result.safe_to_apply is False # syntax fails on both + + def test_explicit_checks_bypass_config(self): + cfg = VerificationConfig(enabled_checks=["syntax"]) + engine = VerificationEngine(checks=[], config=cfg) + assert engine.checks == [] + + +class TestTestSuiteGateConfig: + def test_defaults_preserved(self): + gate = TestSuiteGate() + assert gate.timeout_sec == 45 + assert gate.pytest_extra_args == [] + + def test_custom_values_stored(self): + gate = TestSuiteGate(timeout_sec=10, pytest_extra_args=["-k", "fast"]) + assert gate.timeout_sec == 10 + assert gate.pytest_extra_args == ["-k", "fast"] + + +class TestConfigFileRoundTrip: + def test_verification_section_loaded_from_yaml(self, tmp_path): + config_file = tmp_path / "refactron.yaml" + config_file.write_text( + "verification:\n" + " enabled_checks: [syntax, import_integrity]\n" + " short_circuit: false\n" + " test_gate_timeout_sec: 90\n" + " pytest_extra_args: ['-p', 'no:cacheprovider']\n", + encoding="utf-8", + ) + cfg = RefactronConfig.from_file(config_file) + assert isinstance(cfg.verification, VerificationConfig) + assert cfg.verification.enabled_checks == ["syntax", "import_integrity"] + assert cfg.verification.short_circuit is False + assert cfg.verification.test_gate_timeout_sec == 90 + assert cfg.verification.pytest_extra_args == ["-p", "no:cacheprovider"] + + def test_missing_section_uses_defaults(self, tmp_path): + """Existing configs with no `verification:` section need no migration.""" + config_file = tmp_path / "refactron.yaml" + config_file.write_text("max_parameters: 7\n", encoding="utf-8") + cfg = RefactronConfig.from_file(config_file) + assert cfg.verification.enabled_checks == [ + "syntax", + "import_integrity", + "test_gate", + ] + assert cfg.verification.test_gate_timeout_sec == 45 + + def test_to_file_then_from_file_preserves_verification(self, tmp_path): + config_file = tmp_path / "out.yaml" + cfg = RefactronConfig.default() + cfg.verification.test_gate_timeout_sec = 75 + cfg.verification.enabled_checks = ["syntax"] + cfg.to_file(config_file) + + reloaded = RefactronConfig.from_file(config_file) + assert reloaded.verification.test_gate_timeout_sec == 75 + assert reloaded.verification.enabled_checks == ["syntax"] diff --git a/tests/test_verification_engine.py b/tests/test_verification_engine.py index 4caba8f..643e2c4 100644 --- a/tests/test_verification_engine.py +++ b/tests/test_verification_engine.py @@ -71,6 +71,28 @@ def test_first_fail_short_circuits(self): assert tracker.called is False # short-circuited assert ("tracker", "Short-circuited after always_fail failed") in result.skipped_checks + def test_all_checks_runs_every_check_after_failure(self): + tracker = _TrackingCheck() + engine = VerificationEngine(checks=[_FailingCheck(), tracker]) + result = engine.verify( + "a = 1", "a = 1", Path("/tmp/test.py"), short_circuit=False + ) + assert result.safe_to_apply is False + assert tracker.called is True # ran despite the earlier failure + assert result.checks_run == ["always_fail", "tracker"] + assert result.checks_failed == ["always_fail"] + assert result.checks_passed == ["tracker"] + assert result.skipped_checks == [] # nothing skipped when not short-circuiting + + def test_all_checks_aggregates_multiple_failures(self): + engine = VerificationEngine(checks=[_FailingCheck(), _PassingCheck(), _FailingCheck()]) + result = engine.verify( + "a = 1", "a = 1", Path("/tmp/test.py"), short_circuit=False + ) + assert result.checks_failed == ["always_fail", "always_fail"] + assert result.checks_passed == ["always_pass"] + assert result.blocking_reason == "Intentional failure" # first failure + def test_confidence_is_geometric_mean(self): engine = VerificationEngine(checks=[_PassingCheck(), _TrackingCheck()]) result = engine.verify("a = 1", "a = 1", Path("/tmp/test.py")) diff --git a/tests/test_verification_result.py b/tests/test_verification_result.py index 5c7d44b..68cad62 100644 --- a/tests/test_verification_result.py +++ b/tests/test_verification_result.py @@ -1,8 +1,15 @@ """Contract tests for VerificationResult and CheckResult.""" +import json + import pytest -from refactron.verification.result import CheckResult, VerificationResult +from refactron.verification.report import format_verification_result_json +from refactron.verification.result import ( + JSON_SCHEMA_VERSION, + CheckResult, + VerificationResult, +) class TestCheckResult: @@ -95,3 +102,75 @@ def test_confidence_zero_when_no_checks_passed(self): check_results=[], ) assert vr.confidence_score == 0.0 + + +def _sample_result(safe: bool) -> VerificationResult: + cr = CheckResult( + check_name="syntax", + passed=safe, + blocking_reason="" if safe else "SyntaxError on line 5", + confidence=1.0 if safe else 0.0, + duration_ms=12, + details={"line": 5}, + ) + return VerificationResult( + safe_to_apply=safe, + passed=True, + checks_run=["syntax"], + checks_passed=["syntax"] if safe else [], + checks_failed=[] if safe else ["syntax"], + skipped_checks=[] if safe else [("test_suite", "Short-circuited after syntax failed")], + blocking_reason=None if safe else "SyntaxError on line 5", + confidence_score=1.0 if safe else 0.0, + verification_ms=42, + check_results=[cr], + ) + + +class TestCheckResultJson: + def test_to_json_dict_has_all_fields(self): + cr = CheckResult( + check_name="syntax", + passed=False, + blocking_reason="boom", + confidence=0.0, + duration_ms=7, + details={"k": "v"}, + ) + d = cr.to_json_dict() + assert d == { + "check_name": "syntax", + "passed": False, + "blocking_reason": "boom", + "confidence": 0.0, + "duration_ms": 7, + "details": {"k": "v"}, + } + + +class TestVerificationResultJson: + def test_to_json_dict_safe(self): + d = _sample_result(safe=True).to_json_dict() + assert d["schema_version"] == JSON_SCHEMA_VERSION + assert d["status"] == "safe" + assert d["safe_to_apply"] is True + assert d["blocking_reason"] is None + assert d["checks_run"] == ["syntax"] + assert d["skipped_checks"] == [] + assert d["checks"][0]["check_name"] == "syntax" + + def test_to_json_dict_blocked(self): + d = _sample_result(safe=False).to_json_dict() + assert d["status"] == "blocked" + assert d["safe_to_apply"] is False + assert d["blocking_reason"] == "SyntaxError on line 5" + assert d["checks_failed"] == ["syntax"] + assert d["skipped_checks"] == [ + {"check_name": "test_suite", "reason": "Short-circuited after syntax failed"} + ] + + def test_format_json_is_parseable(self): + text = format_verification_result_json(_sample_result(safe=True)) + parsed = json.loads(text) + assert parsed["schema_version"] == JSON_SCHEMA_VERSION + assert parsed["status"] == "safe"