Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions mcp/mcp_http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ async def handle_mcp_request(request: Request) -> Dict[str, Any]:
request_id,
result=call_api("GET", "/memory/get", params=params),
)
elif method == "flowdex.infer.auto_fix":
return mcp_respond(
request_id,
result=call_api("POST", "/infer/auto-fix", json_body=params),
)
elif method == "flowdex.health":
return mcp_respond(
request_id,
Expand Down
58 changes: 58 additions & 0 deletions pydantic_settings/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
"""Lightweight fallback implementation of :mod:`pydantic_settings`.

This module provides a minimal subset of the interface used by FlowDex so
that the application can run in environments where the optional dependency
isn't available. It loads environment variables (and optionally a ``.env``
file) before initialising the underlying Pydantic model, ensuring values are
cast according to the declared field types.
"""
from __future__ import annotations

import os
from pathlib import Path
from typing import Any, Dict

from dotenv import dotenv_values
from pydantic import BaseModel, ConfigDict


class SettingsConfigDict(dict):
"""Simple mapping used to describe configuration options."""

def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)


class BaseSettings(BaseModel):
"""Minimal ``BaseSettings`` replacement built on :class:`BaseModel`."""

model_config = ConfigDict(extra="ignore")

def __init__(self, **data: Any) -> None: # type: ignore[override]
config = dict(getattr(type(self), "model_config", {}) or {})
env_file = config.get("env_file")
encoding = config.get("env_file_encoding")

file_values: Dict[str, Any] = {}
if env_file:
env_path = Path(env_file)
if env_path.exists():
file_values = {
key: value
for key, value in dotenv_values(env_path, encoding=encoding).items()
if value is not None
}

values: Dict[str, Any] = {}
for field_name in type(self).model_fields:
env_key = field_name.upper()
if env_key in os.environ:
values[field_name] = os.environ[env_key]
elif env_key in file_values:
values[field_name] = file_values[env_key]

values.update(data)
super().__init__(**values)


__all__ = ["BaseSettings", "SettingsConfigDict"]
141 changes: 100 additions & 41 deletions server/app.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import hashlib
import json
import math
import os
import re
import time
from collections import Counter
Expand All @@ -13,17 +12,47 @@
from fastapi import Depends, FastAPI, Header, HTTPException
from pydantic import BaseModel, Field
from redis.exceptions import RedisError
from pydantic_settings import BaseSettings, SettingsConfigDict

try:
import tiktoken

_TOKEN_ENCODER = tiktoken.get_encoding("cl100k_base")
_HAS_TIKTOKEN = True
try:
_TOKEN_ENCODER = tiktoken.get_encoding("cl100k_base")
_HAS_TIKTOKEN = True
except Exception: # pragma: no cover - optional dependency retrieval failure
_TOKEN_ENCODER = None
_HAS_TIKTOKEN = False
except ImportError: # pragma: no cover - optional dependency
_TOKEN_ENCODER = None
_HAS_TIKTOKEN = False

CACHE_DIR = Path(os.environ.get("FLOWDEX_CACHE_DIR", ".flowdex_cache"))
class Settings(BaseSettings):
model_config = SettingsConfigDict(
env_file=".env", env_file_encoding="utf-8", extra="ignore"
)

flowdex_port: int = 8787
flowdex_model: str = "anthropic/claude-3-5-sonnet"
flowdex_cache_dir: Path = Path(".flowdex_cache")
flowdex_max_tokens: int = 6000
flowdex_budget_system: int = 1000
flowdex_budget_context: int = 2500
flowdex_budget_user: int = 1500
flowdex_budget_tools: int = 1000
flowdex_api_key: Optional[str] = None
flowdex_anthropic_timeout: int = 60
flowdex_redis_url: Optional[str] = None
flowdex_redis_host: str = "localhost"
flowdex_redis_port: int = 6379
flowdex_redis_db: int = 0
anthropic_api_key: Optional[str] = None
flowdex_anthropic_url: str = "https://api.anthropic.com/v1/messages"
flowdex_anthropic_version: str = "2023-06-01"


settings = Settings()

CACHE_DIR = settings.flowdex_cache_dir
CACHE_DIR.mkdir(parents=True, exist_ok=True)

app = FastAPI(title="FlowDex API", version="0.1.0")
Expand All @@ -33,8 +62,8 @@
MEMORY_INDEX_KEY = "flowdex:memory:index"
TOOLS_KEY = "flowdex:tools"
RUNS_KEY = "flowdex:runs"
API_KEY = os.environ.get("FLOWDEX_API_KEY")
ANTHROPIC_TIMEOUT = int(os.environ.get("FLOWDEX_ANTHROPIC_TIMEOUT", 60))
API_KEY = settings.flowdex_api_key
ANTHROPIC_TIMEOUT = settings.flowdex_anthropic_timeout

AUTO_FIX_SYSTEM_PROMPT = """You are FlowDex AutoFix, an autonomous incident responder for automation workflows.\n" \
"You triage failing orchestrations by summarizing errors, identifying likely root causes, " \
Expand All @@ -57,15 +86,15 @@ def get_redis() -> redis.Redis:
if _REDIS_CLIENT is not None:
return _REDIS_CLIENT

redis_url = os.environ.get("FLOWDEX_REDIS_URL")
redis_url = settings.flowdex_redis_url
try:
if redis_url:
client = redis.Redis.from_url(redis_url, decode_responses=True)
else:
client = redis.Redis(
host=os.environ.get("FLOWDEX_REDIS_HOST", "localhost"),
port=int(os.environ.get("FLOWDEX_REDIS_PORT", 6379)),
db=int(os.environ.get("FLOWDEX_REDIS_DB", 0)),
host=settings.flowdex_redis_host,
port=settings.flowdex_redis_port,
db=settings.flowdex_redis_db,
decode_responses=True,
)
client.ping()
Expand Down Expand Up @@ -95,10 +124,10 @@ def health_check():


class Budget(BaseModel):
system: int = Field(default=int(os.environ.get("FLOWDEX_BUDGET_SYSTEM", 1000)))
context: int = Field(default=int(os.environ.get("FLOWDEX_BUDGET_CONTEXT", 2500)))
user: int = Field(default=int(os.environ.get("FLOWDEX_BUDGET_USER", 1500)))
tools: int = Field(default=int(os.environ.get("FLOWDEX_BUDGET_TOOLS", 1000)))
system: int = Field(default=settings.flowdex_budget_system)
context: int = Field(default=settings.flowdex_budget_context)
user: int = Field(default=settings.flowdex_budget_user)
tools: int = Field(default=settings.flowdex_budget_tools)


class InferRequest(BaseModel):
Expand All @@ -108,8 +137,8 @@ class InferRequest(BaseModel):
context_ids: List[str] = Field(default_factory=list)
tool_candidates: List[str] = Field(default_factory=list)
retrieval_query: Optional[str] = None
model: str = os.environ.get("FLOWDEX_MODEL", "anthropic/claude-3-5-sonnet")
max_tokens: int = int(os.environ.get("FLOWDEX_MAX_TOKENS", 6000))
model: str = settings.flowdex_model
max_tokens: int = settings.flowdex_max_tokens
budget: Budget = Field(default_factory=Budget)


Expand Down Expand Up @@ -245,7 +274,7 @@ def _safe_json_dumps(value: Any) -> str:
return repr(value)


def build_tool_context(tool_specs: List[Dict[str, Any]], max_length: Optional[int] = None) -> str:
def build_tool_context(tool_specs: List[Dict[str, Any]]) -> str:
if not tool_specs:
return ""
serialized = []
Expand All @@ -265,9 +294,7 @@ def build_tool_context(tool_specs: List[Dict[str, Any]], max_length: Optional[in
)
)
blob = "\n\n".join(serialized)
if max_length is None or len(blob) <= max_length:
return blob
return blob[-max_length:]
return blob


TOKEN_PATTERN = re.compile(r"\b\w+\b", re.UNICODE)
Expand Down Expand Up @@ -335,11 +362,11 @@ def semantic_recall(query: str, exclude_ids: List[str], limit: int = 3) -> List[


def call_llm(prompt: Dict[str, Any], request: InferRequest, tool_specs: List[Dict[str, Any]], retrieved: List[Dict[str, Any]]) -> Dict[str, Any]:
api_key = os.environ.get("ANTHROPIC_API_KEY")
api_key = settings.anthropic_api_key
if not api_key:
raise HTTPException(500, detail="ANTHROPIC_API_KEY environment variable is required for inference")

base_url = os.environ.get("FLOWDEX_ANTHROPIC_URL", "https://api.anthropic.com/v1/messages")
base_url = settings.flowdex_anthropic_url

segments: List[str] = []
if request.task:
Expand All @@ -359,7 +386,7 @@ def call_llm(prompt: Dict[str, Any], request: InferRequest, tool_specs: List[Dic
if prompt.get("user"):
segments.append(f"<user_input>\n{prompt['user']}\n</user_input>")

tool_context_blob = build_tool_context(tool_specs, 100_000)
tool_context_blob = build_tool_context(tool_specs)
tool_context, tool_tokens = truncate_to_token_budget(
tool_context_blob, request.budget.tools
)
Expand Down Expand Up @@ -391,7 +418,7 @@ def call_llm(prompt: Dict[str, Any], request: InferRequest, tool_specs: List[Dic

headers = {
"x-api-key": api_key,
"anthropic-version": os.environ.get("FLOWDEX_ANTHROPIC_VERSION", "2023-06-01"),
"anthropic-version": settings.flowdex_anthropic_version,
"content-type": "application/json",
}

Expand Down Expand Up @@ -507,24 +534,28 @@ def _extract_auto_fix_json(text: str) -> Dict[str, Any]:
if not text:
return {}

candidates = []
match = _CODE_FENCE_RE.search(text)
if match:
candidates.append(match.group(1).strip())
candidates.append(text)
candidates: List[str] = []

fence_match = _CODE_FENCE_RE.search(text)
if fence_match:
candidates.append(fence_match.group(1).strip())

brace_match = re.search(r"\{.*\}", text, re.DOTALL)
if brace_match:
candidates.append(brace_match.group(0))
candidates.append(brace_match.group(0).strip())

candidates.append(text)

for candidate in candidates:
candidate = candidate.strip()
if not candidate:
continue
try:
return json.loads(candidate)
return json.loads(candidate, strict=False)
except json.JSONDecodeError:
continue

print(f"Warning: Could not parse JSON from auto-fix completion: {text[:100]}...")
return {}


Expand Down Expand Up @@ -701,34 +732,62 @@ def run_auto_fix(auto_req: AutoFixRequest) -> Dict[str, Any]:

attempts: List[Dict[str, Any]] = []
prior_analyses: List[Dict[str, Any]] = []
final_status = "unrecoverable"

for attempt in range(1, auto_req.max_attempts + 1):
analysis_req = _build_auto_fix_analysis_request(base_req, auto_req, attempt, prior_analyses)
print(f"--- Auto-Fix Attempt {attempt}/{auto_req.max_attempts} ---")
analysis_req = _build_auto_fix_analysis_request(
base_req, auto_req, attempt, prior_analyses
)

print(f"Running analysis infer (task: {analysis_req.task})...")
analysis_result = run_inference(analysis_req)
parsed_analysis = _parse_auto_fix_completion(analysis_result["output"].get("completion", ""))
parsed_analysis = _parse_auto_fix_completion(
analysis_result["output"].get("completion", "")
)
summary_preview = parsed_analysis.get("summary", "")
print(
"Analysis complete. Status: "
f"{parsed_analysis.get('status', 'unknown')}, Summary: {summary_preview[:100]}..."
)

attempt_record: Dict[str, Any] = {
"analysis_run": analysis_result,
"attempt": attempt,
"analysis_run_id": analysis_result.get("run_id"),
"analysis": parsed_analysis,
"fix_run_id": None,
"fix_output": None,
}
attempts.append(attempt_record)
prior_analyses.append(parsed_analysis)

if parsed_analysis["status"] == "unrecoverable":
final_status = parsed_analysis.get("status", "unparsed")

if final_status == "unrecoverable":
print("Analysis determined issue is unrecoverable. Stopping.")
break

if not parsed_analysis.get("fix_instructions") and parsed_analysis["status"] == "unparsed":
has_actions = bool(parsed_analysis.get("fix_instructions") or parsed_analysis.get("actions"))
if not has_actions and final_status in {"unparsed", "apply_fix"}:
print("Analysis provided no actionable fix instructions. Stopping.")
final_status = "unrecoverable"
break

fix_req = _build_auto_fix_fix_request(base_req, auto_req, parsed_analysis)
fix_result = run_inference(fix_req)
attempt_record["fix_run"] = fix_result
if final_status == "apply_fix":
fix_req = _build_auto_fix_fix_request(base_req, auto_req, parsed_analysis)
print(f"Running fix infer (task: {fix_req.task})...")
fix_result = run_inference(fix_req)
Comment on lines +776 to +779
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Run fixes even when status parsing fails

The new auto‑fix loop only executes the fix step when the analysis result explicitly sets status to "apply_fix". Previously, a fix run was triggered whenever the analysis supplied fix_instructions, even if the status string could not be parsed. LLM outputs often include valid instructions while returning an unexpected status, so this regression will cause the workflow to stop after the analysis phase and never apply the suggested repair. Consider running the fix whenever actionable instructions are present, regardless of whether the status is recognized.

Useful? React with 👍 / 👎.

attempt_record["fix_run_id"] = fix_result.get("run_id")
attempt_record["fix_output"] = fix_result.get("output")
print(f"Fix attempt complete. Run ID: {fix_result.get('run_id')}")

if parsed_analysis["status"] == "resolved":
if final_status == "resolved":
print("Analysis determined issue is resolved. Stopping.")
break

return {
"base_request": base_req.dict(),
"final_status": final_status,
"attempts": attempts,
}

Expand Down
1 change: 1 addition & 0 deletions server/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ python-dotenv
redis
requests
tiktoken
pydantic-settings
4 changes: 2 additions & 2 deletions server/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ def test_tokenize(text: str, expected: list[str]) -> None:
assert _tokenize(text) == expected


def test_build_tool_context_truncates_and_serializes() -> None:
def test_build_tool_context_serializes() -> None:
complex_schema = {"callable": math.sqrt}
tool_specs = [
{"name": "alpha", "description": "first", "cost": 10, "schema": {"x": "int"}},
{"name": "beta", "description": "second", "cost": "low", "schema": complex_schema},
]

context = build_tool_context(tool_specs, max_length=500)
context = build_tool_context(tool_specs)
assert "Tool: alpha" in context
assert "Schema: {\"x\": \"int\"}" in context
# Non-serializable value should fallback to repr
Expand Down
Loading