From 38b3e2a49f339666886686ef206ec87cefa1df3a Mon Sep 17 00:00:00 2001 From: shrutu0929 Date: Mon, 20 Apr 2026 22:15:16 +0530 Subject: [PATCH] feat: unify pipeline session creation between analyze and run commands --- .refactron.yaml | 2 + refactron/cli/analysis.py | 29 ++-- refactron/cli/main.py | 7 + refactron/cli/run.py | 106 ++++++++++++++ refactron/core/pipeline.py | 222 +++++++++++++++++++++++++++++ refactron/core/pipeline_session.py | 77 ++++++++++ tests/test_pipeline.py | 112 +++++++++++++++ tests/test_pipeline_session.py | 208 +++++++++++++++++++++++++++ 8 files changed, 749 insertions(+), 14 deletions(-) create mode 100644 .refactron.yaml create mode 100644 refactron/cli/run.py create mode 100644 refactron/core/pipeline.py create mode 100644 refactron/core/pipeline_session.py create mode 100644 tests/test_pipeline.py create mode 100644 tests/test_pipeline_session.py diff --git a/.refactron.yaml b/.refactron.yaml new file mode 100644 index 0000000..9c1e96d --- /dev/null +++ b/.refactron.yaml @@ -0,0 +1,2 @@ +enable_metrics: true +log_level: DEBUG \ No newline at end of file diff --git a/refactron/cli/analysis.py b/refactron/cli/analysis.py index 17a3a5c..0aee562 100644 --- a/refactron/cli/analysis.py +++ b/refactron/cli/analysis.py @@ -20,6 +20,7 @@ _print_status_messages, console, ) +from refactron.core.pipeline import RefactronPipeline from refactron.cli.utils import _load_config, _setup_logging, _validate_path from refactron.core.models import CodeIssue, IssueCategory, IssueLevel from refactron.core.workspace import WorkspaceManager @@ -151,8 +152,15 @@ def analyze( # Run analysis try: with console.status("[primary]Analyzing code...[/primary]"): - refactron = Refactron(cfg) - result = refactron.analyze(target) + pipeline = RefactronPipeline(config_path=config) + result = pipeline.analyze(target) + + # Queue issues to ensure the session is fully populated for potential fixes + pipeline.queue_issues(result.all_issues) + + # Persist session + pipeline.save_session() + except Exception as e: console.print(f"[red]Analysis failed: {e}[/red]") console.print("[dim]Tip: Check if all files have valid Python syntax[/dim]") @@ -172,18 +180,11 @@ def analyze( # Show metrics if requested if show_metrics and cfg.enable_metrics: - from refactron.core.metrics import get_metrics_collector - - console.print("\n[bold]Metrics Summary:[/bold]") - collector = get_metrics_collector() - metrics_summary = collector.get_analysis_summary() - console.print( - f" Total analysis time: {metrics_summary.get('total_analysis_time_ms', 0):.2f}ms" - ) - console.print( - f" Average time per file: {metrics_summary.get('average_time_per_file_ms', 0):.2f}ms" - ) - console.print(f" Success rate: {metrics_summary.get('success_rate_percent', 0):.1f}%") + console.print("\n[bold]Pipeline Session Summary:[/bold]") + console.print(f" Session ID: [dim]{pipeline.session.id}[/dim]") + console.print(f" Analysis time: {pipeline.session.analyze_ms:.2f}ms") + console.print(f" Queuing time: {pipeline.session.queue_ms:.2f}ms") + console.print(f" Issues queued: {len(result.all_issues)}") # Exit with error code if critical issues found if summary["critical"] > 0: diff --git a/refactron/cli/main.py b/refactron/cli/main.py index 04426b8..e2b1892 100644 --- a/refactron/cli/main.py +++ b/refactron/cli/main.py @@ -143,6 +143,13 @@ def main(ctx: click.Context) -> None: except ImportError: pass +try: + from refactron.cli.run import run + + main.add_command(run) +except ImportError: + pass + try: from refactron.cli.cicd import feedback, generate_cicd, init diff --git a/refactron/cli/run.py b/refactron/cli/run.py new file mode 100644 index 0000000..83b6a45 --- /dev/null +++ b/refactron/cli/run.py @@ -0,0 +1,106 @@ +""" +Refactron CLI - Run Module. +Provides the 'run' command to execute Refactron as a connected pipeline. +""" + +import os +from typing import Optional +from pathlib import Path +import click + +from refactron.cli.ui import console, _auth_banner +from refactron.cli.utils import _validate_path +from refactron.core.pipeline import RefactronPipeline + + +@click.command() +@click.argument("target", type=click.Path(exists=True), required=False) +@click.option( + "--incremental/--no-incremental", + default=False, + help="Enable or disable incremental analysis for the pipeline run (off by default)", +) +@click.option( + "--verbose", + is_flag=True, + help="Show detailed timing information for each pipeline phase", +) +@click.option( + "--fail-fast", + is_flag=True, + help="Stop processing if an error occurs during fix application", +) +def run(target: Optional[str], incremental: bool, verbose: bool, fail_fast: bool) -> None: + """ + Run Refactron as a connected pipeline session. + """ + console.print() + _auth_banner("Pipeline Run") + console.print() + + target_path = _validate_path(target) if target else Path.cwd() + debug_mode = os.getenv("REFACTRON_DEBUG") == "1" + + with console.status("[primary]Executing pipeline run...[/primary]"): + try: + pipeline = RefactronPipeline() + # 1. Analyze + result = pipeline.analyze(target_path, use_incremental=incremental) + + # 2. Queue + queued = pipeline.queue_issues(result.all_issues) + + # 3. Persist session + pipeline.save_session() + + # 4. Apply (simulated for now in 'run' command unless --apply added, + # but we'll show the summary infrastructure) + # For this task, we assume 'run' might be extended to apply fixes. + # If we don't apply, session counts remain 0. + + console.print(f"[green]Pipeline session completed successfully.[/green]") + console.print(f"Session ID: [dim]{pipeline.session.id}[/dim]") + console.print(f"Total files analyzed: {result.total_files}") + console.print(f"Total issues found: {result.total_issues}") + console.print(f"Issues queued for fix: {len(queued)}") + + if pipeline.session.files_attempted > 0: + console.print(f"Files attempted: {pipeline.session.files_attempted}") + console.print(f"Files succeeded: [green]{pipeline.session.files_succeeded}[/green]") + console.print(f"Files failed: [red]{pipeline.session.files_failed}[/red]") + if pipeline.session.backup_session_id: + console.print( + f"Backup Session ID: [cyan]{pipeline.session.backup_session_id}[/cyan]" + ) + + if pipeline.session.blocked_fixes: + console.print("\n[warning]Blocked Fixes:[/warning]") + for block in pipeline.session.blocked_fixes: + file = block.get("file", "Unknown") + reason = block.get("reason") or block.get("error", "Unknown error") + console.print(f" [red]✗[/red] {file}: {reason}") + + if verbose or debug_mode: + console.print("\n[highlight]Pipeline Phase Timings:[/highlight]") + console.print( + f" 󰄰 [secondary]Analysis:[/secondary] {pipeline.session.analyze_ms:.2f}ms" + ) + console.print( + f" 󰄰 [secondary]Queuing:[/secondary] {pipeline.session.queue_ms:.2f}ms" + ) + if pipeline.session.apply_ms > 0: + console.print( + f" 󰄰 [secondary]Applying:[/secondary] {pipeline.session.apply_ms:.2f}ms" + ) + if pipeline.session.verify_ms > 0: + console.print( + f" 󰄰 [secondary]Verify:[/secondary] {pipeline.session.verify_ms:.2f}ms" + ) + + except Exception as e: + console.print(f"[red]Pipeline run failed: {e}[/red]") + if debug_mode: + import traceback + + console.print(traceback.format_exc()) + raise SystemExit(1) diff --git a/refactron/core/pipeline.py b/refactron/core/pipeline.py new file mode 100644 index 0000000..dd387fb --- /dev/null +++ b/refactron/core/pipeline.py @@ -0,0 +1,222 @@ +"""Pipeline orchestration for automated and session-based flows.""" + +import logging +from collections import defaultdict +from pathlib import Path +import time +from typing import Optional, Union, List, Dict, Any + +from refactron.core.config import RefactronConfig +from refactron.core.refactron import Refactron +from refactron.core.analysis_result import AnalysisResult +from refactron.core.models import CodeIssue +from refactron.autofix.engine import AutoFixEngine +from refactron.core.pipeline_session import PipelineSession +from refactron.core.backup import BackupRollbackSystem + +# Configure logging +logger = logging.getLogger(__name__) + + +class RefactronPipeline: + """Automated pipeline execution for session-based and CI/CD flows.""" + + def __init__( + self, + project_root: Optional[Union[str, Path]] = None, + config_path: Optional[Union[str, Path]] = None, + ): + """ + Initialize the pipeline. + + Args: + project_root: Optional root directory of the project. + config_path: Optional explicit configuration path. + """ + self.project_root = Path(project_root) if project_root else None + self.config_path = Path(config_path) if config_path else None + + self._config = self._load_config() + # Enforce pipeline default behavior (full scan) unless overridden later + self._config.enable_incremental_analysis = False + self.refactron = Refactron(self._config) + self.autofix_engine = AutoFixEngine() + self._fixer_cache = {} + self.session = PipelineSession() + + def _load_config(self) -> RefactronConfig: + if self.config_path and self.config_path.exists(): + return RefactronConfig.from_file(self.config_path) + + root = self.project_root or Path.cwd() + yaml_path = root / ".refactron.yaml" + if yaml_path.exists(): + return RefactronConfig.from_file(yaml_path) + return RefactronConfig.default() + + def analyze( + self, target: Union[str, Path], use_incremental: Optional[bool] = None + ) -> AnalysisResult: + """ + Run analysis as part of a pipeline flow. + + Pipeline-only overrides: + - `enable_incremental_analysis` is forced to False by default to guarantee + a fully fresh, reproducible analysis in CI/CD or pipeline environments. + + Args: + target: Path to file or directory to analyze. + use_incremental: Optional override to enable incremental analysis. + + Returns: + AnalysisResult containing issues and metrics. + """ + if use_incremental is not None: + self.refactron.config.enable_incremental_analysis = use_incremental + + start_time = time.time() + result = self.refactron.analyze(target) + duration_ms = (time.time() - start_time) * 1000 + + # Update session metrics + self.session.analyze_ms = duration_ms + self.session.metadata["target"] = str(target) + return result + + def save_session(self, directory: Optional[Path] = None) -> Path: + """ + Persist the current session to disk. + """ + return self.session.save(directory) + + def queue_issues(self, issues: List[CodeIssue]) -> list: + """Queue issues mapped to their responsible fixers.""" + start_time = time.time() + queued = [] + for issue in issues: + fixer_name = self._find_fixer_name(issue) + if fixer_name: + queued.append({"issue": issue, "fixer_name": fixer_name}) + self.session.queue_ms = (time.time() - start_time) * 1000 + return queued + + def _find_fixer_name(self, issue: CodeIssue) -> Optional[str]: + """Find the appropriate fixer name for a given issue.""" + # Use rule_id or category as cache key + issue_type = issue.rule_id or str(getattr(issue, "category", type(issue).__name__)) + if issue_type in self._fixer_cache: + return self._fixer_cache[issue_type] + + candidate_name = None + + # 1. Ask AutoFixEngine directly without preview (O(1) dictionary lookup) + if hasattr(self, "autofix_engine") and self.autofix_engine.can_fix(issue): + candidate_name = issue.rule_id + else: + # 2. Fallback to preview-based resolution for ambiguous cases + if hasattr(self, "autofix_engine"): + for name, fixer in self.autofix_engine.fixers.items(): + try: + preview_result = fixer.preview(issue, "x = 1\n") + if preview_result and getattr(preview_result, "success", False): + candidate_name = name + break + except Exception: + continue + + self._fixer_cache[issue_type] = candidate_name + return candidate_name + + def apply( + self, queued_issues: List[dict], preview: bool = False, fail_fast: bool = False + ) -> list: + """ + Apply fixes for queued issues. + + Orchestration Policies: + - Best-effort (default): Continues applying fixes to other files even if one file fails. + - Fail-fast (fail_fast=True): Stops the entire application process on the first file failure. + + Args: + queued_issues: List of dicts containing 'issue' and 'fixer_name'. + preview: If True, only simulate fixes (no disk writes). + fail_fast: If True, stop on first application error. + + Returns: + List of results for each application attempt. + """ + start_time = time.time() + results = [] + + # 1. Group issues by file to ensure atomic/cumulative updates + file_map = defaultdict(list) + for item in queued_issues: + file_map[item["issue"].file_path].append(item) + + self.session.files_attempted = len(file_map) + + # 2. Setup Backup + backup_system = BackupRollbackSystem(self.project_root) + session_id = None + if not preview and self._config.backup_enabled and file_map: + try: + session_id, _ = backup_system.prepare_for_refactoring( + list(file_map.keys()), description=f"Pipeline session {self.session.id}" + ) + self.session.backup_session_id = session_id + except Exception as e: + logger.warning(f"Failed to create backup session: {e}") + + # 3. Apply fixes file by file + for file_path, items in file_map.items(): + success = True + try: + content = file_path.read_text(encoding="utf-8") + current_code = content + + # Apply each fixer sequentially for this file + for item in items: + issue = item["issue"] + fix_result = self.autofix_engine.fix(issue, current_code, preview=preview) + results.append(fix_result) + + if fix_result.success: + current_code = fix_result.fixed_code + else: + success = False + self.session.blocked_fixes.append( + { + "file": str(file_path), + "issue": issue.message, + "reason": fix_result.reason, + } + ) + + # Write back if successful and not in preview + if not preview and success: + file_path.write_text(current_code, encoding="utf-8") + self.session.files_succeeded += 1 + elif not success: + self.session.files_failed += 1 + if fail_fast: + break + else: + # Preview mode success + self.session.files_succeeded += 1 + + except Exception as e: + self.session.files_failed += 1 + self.session.blocked_fixes.append({"file": str(file_path), "error": str(e)}) + results.append(None) + if fail_fast: + break + + self.session.apply_ms = (time.time() - start_time) * 1000 + return results + + def verify(self, target: Union[str, Path]) -> AnalysisResult: + """Verify the state of the project after fixes.""" + start_time = time.time() + result = self.analyze(target) + self.session.verify_ms = (time.time() - start_time) * 1000 + return result diff --git a/refactron/core/pipeline_session.py b/refactron/core/pipeline_session.py new file mode 100644 index 0000000..dade736 --- /dev/null +++ b/refactron/core/pipeline_session.py @@ -0,0 +1,77 @@ +"""Pipeline session and timing tracking.""" + +import uuid +from dataclasses import dataclass, field, asdict +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, List, Optional + + +@dataclass +class PipelineSession: + """ + Session container for a pipeline run, including timing metrics for each phase. + """ + + id: str = field(default_factory=lambda: str(uuid.uuid4())) + analyze_ms: float = 0.0 + queue_ms: float = 0.0 + apply_ms: float = 0.0 + verify_ms: float = 0.0 + + # Application metrics + files_attempted: int = 0 + files_succeeded: int = 0 + files_failed: int = 0 + blocked_fixes: List[Dict[str, Any]] = field(default_factory=list) + backup_session_id: Optional[str] = None + + metadata: Dict[str, Any] = field(default_factory=dict) + timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat()) + + def to_dict(self) -> Dict[str, Any]: + """Convert the session to a JSON-serializable dictionary.""" + return asdict(self) + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "PipelineSession": + """Create a session from a dictionary.""" + return cls(**data) + + def save(self, directory: Optional[Path] = None) -> Path: + """ + Persist the session to disk. + + Args: + directory: Optional directory to save to. Defaults to ~/.refactron/sessions/ + """ + import json + save_dir = directory or (Path.home() / ".refactron" / "sessions") + save_dir.mkdir(parents=True, exist_ok=True) + + file_path = save_dir / f"{self.id}.json" + with open(file_path, "w", encoding="utf-8") as f: + json.dump(self.to_dict(), f, indent=2) + + # Also update the 'latest' pointer + latest_path = save_dir / "latest.json" + with open(latest_path, "w", encoding="utf-8") as f: + json.dump({"latest_session_id": self.id}, f) + + return file_path + + @classmethod + def from_id( + cls, session_id: str, directory: Optional[Path] = None + ) -> Optional["PipelineSession"]: + """Load a session by ID.""" + import json + save_dir = directory or (Path.home() / ".refactron" / "sessions") + file_path = save_dir / f"{session_id}.json" + + if not file_path.exists(): + return None + + with open(file_path, "r", encoding="utf-8") as f: + data = json.load(f) + return cls.from_dict(data) diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py new file mode 100644 index 0000000..46a52b4 --- /dev/null +++ b/tests/test_pipeline.py @@ -0,0 +1,112 @@ +"""Tests for the Refactron pipeline module.""" + +import tempfile +from pathlib import Path +from unittest.mock import patch, MagicMock + +from refactron.core.pipeline import RefactronPipeline +from refactron.core.config import RefactronConfig + + +def test_pipeline_loads_project_config(): + """Test that RefactronPipeline loads configuration from the project root.""" + with tempfile.TemporaryDirectory() as temp_dir: + root_path = Path(temp_dir) + + config_path = root_path / ".refactron.yaml" + config_path.write_text("max_function_complexity: 99\nenable_metrics: false\n") + + target_file = root_path / "dummy.py" + target_file.write_text("def foo():\n pass\n") + + with patch("refactron.core.pipeline.Refactron.analyze") as mock_analyze: + mock_analyze.return_value = "mock_result" + pipeline = RefactronPipeline(project_root=root_path) + + result = pipeline.analyze(target_file) + + assert result == "mock_result" + config_passed = pipeline.refactron.config + + assert isinstance(config_passed, RefactronConfig) + assert config_passed.max_function_complexity == 99 + assert config_passed.enable_metrics is False + assert config_passed.enable_incremental_analysis is False + + +def test_pipeline_incremental_override(): + """Test that RefactronPipeline can override the incremental setting.""" + with tempfile.TemporaryDirectory() as temp_dir: + root_path = Path(temp_dir) + + target_file = root_path / "dummy.py" + target_file.write_text("def foo():\n pass\n") + + with patch("refactron.core.pipeline.Refactron.analyze"): + pipeline = RefactronPipeline(project_root=root_path) + pipeline.analyze(target_file, use_incremental=True) + + config_passed = pipeline.refactron.config + assert config_passed.enable_incremental_analysis is True + + +def test_pipeline_queue_issues_caching(): + """Test that queue_issues leverages caching and direct mapping without multiple previews.""" + from refactron.core.models import CodeIssue, IssueCategory, IssueLevel + from refactron.autofix.models import FixResult + from pathlib import Path + + with tempfile.TemporaryDirectory() as temp_dir: + pipeline = RefactronPipeline(project_root=Path(temp_dir)) + + # Mock AutoFixEngine on the pipeline directly + pipeline.autofix_engine = MagicMock() + + calls = {"preview": 0} + + # Create a mock fixer that counts preview calls + mock_fixer = MagicMock() + mock_fixer.name = "mock_rule" + + def mock_preview(issue, code): + calls["preview"] += 1 + return FixResult(success=True, reason="") + + mock_fixer.preview.side_effect = mock_preview + + pipeline.autofix_engine.fixers = {"mock_rule": mock_fixer} + + # Mock can_fix to return False so it falls back to preview ONCE per type + pipeline.autofix_engine.can_fix.return_value = False + + issue1 = CodeIssue( + category=IssueCategory.STYLE, + level=IssueLevel.WARNING, + message="Test issue", + file_path=Path("dummy.py"), + line_number=1, + rule_id="unknown_rule_1", + ) + issue2 = CodeIssue( + category=IssueCategory.STYLE, + level=IssueLevel.WARNING, + message="Test issue 2", + file_path=Path("dummy.py"), + line_number=2, + rule_id="unknown_rule_1", + ) + issue3 = CodeIssue( + category=IssueCategory.STYLE, + level=IssueLevel.WARNING, + message="Test issue 3", + file_path=Path("dummy.py"), + line_number=3, + rule_id="unknown_rule_1", + ) + + queued = pipeline.queue_issues([issue1, issue2, issue3]) + + assert len(queued) == 3 + # Should only have run preview 1 time, because the subsequent issues have the same rule_id and hit the cache. + assert calls["preview"] == 1 + assert pipeline._fixer_cache["unknown_rule_1"] == "mock_rule" diff --git a/tests/test_pipeline_session.py b/tests/test_pipeline_session.py new file mode 100644 index 0000000..8b17858 --- /dev/null +++ b/tests/test_pipeline_session.py @@ -0,0 +1,208 @@ +"""Tests for PipelineSession and pipeline timing persistence.""" + +import time +from pathlib import Path +from unittest.mock import MagicMock, patch + +from refactron.core.pipeline import RefactronPipeline +from refactron.core.pipeline_session import PipelineSession +from refactron.core.models import CodeIssue, IssueCategory, IssueLevel + + +def test_session_initialization(): + """Test that PipelineSession initializes with default values.""" + session = PipelineSession() + assert session.id + assert session.analyze_ms == 0.0 + assert session.queue_ms == 0.0 + assert session.apply_ms == 0.0 + assert session.verify_ms == 0.0 + assert isinstance(session.metadata, dict) + + +def test_session_serialization(): + """Test that PipelineSession can be converted to and from a dictionary.""" + session = PipelineSession(analyze_ms=10.5, queue_ms=5.2) + data = session.to_dict() + assert data["analyze_ms"] == 10.5 + assert data["queue_ms"] == 5.2 + + new_session = PipelineSession.from_dict(data) + assert new_session.id == session.id + assert new_session.analyze_ms == 10.5 + + +def test_pipeline_timing_integration(): + """Test that RefactronPipeline populates session timings.""" + with patch("refactron.core.pipeline.Refactron.analyze") as mock_analyze: + mock_analyze.return_value = MagicMock(all_issues=[]) + + pipeline = RefactronPipeline() + + # 1. Analyze + pipeline.analyze(Path(".")) + assert pipeline.session.analyze_ms > 0 + + # 2. Queue + issue = CodeIssue( + category=IssueCategory.STYLE, + level=IssueLevel.INFO, + message="Test", + file_path=Path("test.py"), + line_number=1, + ) + pipeline.queue_issues([issue]) + assert pipeline.session.queue_ms > 0 + + +def test_pipeline_apply_verify_timings(): + """Test that apply and verify phases record timings.""" + pipeline = RefactronPipeline() + + # Mock apply logic + pipeline.autofix_engine.fix = MagicMock(return_value=MagicMock(success=True, fixed_code="")) + + with patch.object(Path, "read_text", return_value="code"): + with patch.object(Path, "write_text"): + issue = MagicMock(spec=CodeIssue) + issue.file_path = Path("test.py") + pipeline.apply([{"issue": issue, "fixer_name": "test"}]) + assert pipeline.session.apply_ms > 0 + + with patch.object(RefactronPipeline, "analyze", return_value=MagicMock()): + pipeline.verify(Path(".")) + assert pipeline.session.verify_ms > 0 + + +def test_pipeline_apply_multi_file_best_effort(): + """Test best-effort behavior (continues on failure).""" + pipeline = RefactronPipeline() + + # Mock fixers + success_fix = MagicMock(success=True, fixed_code="fixed") + fail_fix = MagicMock(success=False, reason="Blocked") + + pipeline.autofix_engine.fix = MagicMock(side_effect=[success_fix, fail_fix]) + + issue1 = CodeIssue( + category=IssueCategory.STYLE, + level=IssueLevel.INFO, + message="Succeed", + file_path=Path("success.py"), + line_number=1, + ) + issue2 = CodeIssue( + category=IssueCategory.STYLE, + level=IssueLevel.INFO, + message="Fail", + file_path=Path("fail.py"), + line_number=1, + ) + + with patch.object(Path, "read_text", return_value="original"): + with patch.object(Path, "write_text"): + with patch("refactron.core.pipeline.BackupRollbackSystem") as mock_backup: + mock_backup.return_value.prepare_for_refactoring.return_value = ("backup_123", []) + + pipeline.apply( + [ + {"issue": issue1, "fixer_name": "fix1"}, + {"issue": issue2, "fixer_name": "fix2"}, + ], + fail_fast=False, + ) + + assert pipeline.session.files_attempted == 2 + assert pipeline.session.files_succeeded == 1 + assert pipeline.session.files_failed == 1 + assert len(pipeline.session.blocked_fixes) == 1 + assert pipeline.session.backup_session_id == "backup_123" + + +def test_pipeline_apply_fail_fast(): + """Test fail-fast behavior (stops on first failure).""" + pipeline = RefactronPipeline() + + # Mock fixers + fail_fix = MagicMock(success=False, reason="Blocked") + success_fix = MagicMock(success=True, fixed_code="fixed") + + pipeline.autofix_engine.fix = MagicMock(side_effect=[fail_fix, success_fix]) + + # Note: file_map depends on dict order or hash? + # Usually we want to ensure fail.py is processed first for this test. + # Grouping by file path. + + issue1 = CodeIssue( + category=IssueCategory.STYLE, + level=IssueLevel.INFO, + message="Fail", + file_path=Path("a_fail.py"), + line_number=1, + ) + issue2 = CodeIssue( + category=IssueCategory.STYLE, + level=IssueLevel.INFO, + message="Succeed", + file_path=Path("b_success.py"), + line_number=1, + ) + + with patch.object(Path, "read_text", return_value="original"): + with patch.object(Path, "write_text"): + pipeline.apply( + [{"issue": issue1, "fixer_name": "fix1"}, {"issue": issue2, "fixer_name": "fix2"}], + fail_fast=True, + ) + + # Should stop after first failure (a_fail.py) + assert pipeline.session.files_failed == 1 + assert pipeline.session.files_succeeded == 0 + # The second file should not have been attempted successfully + # Wait, files_attempted is calculated at start. + assert pipeline.session.files_attempted == 2 + + +def test_session_persistence(): + """Test that PipelineSession can be saved and loaded from disk.""" + import tempfile + import json + + with tempfile.TemporaryDirectory() as temp_dir: + temp_path = Path(temp_dir) + session = PipelineSession(analyze_ms=100.0) + session_id = session.id + + # Save + save_path = session.save(directory=temp_path) + assert save_path.exists() + assert save_path.name == f"{session_id}.json" + + # Check latest pointer + latest_path = temp_path / "latest.json" + assert latest_path.exists() + with open(latest_path, "r") as f: + latest_data = json.load(f) + assert latest_data["latest_session_id"] == session_id + + # Load + loaded_session = PipelineSession.from_id(session_id, directory=temp_path) + assert loaded_session is not None + assert loaded_session.id == session_id + assert loaded_session.analyze_ms == 100.0 + + +def test_pipeline_save_session_integration(): + """Test that RefactronPipeline.save_session works.""" + import tempfile + + with tempfile.TemporaryDirectory() as temp_dir: + temp_path = Path(temp_dir) + pipeline = RefactronPipeline() + pipeline.session.analyze_ms = 50.0 + + save_path = pipeline.save_session(directory=temp_path) + assert save_path.exists() + + loaded = PipelineSession.from_id(pipeline.session.id, directory=temp_path) + assert loaded.analyze_ms == 50.0