Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .refactron.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
enable_metrics: true
log_level: DEBUG
28 changes: 16 additions & 12 deletions refactron/cli/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
_print_status_messages,
console,
)
from refactron.core.pipeline import RefactronPipeline
from refactron.cli.utils import _load_config, _setup_logging, _validate_path
from refactron.core.models import CodeIssue, IssueCategory, IssueLevel
from refactron.core.workspace import WorkspaceManager
Expand Down Expand Up @@ -188,6 +189,16 @@ def analyze(

# Run analysis
try:
with console.status("[primary]Analyzing code...[/primary]"):
pipeline = RefactronPipeline(config_path=config)
result = pipeline.analyze(target)

# Queue issues to ensure the session is fully populated for potential fixes
pipeline.queue_issues(result.all_issues)

# Persist session
pipeline.save_session()

if output_format != "json":
with console.status("[primary]Analyzing code...[/primary]"):
refactron = Refactron(cfg)
Expand Down Expand Up @@ -245,18 +256,11 @@ def analyze(

# Show metrics if requested
if show_metrics and cfg.enable_metrics:
from refactron.core.metrics import get_metrics_collector

console.print("\n[bold]Metrics Summary:[/bold]")
collector = get_metrics_collector()
metrics_summary = collector.get_analysis_summary()
console.print(
f" Total analysis time: {metrics_summary.get('total_analysis_time_ms', 0):.2f}ms"
)
console.print(
f" Average time per file: {metrics_summary.get('average_time_per_file_ms', 0):.2f}ms"
)
console.print(f" Success rate: {metrics_summary.get('success_rate_percent', 0):.1f}%")
console.print("\n[bold]Pipeline Session Summary:[/bold]")
console.print(f" Session ID: [dim]{pipeline.session.id}[/dim]")
console.print(f" Analysis time: {pipeline.session.analyze_ms:.2f}ms")
console.print(f" Queuing time: {pipeline.session.queue_ms:.2f}ms")
console.print(f" Issues queued: {len(result.all_issues)}")

# Exit with error code: --fail-on sets threshold, default is CRITICAL
_LEVEL_RANK = {"INFO": 0, "WARNING": 1, "ERROR": 2, "CRITICAL": 3}
Expand Down
7 changes: 7 additions & 0 deletions refactron/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,13 @@ def main(ctx: click.Context) -> None:
except ImportError:
pass

try:
from refactron.cli.run import run

main.add_command(run)
except ImportError:
pass

try:
from refactron.cli.cicd import feedback, generate_cicd, init

Expand Down
106 changes: 106 additions & 0 deletions refactron/cli/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
"""
Refactron CLI - Run Module.
Provides the 'run' command to execute Refactron as a connected pipeline.
"""

import os
from typing import Optional
from pathlib import Path
import click

from refactron.cli.ui import console, _auth_banner
from refactron.cli.utils import _validate_path
from refactron.core.pipeline import RefactronPipeline


@click.command()
@click.argument("target", type=click.Path(exists=True), required=False)
@click.option(
"--incremental/--no-incremental",
default=False,
help="Enable or disable incremental analysis for the pipeline run (off by default)",
)
@click.option(
"--verbose",
is_flag=True,
help="Show detailed timing information for each pipeline phase",
)
@click.option(
"--fail-fast",
is_flag=True,
help="Stop processing if an error occurs during fix application",
)
def run(target: Optional[str], incremental: bool, verbose: bool, fail_fast: bool) -> None:
"""
Run Refactron as a connected pipeline session.
"""
console.print()
_auth_banner("Pipeline Run")
console.print()

target_path = _validate_path(target) if target else Path.cwd()
debug_mode = os.getenv("REFACTRON_DEBUG") == "1"

with console.status("[primary]Executing pipeline run...[/primary]"):
try:
pipeline = RefactronPipeline()
# 1. Analyze
result = pipeline.analyze(target_path, use_incremental=incremental)

# 2. Queue
queued = pipeline.queue_issues(result.all_issues)

# 3. Persist session
pipeline.save_session()

# 4. Apply (simulated for now in 'run' command unless --apply added,
# but we'll show the summary infrastructure)
# For this task, we assume 'run' might be extended to apply fixes.
# If we don't apply, session counts remain 0.

console.print(f"[green]Pipeline session completed successfully.[/green]")
console.print(f"Session ID: [dim]{pipeline.session.id}[/dim]")
console.print(f"Total files analyzed: {result.total_files}")
console.print(f"Total issues found: {result.total_issues}")
console.print(f"Issues queued for fix: {len(queued)}")

if pipeline.session.files_attempted > 0:
console.print(f"Files attempted: {pipeline.session.files_attempted}")
console.print(f"Files succeeded: [green]{pipeline.session.files_succeeded}[/green]")
console.print(f"Files failed: [red]{pipeline.session.files_failed}[/red]")
if pipeline.session.backup_session_id:
console.print(
f"Backup Session ID: [cyan]{pipeline.session.backup_session_id}[/cyan]"
)

if pipeline.session.blocked_fixes:
console.print("\n[warning]Blocked Fixes:[/warning]")
for block in pipeline.session.blocked_fixes:
file = block.get("file", "Unknown")
reason = block.get("reason") or block.get("error", "Unknown error")
console.print(f" [red]✗[/red] {file}: {reason}")

if verbose or debug_mode:
console.print("\n[highlight]Pipeline Phase Timings:[/highlight]")
console.print(
f" 󰄰 [secondary]Analysis:[/secondary] {pipeline.session.analyze_ms:.2f}ms"
)
console.print(
f" 󰄰 [secondary]Queuing:[/secondary] {pipeline.session.queue_ms:.2f}ms"
)
if pipeline.session.apply_ms > 0:
console.print(
f" 󰄰 [secondary]Applying:[/secondary] {pipeline.session.apply_ms:.2f}ms"
)
if pipeline.session.verify_ms > 0:
console.print(
f" 󰄰 [secondary]Verify:[/secondary] {pipeline.session.verify_ms:.2f}ms"
)

except Exception as e:
console.print(f"[red]Pipeline run failed: {e}[/red]")
if debug_mode:
import traceback

console.print(traceback.format_exc())
raise SystemExit(1)
222 changes: 222 additions & 0 deletions refactron/core/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
"""Pipeline orchestration for automated and session-based flows."""

import logging
from collections import defaultdict
from pathlib import Path
import time
from typing import Optional, Union, List, Dict, Any

from refactron.core.config import RefactronConfig
from refactron.core.refactron import Refactron
from refactron.core.analysis_result import AnalysisResult
from refactron.core.models import CodeIssue
from refactron.autofix.engine import AutoFixEngine
from refactron.core.pipeline_session import PipelineSession
from refactron.core.backup import BackupRollbackSystem

# Configure logging
logger = logging.getLogger(__name__)


class RefactronPipeline:
"""Automated pipeline execution for session-based and CI/CD flows."""

def __init__(
self,
project_root: Optional[Union[str, Path]] = None,
config_path: Optional[Union[str, Path]] = None,
):
"""
Initialize the pipeline.

Args:
project_root: Optional root directory of the project.
config_path: Optional explicit configuration path.
"""
self.project_root = Path(project_root) if project_root else None
self.config_path = Path(config_path) if config_path else None

self._config = self._load_config()
# Enforce pipeline default behavior (full scan) unless overridden later
self._config.enable_incremental_analysis = False
self.refactron = Refactron(self._config)
self.autofix_engine = AutoFixEngine()
self._fixer_cache = {}
self.session = PipelineSession()

def _load_config(self) -> RefactronConfig:
if self.config_path and self.config_path.exists():
return RefactronConfig.from_file(self.config_path)

root = self.project_root or Path.cwd()
yaml_path = root / ".refactron.yaml"
if yaml_path.exists():
return RefactronConfig.from_file(yaml_path)
return RefactronConfig.default()

def analyze(
self, target: Union[str, Path], use_incremental: Optional[bool] = None
) -> AnalysisResult:
"""
Run analysis as part of a pipeline flow.

Pipeline-only overrides:
- `enable_incremental_analysis` is forced to False by default to guarantee
a fully fresh, reproducible analysis in CI/CD or pipeline environments.

Args:
target: Path to file or directory to analyze.
use_incremental: Optional override to enable incremental analysis.

Returns:
AnalysisResult containing issues and metrics.
"""
if use_incremental is not None:
self.refactron.config.enable_incremental_analysis = use_incremental

start_time = time.time()
result = self.refactron.analyze(target)
duration_ms = (time.time() - start_time) * 1000

# Update session metrics
self.session.analyze_ms = duration_ms
self.session.metadata["target"] = str(target)
return result

def save_session(self, directory: Optional[Path] = None) -> Path:
"""
Persist the current session to disk.
"""
return self.session.save(directory)

def queue_issues(self, issues: List[CodeIssue]) -> list:
"""Queue issues mapped to their responsible fixers."""
start_time = time.time()
queued = []
for issue in issues:
fixer_name = self._find_fixer_name(issue)
if fixer_name:
queued.append({"issue": issue, "fixer_name": fixer_name})
self.session.queue_ms = (time.time() - start_time) * 1000
return queued

def _find_fixer_name(self, issue: CodeIssue) -> Optional[str]:
"""Find the appropriate fixer name for a given issue."""
# Use rule_id or category as cache key
issue_type = issue.rule_id or str(getattr(issue, "category", type(issue).__name__))
if issue_type in self._fixer_cache:
return self._fixer_cache[issue_type]

candidate_name = None

# 1. Ask AutoFixEngine directly without preview (O(1) dictionary lookup)
if hasattr(self, "autofix_engine") and self.autofix_engine.can_fix(issue):
candidate_name = issue.rule_id
else:
# 2. Fallback to preview-based resolution for ambiguous cases
if hasattr(self, "autofix_engine"):
for name, fixer in self.autofix_engine.fixers.items():
try:
preview_result = fixer.preview(issue, "x = 1\n")
if preview_result and getattr(preview_result, "success", False):
candidate_name = name
break
except Exception:
continue

self._fixer_cache[issue_type] = candidate_name
return candidate_name

def apply(
self, queued_issues: List[dict], preview: bool = False, fail_fast: bool = False
) -> list:
"""
Apply fixes for queued issues.

Orchestration Policies:
- Best-effort (default): Continues applying fixes to other files even if one file fails.
- Fail-fast (fail_fast=True): Stops the entire application process on the first file failure.

Args:
queued_issues: List of dicts containing 'issue' and 'fixer_name'.
preview: If True, only simulate fixes (no disk writes).
fail_fast: If True, stop on first application error.

Returns:
List of results for each application attempt.
"""
start_time = time.time()
results = []

# 1. Group issues by file to ensure atomic/cumulative updates
file_map = defaultdict(list)
for item in queued_issues:
file_map[item["issue"].file_path].append(item)

self.session.files_attempted = len(file_map)

# 2. Setup Backup
backup_system = BackupRollbackSystem(self.project_root)
session_id = None
if not preview and self._config.backup_enabled and file_map:
try:
session_id, _ = backup_system.prepare_for_refactoring(
list(file_map.keys()), description=f"Pipeline session {self.session.id}"
)
self.session.backup_session_id = session_id
except Exception as e:
logger.warning(f"Failed to create backup session: {e}")

# 3. Apply fixes file by file
for file_path, items in file_map.items():
success = True
try:
content = file_path.read_text(encoding="utf-8")
current_code = content

# Apply each fixer sequentially for this file
for item in items:
issue = item["issue"]
fix_result = self.autofix_engine.fix(issue, current_code, preview=preview)
results.append(fix_result)

if fix_result.success:
current_code = fix_result.fixed_code
else:
success = False
self.session.blocked_fixes.append(
{
"file": str(file_path),
"issue": issue.message,
"reason": fix_result.reason,
}
)

# Write back if successful and not in preview
if not preview and success:
file_path.write_text(current_code, encoding="utf-8")
self.session.files_succeeded += 1
elif not success:
self.session.files_failed += 1
if fail_fast:
break
else:
# Preview mode success
self.session.files_succeeded += 1

except Exception as e:
self.session.files_failed += 1
self.session.blocked_fixes.append({"file": str(file_path), "error": str(e)})
results.append(None)
if fail_fast:
break

self.session.apply_ms = (time.time() - start_time) * 1000
return results

def verify(self, target: Union[str, Path]) -> AnalysisResult:
"""Verify the state of the project after fixes."""
start_time = time.time()
result = self.analyze(target)
self.session.verify_ms = (time.time() - start_time) * 1000
return result
Loading
Loading