Skip to content

janmarkuslanger/stagehand

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

37 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

stagehand

Stagehand orchestrates multi-agent AI workflows in pure Python. Each workflow is a directed acyclic graph (DAG) of tasks. Tasks with no dependencies run in parallel; tasks with dependencies wait until their upstream tasks complete.


Installation

pip install stagehand-ai

Requires Python 3.11+.


Quickstart

import asyncio
from stagehand import WorkflowBuilder
from stagehand.adapters.executor import OllamaExecutor

async def main():
    run_id = await (
        WorkflowBuilder("haiku-pipeline")
        .agent("writer", OllamaExecutor(), model="qwen2.5",
               system_prompt="You are a haiku writer.",
               tools=["write_file"])
        .task("draft",  agent="writer", prompt="Write a haiku about the ocean at dawn.")
        .task("refine", agent="writer",
              prompt="Refine this haiku:\n\n{{ tasks.draft }}",
              after=["draft"])
        .run()
    )
    print(f"Done — run {run_id}")

asyncio.run(main())

Concepts

Builder

WorkflowBuilder is the primary API. It lets you define agents and tasks in code, then run the workflow with a single await.

WorkflowBuilder(name, version="1")
  .agent(agent_id, executor, *, model, system_prompt, role, tools)
  .task(task_id, *, agent, prompt, after, outputs, secrets)
  .state_dir(directory)   # where run state is persisted (default: .stagehand/runs)
  .run(inputs={})         # returns run_id

.build() returns a Workflow object without running it, useful if you want to pass it to a Scheduler directly.


Agents

An agent is a named AI persona with an executor, a model, and a system prompt.

.agent(
    "writer",
    OllamaExecutor(),
    model="qwen2.5",
    system_prompt="You are a concise technical writer.",
    tools=["write_file", "read_file"],
)

Different agents in the same workflow can use different executors:

WorkflowBuilder("pipeline")
.agent("drafter", OllamaExecutor(), model="qwen2.5")
.agent("reviewer", ClaudeExecutor(api_key="..."), model="claude-opus-4-5")

Tasks

A task is a single unit of work in the DAG. There are two kinds:

Agent task — runs a prompt against an AI agent:

.task("draft",  agent="writer", prompt="Write a short intro to Python.")
.task("review", agent="reviewer",
      prompt="Review this draft:\n\n{{ tasks.draft }}",
      after=["draft"])

Deterministic task — runs a plain Python callable (sync or async). No agent or prompt needed:

async def fetch_tickets(ctx):
    return await linear_client.get_tickets()  # returns str or TaskResult

.task("fetch", fn=fetch_tickets)
.task("analyze", agent="analyst",
      prompt="Tickets:\n\n{{ tasks.fetch }}\n\nSummarise.",
      after=["fetch"])

The callable receives a RunContext (access to inputs and previous task results) and must return a str or TaskResult.

Parameter Description
agent ID of the agent that runs this task (required unless fn is set)
prompt Message sent to the agent (supports {{ }} expressions) (required unless fn is set)
fn Python callable to run directly (required unless agent/prompt are set)
after List of task IDs this task waits for
outputs StaticOutputs, DynamicOutputs, or PatternOutputs
secrets List of secret names to resolve at runtime
retry RetryPolicy — how many times to retry on failure

Tasks with no after (or whose dependencies are all complete) start immediately. Multiple ready tasks run in parallel.


Retry

Pass a RetryPolicy to a task to retry it automatically on failure. Downstream tasks are only cancelled once all attempts are exhausted.

from stagehand import RetryPolicy

.task(
    "fetch",
    agent="worker",
    prompt="Fetch the latest report.",
    retry=RetryPolicy(max_attempts=3, delay=2.0),
)
Parameter Default Description
max_attempts 1 Total attempts including the first (1 = no retry)
delay 0.0 Seconds to wait between attempts

Logging

By default the scheduler is silent. Pass a Logger to see workflow and task lifecycle events.

import logging
from stagehand import WorkflowBuilder, StdlibLogger

logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")

run_id = await (
    WorkflowBuilder("my-pipeline")
    .agent("worker", executor, model="claude-opus-4-5")
    .task("analyse", agent="worker", prompt="...")
    .logger(StdlibLogger())   # <-- add this
    .run()
)

StdlibLogger wraps Python's stdlib logging module and, by default, raises httpx and httpcore to WARNING so their per-request INFO lines don't drown out workflow events.

INFO  workflow 'my-pipeline' started [run=sh-20260520-a1b2]
INFO  task 'analyse' starting
INFO  task 'analyse' done
INFO  workflow 'my-pipeline' finished [run=sh-20260520-a1b2]
Parameter Default Description
name "stagehand" Logger name used with logging.getLogger
suppress_http_logs True Raises httpx/httpcore to WARNING to suppress per-request noise

To silence everything, omit .logger() (the default) or pass NullLogger() explicitly.

When using Scheduler directly, pass the logger to its constructor:

from stagehand import Scheduler, StdlibLogger

scheduler = Scheduler(run_state_directory=".stagehand/runs", logger=StdlibLogger())

To write a custom logger — for example to route events to a structured sink — implement the Logger port:

from stagehand import Logger

class MyLogger(Logger):
    def debug(self, message: str) -> None: ...
    def info(self, message: str) -> None: ...
    def warning(self, message: str) -> None: ...
    def error(self, message: str) -> None: ...

Template expressions

Prompts support {{ }} expressions to inject values from previous tasks or runtime inputs.

Expression Resolves to
{{ input.key }} A value passed via inputs={"key": "..."}
{{ tasks.id }} The text output of a completed task
{{ tasks.id.files }} Newline-separated list of file paths produced by a task
{{ tasks.id.filename_md }} Path of a specific file, identified by its slug (filename.mdfilename_md)

Outputs

The outputs parameter of .task() declares what files a task produces.

from stagehand import StaticOutputs, DynamicOutputs, PatternOutputs

# Exact file names known upfront
.task("t1", agent="a", prompt="...", outputs=StaticOutputs(["report.md"]))

# Agent decides at runtime (default)
.task("t2", agent="a", prompt="...", outputs=DynamicOutputs())

# Collect by glob after the task finishes
.task("t3", agent="a", prompt="...", outputs=PatternOutputs(pattern="**/*.md"))

Executors

An executor is the AI backend that drives a task. Stagehand ships two:

OllamaExecutor

Runs models locally via Ollama. No API key required.

from stagehand.adapters.executor import OllamaExecutor
from stagehand.adapters.storage.filesystem import FilesystemStorage

executor = OllamaExecutor(
    host="http://localhost:11434",   # default
    storage=FilesystemStorage("./output"),
)
ollama pull qwen2.5
ollama serve

Models with reliable tool use: qwen2.5, llama3.1, llama3.2, mistral-nemo.

ClaudeExecutor

Uses the Anthropic Messages API.

from stagehand.adapters.executor import ClaudeExecutor
from stagehand.adapters.storage.filesystem import FilesystemStorage

executor = ClaudeExecutor(
    api_key="sk-ant-...",            # or set ANTHROPIC_API_KEY
    storage=FilesystemStorage("./output"),
)

The default model is claude-opus-4-5.

Rate-limit retries

ClaudeExecutor automatically retries messages.create calls that receive a 429 response. Only the individual API call is retried — not the entire agentic loop — so a rate-limit mid-task retries only the current step.

Parameter Default Description
rate_limit_retries 3 Maximum number of attempts per messages.create call (1 = no retries).
rate_limit_delay 60.0 Seconds to wait between attempts.
logger NullLogger() A Logger instance (e.g. StdlibLogger()) to receive retry warnings.
from stagehand.adapters.logger import StdlibLogger

executor = ClaudeExecutor(
    api_key="sk-ant-...",
    rate_limit_retries=5,
    rate_limit_delay=30.0,
    logger=StdlibLogger(),
)

Custom tools

Pass extra tools to ClaudeExecutor via extra_tools:

from stagehand import ToolDefinition
from stagehand.adapters.executor import ClaudeExecutor

my_tool = ToolDefinition(
    name="fetch_ticket",
    description="Fetch a Linear ticket by ID.",
    input_schema={"type": "object", "properties": {"id": {"type": "string"}}, "required": ["id"]},
    handler=lambda args: fetch_from_linear(args["id"]),
)

executor = ClaudeExecutor(api_key="...", extra_tools=[my_tool])

Artifact path validation

FilesystemStorage always rejects paths that contain .. components to prevent path traversal attacks.

Optionally restrict which file extensions are allowed by passing allowed_extensions:

from stagehand import FilesystemStorage

# Only .txt and .md files may be written
storage = FilesystemStorage("./output", allowed_extensions=[".txt", ".md"])

Extension matching is case-insensitive. Any path that violates a constraint raises ValueError before any I/O is performed. Implement validate_path on a custom ArtifactStorage subclass to apply your own rules:

from stagehand import ArtifactStorage

class MyStorage(ArtifactStorage):
    def validate_path(self, path: str) -> None:
        if not path.startswith("safe/"):
            raise ValueError(f"path must be inside safe/: {path!r}")  # raise to block the write
    ...

Custom executor

Implement AgentExecutor to add any backend:

from stagehand.ports.executor import AgentExecutor, ExecutionRequest, ExecutionResult

class MyExecutor(AgentExecutor):
    async def execute(self, request: ExecutionRequest) -> ExecutionResult:
        output = call_my_model(request.prompt, request.system_prompt)
        return ExecutionResult(output=output)

Resume

A failed or interrupted run can be resumed. Completed tasks are skipped.

from stagehand import Scheduler

scheduler = Scheduler(run_state_directory=".stagehand/runs")
workflow = builder.build()

run_id = await scheduler.run(workflow)
# ... if it fails or you want to retry:
await scheduler.resume(run_id, workflow)

Examples

Sequential

run_id = await (
    WorkflowBuilder("sequential")
    .agent("writer", OllamaExecutor(), model="qwen2.5",
           system_prompt="You write haiku.", tools=["write_file"])
    .task("draft",  agent="writer", prompt="Write a haiku about the ocean. Save to draft.md.")
    .task("refine", agent="writer",
          prompt="Refine this:\n\n{{ tasks.draft }}\n\nSave to final.md.",
          after=["draft"])
    .run()
)
draft  →  refine

Parallel

run_id = await (
    WorkflowBuilder("parallel")
    .agent("writer", OllamaExecutor(), model="qwen2.5",
           system_prompt="You write clearly.", tools=["write_file"])
    .task("pros",    agent="writer", prompt="Write pros of remote work. Save to pros.md.")
    .task("cons",    agent="writer", prompt="Write cons of remote work. Save to cons.md.")
    .task("summary", agent="writer",
          prompt="Combine:\n\nPROS:\n{{ tasks.pros }}\n\nCONS:\n{{ tasks.cons }}\n\nSave to summary.md.",
          after=["pros", "cons"])
    .run()
)
pros  ─┐
       ├→  summary
cons  ─┘

Architecture

Stagehand uses a ports-and-adapters (hexagonal) architecture. The dependency rule is strict:

core/     →  nothing external (stdlib only)
ports/    →  nothing (interfaces only)
adapters/ →  ports/ only
builder   →  core/ + ports/
Package Responsibility
stagehand/core/ Domain types, DAG, scheduler, run state, template engine
stagehand/ports/ ABCs: AgentExecutor, ArtifactStorage, SecretProvider
stagehand/adapters/executor/ ClaudeExecutor, OllamaExecutor
stagehand/adapters/storage/ FilesystemStorage
stagehand/adapters/secrets/ EnvSecretProvider
stagehand/builder.py WorkflowBuilder — primary public API

About

A Python library for orchestrating multi-agent AI workflows as a DAG.

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages