diff --git a/pyproject.toml b/pyproject.toml index d0f4506..76c4c20 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,8 +3,8 @@ requires = ["hatchling"] build-backend = "hatchling.build" [project] -name = "agentflow" -version = "0.1.0" +name = "gittielabs-agentflow" +version = "0.2.1" description = "Context engineering framework for multi-agent systems" readme = "README.md" requires-python = ">=3.11" @@ -44,7 +44,9 @@ openai = ["openai>=1.0.0"] google = ["google-genai>=1.0.0"] s3 = ["boto3>=1.34.0"] vector = ["qdrant-client>=1.9.0"] -all = ["agentflow[anthropic,openai,google,s3,vector]"] +orchestration = ["anthropic>=0.42.0"] +telemetry = ["langfuse>=2.0"] +all = ["gittielabs-agentflow[anthropic,openai,google,s3,vector,orchestration,telemetry]"] dev = [ "pytest>=8.0", "pytest-asyncio>=0.23", diff --git a/src/agentflow/__init__.py b/src/agentflow/__init__.py index ab73fcb..2aa358a 100644 --- a/src/agentflow/__init__.py +++ b/src/agentflow/__init__.py @@ -26,11 +26,23 @@ ) from agentflow.agent import AgentExecutor, ContextAssembler, PromptTemplate from agentflow.config import AgentConfig, ConfigLoader, RouterConfig, WorkflowConfig -from agentflow.events import EventBus +from agentflow.events import ( + EventBus, + LLM_CALL_STARTED, + LLM_CALL_COMPLETED, + NODE_STARTED, + NODE_COMPLETED, + TOOL_CALLED, + TOOL_RESULT, + WORKFLOW_STARTED, + WORKFLOW_COMPLETED, + ERROR, +) from agentflow.storage import FileSystemStorage, InMemoryStorage, S3Storage from agentflow.tools import HTTPToolDispatcher, LocalToolDispatcher, ToolRegistry from agentflow.providers import AnthropicProvider, GoogleGenAIProvider, MockLLMProvider, OpenAICompatProvider -from agentflow.session import ArtifactStore, Scratchpad, Session, SessionManager +from agentflow.session import ArtifactStore, HistoryPersistence, MultiUserHistory, Scratchpad, Session, SessionManager +from agentflow.orchestration import ComplexityClassifier, DAGExecutor, Plan, PlanStep from agentflow.memory import FileMemory, MemoryManager, VectorMemory from agentflow.router import RouterEngine, RoutingResult, RuleEvaluator from agentflow.workflow import NodeRunner, WorkflowDAG, WorkflowExecutor @@ -61,6 +73,15 @@ "WorkflowConfig", # Events "EventBus", + "LLM_CALL_STARTED", + "LLM_CALL_COMPLETED", + "NODE_STARTED", + "NODE_COMPLETED", + "TOOL_CALLED", + "TOOL_RESULT", + "WORKFLOW_STARTED", + "WORKFLOW_COMPLETED", + "ERROR", # Storage "FileSystemStorage", "InMemoryStorage", @@ -76,9 +97,18 @@ "OpenAICompatProvider", # Session "ArtifactStore", + "HistoryPersistence", + "MultiUserHistory", "Scratchpad", "Session", "SessionManager", + # Orchestration + "ComplexityClassifier", + "DAGExecutor", + "Plan", + "PlanStep", + # Telemetry + "LangfuseEventHandler", # Memory "FileMemory", "MemoryManager", @@ -93,4 +123,12 @@ "WorkflowExecutor", ] -__version__ = "0.1.0" +def __getattr__(name: str): + """Lazy import for optional telemetry extras to avoid hard import failures.""" + if name == "LangfuseEventHandler": + from agentflow.telemetry import LangfuseEventHandler + return LangfuseEventHandler + raise AttributeError(f"module 'agentflow' has no attribute {name!r}") + + +__version__ = "0.2.1" diff --git a/src/agentflow/agent/runtime.py b/src/agentflow/agent/runtime.py index 8f8984f..4a1dae3 100644 --- a/src/agentflow/agent/runtime.py +++ b/src/agentflow/agent/runtime.py @@ -8,11 +8,12 @@ from __future__ import annotations import logging +import time from typing import Any from agentflow.agent.context import ContextAssembler from agentflow.config.schemas import AgentConfig -from agentflow.events import EventBus, TOOL_CALLED, TOOL_RESULT, ERROR +from agentflow.events import EventBus, TOOL_CALLED, TOOL_RESULT, ERROR, LLM_CALL_STARTED, LLM_CALL_COMPLETED from agentflow.protocols import LLMProvider, ToolDispatcher from agentflow.types import AgentResponse, Message, NodeOutput, Role, ToolResult @@ -98,6 +99,16 @@ async def run( # Tool-use loop for round_num in range(self._config.max_tool_rounds): + t0 = time.monotonic() + if self._events: + await self._events.emit(LLM_CALL_STARTED, { + "agent": self._config.name, + "model": self._config.model, + "node": node_id, + "session_id": session_id, + "round": round_num, + }) + response = await self._llm.chat( messages=messages, system=system, @@ -106,6 +117,21 @@ async def run( temperature=self._config.temperature, ) + elapsed_ms = int((time.monotonic() - t0) * 1000) + if self._events: + await self._events.emit(LLM_CALL_COMPLETED, { + "agent": self._config.name, + "model": self._config.model, + "node": node_id, + "session_id": session_id, + "round": round_num, + "stop_reason": response.stop_reason, + "input_tokens": response.usage.get("input_tokens", 0) if response.usage else 0, + "output_tokens": response.usage.get("output_tokens", 0) if response.usage else 0, + "elapsed_ms": elapsed_ms, + "tool_calls": len(response.tool_calls), + }) + # Emit thinking text as a trace event (Gemini thinking models only). # This is separated from response text in the provider's _from_api_response. thinking_text = (response.raw and hasattr(response, "metadata") diff --git a/src/agentflow/events.py b/src/agentflow/events.py index 2bb302a..f0a5836 100644 --- a/src/agentflow/events.py +++ b/src/agentflow/events.py @@ -25,6 +25,8 @@ WORKFLOW_COMPLETED = "workflow_completed" MEMORY_STORED = "memory_stored" ERROR = "error" +LLM_CALL_STARTED = "llm_call_started" +LLM_CALL_COMPLETED = "llm_call_completed" class EventBus: diff --git a/src/agentflow/orchestration/__init__.py b/src/agentflow/orchestration/__init__.py new file mode 100644 index 0000000..9449472 --- /dev/null +++ b/src/agentflow/orchestration/__init__.py @@ -0,0 +1,12 @@ +"""Orchestration primitives: multi-step plan types, DAG executor, and complexity classifier.""" +from agentflow.orchestration.types import Plan, PlanStep +from agentflow.orchestration.executor import DAGExecutor, WorkflowRunner +from agentflow.orchestration.classifier import ComplexityClassifier + +__all__ = [ + "Plan", + "PlanStep", + "DAGExecutor", + "WorkflowRunner", + "ComplexityClassifier", +] diff --git a/src/agentflow/orchestration/classifier.py b/src/agentflow/orchestration/classifier.py new file mode 100644 index 0000000..78cdd97 --- /dev/null +++ b/src/agentflow/orchestration/classifier.py @@ -0,0 +1,92 @@ +""" +Complexity classifier for multi-step request detection. + +Determines whether a user message requires a multi-step orchestration plan +(COMPLEX) or can be handled by a single workflow (SIMPLE). + +Uses a two-stage approach: +1. Fast bypass: messages under _FAST_BYPASS_LIMIT words with no multi-step + language markers return SIMPLE immediately (no LLM call). +2. LLM fallback: a direct Haiku call classifies ambiguous messages. + +Defaults to SIMPLE on any failure to ensure graceful degradation. +""" +from __future__ import annotations + +import logging + +logger = logging.getLogger("agentflow.orchestration") + +_FAST_BYPASS_LIMIT = 20 +_MULTI_STEP_MARKERS = ( + "and then", + "after that", + "followed by", + "then write", + "then create", + "then send", + "then research", +) + + +class ComplexityClassifier: + """Classifies a message as SIMPLE or COMPLEX via fast pre-check + LLM call. + + Args: + api_key: Anthropic API key for the Haiku classification call. + model: Model to use for classification. Defaults to claude-3-5-haiku-20241022. + + Example: + classifier = ComplexityClassifier(api_key=os.getenv("ANTHROPIC_API_KEY")) + result = await classifier.classify("Research AI agents and then write a LinkedIn post") + # result == "COMPLEX" + """ + + def __init__( + self, + api_key: str, + model: str = "claude-3-5-haiku-20241022", + ) -> None: + self._api_key = api_key + self._model = model + + async def classify(self, message: str) -> str: + """Return 'SIMPLE' or 'COMPLEX'. Defaults to 'SIMPLE' on any failure. + + Args: + message: The user message to classify. + + Returns: + 'COMPLEX' if the message requires multiple distinct workflow steps, + 'SIMPLE' otherwise. + """ + # Fast path: short messages with no multi-step markers skip the LLM call + if len(message.split()) < _FAST_BYPASS_LIMIT and not any( + m in message.lower() for m in _MULTI_STEP_MARKERS + ): + return "SIMPLE" + + try: + import anthropic as _anthropic + + client = _anthropic.AsyncAnthropic(api_key=self._api_key) + resp = await client.messages.create( + model=self._model, + max_tokens=10, + temperature=0.0, + system=( + "Classify the user request as SIMPLE (single task, one workflow) or " + "COMPLEX (multiple distinct tasks each requiring their own workflow). " + "Reply with exactly one word: SIMPLE or COMPLEX." + ), + messages=[{"role": "user", "content": message}], + ) + text = resp.content[0].text if resp.content else "" + return "COMPLEX" if "COMPLEX" in text.upper() else "SIMPLE" + + except ImportError: + logger.warning("anthropic package not installed; ComplexityClassifier defaulting to SIMPLE") + return "SIMPLE" + except Exception: + logger.warning("ComplexityClassifier failed, defaulting to SIMPLE", exc_info=True) + return "SIMPLE" diff --git a/src/agentflow/orchestration/executor.py b/src/agentflow/orchestration/executor.py new file mode 100644 index 0000000..24fd5b8 --- /dev/null +++ b/src/agentflow/orchestration/executor.py @@ -0,0 +1,115 @@ +""" +Async DAG executor for multi-step orchestration plans. + +Executes a Plan by resolving inter-step dependencies ({{key.result}} references) +and running independent steps concurrently via asyncio.gather(). +""" +from __future__ import annotations + +import asyncio +import logging +import re +from typing import Awaitable, Callable + +from agentflow.orchestration.types import Plan, PlanStep + +logger = logging.getLogger("agentflow.orchestration") + +# Callable signature: runner(workflow_name, message) -> result_text +WorkflowRunner = Callable[[str, str], Awaitable[str]] + + +class DAGExecutor: + """Executes a Plan using async topological round-batching. + + Independent steps — those whose message templates have no unresolved + ``{{key.result}}`` references — run concurrently via asyncio.gather(). + Steps that depend on earlier output wait only for their specific prerequisites. + + If a dependency cycle is detected (no steps are ready but some remain), the + executor falls back to sequential execution to avoid a deadlock. + + Example: + plan = Plan(steps=[ + PlanStep(id="s1", workflow="research", message="AI agents", output_key="research"), + PlanStep(id="s2", workflow="write", message="Write based on {{research.result}}", output_key="article"), + ]) + executor = DAGExecutor() + result = await executor.execute(plan, runner=my_runner) + """ + + _DEP_PATTERN = re.compile(r"\{\{(\w+)\.result\}\}") + + async def execute( + self, + plan: Plan, + runner: WorkflowRunner, + variables: dict | None = None, + ) -> str: + """Execute the plan and return a combined result string. + + Args: + plan: The orchestration plan with steps to execute. + runner: Async callable that invokes a named workflow with a message. + variables: Optional extra variables (currently reserved for future use). + + Returns: + Newline-separated results from all successful steps. Failed steps are + omitted from the combined output but logged as errors. + """ + outputs: dict[str, str] = {} + ordered: list[tuple[int, str]] = [] # (original_index, result_text) + + async def _run(step: PlanStep, idx: int) -> tuple[int, str]: + """Execute one step, substituting resolved outputs into its message.""" + msg = step["message"] + for key, val in outputs.items(): + msg = msg.replace(f"{{{{{key}.result}}}}", val[:3000]) + try: + result = await runner(step["workflow"], msg) + logger.info( + "Step %s (%s) completed (%d chars)", + step["id"], step["workflow"], len(result), + ) + return idx, result + except Exception as exc: + logger.error("Step %s (%s) failed: %s", step["id"], step["workflow"], exc) + return idx, f"[Step {step['id']} failed: {exc}]" + + remaining = list(enumerate(plan["steps"])) + completed: set[str] = set() + + while remaining: + ready: list[tuple[int, PlanStep]] = [] + waiting: list[tuple[int, PlanStep]] = [] + + for idx, step in remaining: + deps = self._DEP_PATTERN.findall(step["message"]) + if all(d in completed for d in deps): + ready.append((idx, step)) + else: + waiting.append((idx, step)) + + if not ready: + # Circular or unresolvable dependencies — run remaining sequentially + logger.warning( + "DAGExecutor: %d step(s) have unresolvable dependencies, " + "falling back to sequential execution", + len(waiting), + ) + ready, waiting = waiting, [] + + # Run all ready steps concurrently + results = await asyncio.gather(*[_run(step, idx) for idx, step in ready]) + + for (_, step), (idx, result) in zip(ready, results): + outputs[step["output_key"]] = result + completed.add(step["output_key"]) + ordered.append((idx, result)) + + remaining = waiting + + # Re-sort by original step order and filter out failed steps + ordered.sort(key=lambda x: x[0]) + good = [r for _, r in ordered if not r.startswith("[Step ")] + return "\n\n---\n\n".join(good) if good else "No results produced." diff --git a/src/agentflow/orchestration/types.py b/src/agentflow/orchestration/types.py new file mode 100644 index 0000000..37aabfd --- /dev/null +++ b/src/agentflow/orchestration/types.py @@ -0,0 +1,38 @@ +""" +Orchestration type definitions. + +PlanStep and Plan describe the output of a planner agent — a set of +workflow invocations (possibly with inter-step dependencies) that together +satisfy a complex user request. +""" +from __future__ import annotations + +from typing import TypedDict + + +class PlanStep(TypedDict): + """A single step in an orchestration plan. + + Fields: + id: Unique identifier for this step (e.g. "step_1"). + workflow: Name of the workflow to invoke (must exist in the config loader). + message: Input message for the workflow. May contain ``{{output_key.result}}`` + references to the output of prior steps. + output_key: Key under which this step's result is stored for use by later steps. + """ + + id: str + workflow: str + message: str + output_key: str + + +class Plan(TypedDict): + """A complete orchestration plan produced by a planner agent. + + Fields: + steps: Ordered list of PlanStep items. DAGExecutor resolves + inter-step dependencies and executes independent steps concurrently. + """ + + steps: list[PlanStep] diff --git a/src/agentflow/providers/google_genai.py b/src/agentflow/providers/google_genai.py index bb40863..9e10977 100644 --- a/src/agentflow/providers/google_genai.py +++ b/src/agentflow/providers/google_genai.py @@ -203,4 +203,5 @@ def _from_api_response(self, response: Any) -> AgentResponse: stop_reason=stop_reason, usage=usage, raw=response, + metadata={"thinking": "".join(thinking_parts)} if thinking_parts else {}, ) diff --git a/src/agentflow/session/__init__.py b/src/agentflow/session/__init__.py index 2d6fb91..bd3b96c 100644 --- a/src/agentflow/session/__init__.py +++ b/src/agentflow/session/__init__.py @@ -1,6 +1,14 @@ -"""Session management: sessions, scratchpads, and artifacts.""" +"""Session management: sessions, scratchpads, artifacts, and multi-user history.""" from agentflow.session.manager import Session, SessionManager from agentflow.session.scratchpad import Scratchpad from agentflow.session.artifacts import ArtifactStore +from agentflow.session.multi_user import MultiUserHistory, HistoryPersistence -__all__ = ["Session", "SessionManager", "Scratchpad", "ArtifactStore"] +__all__ = [ + "Session", + "SessionManager", + "Scratchpad", + "ArtifactStore", + "MultiUserHistory", + "HistoryPersistence", +] diff --git a/src/agentflow/session/multi_user.py b/src/agentflow/session/multi_user.py new file mode 100644 index 0000000..5e0e4ac --- /dev/null +++ b/src/agentflow/session/multi_user.py @@ -0,0 +1,93 @@ +""" +Multi-user conversation history manager. + +Generic per-user in-memory history with an optional abstract persistence hook. +Applications that store history in a database (e.g. PostgreSQL) implement the +HistoryPersistence protocol and pass an instance to MultiUserHistory. +""" +from __future__ import annotations + +from collections import defaultdict +from typing import Protocol, runtime_checkable + +from agentflow.types import Message, Role + + +MAX_DEFAULT = 50 + + +@runtime_checkable +class HistoryPersistence(Protocol): + """Protocol for pluggable conversation history persistence backends.""" + + async def load(self, user_id: str) -> list[Message]: + """Load stored messages for a user. Return empty list if none.""" + ... + + async def save(self, user_id: str, messages: list[Message]) -> None: + """Persist the current in-memory messages for a user.""" + ... + + +class MultiUserHistory: + """Per-user in-memory conversation history with configurable max and optional persistence. + + Usage: + history = MultiUserHistory(max_history=50) + history.append(user_id, Role.USER, "Hello") + history.append(user_id, Role.ASSISTANT, "Hi there!") + messages = history.get(user_id) # [Message(role=USER, ...), Message(role=ASSISTANT, ...)] + + With PostgreSQL persistence: + history = MultiUserHistory(max_history=50, persistence=postgres_adapter) + await history.load(user_id) # loads once per user per session + history.append(...) + await history.save(user_id) # flush to DB + """ + + def __init__( + self, + max_history: int = MAX_DEFAULT, + persistence: HistoryPersistence | None = None, + ) -> None: + self._history: dict[str, list[Message]] = defaultdict(list) + self._loaded: set[str] = set() + self._max = max_history + self._persistence = persistence + + def get(self, user_id: str) -> list[Message]: + """Return the current in-memory history for a user, trimming to max_history.""" + history = self._history[user_id] + if len(history) > self._max: + self._history[user_id] = history[-self._max:] + return self._history[user_id] + + def append(self, user_id: str, role: Role, content: str) -> None: + """Add a message to a user's history.""" + self._history[user_id].append(Message(role=role, content=content)) + + def clear(self, user_id: str) -> None: + """Discard all in-memory history for a user and reset the loaded flag.""" + self._history[user_id] = [] + self._loaded.discard(user_id) + + async def load(self, user_id: str) -> None: + """Load persisted messages for a user (once per user per session). + + Subsequent calls for the same user_id are no-ops until clear() is called. + """ + if user_id in self._loaded or self._persistence is None: + return + messages = await self._persistence.load(user_id) + if messages: + self._history[user_id] = messages[-self._max:] + self._loaded.add(user_id) + + async def save(self, user_id: str) -> None: + """Persist the current in-memory history for a user. + + No-op when no persistence backend is configured. + """ + if self._persistence is None: + return + await self._persistence.save(user_id, self._history[user_id]) diff --git a/src/agentflow/telemetry/__init__.py b/src/agentflow/telemetry/__init__.py new file mode 100644 index 0000000..08183c9 --- /dev/null +++ b/src/agentflow/telemetry/__init__.py @@ -0,0 +1,4 @@ +"""Telemetry integrations for agentflow observability.""" +from agentflow.telemetry.langfuse_handler import LangfuseEventHandler + +__all__ = ["LangfuseEventHandler"] diff --git a/src/agentflow/telemetry/langfuse_handler.py b/src/agentflow/telemetry/langfuse_handler.py new file mode 100644 index 0000000..2b47970 --- /dev/null +++ b/src/agentflow/telemetry/langfuse_handler.py @@ -0,0 +1,217 @@ +""" +Langfuse observability handler for agentflow EventBus. + +Maps agentflow events to the Langfuse trace/span/generation hierarchy: + + WORKFLOW_STARTED → trace(name=workflow_name) + NODE_STARTED → trace.span(name=node_id) + LLM_CALL_COMPLETED → span.generation(model, usage, latency) — nested under node span + TOOL_CALLED → span.span(name="tool:tool_name") + NODE_COMPLETED → close span + WORKFLOW_COMPLETED → close trace + ERROR → mark span/trace as error + +Requirements: + pip install langfuse>=2.0 (or gittielabs-agentflow[telemetry]) + +Usage: + from agentflow.telemetry import LangfuseEventHandler + from agentflow import EventBus, WORKFLOW_STARTED, WORKFLOW_COMPLETED, ... + + bus = EventBus() + handler = LangfuseEventHandler( + public_key=os.getenv("LANGFUSE_PUBLIC_KEY"), + secret_key=os.getenv("LANGFUSE_SECRET_KEY"), + host=os.getenv("LANGFUSE_BASE_URL"), # e.g. https://us.cloud.langfuse.com + ) + for event in (WORKFLOW_STARTED, WORKFLOW_COMPLETED, NODE_STARTED, NODE_COMPLETED, + LLM_CALL_COMPLETED, TOOL_CALLED, ERROR): + bus.on(event, handler) + + # On shutdown: + handler.flush() +""" +from __future__ import annotations + +import logging +from typing import Any + +from agentflow.events import ( + ERROR, + LLM_CALL_COMPLETED, + NODE_COMPLETED, + NODE_STARTED, + TOOL_CALLED, + WORKFLOW_COMPLETED, + WORKFLOW_STARTED, +) + +logger = logging.getLogger("agentflow.telemetry") + + +class LangfuseEventHandler: + """EventBus subscriber that creates Langfuse traces from agentflow events. + + Span keying strategy: + - Traces are keyed by workflow name. + - Spans are keyed by node_id (the ``node`` value emitted by NODE_STARTED). + - LLM generations are nested under the matching node span (looked up by ``node`` key + from LLM_CALL_COMPLETED data). Falls back to the first active trace if no span found. + + Thread safety: not thread-safe; designed for use within a single asyncio event loop. + All state lives in plain dicts — fine for asyncio's cooperative multitasking. + """ + + def __init__( + self, + public_key: str, + secret_key: str, + host: str | None = None, + ) -> None: + """ + Args: + public_key: Langfuse public key (LANGFUSE_PUBLIC_KEY). + secret_key: Langfuse secret key (LANGFUSE_SECRET_KEY). + host: Optional Langfuse instance URL. Defaults to Langfuse Cloud EU. + Use ``https://us.cloud.langfuse.com`` for Langfuse Cloud US. + """ + try: + from langfuse import Langfuse + except ImportError: + raise ImportError( + "langfuse package is required. " + "Install with: pip install langfuse " + "or: pip install gittielabs-agentflow[telemetry]" + ) + + kwargs: dict[str, Any] = { + "public_key": public_key, + "secret_key": secret_key, + } + if host: + kwargs["host"] = host + + self._lf = Langfuse(**kwargs) + + # Active objects — keyed by workflow name / node_id + self._traces: dict[str, Any] = {} + self._spans: dict[str, Any] = {} + + async def on_event(self, event_type: str, data: dict[str, Any]) -> None: + """Dispatch an event to the appropriate handler. Errors are caught and logged.""" + try: + if event_type == WORKFLOW_STARTED: + self._handle_workflow_started(data) + elif event_type == WORKFLOW_COMPLETED: + self._handle_workflow_completed(data) + elif event_type == NODE_STARTED: + self._handle_node_started(data) + elif event_type == NODE_COMPLETED: + self._handle_node_completed(data) + elif event_type == LLM_CALL_COMPLETED: + self._handle_llm_call_completed(data) + elif event_type == TOOL_CALLED: + self._handle_tool_called(data) + elif event_type == ERROR: + self._handle_error(data) + except Exception: + logger.warning("LangfuseEventHandler error on event '%s'", event_type, exc_info=True) + + # ─── Event handlers ────────────────────────────────────────────────────── + + def _handle_workflow_started(self, data: dict[str, Any]) -> None: + workflow = data.get("workflow", "unknown") + trace = self._lf.trace(name=workflow, metadata=data) + self._traces[workflow] = trace + logger.debug("Langfuse trace created: %s", workflow) + + def _handle_workflow_completed(self, data: dict[str, Any]) -> None: + workflow = data.get("workflow", "unknown") + trace = self._traces.pop(workflow, None) + if trace: + trace.update( + output=data.get("result", ""), + metadata={"nodes_completed": data.get("nodes_completed", 0)}, + ) + logger.debug("Langfuse trace closed: %s", workflow) + + def _handle_node_started(self, data: dict[str, Any]) -> None: + node = data.get("node", "unknown") + # Find the most recently opened trace to attach this span to + trace = next(iter(self._traces.values()), None) if self._traces else None + if trace: + span = trace.span(name=node, input=data) + self._spans[node] = span + logger.debug("Langfuse span created: %s", node) + + def _handle_node_completed(self, data: dict[str, Any]) -> None: + node = data.get("node", "unknown") + span = self._spans.pop(node, None) + if span: + span.end( + output=data.get("output", ""), + metadata={"agent": data.get("agent", "")}, + ) + logger.debug("Langfuse span closed: %s", node) + + def _handle_llm_call_completed(self, data: dict[str, Any]) -> None: + """Nest an LLM generation under the appropriate node span. + + Uses the ``node`` key from LLM_CALL_COMPLETED (threaded from AgentExecutor + through node_id parameter). Falls back to the first active trace. + """ + node = data.get("node") or data.get("agent", "unknown") + span = self._spans.get(node) + parent = span if span is not None else next(iter(self._traces.values()), None) + + if parent is None: + return + + input_tokens = data.get("input_tokens", 0) + output_tokens = data.get("output_tokens", 0) + + parent.generation( + name=f"{data.get('agent', 'llm')}/round-{data.get('round', 0)}", + model=data.get("model"), + usage={ + "input": input_tokens, + "output": output_tokens, + "total": input_tokens + output_tokens, + }, + metadata={ + "round": data.get("round"), + "elapsed_ms": data.get("elapsed_ms"), + "stop_reason": data.get("stop_reason"), + "tool_calls": data.get("tool_calls", 0), + }, + ) + + def _handle_tool_called(self, data: dict[str, Any]) -> None: + node = data.get("node", "unknown") + span = self._spans.get(node) + if span: + tool_name = data.get("tool", "unknown") + span.span( + name=f"tool:{tool_name}", + input=data.get("input"), + metadata={"round": data.get("round")}, + ) + + def _handle_error(self, data: dict[str, Any]) -> None: + node = data.get("node", "unknown") + span = self._spans.pop(node, None) + if span: + span.end( + level="ERROR", + status_message=data.get("error", "unknown error"), + ) + + # ─── Lifecycle ─────────────────────────────────────────────────────────── + + def flush(self) -> None: + """Flush pending events to Langfuse. Call this on application shutdown.""" + try: + self._lf.flush() + logger.debug("Langfuse flush completed") + except Exception: + logger.warning("Langfuse flush error", exc_info=True)