From 6870691e70e4ae93becf3b6be9182cf6f167d63e Mon Sep 17 00:00:00 2001 From: Keith Elliott Date: Wed, 11 Mar 2026 13:43:00 -0400 Subject: [PATCH 1/2] feat: add LLM call events, fix Gemini metadata bug, rename to gittielabs-agentflow v0.1.1 - Rename package to gittielabs-agentflow in pyproject.toml (import stays agentflow) - Add LLM_CALL_STARTED / LLM_CALL_COMPLETED constants to events.py - Emit LLM call events from AgentExecutor.run() with node_id, session_id, timing, and token usage - Fix google_genai.py: thinking metadata was computed but dropped; now passed to AgentResponse - Export new event constants from agentflow.__init__ - Add orchestration and telemetry optional dependency groups Co-Authored-By: Claude Sonnet 4.6 --- pyproject.toml | 8 ++++--- src/agentflow/__init__.py | 24 +++++++++++++++++++-- src/agentflow/agent/runtime.py | 28 ++++++++++++++++++++++++- src/agentflow/events.py | 2 ++ src/agentflow/providers/google_genai.py | 1 + 5 files changed, 57 insertions(+), 6 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index d0f4506..a0ec6e5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,8 +3,8 @@ requires = ["hatchling"] build-backend = "hatchling.build" [project] -name = "agentflow" -version = "0.1.0" +name = "gittielabs-agentflow" +version = "0.1.1" description = "Context engineering framework for multi-agent systems" readme = "README.md" requires-python = ">=3.11" @@ -44,7 +44,9 @@ openai = ["openai>=1.0.0"] google = ["google-genai>=1.0.0"] s3 = ["boto3>=1.34.0"] vector = ["qdrant-client>=1.9.0"] -all = ["agentflow[anthropic,openai,google,s3,vector]"] +orchestration = ["anthropic>=0.42.0"] +telemetry = ["langfuse>=2.0"] +all = ["gittielabs-agentflow[anthropic,openai,google,s3,vector,orchestration,telemetry]"] dev = [ "pytest>=8.0", "pytest-asyncio>=0.23", diff --git a/src/agentflow/__init__.py b/src/agentflow/__init__.py index ab73fcb..c1cc6c2 100644 --- a/src/agentflow/__init__.py +++ b/src/agentflow/__init__.py @@ -26,7 +26,18 @@ ) from agentflow.agent import AgentExecutor, ContextAssembler, PromptTemplate from agentflow.config import AgentConfig, ConfigLoader, RouterConfig, WorkflowConfig -from agentflow.events import EventBus +from agentflow.events import ( + EventBus, + LLM_CALL_STARTED, + LLM_CALL_COMPLETED, + NODE_STARTED, + NODE_COMPLETED, + TOOL_CALLED, + TOOL_RESULT, + WORKFLOW_STARTED, + WORKFLOW_COMPLETED, + ERROR, +) from agentflow.storage import FileSystemStorage, InMemoryStorage, S3Storage from agentflow.tools import HTTPToolDispatcher, LocalToolDispatcher, ToolRegistry from agentflow.providers import AnthropicProvider, GoogleGenAIProvider, MockLLMProvider, OpenAICompatProvider @@ -61,6 +72,15 @@ "WorkflowConfig", # Events "EventBus", + "LLM_CALL_STARTED", + "LLM_CALL_COMPLETED", + "NODE_STARTED", + "NODE_COMPLETED", + "TOOL_CALLED", + "TOOL_RESULT", + "WORKFLOW_STARTED", + "WORKFLOW_COMPLETED", + "ERROR", # Storage "FileSystemStorage", "InMemoryStorage", @@ -93,4 +113,4 @@ "WorkflowExecutor", ] -__version__ = "0.1.0" +__version__ = "0.1.1" diff --git a/src/agentflow/agent/runtime.py b/src/agentflow/agent/runtime.py index 8f8984f..4a1dae3 100644 --- a/src/agentflow/agent/runtime.py +++ b/src/agentflow/agent/runtime.py @@ -8,11 +8,12 @@ from __future__ import annotations import logging +import time from typing import Any from agentflow.agent.context import ContextAssembler from agentflow.config.schemas import AgentConfig -from agentflow.events import EventBus, TOOL_CALLED, TOOL_RESULT, ERROR +from agentflow.events import EventBus, TOOL_CALLED, TOOL_RESULT, ERROR, LLM_CALL_STARTED, LLM_CALL_COMPLETED from agentflow.protocols import LLMProvider, ToolDispatcher from agentflow.types import AgentResponse, Message, NodeOutput, Role, ToolResult @@ -98,6 +99,16 @@ async def run( # Tool-use loop for round_num in range(self._config.max_tool_rounds): + t0 = time.monotonic() + if self._events: + await self._events.emit(LLM_CALL_STARTED, { + "agent": self._config.name, + "model": self._config.model, + "node": node_id, + "session_id": session_id, + "round": round_num, + }) + response = await self._llm.chat( messages=messages, system=system, @@ -106,6 +117,21 @@ async def run( temperature=self._config.temperature, ) + elapsed_ms = int((time.monotonic() - t0) * 1000) + if self._events: + await self._events.emit(LLM_CALL_COMPLETED, { + "agent": self._config.name, + "model": self._config.model, + "node": node_id, + "session_id": session_id, + "round": round_num, + "stop_reason": response.stop_reason, + "input_tokens": response.usage.get("input_tokens", 0) if response.usage else 0, + "output_tokens": response.usage.get("output_tokens", 0) if response.usage else 0, + "elapsed_ms": elapsed_ms, + "tool_calls": len(response.tool_calls), + }) + # Emit thinking text as a trace event (Gemini thinking models only). # This is separated from response text in the provider's _from_api_response. thinking_text = (response.raw and hasattr(response, "metadata") diff --git a/src/agentflow/events.py b/src/agentflow/events.py index 2bb302a..f0a5836 100644 --- a/src/agentflow/events.py +++ b/src/agentflow/events.py @@ -25,6 +25,8 @@ WORKFLOW_COMPLETED = "workflow_completed" MEMORY_STORED = "memory_stored" ERROR = "error" +LLM_CALL_STARTED = "llm_call_started" +LLM_CALL_COMPLETED = "llm_call_completed" class EventBus: diff --git a/src/agentflow/providers/google_genai.py b/src/agentflow/providers/google_genai.py index bb40863..9e10977 100644 --- a/src/agentflow/providers/google_genai.py +++ b/src/agentflow/providers/google_genai.py @@ -203,4 +203,5 @@ def _from_api_response(self, response: Any) -> AgentResponse: stop_reason=stop_reason, usage=usage, raw=response, + metadata={"thinking": "".join(thinking_parts)} if thinking_parts else {}, ) From 06b3465598ae8439ca399d1bb8734d34ee0ec584 Mon Sep 17 00:00:00 2001 From: Keith Elliott Date: Wed, 11 Mar 2026 13:53:47 -0400 Subject: [PATCH 2/2] feat: add MultiUserHistory and orchestration primitives v0.2.0 - session/multi_user.py: MultiUserHistory with HistoryPersistence protocol - Per-user in-memory history with configurable max and optional persistence backend - load()/save() for pluggable DB persistence (e.g. PostgreSQL adapter) - Lazy loading: each user's history is loaded from persistence exactly once per session - orchestration/types.py: Plan and PlanStep TypedDicts - orchestration/executor.py: DAGExecutor with topological round-batching - asyncio.gather() for independent steps, dependency resolution via {{key.result}} refs - Graceful fallback to sequential execution on circular deps - orchestration/classifier.py: ComplexityClassifier - Fast bypass for short messages (< 20 words, no multi-step markers) - Haiku LLM call for ambiguous messages, defaults to SIMPLE on failure - Export all new symbols from agentflow.__init__ Co-Authored-By: Claude Sonnet 4.6 --- pyproject.toml | 2 +- src/agentflow/__init__.py | 12 ++- src/agentflow/orchestration/__init__.py | 12 +++ src/agentflow/orchestration/classifier.py | 92 +++++++++++++++++ src/agentflow/orchestration/executor.py | 115 ++++++++++++++++++++++ src/agentflow/orchestration/types.py | 38 +++++++ src/agentflow/session/__init__.py | 12 ++- src/agentflow/session/multi_user.py | 93 +++++++++++++++++ 8 files changed, 371 insertions(+), 5 deletions(-) create mode 100644 src/agentflow/orchestration/__init__.py create mode 100644 src/agentflow/orchestration/classifier.py create mode 100644 src/agentflow/orchestration/executor.py create mode 100644 src/agentflow/orchestration/types.py create mode 100644 src/agentflow/session/multi_user.py diff --git a/pyproject.toml b/pyproject.toml index a0ec6e5..62c801f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "gittielabs-agentflow" -version = "0.1.1" +version = "0.2.0" description = "Context engineering framework for multi-agent systems" readme = "README.md" requires-python = ">=3.11" diff --git a/src/agentflow/__init__.py b/src/agentflow/__init__.py index c1cc6c2..e2a6f2d 100644 --- a/src/agentflow/__init__.py +++ b/src/agentflow/__init__.py @@ -41,7 +41,8 @@ from agentflow.storage import FileSystemStorage, InMemoryStorage, S3Storage from agentflow.tools import HTTPToolDispatcher, LocalToolDispatcher, ToolRegistry from agentflow.providers import AnthropicProvider, GoogleGenAIProvider, MockLLMProvider, OpenAICompatProvider -from agentflow.session import ArtifactStore, Scratchpad, Session, SessionManager +from agentflow.session import ArtifactStore, HistoryPersistence, MultiUserHistory, Scratchpad, Session, SessionManager +from agentflow.orchestration import ComplexityClassifier, DAGExecutor, Plan, PlanStep from agentflow.memory import FileMemory, MemoryManager, VectorMemory from agentflow.router import RouterEngine, RoutingResult, RuleEvaluator from agentflow.workflow import NodeRunner, WorkflowDAG, WorkflowExecutor @@ -96,9 +97,16 @@ "OpenAICompatProvider", # Session "ArtifactStore", + "HistoryPersistence", + "MultiUserHistory", "Scratchpad", "Session", "SessionManager", + # Orchestration + "ComplexityClassifier", + "DAGExecutor", + "Plan", + "PlanStep", # Memory "FileMemory", "MemoryManager", @@ -113,4 +121,4 @@ "WorkflowExecutor", ] -__version__ = "0.1.1" +__version__ = "0.2.0" diff --git a/src/agentflow/orchestration/__init__.py b/src/agentflow/orchestration/__init__.py new file mode 100644 index 0000000..9449472 --- /dev/null +++ b/src/agentflow/orchestration/__init__.py @@ -0,0 +1,12 @@ +"""Orchestration primitives: multi-step plan types, DAG executor, and complexity classifier.""" +from agentflow.orchestration.types import Plan, PlanStep +from agentflow.orchestration.executor import DAGExecutor, WorkflowRunner +from agentflow.orchestration.classifier import ComplexityClassifier + +__all__ = [ + "Plan", + "PlanStep", + "DAGExecutor", + "WorkflowRunner", + "ComplexityClassifier", +] diff --git a/src/agentflow/orchestration/classifier.py b/src/agentflow/orchestration/classifier.py new file mode 100644 index 0000000..78cdd97 --- /dev/null +++ b/src/agentflow/orchestration/classifier.py @@ -0,0 +1,92 @@ +""" +Complexity classifier for multi-step request detection. + +Determines whether a user message requires a multi-step orchestration plan +(COMPLEX) or can be handled by a single workflow (SIMPLE). + +Uses a two-stage approach: +1. Fast bypass: messages under _FAST_BYPASS_LIMIT words with no multi-step + language markers return SIMPLE immediately (no LLM call). +2. LLM fallback: a direct Haiku call classifies ambiguous messages. + +Defaults to SIMPLE on any failure to ensure graceful degradation. +""" +from __future__ import annotations + +import logging + +logger = logging.getLogger("agentflow.orchestration") + +_FAST_BYPASS_LIMIT = 20 +_MULTI_STEP_MARKERS = ( + "and then", + "after that", + "followed by", + "then write", + "then create", + "then send", + "then research", +) + + +class ComplexityClassifier: + """Classifies a message as SIMPLE or COMPLEX via fast pre-check + LLM call. + + Args: + api_key: Anthropic API key for the Haiku classification call. + model: Model to use for classification. Defaults to claude-3-5-haiku-20241022. + + Example: + classifier = ComplexityClassifier(api_key=os.getenv("ANTHROPIC_API_KEY")) + result = await classifier.classify("Research AI agents and then write a LinkedIn post") + # result == "COMPLEX" + """ + + def __init__( + self, + api_key: str, + model: str = "claude-3-5-haiku-20241022", + ) -> None: + self._api_key = api_key + self._model = model + + async def classify(self, message: str) -> str: + """Return 'SIMPLE' or 'COMPLEX'. Defaults to 'SIMPLE' on any failure. + + Args: + message: The user message to classify. + + Returns: + 'COMPLEX' if the message requires multiple distinct workflow steps, + 'SIMPLE' otherwise. + """ + # Fast path: short messages with no multi-step markers skip the LLM call + if len(message.split()) < _FAST_BYPASS_LIMIT and not any( + m in message.lower() for m in _MULTI_STEP_MARKERS + ): + return "SIMPLE" + + try: + import anthropic as _anthropic + + client = _anthropic.AsyncAnthropic(api_key=self._api_key) + resp = await client.messages.create( + model=self._model, + max_tokens=10, + temperature=0.0, + system=( + "Classify the user request as SIMPLE (single task, one workflow) or " + "COMPLEX (multiple distinct tasks each requiring their own workflow). " + "Reply with exactly one word: SIMPLE or COMPLEX." + ), + messages=[{"role": "user", "content": message}], + ) + text = resp.content[0].text if resp.content else "" + return "COMPLEX" if "COMPLEX" in text.upper() else "SIMPLE" + + except ImportError: + logger.warning("anthropic package not installed; ComplexityClassifier defaulting to SIMPLE") + return "SIMPLE" + except Exception: + logger.warning("ComplexityClassifier failed, defaulting to SIMPLE", exc_info=True) + return "SIMPLE" diff --git a/src/agentflow/orchestration/executor.py b/src/agentflow/orchestration/executor.py new file mode 100644 index 0000000..24fd5b8 --- /dev/null +++ b/src/agentflow/orchestration/executor.py @@ -0,0 +1,115 @@ +""" +Async DAG executor for multi-step orchestration plans. + +Executes a Plan by resolving inter-step dependencies ({{key.result}} references) +and running independent steps concurrently via asyncio.gather(). +""" +from __future__ import annotations + +import asyncio +import logging +import re +from typing import Awaitable, Callable + +from agentflow.orchestration.types import Plan, PlanStep + +logger = logging.getLogger("agentflow.orchestration") + +# Callable signature: runner(workflow_name, message) -> result_text +WorkflowRunner = Callable[[str, str], Awaitable[str]] + + +class DAGExecutor: + """Executes a Plan using async topological round-batching. + + Independent steps — those whose message templates have no unresolved + ``{{key.result}}`` references — run concurrently via asyncio.gather(). + Steps that depend on earlier output wait only for their specific prerequisites. + + If a dependency cycle is detected (no steps are ready but some remain), the + executor falls back to sequential execution to avoid a deadlock. + + Example: + plan = Plan(steps=[ + PlanStep(id="s1", workflow="research", message="AI agents", output_key="research"), + PlanStep(id="s2", workflow="write", message="Write based on {{research.result}}", output_key="article"), + ]) + executor = DAGExecutor() + result = await executor.execute(plan, runner=my_runner) + """ + + _DEP_PATTERN = re.compile(r"\{\{(\w+)\.result\}\}") + + async def execute( + self, + plan: Plan, + runner: WorkflowRunner, + variables: dict | None = None, + ) -> str: + """Execute the plan and return a combined result string. + + Args: + plan: The orchestration plan with steps to execute. + runner: Async callable that invokes a named workflow with a message. + variables: Optional extra variables (currently reserved for future use). + + Returns: + Newline-separated results from all successful steps. Failed steps are + omitted from the combined output but logged as errors. + """ + outputs: dict[str, str] = {} + ordered: list[tuple[int, str]] = [] # (original_index, result_text) + + async def _run(step: PlanStep, idx: int) -> tuple[int, str]: + """Execute one step, substituting resolved outputs into its message.""" + msg = step["message"] + for key, val in outputs.items(): + msg = msg.replace(f"{{{{{key}.result}}}}", val[:3000]) + try: + result = await runner(step["workflow"], msg) + logger.info( + "Step %s (%s) completed (%d chars)", + step["id"], step["workflow"], len(result), + ) + return idx, result + except Exception as exc: + logger.error("Step %s (%s) failed: %s", step["id"], step["workflow"], exc) + return idx, f"[Step {step['id']} failed: {exc}]" + + remaining = list(enumerate(plan["steps"])) + completed: set[str] = set() + + while remaining: + ready: list[tuple[int, PlanStep]] = [] + waiting: list[tuple[int, PlanStep]] = [] + + for idx, step in remaining: + deps = self._DEP_PATTERN.findall(step["message"]) + if all(d in completed for d in deps): + ready.append((idx, step)) + else: + waiting.append((idx, step)) + + if not ready: + # Circular or unresolvable dependencies — run remaining sequentially + logger.warning( + "DAGExecutor: %d step(s) have unresolvable dependencies, " + "falling back to sequential execution", + len(waiting), + ) + ready, waiting = waiting, [] + + # Run all ready steps concurrently + results = await asyncio.gather(*[_run(step, idx) for idx, step in ready]) + + for (_, step), (idx, result) in zip(ready, results): + outputs[step["output_key"]] = result + completed.add(step["output_key"]) + ordered.append((idx, result)) + + remaining = waiting + + # Re-sort by original step order and filter out failed steps + ordered.sort(key=lambda x: x[0]) + good = [r for _, r in ordered if not r.startswith("[Step ")] + return "\n\n---\n\n".join(good) if good else "No results produced." diff --git a/src/agentflow/orchestration/types.py b/src/agentflow/orchestration/types.py new file mode 100644 index 0000000..37aabfd --- /dev/null +++ b/src/agentflow/orchestration/types.py @@ -0,0 +1,38 @@ +""" +Orchestration type definitions. + +PlanStep and Plan describe the output of a planner agent — a set of +workflow invocations (possibly with inter-step dependencies) that together +satisfy a complex user request. +""" +from __future__ import annotations + +from typing import TypedDict + + +class PlanStep(TypedDict): + """A single step in an orchestration plan. + + Fields: + id: Unique identifier for this step (e.g. "step_1"). + workflow: Name of the workflow to invoke (must exist in the config loader). + message: Input message for the workflow. May contain ``{{output_key.result}}`` + references to the output of prior steps. + output_key: Key under which this step's result is stored for use by later steps. + """ + + id: str + workflow: str + message: str + output_key: str + + +class Plan(TypedDict): + """A complete orchestration plan produced by a planner agent. + + Fields: + steps: Ordered list of PlanStep items. DAGExecutor resolves + inter-step dependencies and executes independent steps concurrently. + """ + + steps: list[PlanStep] diff --git a/src/agentflow/session/__init__.py b/src/agentflow/session/__init__.py index 2d6fb91..bd3b96c 100644 --- a/src/agentflow/session/__init__.py +++ b/src/agentflow/session/__init__.py @@ -1,6 +1,14 @@ -"""Session management: sessions, scratchpads, and artifacts.""" +"""Session management: sessions, scratchpads, artifacts, and multi-user history.""" from agentflow.session.manager import Session, SessionManager from agentflow.session.scratchpad import Scratchpad from agentflow.session.artifacts import ArtifactStore +from agentflow.session.multi_user import MultiUserHistory, HistoryPersistence -__all__ = ["Session", "SessionManager", "Scratchpad", "ArtifactStore"] +__all__ = [ + "Session", + "SessionManager", + "Scratchpad", + "ArtifactStore", + "MultiUserHistory", + "HistoryPersistence", +] diff --git a/src/agentflow/session/multi_user.py b/src/agentflow/session/multi_user.py new file mode 100644 index 0000000..5e0e4ac --- /dev/null +++ b/src/agentflow/session/multi_user.py @@ -0,0 +1,93 @@ +""" +Multi-user conversation history manager. + +Generic per-user in-memory history with an optional abstract persistence hook. +Applications that store history in a database (e.g. PostgreSQL) implement the +HistoryPersistence protocol and pass an instance to MultiUserHistory. +""" +from __future__ import annotations + +from collections import defaultdict +from typing import Protocol, runtime_checkable + +from agentflow.types import Message, Role + + +MAX_DEFAULT = 50 + + +@runtime_checkable +class HistoryPersistence(Protocol): + """Protocol for pluggable conversation history persistence backends.""" + + async def load(self, user_id: str) -> list[Message]: + """Load stored messages for a user. Return empty list if none.""" + ... + + async def save(self, user_id: str, messages: list[Message]) -> None: + """Persist the current in-memory messages for a user.""" + ... + + +class MultiUserHistory: + """Per-user in-memory conversation history with configurable max and optional persistence. + + Usage: + history = MultiUserHistory(max_history=50) + history.append(user_id, Role.USER, "Hello") + history.append(user_id, Role.ASSISTANT, "Hi there!") + messages = history.get(user_id) # [Message(role=USER, ...), Message(role=ASSISTANT, ...)] + + With PostgreSQL persistence: + history = MultiUserHistory(max_history=50, persistence=postgres_adapter) + await history.load(user_id) # loads once per user per session + history.append(...) + await history.save(user_id) # flush to DB + """ + + def __init__( + self, + max_history: int = MAX_DEFAULT, + persistence: HistoryPersistence | None = None, + ) -> None: + self._history: dict[str, list[Message]] = defaultdict(list) + self._loaded: set[str] = set() + self._max = max_history + self._persistence = persistence + + def get(self, user_id: str) -> list[Message]: + """Return the current in-memory history for a user, trimming to max_history.""" + history = self._history[user_id] + if len(history) > self._max: + self._history[user_id] = history[-self._max:] + return self._history[user_id] + + def append(self, user_id: str, role: Role, content: str) -> None: + """Add a message to a user's history.""" + self._history[user_id].append(Message(role=role, content=content)) + + def clear(self, user_id: str) -> None: + """Discard all in-memory history for a user and reset the loaded flag.""" + self._history[user_id] = [] + self._loaded.discard(user_id) + + async def load(self, user_id: str) -> None: + """Load persisted messages for a user (once per user per session). + + Subsequent calls for the same user_id are no-ops until clear() is called. + """ + if user_id in self._loaded or self._persistence is None: + return + messages = await self._persistence.load(user_id) + if messages: + self._history[user_id] = messages[-self._max:] + self._loaded.add(user_id) + + async def save(self, user_id: str) -> None: + """Persist the current in-memory history for a user. + + No-op when no persistence backend is configured. + """ + if self._persistence is None: + return + await self._persistence.save(user_id, self._history[user_id])