Stagehand orchestrates multi-agent AI workflows in pure Python. Each workflow is a directed acyclic graph (DAG) of tasks. Tasks with no dependencies run in parallel; tasks with dependencies wait until their upstream tasks complete.
pip install stagehand-aiRequires Python 3.11+.
import asyncio
from stagehand import WorkflowBuilder
from stagehand.adapters.executor import OllamaExecutor
async def main():
run_id = await (
WorkflowBuilder("haiku-pipeline")
.agent("writer", OllamaExecutor(), model="qwen2.5",
system_prompt="You are a haiku writer.",
tools=["write_file"])
.task("draft", agent="writer", prompt="Write a haiku about the ocean at dawn.")
.task("refine", agent="writer",
prompt="Refine this haiku:\n\n{{ tasks.draft }}",
after=["draft"])
.run()
)
print(f"Done — run {run_id}")
asyncio.run(main())WorkflowBuilder is the primary API. It lets you define agents and tasks in code, then run the workflow with a single await.
WorkflowBuilder(name, version="1")
.agent(agent_id, executor, *, model, system_prompt, role, tools)
.task(task_id, *, agent, prompt, after, outputs, secrets)
.state_dir(directory) # where run state is persisted (default: .stagehand/runs)
.run(inputs={}) # returns run_id.build() returns a Workflow object without running it, useful if you want to pass it to a Scheduler directly.
An agent is a named AI persona with an executor, a model, and a system prompt.
.agent(
"writer",
OllamaExecutor(),
model="qwen2.5",
system_prompt="You are a concise technical writer.",
tools=["write_file", "read_file"],
)Different agents in the same workflow can use different executors:
WorkflowBuilder("pipeline")
.agent("drafter", OllamaExecutor(), model="qwen2.5")
.agent("reviewer", ClaudeExecutor(api_key="..."), model="claude-opus-4-5")A task is a single unit of work in the DAG. There are two kinds:
Agent task — runs a prompt against an AI agent:
.task("draft", agent="writer", prompt="Write a short intro to Python.")
.task("review", agent="reviewer",
prompt="Review this draft:\n\n{{ tasks.draft }}",
after=["draft"])Deterministic task — runs a plain Python callable (sync or async). No agent or prompt needed:
async def fetch_tickets(ctx):
return await linear_client.get_tickets() # returns str or TaskResult
.task("fetch", fn=fetch_tickets)
.task("analyze", agent="analyst",
prompt="Tickets:\n\n{{ tasks.fetch }}\n\nSummarise.",
after=["fetch"])The callable receives a RunContext (access to inputs and previous task results) and must return a str or TaskResult.
| Parameter | Description |
|---|---|
agent |
ID of the agent that runs this task (required unless fn is set) |
prompt |
Message sent to the agent (supports {{ }} expressions) (required unless fn is set) |
fn |
Python callable to run directly (required unless agent/prompt are set) |
after |
List of task IDs this task waits for |
outputs |
StaticOutputs, DynamicOutputs, or PatternOutputs |
secrets |
List of secret names to resolve at runtime |
retry |
RetryPolicy — how many times to retry on failure |
Tasks with no after (or whose dependencies are all complete) start immediately. Multiple ready tasks run in parallel.
Pass a RetryPolicy to a task to retry it automatically on failure. Downstream tasks are only cancelled once all attempts are exhausted.
from stagehand import RetryPolicy
.task(
"fetch",
agent="worker",
prompt="Fetch the latest report.",
retry=RetryPolicy(max_attempts=3, delay=2.0),
)| Parameter | Default | Description |
|---|---|---|
max_attempts |
1 |
Total attempts including the first (1 = no retry) |
delay |
0.0 |
Seconds to wait between attempts |
By default the scheduler is silent. Pass a Logger to see workflow and task lifecycle events.
import logging
from stagehand import WorkflowBuilder, StdlibLogger
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
run_id = await (
WorkflowBuilder("my-pipeline")
.agent("worker", executor, model="claude-opus-4-5")
.task("analyse", agent="worker", prompt="...")
.logger(StdlibLogger()) # <-- add this
.run()
)StdlibLogger wraps Python's stdlib logging module and, by default, raises httpx and httpcore to WARNING so their per-request INFO lines don't drown out workflow events.
INFO workflow 'my-pipeline' started [run=sh-20260520-a1b2]
INFO task 'analyse' starting
INFO task 'analyse' done
INFO workflow 'my-pipeline' finished [run=sh-20260520-a1b2]
| Parameter | Default | Description |
|---|---|---|
name |
"stagehand" |
Logger name used with logging.getLogger |
suppress_http_logs |
True |
Raises httpx/httpcore to WARNING to suppress per-request noise |
To silence everything, omit .logger() (the default) or pass NullLogger() explicitly.
When using Scheduler directly, pass the logger to its constructor:
from stagehand import Scheduler, StdlibLogger
scheduler = Scheduler(run_state_directory=".stagehand/runs", logger=StdlibLogger())To write a custom logger — for example to route events to a structured sink — implement the Logger port:
from stagehand import Logger
class MyLogger(Logger):
def debug(self, message: str) -> None: ...
def info(self, message: str) -> None: ...
def warning(self, message: str) -> None: ...
def error(self, message: str) -> None: ...Prompts support {{ }} expressions to inject values from previous tasks or runtime inputs.
| Expression | Resolves to |
|---|---|
{{ input.key }} |
A value passed via inputs={"key": "..."} |
{{ tasks.id }} |
The text output of a completed task |
{{ tasks.id.files }} |
Newline-separated list of file paths produced by a task |
{{ tasks.id.filename_md }} |
Path of a specific file, identified by its slug (filename.md → filename_md) |
The outputs parameter of .task() declares what files a task produces.
from stagehand import StaticOutputs, DynamicOutputs, PatternOutputs
# Exact file names known upfront
.task("t1", agent="a", prompt="...", outputs=StaticOutputs(["report.md"]))
# Agent decides at runtime (default)
.task("t2", agent="a", prompt="...", outputs=DynamicOutputs())
# Collect by glob after the task finishes
.task("t3", agent="a", prompt="...", outputs=PatternOutputs(pattern="**/*.md"))An executor is the AI backend that drives a task. Stagehand ships two:
Runs models locally via Ollama. No API key required.
from stagehand.adapters.executor import OllamaExecutor
from stagehand.adapters.storage.filesystem import FilesystemStorage
executor = OllamaExecutor(
host="http://localhost:11434", # default
storage=FilesystemStorage("./output"),
)ollama pull qwen2.5
ollama serveModels with reliable tool use: qwen2.5, llama3.1, llama3.2, mistral-nemo.
Uses the Anthropic Messages API.
from stagehand.adapters.executor import ClaudeExecutor
from stagehand.adapters.storage.filesystem import FilesystemStorage
executor = ClaudeExecutor(
api_key="sk-ant-...", # or set ANTHROPIC_API_KEY
storage=FilesystemStorage("./output"),
)The default model is claude-opus-4-5.
ClaudeExecutor automatically retries messages.create calls that receive a 429 response. Only the individual API call is retried — not the entire agentic loop — so a rate-limit mid-task retries only the current step.
| Parameter | Default | Description |
|---|---|---|
rate_limit_retries |
3 |
Maximum number of attempts per messages.create call (1 = no retries). |
rate_limit_delay |
60.0 |
Seconds to wait between attempts. |
logger |
NullLogger() |
A Logger instance (e.g. StdlibLogger()) to receive retry warnings. |
from stagehand.adapters.logger import StdlibLogger
executor = ClaudeExecutor(
api_key="sk-ant-...",
rate_limit_retries=5,
rate_limit_delay=30.0,
logger=StdlibLogger(),
)Pass extra tools to ClaudeExecutor via extra_tools:
from stagehand import ToolDefinition
from stagehand.adapters.executor import ClaudeExecutor
my_tool = ToolDefinition(
name="fetch_ticket",
description="Fetch a Linear ticket by ID.",
input_schema={"type": "object", "properties": {"id": {"type": "string"}}, "required": ["id"]},
handler=lambda args: fetch_from_linear(args["id"]),
)
executor = ClaudeExecutor(api_key="...", extra_tools=[my_tool])FilesystemStorage always rejects paths that contain .. components to prevent path traversal attacks.
Optionally restrict which file extensions are allowed by passing allowed_extensions:
from stagehand import FilesystemStorage
# Only .txt and .md files may be written
storage = FilesystemStorage("./output", allowed_extensions=[".txt", ".md"])Extension matching is case-insensitive. Any path that violates a constraint raises ValueError
before any I/O is performed. Implement validate_path on a custom ArtifactStorage subclass
to apply your own rules:
from stagehand import ArtifactStorage
class MyStorage(ArtifactStorage):
def validate_path(self, path: str) -> None:
if not path.startswith("safe/"):
raise ValueError(f"path must be inside safe/: {path!r}") # raise to block the write
...Implement AgentExecutor to add any backend:
from stagehand.ports.executor import AgentExecutor, ExecutionRequest, ExecutionResult
class MyExecutor(AgentExecutor):
async def execute(self, request: ExecutionRequest) -> ExecutionResult:
output = call_my_model(request.prompt, request.system_prompt)
return ExecutionResult(output=output)A failed or interrupted run can be resumed. Completed tasks are skipped.
from stagehand import Scheduler
scheduler = Scheduler(run_state_directory=".stagehand/runs")
workflow = builder.build()
run_id = await scheduler.run(workflow)
# ... if it fails or you want to retry:
await scheduler.resume(run_id, workflow)run_id = await (
WorkflowBuilder("sequential")
.agent("writer", OllamaExecutor(), model="qwen2.5",
system_prompt="You write haiku.", tools=["write_file"])
.task("draft", agent="writer", prompt="Write a haiku about the ocean. Save to draft.md.")
.task("refine", agent="writer",
prompt="Refine this:\n\n{{ tasks.draft }}\n\nSave to final.md.",
after=["draft"])
.run()
)draft → refine
run_id = await (
WorkflowBuilder("parallel")
.agent("writer", OllamaExecutor(), model="qwen2.5",
system_prompt="You write clearly.", tools=["write_file"])
.task("pros", agent="writer", prompt="Write pros of remote work. Save to pros.md.")
.task("cons", agent="writer", prompt="Write cons of remote work. Save to cons.md.")
.task("summary", agent="writer",
prompt="Combine:\n\nPROS:\n{{ tasks.pros }}\n\nCONS:\n{{ tasks.cons }}\n\nSave to summary.md.",
after=["pros", "cons"])
.run()
)pros ─┐
├→ summary
cons ─┘
Stagehand uses a ports-and-adapters (hexagonal) architecture. The dependency rule is strict:
core/ → nothing external (stdlib only)
ports/ → nothing (interfaces only)
adapters/ → ports/ only
builder → core/ + ports/
| Package | Responsibility |
|---|---|
stagehand/core/ |
Domain types, DAG, scheduler, run state, template engine |
stagehand/ports/ |
ABCs: AgentExecutor, ArtifactStorage, SecretProvider |
stagehand/adapters/executor/ |
ClaudeExecutor, OllamaExecutor |
stagehand/adapters/storage/ |
FilesystemStorage |
stagehand/adapters/secrets/ |
EnvSecretProvider |
stagehand/builder.py |
WorkflowBuilder — primary public API |