Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .refactron.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
enable_metrics: true
log_level: DEBUG
7 changes: 7 additions & 0 deletions refactron/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,13 @@ def main(ctx: click.Context) -> None:
except ImportError:
pass

try:
from refactron.cli.run import run

main.add_command(run)
except ImportError:
pass

try:
from refactron.cli.cicd import feedback, generate_cicd, init

Expand Down
103 changes: 103 additions & 0 deletions refactron/cli/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
"""
Refactron CLI - Run Module.
Provides the 'run' command to execute Refactron as a connected pipeline.
"""

import os
from typing import Optional
from pathlib import Path
import click

from refactron.cli.ui import console, _auth_banner
from refactron.cli.utils import _validate_path
from refactron.core.pipeline import RefactronPipeline


@click.command()
@click.argument("target", type=click.Path(exists=True), required=False)
@click.option(
"--incremental/--no-incremental",
default=False,
help="Enable or disable incremental analysis for the pipeline run (off by default)",
)
@click.option(
"--verbose",
is_flag=True,
help="Show detailed timing information for each pipeline phase",
)
@click.option(
"--fail-fast",
is_flag=True,
help="Stop processing if an error occurs during fix application",
)
def run(target: Optional[str], incremental: bool, verbose: bool, fail_fast: bool) -> None:
"""
Run Refactron as a connected pipeline session.
"""
console.print()
_auth_banner("Pipeline Run")
console.print()

target_path = _validate_path(target) if target else Path.cwd()
debug_mode = os.getenv("REFACTRON_DEBUG") == "1"

with console.status("[primary]Executing pipeline run...[/primary]"):
try:
pipeline = RefactronPipeline()
# 1. Analyze
result = pipeline.analyze(target_path, use_incremental=incremental)

# 2. Queue
queued = pipeline.queue_issues(result.all_issues)

# 3. Apply (simulated for now in 'run' command unless --apply added,
# but we'll show the summary infrastructure)
# For this task, we assume 'run' might be extended to apply fixes.
# If we don't apply, session counts remain 0.
Comment on lines +28 to +56
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Wire --fail-fast to fix application or hide it for now.

The command accepts --fail-fast, but it only runs analyze + queue, so users cannot trigger the fail-fast behavior described by the PR. Either add an explicit apply path that passes fail_fast=fail_fast, or remove the option until run supports applying fixes.

One safe direction
 `@click.option`(
     "--fail-fast",
     is_flag=True,
     help="Stop processing if an error occurs during fix application",
 )
-def run(target: Optional[str], incremental: bool, verbose: bool, fail_fast: bool) -> None:
+@click.option(
+    "--apply",
+    "apply_fixes",
+    is_flag=True,
+    help="Apply queued fixes after analysis and backup preparation",
+)
+def run(
+    target: Optional[str],
+    incremental: bool,
+    verbose: bool,
+    fail_fast: bool,
+    apply_fixes: bool,
+) -> None:
@@
             # 2. Queue
             queued = pipeline.queue_issues(result.all_issues)
 
-            # 3. Apply (simulated for now in 'run' command unless --apply added,
-            # but we'll show the summary infrastructure)
-            # For this task, we assume 'run' might be extended to apply fixes.
-            # If we don't apply, session counts remain 0.
+            # 3. Apply only when explicitly requested.
+            if apply_fixes:
+                pipeline.apply(queued, fail_fast=fail_fast)

Based on learnings, all refactoring must go through safety-first pipeline: preview → backup → apply → optional rollback.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@refactron/cli/run.py` around lines 28 - 56, The CLI currently accepts
--fail-fast but never uses it; update the run command to either remove the
option or wire it into the apply step — implement an explicit apply phase after
queueing that calls RefactronPipeline.apply (or the appropriate apply method on
the pipeline instance) and pass fail_fast=fail_fast into that call so the
pipeline honours the flag; locate the run function and the pipeline
creation/queueing code (RefactronPipeline(), .analyze(), .queue_issues()) and
add the apply invocation and any necessary preview/backup invocation sequence so
the safety-first flow (preview → backup → apply → optional rollback) is
preserved.


console.print(f"[green]Pipeline session completed successfully.[/green]")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Remove the unnecessary f-string prefix.

This line has no interpolation and currently fails pre-commit with F541.

Proposed fix
-            console.print(f"[green]Pipeline session completed successfully.[/green]")
+            console.print("[green]Pipeline session completed successfully.[/green]")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
console.print(f"[green]Pipeline session completed successfully.[/green]")
console.print("[green]Pipeline session completed successfully.[/green]")
🧰 Tools
🪛 Ruff (0.15.10)

[error] 58-58: f-string without any placeholders

Remove extraneous f prefix

(F541)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@refactron/cli/run.py` at line 58, The console.print call uses an unnecessary
f-string literal with no interpolation, causing an F541 pre-commit failure;
replace the f-string in the console.print invocation (the line that calls
console.print("[green]Pipeline session completed successfully.[/green]")) with a
plain string literal (remove the leading f) so the call uses a regular quoted
string passed to console.print.

console.print(f"Session ID: [dim]{pipeline.session.id}[/dim]")
console.print(f"Total files analyzed: {result.total_files}")
console.print(f"Total issues found: {result.total_issues}")
console.print(f"Issues queued for fix: {len(queued)}")

if pipeline.session.files_attempted > 0:
console.print(f"Files attempted: {pipeline.session.files_attempted}")
console.print(f"Files succeeded: [green]{pipeline.session.files_succeeded}[/green]")
console.print(f"Files failed: [red]{pipeline.session.files_failed}[/red]")
if pipeline.session.backup_session_id:
console.print(
f"Backup Session ID: [cyan]{pipeline.session.backup_session_id}[/cyan]"
)

if pipeline.session.blocked_fixes:
console.print("\n[warning]Blocked Fixes:[/warning]")
for block in pipeline.session.blocked_fixes:
file = block.get("file", "Unknown")
reason = block.get("reason") or block.get("error", "Unknown error")
console.print(f" [red]✗[/red] {file}: {reason}")

if verbose or debug_mode:
console.print("\n[highlight]Pipeline Phase Timings:[/highlight]")
console.print(
f" 󰄰 [secondary]Analysis:[/secondary] {pipeline.session.analyze_ms:.2f}ms"
)
console.print(
f" 󰄰 [secondary]Queuing:[/secondary] {pipeline.session.queue_ms:.2f}ms"
)
if pipeline.session.apply_ms > 0:
console.print(
f" 󰄰 [secondary]Applying:[/secondary] {pipeline.session.apply_ms:.2f}ms"
)
if pipeline.session.verify_ms > 0:
console.print(
f" 󰄰 [secondary]Verify:[/secondary] {pipeline.session.verify_ms:.2f}ms"
)

except Exception as e:
console.print(f"[red]Pipeline run failed: {e}[/red]")
if debug_mode:
import traceback

console.print(traceback.format_exc())
raise SystemExit(1)
212 changes: 212 additions & 0 deletions refactron/core/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
"""Pipeline orchestration for automated and session-based flows."""

import logging
from collections import defaultdict
from pathlib import Path
import time
from typing import Optional, Union, List, Dict, Any
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Finish the core type annotations and type the fixer cache.

This resolves the unused typing import, _fixer_cache mypy failure, and the Optional[str] return issue from reading an untyped dict.

Proposed fix
-from typing import Optional, Union, List, Dict, Any
+from typing import Dict, List, Optional, Union
@@
-    def __init__(
+    def __init__(
         self,
         project_root: Optional[Union[str, Path]] = None,
         config_path: Optional[Union[str, Path]] = None,
-    ):
+    ) -> None:
@@
-        self._fixer_cache = {}
+        self._fixer_cache: Dict[str, Optional[str]] = {}

As per coding guidelines, refactron/**/*.py: Type annotations are required in refactron/ with mypy disallow_untyped_defs = true enabled.

Also applies to: 24-28, 44-44

🧰 Tools
🪛 GitHub Actions: Pre-commit

[error] 7-7: flake8 F401 'typing.Any' imported but unused.


[error] 7-7: flake8 F401 'typing.Dict' imported but unused.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@refactron/core/pipeline.py` at line 7, Add proper type annotations to finish
core typing in refactron/core/pipeline.py: annotate the module-level
_fixer_cache (e.g., Dict[str, Any] or a more specific Dict[str, FixerType]) and
update any untyped function signatures to use explicit return and parameter
types (replace bare dict usages with typed Dict[str, Any] and ensure functions
that read optional string values return Optional[str] by handling missing keys
or casting). Update the import line to only import the typing names you actually
use (e.g., Optional, Dict, List, Any, Union) and adjust the functions/methods
that access the fixer cache and the dict-reading logic (refer to _fixer_cache,
any functions that return Optional[str], and the functions using the fixer
lookup) so mypy’s disallow_untyped_defs passes. Ensure all updated symbols have
concrete types consistent across the module.


from refactron.core.config import RefactronConfig
from refactron.core.refactron import Refactron
from refactron.core.analysis_result import AnalysisResult
from refactron.core.models import CodeIssue
from refactron.autofix.engine import AutoFixEngine
from refactron.core.pipeline_session import PipelineSession
from refactron.core.backup import BackupRollbackSystem

# Configure logging
logger = logging.getLogger(__name__)


class RefactronPipeline:
"""Automated pipeline execution for session-based and CI/CD flows."""

def __init__(
self,
project_root: Optional[Union[str, Path]] = None,
config_path: Optional[Union[str, Path]] = None,
):
"""
Initialize the pipeline.

Args:
project_root: Optional root directory of the project.
config_path: Optional explicit configuration path.
"""
self.project_root = Path(project_root) if project_root else None
self.config_path = Path(config_path) if config_path else None

self._config = self._load_config()
# Enforce pipeline default behavior (full scan) unless overridden later
self._config.enable_incremental_analysis = False
self.refactron = Refactron(self._config)
self.autofix_engine = AutoFixEngine()
self._fixer_cache = {}
self.session = PipelineSession()

def _load_config(self) -> RefactronConfig:
if self.config_path and self.config_path.exists():
return RefactronConfig.from_file(self.config_path)

root = self.project_root or Path.cwd()
yaml_path = root / ".refactron.yaml"
if yaml_path.exists():
return RefactronConfig.from_file(yaml_path)
return RefactronConfig.default()

def analyze(
self, target: Union[str, Path], use_incremental: Optional[bool] = None
) -> AnalysisResult:
"""
Run analysis as part of a pipeline flow.

Pipeline-only overrides:
- `enable_incremental_analysis` is forced to False by default to guarantee
a fully fresh, reproducible analysis in CI/CD or pipeline environments.

Args:
target: Path to file or directory to analyze.
use_incremental: Optional override to enable incremental analysis.

Returns:
AnalysisResult containing issues and metrics.
"""
if use_incremental is not None:
self.refactron.config.enable_incremental_analysis = use_incremental

start_time = time.time()
result = self.refactron.analyze(target)
self.session.analyze_ms = (time.time() - start_time) * 1000
return result

def queue_issues(self, issues: List[CodeIssue]) -> list:
"""Queue issues mapped to their responsible fixers."""
start_time = time.time()
queued = []
for issue in issues:
fixer_name = self._find_fixer_name(issue)
if fixer_name:
queued.append({"issue": issue, "fixer_name": fixer_name})
self.session.queue_ms = (time.time() - start_time) * 1000
return queued

def _find_fixer_name(self, issue: CodeIssue) -> Optional[str]:
"""Find the appropriate fixer name for a given issue."""
# Use rule_id or category as cache key
issue_type = issue.rule_id or str(getattr(issue, "category", type(issue).__name__))
if issue_type in self._fixer_cache:
return self._fixer_cache[issue_type]

candidate_name = None

# 1. Ask AutoFixEngine directly without preview (O(1) dictionary lookup)
if hasattr(self, "autofix_engine") and self.autofix_engine.can_fix(issue):
candidate_name = issue.rule_id
else:
# 2. Fallback to preview-based resolution for ambiguous cases
if hasattr(self, "autofix_engine"):
for name, fixer in self.autofix_engine.fixers.items():
try:
preview_result = fixer.preview(issue, "x = 1\n")
if preview_result and getattr(preview_result, "success", False):
candidate_name = name
break
except Exception:
continue

self._fixer_cache[issue_type] = candidate_name
return candidate_name

def apply(
self, queued_issues: List[dict], preview: bool = False, fail_fast: bool = False
) -> list:
"""
Apply fixes for queued issues.

Orchestration Policies:
- Best-effort (default): Continues applying fixes to other files even if one file fails.
- Fail-fast (fail_fast=True): Stops the entire application process on the first file failure.

Comment on lines +126 to +129
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Wrap the long docstring line.

Line 128 exceeds the configured 100-character limit and fails pre-commit.

Proposed fix
-        - Fail-fast (fail_fast=True): Stops the entire application process on the first file failure.
+        - Fail-fast (fail_fast=True): Stops the entire application process on the first
+          file failure.

As per coding guidelines, **/*.py: Use line length of 100 characters, enforced by black, isort, and flake8.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Orchestration Policies:
- Best-effort (default): Continues applying fixes to other files even if one file fails.
- Fail-fast (fail_fast=True): Stops the entire application process on the first file failure.
Orchestration Policies:
- Best-effort (default): Continues applying fixes to other files even if one file fails.
- Fail-fast (fail_fast=True): Stops the entire application process on the first
file failure.
🧰 Tools
🪛 GitHub Actions: Pre-commit

[error] 128-128: flake8 E501 line too long (101 > 100 characters).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@refactron/core/pipeline.py` around lines 126 - 129, The long docstring line
in the "Orchestration Policies" block (the sentence starting "Fail-fast
(fail_fast=True): Stops the entire application process on the first file
failure.") exceeds 100 characters; wrap that sentence into one or more lines
under 100 characters inside the same docstring, preserving the existing
indentation and triple-quote style so the docstring formatting and semantics
remain unchanged.

Args:
queued_issues: List of dicts containing 'issue' and 'fixer_name'.
preview: If True, only simulate fixes (no disk writes).
fail_fast: If True, stop on first application error.

Returns:
List of results for each application attempt.
"""
start_time = time.time()
results = []

# 1. Group issues by file to ensure atomic/cumulative updates
file_map = defaultdict(list)
for item in queued_issues:
file_map[item["issue"].file_path].append(item)

self.session.files_attempted = len(file_map)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Count only files that are actually attempted.

files_attempted is set to every grouped file before fail-fast processing, so files skipped after the first failure are reported as attempted. Increment it inside the per-file loop instead.

Proposed fix
-        self.session.files_attempted = len(file_map)
-
         # 2. Setup Backup
@@
         # 3. Apply fixes file by file
         for file_path, items in file_map.items():
+            self.session.files_attempted += 1
             success = True

Also applies to: 160-163, 197-202

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@refactron/core/pipeline.py` at line 146, The current code sets
self.session.files_attempted = len(file_map) before fail-fast/grouped processing
which counts files that are never processed; instead remove that bulk assignment
and increment self.session.files_attempted by 1 inside the per-file processing
loop at the point where a file is actually about to be attempted (e.g., inside
the loop that iterates grouped files in pipeline.py, where the code calls the
per-file processing logic), and apply the same change to the other occurrences
noted (the blocks around the ranges corresponding to lines 160-163 and 197-202)
so each attempted file increments the counter only when it is truly attempted.


# 2. Setup Backup
backup_system = BackupRollbackSystem(self.project_root)
session_id = None
if not preview and self._config.backup_enabled and file_map:
try:
session_id, _ = backup_system.prepare_for_refactoring(
list(file_map.keys()), description=f"Pipeline session {self.session.id}"
)
self.session.backup_session_id = session_id
except Exception as e:
logger.warning(f"Failed to create backup session: {e}")

Comment on lines +151 to +159
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Do not apply fixes after backup creation or integrity failures.

prepare_for_refactoring() returns failed backup paths, but they are ignored, and a backup exception only logs a warning before the code continues to write files. That can leave users without rollback coverage.

Proposed direction
-                session_id, _ = backup_system.prepare_for_refactoring(
+                session_id, failed_files = backup_system.prepare_for_refactoring(
                     list(file_map.keys()), description=f"Pipeline session {self.session.id}"
                 )
                 self.session.backup_session_id = session_id
+                valid_paths, corrupt_paths = (
+                    backup_system.backup_manager.validate_backup_integrity(session_id)
+                )
+                unsafe_paths = set(failed_files) | set(corrupt_paths)
+                if unsafe_paths:
+                    raise RuntimeError(
+                        f"Backup validation failed for: "
+                        f"{', '.join(str(path) for path in unsafe_paths)}"
+                    )
             except Exception as e:
-                logger.warning(f"Failed to create backup session: {e}")
+                logger.error("Failed to prepare backup session: %s", e)
+                raise

Based on learnings, all refactoring must go through safety-first pipeline: preview → backup → apply → optional rollback. As per coding guidelines, refactron/core/*.py: Use BackupManager.validate_backup_integrity(session_id) for backup integrity validation, returning (valid_paths, corrupt_paths).

🧰 Tools
🪛 Ruff (0.15.10)

[warning] 157-157: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@refactron/core/pipeline.py` around lines 151 - 159, The code currently
ignores failed backup paths and only logs a warning on backup errors; update the
block that runs when not preview and self._config.backup_enabled and file_map to
capture the results from backup_system.prepare_for_refactoring (including any
failed paths), assign session_id to self.session.backup_session_id only on
success, then call BackupManager.validate_backup_integrity(session_id) to get
(valid_paths, corrupt_paths) and if there are any failed/ corrupt paths or an
exception from prepare_for_refactoring, abort the refactoring (do not proceed to
apply/write files) by raising an exception or returning an error from the
pipeline; reference the prepare_for_refactoring call, the
self.session.backup_session_id assignment, and
BackupManager.validate_backup_integrity(session_id) in your changes.

# 3. Apply fixes file by file
for file_path, items in file_map.items():
success = True
try:
content = file_path.read_text(encoding="utf-8")
current_code = content

# Apply each fixer sequentially for this file
for item in items:
issue = item["issue"]
fix_result = self.autofix_engine.fix(issue, current_code, preview=preview)
results.append(fix_result)

if fix_result.success:
current_code = fix_result.fixed_code
Comment on lines +168 to +174
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Use the selected fixer and the actual FixResult.fixed field.

Fallback resolution stores fixer_name, but apply() ignores it and calls AutoFixEngine.fix() with the original issue, so fallback-mapped issues still fail. This block also reads fixed_code, which does not exist on FixResult.

Proposed direction
                 # Apply each fixer sequentially for this file
                 for item in items:
                     issue = item["issue"]
-                    fix_result = self.autofix_engine.fix(issue, current_code, preview=preview)
+                    fixer_name = item["fixer_name"]
+                    original_rule_id = issue.rule_id
+                    if fixer_name != issue.rule_id:
+                        issue.rule_id = fixer_name
+                    try:
+                        fix_result = self.autofix_engine.fix(
+                            issue, current_code, preview=preview
+                        )
+                    finally:
+                        issue.rule_id = original_rule_id
                     results.append(fix_result)
 
                     if fix_result.success:
-                        current_code = fix_result.fixed_code
+                        if fix_result.fixed is None:
+                            success = False
+                            self.session.blocked_fixes.append(
+                                {
+                                    "file": str(file_path),
+                                    "issue": issue.message,
+                                    "reason": "Fixer succeeded without fixed code",
+                                }
+                            )
+                            continue
+                        current_code = fix_result.fixed
🧰 Tools
🪛 GitHub Actions: Pre-commit

[error] 174-174: mypy [attr-defined]: "FixResult" has no attribute "fixed_code".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@refactron/core/pipeline.py` around lines 168 - 174, The loop currently calls
self.autofix_engine.fix() with item["issue"] and reads a non-existent fixed_code
field; update it to honor any fallback-mapped fixer by using the selected
fixer/fallback from item (e.g., item.get("fixer_name") or the mapped issue
object) when invoking AutoFixEngine.fix/apply so the correct fixer is used, and
replace reads of FixResult.fixed_code with the actual FixResult.fixed field and
assign current_code = fix_result.fixed when fix_result.success is true.

else:
success = False
self.session.blocked_fixes.append(
{
"file": str(file_path),
"issue": issue.message,
"reason": fix_result.reason,
}
)

# Write back if successful and not in preview
if not preview and success:
file_path.write_text(current_code, encoding="utf-8")
self.session.files_succeeded += 1
elif not success:
self.session.files_failed += 1
if fail_fast:
break
else:
# Preview mode success
self.session.files_succeeded += 1

except Exception as e:
self.session.files_failed += 1
self.session.blocked_fixes.append({"file": str(file_path), "error": str(e)})
results.append(None)
if fail_fast:
break

self.session.apply_ms = (time.time() - start_time) * 1000
return results

def verify(self, target: Union[str, Path]) -> AnalysisResult:
"""Verify the state of the project after fixes."""
start_time = time.time()
result = self.analyze(target)
self.session.verify_ms = (time.time() - start_time) * 1000
return result
36 changes: 36 additions & 0 deletions refactron/core/pipeline_session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""Pipeline session and timing tracking."""

import uuid
from dataclasses import dataclass, field, asdict
from typing import Any, Dict, List, Optional


@dataclass
class PipelineSession:
"""
Session container for a pipeline run, including timing metrics for each phase.
"""

id: str = field(default_factory=lambda: str(uuid.uuid4()))
analyze_ms: float = 0.0
queue_ms: float = 0.0
apply_ms: float = 0.0
verify_ms: float = 0.0

# Application metrics
files_attempted: int = 0
files_succeeded: int = 0
files_failed: int = 0
blocked_fixes: List[Dict[str, Any]] = field(default_factory=list)
backup_session_id: Optional[str] = None

metadata: Dict[str, Any] = field(default_factory=dict)

def to_dict(self) -> Dict[str, Any]:
"""Convert the session to a JSON-serializable dictionary."""
return asdict(self)

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "PipelineSession":
"""Create a session from a dictionary."""
return cls(**data)
Loading
Loading