From 8950ec4fff3ea517d30e67ca4e2d4a15fc5201fa Mon Sep 17 00:00:00 2001 From: aandersen2323 <132087791+aandersen2323@users.noreply.github.com> Date: Thu, 23 Oct 2025 22:56:45 -0500 Subject: [PATCH] Expose auto-fix via MCP and improve settings --- mcp/mcp_http_server.py | 5 ++ pydantic_settings/__init__.py | 58 ++++++++++++++ server/app.py | 141 ++++++++++++++++++++++++---------- server/requirements.txt | 1 + server/tests/test_utils.py | 4 +- 5 files changed, 166 insertions(+), 43 deletions(-) create mode 100644 pydantic_settings/__init__.py diff --git a/mcp/mcp_http_server.py b/mcp/mcp_http_server.py index ec9842f..e1bf192 100644 --- a/mcp/mcp_http_server.py +++ b/mcp/mcp_http_server.py @@ -117,6 +117,11 @@ async def handle_mcp_request(request: Request) -> Dict[str, Any]: request_id, result=call_api("GET", "/memory/get", params=params), ) + elif method == "flowdex.infer.auto_fix": + return mcp_respond( + request_id, + result=call_api("POST", "/infer/auto-fix", json_body=params), + ) elif method == "flowdex.health": return mcp_respond( request_id, diff --git a/pydantic_settings/__init__.py b/pydantic_settings/__init__.py new file mode 100644 index 0000000..4aa6093 --- /dev/null +++ b/pydantic_settings/__init__.py @@ -0,0 +1,58 @@ +"""Lightweight fallback implementation of :mod:`pydantic_settings`. + +This module provides a minimal subset of the interface used by FlowDex so +that the application can run in environments where the optional dependency +isn't available. It loads environment variables (and optionally a ``.env`` +file) before initialising the underlying Pydantic model, ensuring values are +cast according to the declared field types. +""" +from __future__ import annotations + +import os +from pathlib import Path +from typing import Any, Dict + +from dotenv import dotenv_values +from pydantic import BaseModel, ConfigDict + + +class SettingsConfigDict(dict): + """Simple mapping used to describe configuration options.""" + + def __init__(self, **kwargs: Any) -> None: + super().__init__(**kwargs) + + +class BaseSettings(BaseModel): + """Minimal ``BaseSettings`` replacement built on :class:`BaseModel`.""" + + model_config = ConfigDict(extra="ignore") + + def __init__(self, **data: Any) -> None: # type: ignore[override] + config = dict(getattr(type(self), "model_config", {}) or {}) + env_file = config.get("env_file") + encoding = config.get("env_file_encoding") + + file_values: Dict[str, Any] = {} + if env_file: + env_path = Path(env_file) + if env_path.exists(): + file_values = { + key: value + for key, value in dotenv_values(env_path, encoding=encoding).items() + if value is not None + } + + values: Dict[str, Any] = {} + for field_name in type(self).model_fields: + env_key = field_name.upper() + if env_key in os.environ: + values[field_name] = os.environ[env_key] + elif env_key in file_values: + values[field_name] = file_values[env_key] + + values.update(data) + super().__init__(**values) + + +__all__ = ["BaseSettings", "SettingsConfigDict"] diff --git a/server/app.py b/server/app.py index 9dd2f69..3e63485 100644 --- a/server/app.py +++ b/server/app.py @@ -1,7 +1,6 @@ import hashlib import json import math -import os import re import time from collections import Counter @@ -13,17 +12,47 @@ from fastapi import Depends, FastAPI, Header, HTTPException from pydantic import BaseModel, Field from redis.exceptions import RedisError +from pydantic_settings import BaseSettings, SettingsConfigDict try: import tiktoken - - _TOKEN_ENCODER = tiktoken.get_encoding("cl100k_base") - _HAS_TIKTOKEN = True + try: + _TOKEN_ENCODER = tiktoken.get_encoding("cl100k_base") + _HAS_TIKTOKEN = True + except Exception: # pragma: no cover - optional dependency retrieval failure + _TOKEN_ENCODER = None + _HAS_TIKTOKEN = False except ImportError: # pragma: no cover - optional dependency _TOKEN_ENCODER = None _HAS_TIKTOKEN = False -CACHE_DIR = Path(os.environ.get("FLOWDEX_CACHE_DIR", ".flowdex_cache")) +class Settings(BaseSettings): + model_config = SettingsConfigDict( + env_file=".env", env_file_encoding="utf-8", extra="ignore" + ) + + flowdex_port: int = 8787 + flowdex_model: str = "anthropic/claude-3-5-sonnet" + flowdex_cache_dir: Path = Path(".flowdex_cache") + flowdex_max_tokens: int = 6000 + flowdex_budget_system: int = 1000 + flowdex_budget_context: int = 2500 + flowdex_budget_user: int = 1500 + flowdex_budget_tools: int = 1000 + flowdex_api_key: Optional[str] = None + flowdex_anthropic_timeout: int = 60 + flowdex_redis_url: Optional[str] = None + flowdex_redis_host: str = "localhost" + flowdex_redis_port: int = 6379 + flowdex_redis_db: int = 0 + anthropic_api_key: Optional[str] = None + flowdex_anthropic_url: str = "https://api.anthropic.com/v1/messages" + flowdex_anthropic_version: str = "2023-06-01" + + +settings = Settings() + +CACHE_DIR = settings.flowdex_cache_dir CACHE_DIR.mkdir(parents=True, exist_ok=True) app = FastAPI(title="FlowDex API", version="0.1.0") @@ -33,8 +62,8 @@ MEMORY_INDEX_KEY = "flowdex:memory:index" TOOLS_KEY = "flowdex:tools" RUNS_KEY = "flowdex:runs" -API_KEY = os.environ.get("FLOWDEX_API_KEY") -ANTHROPIC_TIMEOUT = int(os.environ.get("FLOWDEX_ANTHROPIC_TIMEOUT", 60)) +API_KEY = settings.flowdex_api_key +ANTHROPIC_TIMEOUT = settings.flowdex_anthropic_timeout AUTO_FIX_SYSTEM_PROMPT = """You are FlowDex AutoFix, an autonomous incident responder for automation workflows.\n" \ "You triage failing orchestrations by summarizing errors, identifying likely root causes, " \ @@ -57,15 +86,15 @@ def get_redis() -> redis.Redis: if _REDIS_CLIENT is not None: return _REDIS_CLIENT - redis_url = os.environ.get("FLOWDEX_REDIS_URL") + redis_url = settings.flowdex_redis_url try: if redis_url: client = redis.Redis.from_url(redis_url, decode_responses=True) else: client = redis.Redis( - host=os.environ.get("FLOWDEX_REDIS_HOST", "localhost"), - port=int(os.environ.get("FLOWDEX_REDIS_PORT", 6379)), - db=int(os.environ.get("FLOWDEX_REDIS_DB", 0)), + host=settings.flowdex_redis_host, + port=settings.flowdex_redis_port, + db=settings.flowdex_redis_db, decode_responses=True, ) client.ping() @@ -95,10 +124,10 @@ def health_check(): class Budget(BaseModel): - system: int = Field(default=int(os.environ.get("FLOWDEX_BUDGET_SYSTEM", 1000))) - context: int = Field(default=int(os.environ.get("FLOWDEX_BUDGET_CONTEXT", 2500))) - user: int = Field(default=int(os.environ.get("FLOWDEX_BUDGET_USER", 1500))) - tools: int = Field(default=int(os.environ.get("FLOWDEX_BUDGET_TOOLS", 1000))) + system: int = Field(default=settings.flowdex_budget_system) + context: int = Field(default=settings.flowdex_budget_context) + user: int = Field(default=settings.flowdex_budget_user) + tools: int = Field(default=settings.flowdex_budget_tools) class InferRequest(BaseModel): @@ -108,8 +137,8 @@ class InferRequest(BaseModel): context_ids: List[str] = Field(default_factory=list) tool_candidates: List[str] = Field(default_factory=list) retrieval_query: Optional[str] = None - model: str = os.environ.get("FLOWDEX_MODEL", "anthropic/claude-3-5-sonnet") - max_tokens: int = int(os.environ.get("FLOWDEX_MAX_TOKENS", 6000)) + model: str = settings.flowdex_model + max_tokens: int = settings.flowdex_max_tokens budget: Budget = Field(default_factory=Budget) @@ -245,7 +274,7 @@ def _safe_json_dumps(value: Any) -> str: return repr(value) -def build_tool_context(tool_specs: List[Dict[str, Any]], max_length: Optional[int] = None) -> str: +def build_tool_context(tool_specs: List[Dict[str, Any]]) -> str: if not tool_specs: return "" serialized = [] @@ -265,9 +294,7 @@ def build_tool_context(tool_specs: List[Dict[str, Any]], max_length: Optional[in ) ) blob = "\n\n".join(serialized) - if max_length is None or len(blob) <= max_length: - return blob - return blob[-max_length:] + return blob TOKEN_PATTERN = re.compile(r"\b\w+\b", re.UNICODE) @@ -335,11 +362,11 @@ def semantic_recall(query: str, exclude_ids: List[str], limit: int = 3) -> List[ def call_llm(prompt: Dict[str, Any], request: InferRequest, tool_specs: List[Dict[str, Any]], retrieved: List[Dict[str, Any]]) -> Dict[str, Any]: - api_key = os.environ.get("ANTHROPIC_API_KEY") + api_key = settings.anthropic_api_key if not api_key: raise HTTPException(500, detail="ANTHROPIC_API_KEY environment variable is required for inference") - base_url = os.environ.get("FLOWDEX_ANTHROPIC_URL", "https://api.anthropic.com/v1/messages") + base_url = settings.flowdex_anthropic_url segments: List[str] = [] if request.task: @@ -359,7 +386,7 @@ def call_llm(prompt: Dict[str, Any], request: InferRequest, tool_specs: List[Dic if prompt.get("user"): segments.append(f"\n{prompt['user']}\n") - tool_context_blob = build_tool_context(tool_specs, 100_000) + tool_context_blob = build_tool_context(tool_specs) tool_context, tool_tokens = truncate_to_token_budget( tool_context_blob, request.budget.tools ) @@ -391,7 +418,7 @@ def call_llm(prompt: Dict[str, Any], request: InferRequest, tool_specs: List[Dic headers = { "x-api-key": api_key, - "anthropic-version": os.environ.get("FLOWDEX_ANTHROPIC_VERSION", "2023-06-01"), + "anthropic-version": settings.flowdex_anthropic_version, "content-type": "application/json", } @@ -507,24 +534,28 @@ def _extract_auto_fix_json(text: str) -> Dict[str, Any]: if not text: return {} - candidates = [] - match = _CODE_FENCE_RE.search(text) - if match: - candidates.append(match.group(1).strip()) - candidates.append(text) + candidates: List[str] = [] + + fence_match = _CODE_FENCE_RE.search(text) + if fence_match: + candidates.append(fence_match.group(1).strip()) brace_match = re.search(r"\{.*\}", text, re.DOTALL) if brace_match: - candidates.append(brace_match.group(0)) + candidates.append(brace_match.group(0).strip()) + + candidates.append(text) for candidate in candidates: candidate = candidate.strip() if not candidate: continue try: - return json.loads(candidate) + return json.loads(candidate, strict=False) except json.JSONDecodeError: continue + + print(f"Warning: Could not parse JSON from auto-fix completion: {text[:100]}...") return {} @@ -701,34 +732,62 @@ def run_auto_fix(auto_req: AutoFixRequest) -> Dict[str, Any]: attempts: List[Dict[str, Any]] = [] prior_analyses: List[Dict[str, Any]] = [] + final_status = "unrecoverable" for attempt in range(1, auto_req.max_attempts + 1): - analysis_req = _build_auto_fix_analysis_request(base_req, auto_req, attempt, prior_analyses) + print(f"--- Auto-Fix Attempt {attempt}/{auto_req.max_attempts} ---") + analysis_req = _build_auto_fix_analysis_request( + base_req, auto_req, attempt, prior_analyses + ) + + print(f"Running analysis infer (task: {analysis_req.task})...") analysis_result = run_inference(analysis_req) - parsed_analysis = _parse_auto_fix_completion(analysis_result["output"].get("completion", "")) + parsed_analysis = _parse_auto_fix_completion( + analysis_result["output"].get("completion", "") + ) + summary_preview = parsed_analysis.get("summary", "") + print( + "Analysis complete. Status: " + f"{parsed_analysis.get('status', 'unknown')}, Summary: {summary_preview[:100]}..." + ) attempt_record: Dict[str, Any] = { - "analysis_run": analysis_result, + "attempt": attempt, + "analysis_run_id": analysis_result.get("run_id"), "analysis": parsed_analysis, + "fix_run_id": None, + "fix_output": None, } attempts.append(attempt_record) prior_analyses.append(parsed_analysis) - if parsed_analysis["status"] == "unrecoverable": + final_status = parsed_analysis.get("status", "unparsed") + + if final_status == "unrecoverable": + print("Analysis determined issue is unrecoverable. Stopping.") break - if not parsed_analysis.get("fix_instructions") and parsed_analysis["status"] == "unparsed": + has_actions = bool(parsed_analysis.get("fix_instructions") or parsed_analysis.get("actions")) + if not has_actions and final_status in {"unparsed", "apply_fix"}: + print("Analysis provided no actionable fix instructions. Stopping.") + final_status = "unrecoverable" break - fix_req = _build_auto_fix_fix_request(base_req, auto_req, parsed_analysis) - fix_result = run_inference(fix_req) - attempt_record["fix_run"] = fix_result + if final_status == "apply_fix": + fix_req = _build_auto_fix_fix_request(base_req, auto_req, parsed_analysis) + print(f"Running fix infer (task: {fix_req.task})...") + fix_result = run_inference(fix_req) + attempt_record["fix_run_id"] = fix_result.get("run_id") + attempt_record["fix_output"] = fix_result.get("output") + print(f"Fix attempt complete. Run ID: {fix_result.get('run_id')}") - if parsed_analysis["status"] == "resolved": + if final_status == "resolved": + print("Analysis determined issue is resolved. Stopping.") break return { "base_request": base_req.dict(), + "final_status": final_status, "attempts": attempts, } diff --git a/server/requirements.txt b/server/requirements.txt index 546bd9d..2135cbd 100644 --- a/server/requirements.txt +++ b/server/requirements.txt @@ -5,3 +5,4 @@ python-dotenv redis requests tiktoken +pydantic-settings diff --git a/server/tests/test_utils.py b/server/tests/test_utils.py index 209a9cb..0de126d 100644 --- a/server/tests/test_utils.py +++ b/server/tests/test_utils.py @@ -55,14 +55,14 @@ def test_tokenize(text: str, expected: list[str]) -> None: assert _tokenize(text) == expected -def test_build_tool_context_truncates_and_serializes() -> None: +def test_build_tool_context_serializes() -> None: complex_schema = {"callable": math.sqrt} tool_specs = [ {"name": "alpha", "description": "first", "cost": 10, "schema": {"x": "int"}}, {"name": "beta", "description": "second", "cost": "low", "schema": complex_schema}, ] - context = build_tool_context(tool_specs, max_length=500) + context = build_tool_context(tool_specs) assert "Tool: alpha" in context assert "Schema: {\"x\": \"int\"}" in context # Non-serializable value should fallback to repr