Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ requires = ["hatchling"]
build-backend = "hatchling.build"

[project]
name = "agentflow"
version = "0.1.0"
name = "gittielabs-agentflow"
version = "0.2.1"
description = "Context engineering framework for multi-agent systems"
readme = "README.md"
requires-python = ">=3.11"
Expand Down Expand Up @@ -44,7 +44,9 @@ openai = ["openai>=1.0.0"]
google = ["google-genai>=1.0.0"]
s3 = ["boto3>=1.34.0"]
vector = ["qdrant-client>=1.9.0"]
all = ["agentflow[anthropic,openai,google,s3,vector]"]
orchestration = ["anthropic>=0.42.0"]
telemetry = ["langfuse>=2.0"]
all = ["gittielabs-agentflow[anthropic,openai,google,s3,vector,orchestration,telemetry]"]
dev = [
"pytest>=8.0",
"pytest-asyncio>=0.23",
Expand Down
44 changes: 41 additions & 3 deletions src/agentflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,23 @@
)
from agentflow.agent import AgentExecutor, ContextAssembler, PromptTemplate
from agentflow.config import AgentConfig, ConfigLoader, RouterConfig, WorkflowConfig
from agentflow.events import EventBus
from agentflow.events import (
EventBus,
LLM_CALL_STARTED,
LLM_CALL_COMPLETED,
NODE_STARTED,
NODE_COMPLETED,
TOOL_CALLED,
TOOL_RESULT,
WORKFLOW_STARTED,
WORKFLOW_COMPLETED,
ERROR,
)
from agentflow.storage import FileSystemStorage, InMemoryStorage, S3Storage
from agentflow.tools import HTTPToolDispatcher, LocalToolDispatcher, ToolRegistry
from agentflow.providers import AnthropicProvider, GoogleGenAIProvider, MockLLMProvider, OpenAICompatProvider
from agentflow.session import ArtifactStore, Scratchpad, Session, SessionManager
from agentflow.session import ArtifactStore, HistoryPersistence, MultiUserHistory, Scratchpad, Session, SessionManager
from agentflow.orchestration import ComplexityClassifier, DAGExecutor, Plan, PlanStep
from agentflow.memory import FileMemory, MemoryManager, VectorMemory
from agentflow.router import RouterEngine, RoutingResult, RuleEvaluator
from agentflow.workflow import NodeRunner, WorkflowDAG, WorkflowExecutor
Expand Down Expand Up @@ -61,6 +73,15 @@
"WorkflowConfig",
# Events
"EventBus",
"LLM_CALL_STARTED",
"LLM_CALL_COMPLETED",
"NODE_STARTED",
"NODE_COMPLETED",
"TOOL_CALLED",
"TOOL_RESULT",
"WORKFLOW_STARTED",
"WORKFLOW_COMPLETED",
"ERROR",
# Storage
"FileSystemStorage",
"InMemoryStorage",
Expand All @@ -76,9 +97,18 @@
"OpenAICompatProvider",
# Session
"ArtifactStore",
"HistoryPersistence",
"MultiUserHistory",
"Scratchpad",
"Session",
"SessionManager",
# Orchestration
"ComplexityClassifier",
"DAGExecutor",
"Plan",
"PlanStep",
# Telemetry
"LangfuseEventHandler",
# Memory
"FileMemory",
"MemoryManager",
Expand All @@ -93,4 +123,12 @@
"WorkflowExecutor",
]

__version__ = "0.1.0"
def __getattr__(name: str):
"""Lazy import for optional telemetry extras to avoid hard import failures."""
if name == "LangfuseEventHandler":
from agentflow.telemetry import LangfuseEventHandler
return LangfuseEventHandler
raise AttributeError(f"module 'agentflow' has no attribute {name!r}")


__version__ = "0.2.1"
28 changes: 27 additions & 1 deletion src/agentflow/agent/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@
from __future__ import annotations

import logging
import time
from typing import Any

from agentflow.agent.context import ContextAssembler
from agentflow.config.schemas import AgentConfig
from agentflow.events import EventBus, TOOL_CALLED, TOOL_RESULT, ERROR
from agentflow.events import EventBus, TOOL_CALLED, TOOL_RESULT, ERROR, LLM_CALL_STARTED, LLM_CALL_COMPLETED
from agentflow.protocols import LLMProvider, ToolDispatcher
from agentflow.types import AgentResponse, Message, NodeOutput, Role, ToolResult

Expand Down Expand Up @@ -98,6 +99,16 @@ async def run(

# Tool-use loop
for round_num in range(self._config.max_tool_rounds):
t0 = time.monotonic()
if self._events:
await self._events.emit(LLM_CALL_STARTED, {
"agent": self._config.name,
"model": self._config.model,
"node": node_id,
"session_id": session_id,
"round": round_num,
})

response = await self._llm.chat(
messages=messages,
system=system,
Expand All @@ -106,6 +117,21 @@ async def run(
temperature=self._config.temperature,
)

elapsed_ms = int((time.monotonic() - t0) * 1000)
if self._events:
await self._events.emit(LLM_CALL_COMPLETED, {
"agent": self._config.name,
"model": self._config.model,
"node": node_id,
"session_id": session_id,
"round": round_num,
"stop_reason": response.stop_reason,
"input_tokens": response.usage.get("input_tokens", 0) if response.usage else 0,
"output_tokens": response.usage.get("output_tokens", 0) if response.usage else 0,
"elapsed_ms": elapsed_ms,
"tool_calls": len(response.tool_calls),
})

# Emit thinking text as a trace event (Gemini thinking models only).
# This is separated from response text in the provider's _from_api_response.
thinking_text = (response.raw and hasattr(response, "metadata")
Expand Down
2 changes: 2 additions & 0 deletions src/agentflow/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
WORKFLOW_COMPLETED = "workflow_completed"
MEMORY_STORED = "memory_stored"
ERROR = "error"
LLM_CALL_STARTED = "llm_call_started"
LLM_CALL_COMPLETED = "llm_call_completed"


class EventBus:
Expand Down
12 changes: 12 additions & 0 deletions src/agentflow/orchestration/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
"""Orchestration primitives: multi-step plan types, DAG executor, and complexity classifier."""
from agentflow.orchestration.types import Plan, PlanStep
from agentflow.orchestration.executor import DAGExecutor, WorkflowRunner
from agentflow.orchestration.classifier import ComplexityClassifier

__all__ = [
"Plan",
"PlanStep",
"DAGExecutor",
"WorkflowRunner",
"ComplexityClassifier",
]
92 changes: 92 additions & 0 deletions src/agentflow/orchestration/classifier.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
"""
Complexity classifier for multi-step request detection.

Determines whether a user message requires a multi-step orchestration plan
(COMPLEX) or can be handled by a single workflow (SIMPLE).

Uses a two-stage approach:
1. Fast bypass: messages under _FAST_BYPASS_LIMIT words with no multi-step
language markers return SIMPLE immediately (no LLM call).
2. LLM fallback: a direct Haiku call classifies ambiguous messages.

Defaults to SIMPLE on any failure to ensure graceful degradation.
"""
from __future__ import annotations

import logging

logger = logging.getLogger("agentflow.orchestration")

_FAST_BYPASS_LIMIT = 20
_MULTI_STEP_MARKERS = (
"and then",
"after that",
"followed by",
"then write",
"then create",
"then send",
"then research",
)


class ComplexityClassifier:
"""Classifies a message as SIMPLE or COMPLEX via fast pre-check + LLM call.

Args:
api_key: Anthropic API key for the Haiku classification call.
model: Model to use for classification. Defaults to claude-3-5-haiku-20241022.

Example:
classifier = ComplexityClassifier(api_key=os.getenv("ANTHROPIC_API_KEY"))
result = await classifier.classify("Research AI agents and then write a LinkedIn post")
# result == "COMPLEX"
"""

def __init__(
self,
api_key: str,
model: str = "claude-3-5-haiku-20241022",
) -> None:
self._api_key = api_key
self._model = model

async def classify(self, message: str) -> str:
"""Return 'SIMPLE' or 'COMPLEX'. Defaults to 'SIMPLE' on any failure.

Args:
message: The user message to classify.

Returns:
'COMPLEX' if the message requires multiple distinct workflow steps,
'SIMPLE' otherwise.
"""
# Fast path: short messages with no multi-step markers skip the LLM call
if len(message.split()) < _FAST_BYPASS_LIMIT and not any(
m in message.lower() for m in _MULTI_STEP_MARKERS
):
return "SIMPLE"

try:
import anthropic as _anthropic

client = _anthropic.AsyncAnthropic(api_key=self._api_key)
resp = await client.messages.create(
model=self._model,
max_tokens=10,
temperature=0.0,
system=(
"Classify the user request as SIMPLE (single task, one workflow) or "
"COMPLEX (multiple distinct tasks each requiring their own workflow). "
"Reply with exactly one word: SIMPLE or COMPLEX."
),
messages=[{"role": "user", "content": message}],
)
text = resp.content[0].text if resp.content else ""
return "COMPLEX" if "COMPLEX" in text.upper() else "SIMPLE"

except ImportError:
logger.warning("anthropic package not installed; ComplexityClassifier defaulting to SIMPLE")
return "SIMPLE"
except Exception:
logger.warning("ComplexityClassifier failed, defaulting to SIMPLE", exc_info=True)
return "SIMPLE"
115 changes: 115 additions & 0 deletions src/agentflow/orchestration/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
"""
Async DAG executor for multi-step orchestration plans.

Executes a Plan by resolving inter-step dependencies ({{key.result}} references)
and running independent steps concurrently via asyncio.gather().
"""
from __future__ import annotations

import asyncio
import logging
import re
from typing import Awaitable, Callable

from agentflow.orchestration.types import Plan, PlanStep

logger = logging.getLogger("agentflow.orchestration")

# Callable signature: runner(workflow_name, message) -> result_text
WorkflowRunner = Callable[[str, str], Awaitable[str]]


class DAGExecutor:
"""Executes a Plan using async topological round-batching.

Independent steps — those whose message templates have no unresolved
``{{key.result}}`` references — run concurrently via asyncio.gather().
Steps that depend on earlier output wait only for their specific prerequisites.

If a dependency cycle is detected (no steps are ready but some remain), the
executor falls back to sequential execution to avoid a deadlock.

Example:
plan = Plan(steps=[
PlanStep(id="s1", workflow="research", message="AI agents", output_key="research"),
PlanStep(id="s2", workflow="write", message="Write based on {{research.result}}", output_key="article"),
])
executor = DAGExecutor()
result = await executor.execute(plan, runner=my_runner)
"""

_DEP_PATTERN = re.compile(r"\{\{(\w+)\.result\}\}")

async def execute(
self,
plan: Plan,
runner: WorkflowRunner,
variables: dict | None = None,
) -> str:
"""Execute the plan and return a combined result string.

Args:
plan: The orchestration plan with steps to execute.
runner: Async callable that invokes a named workflow with a message.
variables: Optional extra variables (currently reserved for future use).

Returns:
Newline-separated results from all successful steps. Failed steps are
omitted from the combined output but logged as errors.
"""
outputs: dict[str, str] = {}
ordered: list[tuple[int, str]] = [] # (original_index, result_text)

async def _run(step: PlanStep, idx: int) -> tuple[int, str]:
"""Execute one step, substituting resolved outputs into its message."""
msg = step["message"]
for key, val in outputs.items():
msg = msg.replace(f"{{{{{key}.result}}}}", val[:3000])
try:
result = await runner(step["workflow"], msg)
logger.info(
"Step %s (%s) completed (%d chars)",
step["id"], step["workflow"], len(result),
)
return idx, result
except Exception as exc:
logger.error("Step %s (%s) failed: %s", step["id"], step["workflow"], exc)
return idx, f"[Step {step['id']} failed: {exc}]"

remaining = list(enumerate(plan["steps"]))
completed: set[str] = set()

while remaining:
ready: list[tuple[int, PlanStep]] = []
waiting: list[tuple[int, PlanStep]] = []

for idx, step in remaining:
deps = self._DEP_PATTERN.findall(step["message"])
if all(d in completed for d in deps):
ready.append((idx, step))
else:
waiting.append((idx, step))

if not ready:
# Circular or unresolvable dependencies — run remaining sequentially
logger.warning(
"DAGExecutor: %d step(s) have unresolvable dependencies, "
"falling back to sequential execution",
len(waiting),
)
ready, waiting = waiting, []

# Run all ready steps concurrently
results = await asyncio.gather(*[_run(step, idx) for idx, step in ready])

for (_, step), (idx, result) in zip(ready, results):
outputs[step["output_key"]] = result
completed.add(step["output_key"])
ordered.append((idx, result))

remaining = waiting

# Re-sort by original step order and filter out failed steps
ordered.sort(key=lambda x: x[0])
good = [r for _, r in ordered if not r.startswith("[Step ")]
return "\n\n---\n\n".join(good) if good else "No results produced."
Loading
Loading