Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .refactron.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
enable_metrics: true
log_level: DEBUG
7 changes: 7 additions & 0 deletions refactron/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,13 @@ def main(ctx: click.Context) -> None:
except ImportError:
pass

try:
from refactron.cli.run import run

main.add_command(run)
except ImportError:
pass

try:
from refactron.cli.cicd import feedback, generate_cicd, init

Expand Down
69 changes: 69 additions & 0 deletions refactron/cli/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""
Refactron CLI - Run Module.
Provides the 'run' command to execute Refactron as a connected pipeline.
"""
import os
from typing import Optional
from pathlib import Path
import click

from refactron.cli.ui import console, _auth_banner
from refactron.cli.utils import _validate_path
from refactron.core.pipeline import RefactronPipeline

@click.command()
@click.argument("target", type=click.Path(exists=True), required=False)
@click.option(
"--incremental/--no-incremental",
default=False,
help="Enable or disable incremental analysis for the pipeline run (off by default)",
)
@click.option(
"--verbose",
is_flag=True,
help="Show detailed timing information for each pipeline phase",
)
def run(target: Optional[str], incremental: bool, verbose: bool) -> None:
"""
Run Refactron as a connected pipeline session.
"""
console.print()
_auth_banner("Pipeline Run")
console.print()

target_path = _validate_path(target) if target else Path.cwd()
debug_mode = os.getenv("REFACTRON_DEBUG") == "1"

with console.status("[primary]Executing pipeline run...[/primary]"):
try:
pipeline = RefactronPipeline()
# 1. Analyze
result = pipeline.analyze(target_path, use_incremental=incremental)

# 2. Queue (Always run queue to populate queue_ms, even if we don't fix)
queued = pipeline.queue_issues(result.all_issues)

# Note: We don't call apply/verify yet in the basic 'run' command
# unless we add more flags, but we've enabled the infrastructure.

console.print(f"[green]Pipeline session completed successfully.[/green]")
console.print(f"Session ID: [dim]{pipeline.session.id}[/dim]")
console.print(f"Total files analyzed: {result.total_files}")
console.print(f"Total issues found: {result.total_issues}")
console.print(f"Issues queued for fix: {len(queued)}")

if verbose or debug_mode:
console.print("\n[highlight]Pipeline Phase Timings:[/highlight]")
console.print(f" 󰄰 [secondary]Analysis:[/secondary] {pipeline.session.analyze_ms:.2f}ms")
console.print(f" 󰄰 [secondary]Queuing:[/secondary] {pipeline.session.queue_ms:.2f}ms")
if pipeline.session.apply_ms > 0:
console.print(f" 󰄰 [secondary]Applying:[/secondary] {pipeline.session.apply_ms:.2f}ms")
if pipeline.session.verify_ms > 0:
console.print(f" 󰄰 [secondary]Verify:[/secondary] {pipeline.session.verify_ms:.2f}ms")

except Exception as e:
console.print(f"[red]Pipeline run failed: {e}[/red]")
if debug_mode:
import traceback
console.print(traceback.format_exc())
raise SystemExit(1)
135 changes: 135 additions & 0 deletions refactron/core/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
"""Pipeline orchestration for automated and session-based flows."""

from pathlib import Path
import time
from typing import Optional, Union, List

from refactron.core.config import RefactronConfig
from refactron.core.refactron import Refactron
from refactron.core.analysis_result import AnalysisResult
from refactron.core.models import CodeIssue
from refactron.autofix.engine import AutoFixEngine
from refactron.core.pipeline_session import PipelineSession

class RefactronPipeline:
"""Automated pipeline execution for session-based and CI/CD flows."""

def __init__(self, project_root: Optional[Union[str, Path]] = None, config_path: Optional[Union[str, Path]] = None):
"""
Initialize the pipeline.

Args:
project_root: Optional root directory of the project.
config_path: Optional explicit configuration path.
"""
self.project_root = Path(project_root) if project_root else None
self.config_path = Path(config_path) if config_path else None

self._config = self._load_config()
# Enforce pipeline default behavior (full scan) unless overridden later
self._config.enable_incremental_analysis = False
self.refactron = Refactron(self._config)
self.autofix_engine = AutoFixEngine()
self._fixer_cache = {}
self.session = PipelineSession()

def _load_config(self) -> RefactronConfig:
if self.config_path and self.config_path.exists():
return RefactronConfig.from_file(self.config_path)

root = self.project_root or Path.cwd()
yaml_path = root / ".refactron.yaml"
if yaml_path.exists():
return RefactronConfig.from_file(yaml_path)
return RefactronConfig.default()

def analyze(self, target: Union[str, Path], use_incremental: Optional[bool] = None) -> AnalysisResult:
"""
Run analysis as part of a pipeline flow.

Pipeline-only overrides:
- `enable_incremental_analysis` is forced to False by default to guarantee
a fully fresh, reproducible analysis in CI/CD or pipeline environments.

Args:
target: Path to file or directory to analyze.
use_incremental: Optional override to enable incremental analysis.

Returns:
AnalysisResult containing issues and metrics.
"""
if use_incremental is not None:
self.refactron.config.enable_incremental_analysis = use_incremental

start_time = time.time()
result = self.refactron.analyze(target)
self.session.analyze_ms = (time.time() - start_time) * 1000
return result

def queue_issues(self, issues: List[CodeIssue]) -> list:
"""Queue issues mapped to their responsible fixers."""
start_time = time.time()
queued = []
for issue in issues:
fixer_name = self._find_fixer_name(issue)
if fixer_name:
queued.append({"issue": issue, "fixer_name": fixer_name})
self.session.queue_ms = (time.time() - start_time) * 1000
return queued

def _find_fixer_name(self, issue: CodeIssue) -> Optional[str]:
"""Find the appropriate fixer name for a given issue."""
# Use rule_id or category as cache key
issue_type = issue.rule_id or str(getattr(issue, 'category', type(issue).__name__))
if issue_type in self._fixer_cache:
return self._fixer_cache[issue_type]

candidate_name = None

# 1. Ask AutoFixEngine directly without preview (O(1) dictionary lookup)
if hasattr(self, 'autofix_engine') and self.autofix_engine.can_fix(issue):
candidate_name = issue.rule_id
else:
# 2. Fallback to preview-based resolution for ambiguous cases
if hasattr(self, 'autofix_engine'):
for name, fixer in self.autofix_engine.fixers.items():
try:
preview_result = fixer.preview(issue, "x = 1\n")
if preview_result and getattr(preview_result, "success", False):
candidate_name = name
break
except Exception:
continue

self._fixer_cache[issue_type] = candidate_name
return candidate_name

def apply(self, queued_issues: List[dict], preview: bool = False) -> list:
"""Apply fixes for queued issues."""
start_time = time.time()
results = []

# We need the original code to apply fixes.
# For simplicity, we'll read it from the issues' file paths.
# In a real batch, we should group by file.
for item in queued_issues:
issue = item["issue"]
try:
code = issue.file_path.read_text(encoding="utf-8")
result = self.autofix_engine.fix(issue, code, preview=preview)
results.append(result)

if not preview and result.success:
issue.file_path.write_text(result.fixed_code, encoding="utf-8")
except Exception as e:
results.append(None) # Or handle error

self.session.apply_ms = (time.time() - start_time) * 1000
return results

def verify(self, target: Union[str, Path]) -> AnalysisResult:
"""Verify the state of the project after fixes."""
start_time = time.time()
result = self.analyze(target)
self.session.verify_ms = (time.time() - start_time) * 1000
return result
28 changes: 28 additions & 0 deletions refactron/core/pipeline_session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""Pipeline session and timing tracking."""

import uuid
from dataclasses import dataclass, field, asdict
from typing import Any, Dict, Optional


@dataclass
class PipelineSession:
"""
Session container for a pipeline run, including timing metrics for each phase.
"""

id: str = field(default_factory=lambda: str(uuid.uuid4()))
analyze_ms: float = 0.0
queue_ms: float = 0.0
apply_ms: float = 0.0
verify_ms: float = 0.0
metadata: Dict[str, Any] = field(default_factory=dict)

def to_dict(self) -> Dict[str, Any]:
"""Convert the session to a JSON-serializable dictionary."""
return asdict(self)

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "PipelineSession":
"""Create a session from a dictionary."""
return cls(**data)
107 changes: 107 additions & 0 deletions tests/test_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
"""Tests for the Refactron pipeline module."""

import tempfile
from pathlib import Path
from unittest.mock import patch, MagicMock

from refactron.core.pipeline import RefactronPipeline
from refactron.core.config import RefactronConfig

def test_pipeline_loads_project_config():
"""Test that RefactronPipeline loads configuration from the project root."""
with tempfile.TemporaryDirectory() as temp_dir:
root_path = Path(temp_dir)

config_path = root_path / ".refactron.yaml"
config_path.write_text("max_function_complexity: 99\nenable_metrics: false\n")

target_file = root_path / "dummy.py"
target_file.write_text("def foo():\n pass\n")

with patch("refactron.core.pipeline.Refactron.analyze") as mock_analyze:
mock_analyze.return_value = "mock_result"
pipeline = RefactronPipeline(project_root=root_path)

result = pipeline.analyze(target_file)

assert result == "mock_result"
config_passed = pipeline.refactron.config

assert isinstance(config_passed, RefactronConfig)
assert config_passed.max_function_complexity == 99
assert config_passed.enable_metrics is False
assert config_passed.enable_incremental_analysis is False

def test_pipeline_incremental_override():
"""Test that RefactronPipeline can override the incremental setting."""
with tempfile.TemporaryDirectory() as temp_dir:
root_path = Path(temp_dir)

target_file = root_path / "dummy.py"
target_file.write_text("def foo():\n pass\n")

with patch("refactron.core.pipeline.Refactron.analyze"):
pipeline = RefactronPipeline(project_root=root_path)
pipeline.analyze(target_file, use_incremental=True)

config_passed = pipeline.refactron.config
assert config_passed.enable_incremental_analysis is True

def test_pipeline_queue_issues_caching():
"""Test that queue_issues leverages caching and direct mapping without multiple previews."""
from refactron.core.models import CodeIssue, IssueCategory, IssueLevel
from refactron.autofix.models import FixResult
from pathlib import Path

with tempfile.TemporaryDirectory() as temp_dir:
pipeline = RefactronPipeline(project_root=Path(temp_dir))

# Mock AutoFixEngine on the pipeline directly
pipeline.autofix_engine = MagicMock()

calls = {"preview": 0}

# Create a mock fixer that counts preview calls
mock_fixer = MagicMock()
mock_fixer.name = "mock_rule"
def mock_preview(issue, code):
calls["preview"] += 1
return FixResult(success=True, reason="")
mock_fixer.preview.side_effect = mock_preview

pipeline.autofix_engine.fixers = {"mock_rule": mock_fixer}

# Mock can_fix to return False so it falls back to preview ONCE per type
pipeline.autofix_engine.can_fix.return_value = False

issue1 = CodeIssue(
category=IssueCategory.STYLE,
level=IssueLevel.WARNING,
message="Test issue",
file_path=Path("dummy.py"),
line_number=1,
rule_id="unknown_rule_1"
)
issue2 = CodeIssue(
category=IssueCategory.STYLE,
level=IssueLevel.WARNING,
message="Test issue 2",
file_path=Path("dummy.py"),
line_number=2,
rule_id="unknown_rule_1"
)
issue3 = CodeIssue(
category=IssueCategory.STYLE,
level=IssueLevel.WARNING,
message="Test issue 3",
file_path=Path("dummy.py"),
line_number=3,
rule_id="unknown_rule_1"
)

queued = pipeline.queue_issues([issue1, issue2, issue3])

assert len(queued) == 3
# Should only have run preview 1 time, because the subsequent issues have the same rule_id and hit the cache.
assert calls["preview"] == 1
assert pipeline._fixer_cache["unknown_rule_1"] == "mock_rule"
Loading
Loading