diff --git a/CLAUDE.md b/CLAUDE.md index f5d3cd7..91091c3 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -222,3 +222,4 @@ Production deployments are managed internally. Do NOT modify production deployme - [Self-Evolution](docs/self-evolution.md) - The 6-step reflection pipeline - [Security](docs/security.md) - Auth, secrets, permissions, and hardening - [Roles](docs/roles.md) - Customizing the agent's specialization +- [Loop](docs/loop.md) - Autonomous iteration primitive (phantom_loop) diff --git a/docs/loop.md b/docs/loop.md new file mode 100644 index 0000000..37cee17 --- /dev/null +++ b/docs/loop.md @@ -0,0 +1,173 @@ +# Loop + +Phantom loop is an autonomous iteration primitive. The agent runs repeatedly against a goal, each tick in a fresh SDK session, with a markdown state file as the only contract between ticks. Budgets, mid-loop critique, Slack feedback, and post-loop learning are all built in. + +## Overview + +Regular sessions are conversational: the operator sends a message, the agent responds, back and forth. Loops are different. The operator defines a goal and a budget, then walks away. The runner drives ticks automatically until the goal is met or a budget is hit. + +Use loops for long-horizon tasks where the agent should grind autonomously: +- "Keep refactoring until tests pass" +- "Iterate on this design doc until the reviewer approves" +- "Bisect this regression across the last 50 commits" + +## MCP Tool + +The `phantom_loop` tool exposes four actions: `start`, `status`, `stop`, `list`. + +### Start Parameters + +| Parameter | Default | Ceiling | Description | +|-----------|---------|---------|-------------| +| `goal` (required) | - | 10,000 chars | What the loop should achieve | +| `workspace` | `data/loops//` | - | Working directory for the agent | +| `max_iterations` | 20 | 200 | Maximum ticks before budget termination | +| `max_cost_usd` | 5 | 50 | Maximum total cost before budget termination | +| `checkpoint_interval` | off | 200 | Run a Sonnet critique every N ticks (0 = disabled) | +| `success_command` | off | - | Shell command run after each tick; exit 0 = done | +| `channel_id` | auto | - | Slack channel for status updates | +| `conversation_id` | auto | - | Slack thread for threading updates | +| `trigger_message_ts` | auto | - | Slack message timestamp for reaction ladder | + +When started from Slack, `channel_id`, `conversation_id`, and `trigger_message_ts` are auto-filled from the originating message context. Explicit tool arguments always take precedence. + +### Other Actions + +- **status**: Returns the loop row, parsed state file frontmatter, and the first 40 lines of the state file. +- **stop**: Sets an interrupt flag. The loop stops gracefully before the next tick. +- **list**: Returns active loops. Pass `include_finished: true` for recent history. + +## State File + +The state file (`state.md` in the workspace) is the loop's memory across ticks. It has YAML frontmatter that the runner inspects for control flow, and a markdown body that belongs entirely to the agent. + +### Frontmatter + +```yaml +--- +loop_id: +status: in-progress # in-progress | done | blocked +iteration: 3 +--- +``` + +The runner acts on `done` (finalize immediately) and `blocked` (continue, but the agent should explain in Notes). Everything else is treated as `in-progress`. + +### Body Sections + +```markdown +# Goal +Keep refactoring src/auth until all 47 tests pass. + +# Progress +- Tick 1: Fixed the missing import in auth/middleware.ts +- Tick 2: Updated the session type to include refreshToken +- Tick 3: Fixed the mock in auth.test.ts, 44/47 tests passing + +# Next Action +The remaining 3 failures are all in auth/oauth.test.ts. Read the test file, +identify the common cause, and fix it. + +# Notes +(empty) +``` + +The agent reads Progress and Next Action at the start of each tick to understand what happened before and what to do now. The runner does not parse the body, only the frontmatter. + +## Tick Lifecycle + +Each tick follows a fixed sequence: + +1. **Lock** - acquire in-flight guard (prevents concurrent ticks on the same loop) +2. **Pre-checks** - verify loop is still "running"; check interrupt flag; enforce budget limits +3. **Read state** - load the current state file from disk +4. **Build prompt** - assemble the tick prompt with: goal, state file contents, budget info, optional memory context, optional critique feedback +5. **Fresh session** - call `runtime.handleMessage()` with a rotating conversation ID (`{loopId}:{iteration}`) +6. **Agent works** - executes tools, makes progress, writes updated state file +7. **Record cost** - increment iteration count and accumulate cost from the SDK response +8. **Parse frontmatter** - re-read the state file; if the agent declared `done`, finalize immediately (steps 9-11 are skipped) +9. **Success command** - if configured, run the shell command (5-minute timeout, sanitized env with only PATH, HOME, LANG, TERM, TOOL_INPUT where TOOL_INPUT is a JSON string containing loop_id and workspace) +10. **Critique checkpoint** - if `checkpoint_interval` is set and the current tick is a multiple, run a Sonnet critique (see below) +11. **Slack update** - post tick progress to the status message +12. **Schedule next** - queue the next tick via `setImmediate` + +## Slack Integration + +When a loop is started from Slack (or with explicit `channel_id`), the `LoopNotifier` provides real-time feedback: + +**Start notice** - posted to the channel/thread with the goal excerpt and budget: +``` +:repeat: Starting loop `abcdef01` (max 20 iter, $5.00 budget) +> Keep refactoring src/auth until all 47 tests pass +``` +Includes a Stop button routed through Slack interactive actions. + +**Tick updates** - the same message is edited on each tick with a progress bar: +``` +:repeat: Loop `abcdef01` · [████░░░░░░] 4/10 · $1.20/$5.00 · in-progress +``` +The Stop button survives across edits (blocks are re-sent on every `chat.update`). + +**Reaction ladder** on the operator's original message: +- Start: hourglass +- First tick: swap to cycling arrows +- Terminal: checkmark (done), stop sign (stopped), warning (budget exceeded), X (failed) + +**Final notice** - progress bar with terminal emoji, and the state file body posted as a threaded code block so the operator can see the full progress log. + +## Mid-Loop Critique + +When `checkpoint_interval` is set, Sonnet 4.6 reviews the loop's progress every N ticks. This catches drift, stuck patterns, and wasted budget before the loop exhausts its resources. + +The critique runs after terminal checks (so the final tick is never wasted on a critique call) and is guarded by judge availability and cost cap. + +The reviewer sees: +- The original goal +- Rolling tick summaries (up to 10) +- The current state file (truncated to 3,000 chars) +- The agent's last response (truncated to 1,000 chars) + +The assessment is injected into the next tick's prompt as a "REVIEWER FEEDBACK" section. + +## Post-Loop Pipeline + +After a loop finalizes, a fire-and-forget pipeline runs evolution and memory consolidation. Neither can affect the loop's final status, and errors are logged but never propagated. + +**Evolution**: A bounded transcript (rolling summaries, first/last prompt-response pairs) is synthesized into a `SessionData` object and fed to the evolution engine's `afterSession()` pipeline. If the engine applies changes, the runtime's evolved config is updated. + +**Memory consolidation**: If vector memory is ready, the session data is consolidated into episodic memory. When LLM judges are available and within cost cap, Sonnet extracts facts while checking for contradictions with existing knowledge. Otherwise, a heuristic fallback runs. + +Loop status maps to evolution outcome: `done` becomes success, `stopped` becomes abandoned, everything else becomes failure. + +## Memory Context + +Memory context is cached once at loop start and injected into every tick prompt as a "RECALLED MEMORIES" section. Caching avoids re-querying the vector database on every tick (the goal is constant, so recall results don't change). The cache is cleared on finalize and rebuilt on resume. + +## Writing Effective Goals + +**Be specific and incremental:** +- Good: "Refactor src/auth/ to use the new session types from types.ts. Run `bun test src/auth` after each change. Stop when all tests pass." +- Bad: "Fix the auth system." + +**One concrete action per tick:** +- The agent works best when Next Action describes a single, verifiable step +- Goals that encourage small steps ("fix one test at a time") produce more reliable loops than goals that demand large leaps + +**Use success_command for objective verification:** +- `bun test src/auth` - loop runs until all auth tests pass +- `curl -sf http://localhost:3000/health` - loop runs until the service is healthy +- `grep -q 'TODO' src/module.ts && exit 1 || exit 0` - loop runs until no TODOs remain + +## Key Files + +| File | Purpose | +|------|---------| +| `src/loop/runner.ts` | LoopRunner: tick lifecycle, memory caching, critique scheduling, finalization | +| `src/loop/prompt.ts` | Per-tick prompt builder with memory and critique injection | +| `src/loop/types.ts` | Types, Zod schemas, constants, ceilings | +| `src/loop/store.ts` | SQLite persistence layer | +| `src/loop/state-file.ts` | State file init, read, YAML frontmatter parsing | +| `src/loop/tool.ts` | `phantom_loop` MCP tool (start/status/stop/list) | +| `src/loop/critique.ts` | Mid-loop Sonnet 4.6 critique judge | +| `src/loop/post-loop.ts` | Post-loop evolution and memory consolidation pipeline | +| `src/loop/notifications.ts` | Slack progress bar, reaction ladder, stop button | diff --git a/src/agent/__tests__/security-wrapping.test.ts b/src/agent/__tests__/security-wrapping.test.ts index b355785..a4b5ae8 100644 --- a/src/agent/__tests__/security-wrapping.test.ts +++ b/src/agent/__tests__/security-wrapping.test.ts @@ -29,6 +29,7 @@ describe("security message wrapping", () => { test("internal channels are detected correctly", () => { expect(proto.isExternalChannel("scheduler")).toBe(false); expect(proto.isExternalChannel("trigger")).toBe(false); + expect(proto.isExternalChannel("loop")).toBe(false); }); test("wrapper prepends security context", () => { diff --git a/src/agent/__tests__/slack-context.test.ts b/src/agent/__tests__/slack-context.test.ts new file mode 100644 index 0000000..550223a --- /dev/null +++ b/src/agent/__tests__/slack-context.test.ts @@ -0,0 +1,78 @@ +import { describe, expect, test } from "bun:test"; +import { type SlackContext, slackContextStore } from "../slack-context.ts"; + +const SAMPLE: SlackContext = { + slackChannelId: "C123", + slackThreadTs: "1700000000.000100", + slackMessageTs: "1700000000.000200", +}; + +describe("slackContextStore", () => { + test("getStore() is undefined outside a run()", () => { + expect(slackContextStore.getStore()).toBeUndefined(); + }); + + test("synchronous read inside run() sees the context", () => { + const seen = slackContextStore.run(SAMPLE, () => slackContextStore.getStore()); + expect(seen).toEqual(SAMPLE); + }); + + test("context propagates across a plain await boundary", async () => { + const seen = await slackContextStore.run(SAMPLE, async () => { + await Promise.resolve(); + return slackContextStore.getStore(); + }); + expect(seen).toEqual(SAMPLE); + }); + + test("context propagates across a setImmediate hop", async () => { + const seen = await slackContextStore.run(SAMPLE, async () => { + await new Promise((resolve) => setImmediate(resolve)); + return slackContextStore.getStore(); + }); + expect(seen).toEqual(SAMPLE); + }); + + test("context propagates through an async generator for-await loop", async () => { + async function* producer(): AsyncGenerator { + for (let i = 0; i < 3; i++) { + await Promise.resolve(); + yield i; + } + } + + const observations: (SlackContext | undefined)[] = []; + await slackContextStore.run(SAMPLE, async () => { + for await (const _ of producer()) { + observations.push(slackContextStore.getStore()); + } + }); + + expect(observations.length).toBe(3); + for (const seen of observations) { + expect(seen).toEqual(SAMPLE); + } + }); + + test("concurrent run() calls keep contexts isolated", async () => { + const other: SlackContext = { + slackChannelId: "C999", + slackThreadTs: "2700000000.000100", + slackMessageTs: "2700000000.000200", + }; + + const [a, b] = await Promise.all([ + slackContextStore.run(SAMPLE, async () => { + await Promise.resolve(); + return slackContextStore.getStore(); + }), + slackContextStore.run(other, async () => { + await Promise.resolve(); + return slackContextStore.getStore(); + }), + ]); + + expect(a).toEqual(SAMPLE); + expect(b).toEqual(other); + }); +}); diff --git a/src/agent/runtime.ts b/src/agent/runtime.ts index 4ed3fc8..a90fae9 100644 --- a/src/agent/runtime.ts +++ b/src/agent/runtime.ts @@ -89,9 +89,9 @@ export class AgentRuntime { } } - // Scheduler and trigger are internal sources; all other channels are external user input + // Scheduler, trigger, and loop are internal sources; all other channels are external user input private isExternalChannel(channelId: string): boolean { - return channelId !== "scheduler" && channelId !== "trigger"; + return channelId !== "scheduler" && channelId !== "trigger" && channelId !== "loop"; } // Per-message security context so the LLM has safety guidance adjacent to user input diff --git a/src/agent/slack-context.ts b/src/agent/slack-context.ts new file mode 100644 index 0000000..a967315 --- /dev/null +++ b/src/agent/slack-context.ts @@ -0,0 +1,22 @@ +import { AsyncLocalStorage } from "node:async_hooks"; + +/** + * Request-scoped Slack context for the current agent turn. + * + * Populated by the channel router when a Slack-origin message enters the + * runtime, and read by in-process MCP tool handlers that need to target the + * operator's originating message/thread without relying on the agent to + * forward the IDs through tool arguments. This is the minimum-surface + * plumbing that lets tools (e.g. phantom_loop) auto-fill channel/thread when + * the agent omits them. + * + * Non-Slack turns (telegram, email, webhook, cli, scheduled triggers) leave + * the store unset; consumers must treat `getStore()` as possibly undefined. + */ +export type SlackContext = { + slackChannelId: string; + slackThreadTs: string; + slackMessageTs: string; +}; + +export const slackContextStore = new AsyncLocalStorage(); diff --git a/src/channels/slack-actions.ts b/src/channels/slack-actions.ts index 709d4f5..7cbcec7 100644 --- a/src/channels/slack-actions.ts +++ b/src/channels/slack-actions.ts @@ -22,6 +22,19 @@ export function setActionFollowUpHandler(handler: ActionFollowUpHandler): void { actionFollowUpHandler = handler; } +// Loop stop button handler. The `phantom:loop_stop:` action_id is attached +// to the initial loop status message, allowing an operator to interrupt a +// running loop from Slack without invoking the MCP tool. +type LoopStopHandler = (loopId: string) => boolean; + +let loopStopHandler: LoopStopHandler | null = null; + +export function setLoopStopHandler(handler: LoopStopHandler): void { + loopStopHandler = handler; +} + +export const LOOP_STOP_ACTION_PREFIX = "phantom:loop_stop:"; + /** Extract a typed value from the Bolt body object */ function bodyField(body: unknown, ...keys: string[]): T | undefined { let obj = body as Record | undefined; @@ -89,6 +102,43 @@ export function registerSlackActions(app: App): void { }); } + // Register loop stop button handler + app.action(/^phantom:loop_stop:.+$/, async ({ ack, body, client }) => { + await ack(); + + const b = body as unknown as Record; + const actions = b.actions as Array<{ action_id: string }> | undefined; + const actionId = actions?.[0]?.action_id; + if (!actionId?.startsWith(LOOP_STOP_ACTION_PREFIX)) return; + + const loopId = actionId.slice(LOOP_STOP_ACTION_PREFIX.length); + const channelId = bodyField(b, "channel", "id"); + const messageTs = bodyField(b, "message", "ts"); + const userId = bodyField(b, "user", "id"); + const messageText = bodyField(b, "message", "text") ?? ""; + const existingBlocks = bodyField>(b, "message", "blocks") ?? []; + + const stopped = loopStopHandler?.(loopId) ?? false; + + if (channelId && messageTs) { + const nonActionBlocks = existingBlocks.filter((block) => block.block_id !== `phantom_loop_actions_${loopId}`); + const noteText = stopped + ? `_<@${userId}> requested stop. The loop will halt before the next iteration._` + : `_<@${userId}> clicked stop, but the loop is already finished._`; + try { + await client.chat.update({ + channel: channelId, + ts: messageTs, + text: messageText, + blocks: [...nonActionBlocks, { type: "context", elements: [{ type: "mrkdwn", text: noteText }] }], + } as unknown as Parameters[0]); + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + console.warn(`[slack] Failed to update loop stop message: ${msg}`); + } + } + }); + // Register agent action button handler app.action(/^phantom:action:\d+$/, async ({ ack, body, client }) => { await ack(); diff --git a/src/channels/slack.ts b/src/channels/slack.ts index 587a426..56ab1cb 100644 --- a/src/channels/slack.ts +++ b/src/channels/slack.ts @@ -174,7 +174,7 @@ export class SlackChannel implements Channel { return this.connectionState; } - async postToChannel(channelId: string, text: string): Promise { + async postToChannel(channelId: string, text: string, threadTs?: string): Promise { const formattedText = toSlackMarkdown(text); const chunks = splitMessage(formattedText); let lastTs: string | null = null; @@ -184,6 +184,7 @@ export class SlackChannel implements Channel { const result = await this.app.client.chat.postMessage({ channel: channelId, text: chunk, + ...(threadTs ? { thread_ts: threadTs } : {}), }); lastTs = result.ts ?? null; } catch (err: unknown) { diff --git a/src/core/__tests__/trigger-auth.test.ts b/src/core/__tests__/trigger-auth.test.ts index 39f89ff..b29f3d8 100644 --- a/src/core/__tests__/trigger-auth.test.ts +++ b/src/core/__tests__/trigger-auth.test.ts @@ -1,129 +1,67 @@ -import { afterAll, beforeAll, describe, expect, test } from "bun:test"; -import { existsSync, mkdirSync, readFileSync, writeFileSync } from "node:fs"; -import YAML from "yaml"; +import { describe, expect, test } from "bun:test"; +import { AuthMiddleware } from "../../mcp/auth.ts"; import { hashTokenSync } from "../../mcp/config.ts"; import type { McpConfig } from "../../mcp/types.ts"; -import { setTriggerDeps, startServer } from "../server.ts"; /** - * Tests that the /trigger endpoint requires bearer token auth - * with operator scope. Closes ghostwright/phantom#9. + * Tests that /trigger auth logic requires bearer token with operator scope. + * Closes ghostwright/phantom#9. + * + * Tests the AuthMiddleware directly with constructed Request objects + * to avoid Bun.serve + fetch issues in GitHub Actions CI. */ describe("/trigger endpoint auth", () => { const adminToken = "test-trigger-admin-token"; const readToken = "test-trigger-read-token"; const operatorToken = "test-trigger-operator-token"; - const mcpConfigPath = "config/mcp.yaml"; - let originalMcpYaml: string | null = null; - let server: ReturnType; - let baseUrl: string; + const mcpConfig: McpConfig = { + tokens: [ + { name: "admin", hash: hashTokenSync(adminToken), scopes: ["read", "operator", "admin"] }, + { name: "reader", hash: hashTokenSync(readToken), scopes: ["read"] }, + { name: "operator", hash: hashTokenSync(operatorToken), scopes: ["read", "operator"] }, + ], + rate_limit: { requests_per_minute: 60, burst: 10 }, + }; - beforeAll(() => { - // Back up the existing mcp.yaml so we can restore it after tests - if (existsSync(mcpConfigPath)) { - originalMcpYaml = readFileSync(mcpConfigPath, "utf-8"); - } + const auth = new AuthMiddleware(mcpConfig); - // Write test tokens to mcp.yaml so loadMcpConfig picks them up - const mcpConfig: McpConfig = { - tokens: [ - { name: "admin", hash: hashTokenSync(adminToken), scopes: ["read", "operator", "admin"] }, - { name: "reader", hash: hashTokenSync(readToken), scopes: ["read"] }, - { name: "operator", hash: hashTokenSync(operatorToken), scopes: ["read", "operator"] }, - ], - rate_limit: { requests_per_minute: 60, burst: 10 }, - }; - - mkdirSync("config", { recursive: true }); - writeFileSync(mcpConfigPath, YAML.stringify(mcpConfig), "utf-8"); - - // Start server with a random port - server = startServer({ name: "test", port: 0, role: "base" } as never, Date.now()); - baseUrl = `http://localhost:${server.port}`; - - // Wire trigger deps with a mock runtime - setTriggerDeps({ - runtime: { - handleMessage: async () => ({ - text: "ok", - cost: { totalUsd: 0 }, - durationMs: 0, - }), - } as never, + function makeRequest(headers: Record = {}): Request { + return new Request("http://localhost/trigger", { + method: "POST", + headers: { "Content-Type": "application/json", ...headers }, + body: JSON.stringify({ task: "hello" }), }); - }); - - afterAll(() => { - server?.stop(true); - // Restore the original mcp.yaml - if (originalMcpYaml !== null) { - writeFileSync(mcpConfigPath, originalMcpYaml, "utf-8"); - } - }); - - const triggerBody = JSON.stringify({ task: "hello" }); + } test("rejects request with no Authorization header", async () => { - const res = await fetch(`${baseUrl}/trigger`, { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: triggerBody, - }); - expect(res.status).toBe(401); - const json = (await res.json()) as { status: string; message: string }; - expect(json.message).toContain("Missing"); + const result = await auth.authenticate(makeRequest()); + expect(result.authenticated).toBe(false); + if (!result.authenticated) expect(result.error).toContain("Missing"); }); test("rejects request with invalid token", async () => { - const res = await fetch(`${baseUrl}/trigger`, { - method: "POST", - headers: { - "Content-Type": "application/json", - Authorization: "Bearer wrong-token", - }, - body: triggerBody, - }); - expect(res.status).toBe(401); + const result = await auth.authenticate(makeRequest({ Authorization: "Bearer wrong-token" })); + expect(result.authenticated).toBe(false); + if (!result.authenticated) expect(result.error).toContain("Invalid"); }); test("rejects read-only token (insufficient scope)", async () => { - const res = await fetch(`${baseUrl}/trigger`, { - method: "POST", - headers: { - "Content-Type": "application/json", - Authorization: `Bearer ${readToken}`, - }, - body: triggerBody, - }); - expect(res.status).toBe(403); - const json = (await res.json()) as { status: string; message: string }; - expect(json.message).toContain("operator"); + const result = await auth.authenticate(makeRequest({ Authorization: `Bearer ${readToken}` })); + expect(result.authenticated).toBe(true); + expect(auth.hasScope(result, "operator")).toBe(false); }); test("accepts operator token", async () => { - const res = await fetch(`${baseUrl}/trigger`, { - method: "POST", - headers: { - "Content-Type": "application/json", - Authorization: `Bearer ${operatorToken}`, - }, - body: triggerBody, - }); - expect(res.status).toBe(200); - const json = (await res.json()) as { status: string }; - expect(json.status).toBe("ok"); + const result = await auth.authenticate(makeRequest({ Authorization: `Bearer ${operatorToken}` })); + expect(result.authenticated).toBe(true); + expect(auth.hasScope(result, "operator")).toBe(true); }); test("accepts admin token", async () => { - const res = await fetch(`${baseUrl}/trigger`, { - method: "POST", - headers: { - "Content-Type": "application/json", - Authorization: `Bearer ${adminToken}`, - }, - body: triggerBody, - }); - expect(res.status).toBe(200); + const result = await auth.authenticate(makeRequest({ Authorization: `Bearer ${adminToken}` })); + expect(result.authenticated).toBe(true); + expect(auth.hasScope(result, "operator")).toBe(true); + expect(auth.hasScope(result, "admin")).toBe(true); }); }); diff --git a/src/db/__tests__/migrate.test.ts b/src/db/__tests__/migrate.test.ts index 3a00b15..4662419 100644 --- a/src/db/__tests__/migrate.test.ts +++ b/src/db/__tests__/migrate.test.ts @@ -26,6 +26,7 @@ describe("runMigrations", () => { expect(tables).toContain("scheduled_jobs"); expect(tables).toContain("secrets"); expect(tables).toContain("secret_requests"); + expect(tables).toContain("loops"); expect(tables).toContain("_migrations"); }); @@ -35,7 +36,7 @@ describe("runMigrations", () => { runMigrations(db); const migrationCount = db.query("SELECT COUNT(*) as count FROM _migrations").get() as { count: number }; - expect(migrationCount.count).toBe(9); + expect(migrationCount.count).toBe(13); }); test("tracks applied migration indices", () => { @@ -47,6 +48,6 @@ describe("runMigrations", () => { .all() .map((r) => (r as { index_num: number }).index_num); - expect(indices).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8]); + expect(indices).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]); }); }); diff --git a/src/db/schema.ts b/src/db/schema.ts index 39d0589..060f5c6 100644 --- a/src/db/schema.ts +++ b/src/db/schema.ts @@ -97,4 +97,35 @@ export const MIGRATIONS: string[] = [ expires_at TEXT NOT NULL, completed_at TEXT )`, + + `CREATE TABLE IF NOT EXISTS loops ( + id TEXT PRIMARY KEY, + goal TEXT NOT NULL, + workspace_dir TEXT NOT NULL, + state_file TEXT NOT NULL, + success_command TEXT, + max_iterations INTEGER NOT NULL, + max_cost_usd REAL NOT NULL, + status TEXT NOT NULL, + iteration_count INTEGER NOT NULL DEFAULT 0, + total_cost_usd REAL NOT NULL DEFAULT 0, + channel_id TEXT, + conversation_id TEXT, + status_message_ts TEXT, + interrupt_requested INTEGER NOT NULL DEFAULT 0, + last_error TEXT, + started_at TEXT NOT NULL DEFAULT (datetime('now')), + last_tick_at TEXT, + finished_at TEXT + )`, + + "CREATE INDEX IF NOT EXISTS idx_loops_status ON loops(status)", + + // Track the operator's originating Slack message so the loop runner can + // drive a reaction ladder on it (hourglass → cycle → terminal emoji). + // Appended, never inserted mid-array: existing deployments have already + // applied migrations 0–10, so the new column must land at index 11. + "ALTER TABLE loops ADD COLUMN trigger_message_ts TEXT", + + "ALTER TABLE loops ADD COLUMN checkpoint_interval INTEGER", ]; diff --git a/src/evolution/__tests__/judge-activation.test.ts b/src/evolution/__tests__/judge-activation.test.ts index 59935ce..8a8cc73 100644 --- a/src/evolution/__tests__/judge-activation.test.ts +++ b/src/evolution/__tests__/judge-activation.test.ts @@ -79,10 +79,17 @@ function setupWithJudgeMode(enabled: "auto" | "always" | "never"): void { } let savedApiKey: string | undefined; +let savedAuthToken: string | undefined; +let savedOauthToken: string | undefined; describe("Judge Activation", () => { beforeEach(() => { savedApiKey = process.env.ANTHROPIC_API_KEY; + savedAuthToken = process.env.ANTHROPIC_AUTH_TOKEN; + savedOauthToken = process.env.CLAUDE_CODE_OAUTH_TOKEN; + // Clear all auth env vars so tests control them explicitly + process.env.ANTHROPIC_AUTH_TOKEN = undefined; + process.env.CLAUDE_CODE_OAUTH_TOKEN = undefined; }); afterEach(() => { @@ -91,6 +98,16 @@ describe("Judge Activation", () => { } else { process.env.ANTHROPIC_API_KEY = undefined; } + if (savedAuthToken !== undefined) { + process.env.ANTHROPIC_AUTH_TOKEN = savedAuthToken; + } else { + process.env.ANTHROPIC_AUTH_TOKEN = undefined; + } + if (savedOauthToken !== undefined) { + process.env.CLAUDE_CODE_OAUTH_TOKEN = savedOauthToken; + } else { + process.env.CLAUDE_CODE_OAUTH_TOKEN = undefined; + } rmSync(TEST_DIR, { recursive: true, force: true }); }); @@ -108,6 +125,22 @@ describe("Judge Activation", () => { expect(engine.usesLLMJudges()).toBe(false); }); + test("auto mode enables judges with ANTHROPIC_AUTH_TOKEN alone", () => { + process.env.ANTHROPIC_API_KEY = undefined; + process.env.ANTHROPIC_AUTH_TOKEN = "auth-token-test"; + setupWithJudgeMode("auto"); + const engine = new EvolutionEngine(CONFIG_PATH); + expect(engine.usesLLMJudges()).toBe(true); + }); + + test("auto mode enables judges with CLAUDE_CODE_OAUTH_TOKEN alone", () => { + process.env.ANTHROPIC_API_KEY = undefined; + process.env.CLAUDE_CODE_OAUTH_TOKEN = "oauth-token-test"; + setupWithJudgeMode("auto"); + const engine = new EvolutionEngine(CONFIG_PATH); + expect(engine.usesLLMJudges()).toBe(true); + }); + test("never mode disables judges even when API key is set", () => { process.env.ANTHROPIC_API_KEY = "sk-test-key"; setupWithJudgeMode("never"); diff --git a/src/evolution/engine.ts b/src/evolution/engine.ts index c32b4df..9f35822 100644 --- a/src/evolution/engine.ts +++ b/src/evolution/engine.ts @@ -48,7 +48,7 @@ export class EvolutionEngine { const setting = this.config.judges?.enabled ?? "auto"; if (setting === "never") return false; if (setting === "always") return true; - return !!process.env.ANTHROPIC_API_KEY; + return !!(process.env.ANTHROPIC_API_KEY || process.env.ANTHROPIC_AUTH_TOKEN || process.env.CLAUDE_CODE_OAUTH_TOKEN); } usesLLMJudges(): boolean { diff --git a/src/evolution/judges/client.ts b/src/evolution/judges/client.ts index 6254e99..edd5703 100644 --- a/src/evolution/judges/client.ts +++ b/src/evolution/judges/client.ts @@ -10,11 +10,25 @@ import { type VotingStrategy, } from "./types.ts"; +/** Thrown when the API call succeeds but structured output parsing fails. Carries token usage so cost can still be tracked. */ +export class JudgeParseError extends Error { + constructor( + message: string, + public readonly inputTokens: number, + public readonly outputTokens: number, + public readonly costUsd: number, + ) { + super(message); + this.name = "JudgeParseError"; + } +} + let _client: Anthropic | null = null; function getClient(): Anthropic { if (!_client) { - _client = new Anthropic(); + const authToken = process.env.ANTHROPIC_AUTH_TOKEN || process.env.CLAUDE_CODE_OAUTH_TOKEN || undefined; + _client = authToken && !process.env.ANTHROPIC_API_KEY ? new Anthropic({ authToken }) : new Anthropic(); } return _client; } @@ -25,7 +39,7 @@ export function setClient(client: Anthropic | null): void { } export function isJudgeAvailable(): boolean { - return !!process.env.ANTHROPIC_API_KEY; + return !!(process.env.ANTHROPIC_API_KEY || process.env.ANTHROPIC_AUTH_TOKEN || process.env.CLAUDE_CODE_OAUTH_TOKEN); } /** @@ -58,14 +72,19 @@ export async function callJudge(options: { }); const parsed = message.parsed_output; - if (!parsed) { - throw new Error(`Judge returned no structured output (stop_reason: ${message.stop_reason})`); - } - const inputTokens = message.usage.input_tokens; const outputTokens = message.usage.output_tokens; const costUsd = estimateCost(options.model, inputTokens, outputTokens); + if (!parsed) { + throw new JudgeParseError( + `Judge returned no structured output (stop_reason: ${message.stop_reason})`, + inputTokens, + outputTokens, + costUsd, + ); + } + // Extract verdict and confidence from the parsed data if present const data = parsed as Record; const verdict = (data.verdict as "pass" | "fail") ?? "pass"; diff --git a/src/index.ts b/src/index.ts index a6e0066..89156a5 100644 --- a/src/index.ts +++ b/src/index.ts @@ -3,13 +3,14 @@ import { join, resolve } from "node:path"; import { createInProcessToolServer } from "./agent/in-process-tools.ts"; import { AgentRuntime } from "./agent/runtime.ts"; import type { RuntimeEvent } from "./agent/runtime.ts"; +import { slackContextStore } from "./agent/slack-context.ts"; import { CliChannel } from "./channels/cli.ts"; import { EmailChannel } from "./channels/email.ts"; import { emitFeedback, setFeedbackHandler } from "./channels/feedback.ts"; import { formatToolActivity } from "./channels/progress-stream.ts"; import { createProgressStream } from "./channels/progress-stream.ts"; import { ChannelRouter } from "./channels/router.ts"; -import { setActionFollowUpHandler } from "./channels/slack-actions.ts"; +import { setActionFollowUpHandler, setLoopStopHandler } from "./channels/slack-actions.ts"; import { SlackChannel } from "./channels/slack.ts"; import { createStatusReactionController } from "./channels/status-reactions.ts"; import { TelegramChannel } from "./channels/telegram.ts"; @@ -33,6 +34,8 @@ import { runMigrations } from "./db/migrate.ts"; import { createEmailToolServer } from "./email/tool.ts"; import { EvolutionEngine } from "./evolution/engine.ts"; import type { SessionSummary } from "./evolution/types.ts"; +import { LoopRunner } from "./loop/runner.ts"; +import { createLoopToolServer } from "./loop/tool.ts"; import { PeerHealthMonitor } from "./mcp/peer-health.ts"; import { PeerManager } from "./mcp/peers.ts"; import { PhantomMcpServer } from "./mcp/server.ts"; @@ -113,8 +116,9 @@ async function main(): Promise { runtime.setRoleTemplate(activeRole); } + let contextBuilder: MemoryContextBuilder | undefined; if (memory.isReady()) { - const contextBuilder = new MemoryContextBuilder(memory, memoryConfig); + contextBuilder = new MemoryContextBuilder(memory, memoryConfig); runtime.setMemoryContextBuilder(contextBuilder); } @@ -157,6 +161,17 @@ async function main(): Promise { let mcpServer: PhantomMcpServer | null = null; let scheduler: Scheduler | null = null; + const postLoopDeps = + evolution || memory.isReady() + ? { + evolution: evolution ?? undefined, + memory: memory.isReady() ? memory : undefined, + onEvolvedConfigUpdate: evolution + ? (config: ReturnType) => runtime.setEvolvedConfig(config) + : undefined, + } + : undefined; + const loopRunner = new LoopRunner({ db, runtime, memoryContextBuilder: contextBuilder, postLoopDeps }); try { mcpServer = new PhantomMcpServer({ config, @@ -184,6 +199,7 @@ async function main(): Promise { runtime.setMcpServerFactories({ "phantom-dynamic-tools": () => createInProcessToolServer(registry), "phantom-scheduler": () => createSchedulerToolServer(scheduler as Scheduler), + "phantom-loop": () => createLoopToolServer(loopRunner), "phantom-web-ui": () => createWebUiToolServer(config.public_url), "phantom-secrets": () => createSecretToolServer({ db, baseUrl: secretsBaseUrl }), ...(process.env.RESEND_API_KEY @@ -325,6 +341,9 @@ async function main(): Promise { return health; }); + // Wire loop stop button (Slack -> runner) + setLoopStopHandler((loopId) => loopRunner.requestStop(loopId)); + // Wire action follow-up handler (button clicks -> agent) setActionFollowUpHandler(async (params) => { const followUpText = params.actionPayload @@ -408,7 +427,7 @@ async function main(): Promise { telegramChannel.startTyping(telegramChatId); } - const response = await runtime.handleMessage(msg.channelId, msg.conversationId, msg.text, (event: RuntimeEvent) => { + const onEvent = (event: RuntimeEvent): void => { switch (event.type) { case "init": console.log(`\n[phantom] Session: ${event.sessionId}`); @@ -427,7 +446,18 @@ async function main(): Promise { statusReactions?.setError(); break; } - }); + }; + + const runHandle = (): ReturnType => + runtime.handleMessage(msg.channelId, msg.conversationId, promptText, onEvent); + + // Slack-origin turns run inside an AsyncLocalStorage scope so in-process + // MCP tools (phantom_loop, etc.) can auto-target the operator's thread + // and original message without relying on the agent to forward the IDs. + const response = + isSlack && slackChannelId && slackThreadTs && slackMessageTs + ? await slackContextStore.run({ slackChannelId, slackThreadTs, slackMessageTs }, runHandle) + : await runHandle(); // Track assistant messages if (response.text) { @@ -594,10 +624,21 @@ async function main(): Promise { if (scheduler && slackChannel && channelsConfig?.slack?.owner_user_id) { scheduler.setSlackChannel(slackChannel, channelsConfig.slack.owner_user_id); } + if (slackChannel) { + loopRunner.setSlackChannel(slackChannel); + } if (scheduler) { await scheduler.start(); } + // Resume any loops that were running when the process last exited. + // The state file on disk is the source of truth, so "resume" just means + // scheduling another tick against each loop still marked running. + const resumedLoops = loopRunner.resumeRunning(); + if (resumedLoops > 0) { + console.log(`[loop] Resumed ${resumedLoops} loop(s)`); + } + // Wire /trigger endpoint setTriggerDeps({ runtime, diff --git a/src/loop/__tests__/evolution-integration.test.ts b/src/loop/__tests__/evolution-integration.test.ts new file mode 100644 index 0000000..074b297 --- /dev/null +++ b/src/loop/__tests__/evolution-integration.test.ts @@ -0,0 +1,364 @@ +import { Database } from "bun:sqlite"; +import { afterEach, beforeEach, describe, expect, mock, test } from "bun:test"; +import { mkdtempSync, rmSync, writeFileSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { runMigrations } from "../../db/migrate.ts"; +import { LoopRunner } from "../runner.ts"; +import { LoopStartInputSchema } from "../types.ts"; + +type HandleMessageImpl = ( + channel: string, + conversationId: string, + text: string, +) => Promise<{ + text: string; + sessionId: string; + cost: { totalUsd: number; inputTokens: number; outputTokens: number; modelUsage: Record }; + durationMs: number; +}>; + +function createMockRuntime(impl?: HandleMessageImpl) { + const defaultImpl: HandleMessageImpl = async () => ({ + text: "ok", + sessionId: "s", + cost: { totalUsd: 0.01, inputTokens: 10, outputTokens: 10, modelUsage: {} }, + durationMs: 10, + }); + return { handleMessage: mock(impl ?? defaultImpl) }; +} + +function agentFinishes(stateFile: string, loopId: string): HandleMessageImpl { + return async () => { + writeFileSync(stateFile, `---\nloop_id: ${loopId}\nstatus: done\niteration: 1\n---\n\nDone.\n`, "utf-8"); + return { + text: "done", + sessionId: "s", + cost: { totalUsd: 0.01, inputTokens: 1, outputTokens: 1, modelUsage: {} }, + durationMs: 1, + }; + }; +} + +describe("LoopRunner evolution integration", () => { + let db: Database; + let dataDir: string; + + beforeEach(() => { + db = new Database(":memory:"); + db.run("PRAGMA journal_mode = WAL"); + db.run("PRAGMA foreign_keys = ON"); + runMigrations(db); + dataDir = mkdtempSync(join(tmpdir(), "phantom-loop-evo-")); + }); + + afterEach(() => { + db.close(); + rmSync(dataDir, { recursive: true, force: true }); + }); + + test("memory context is cached once at start and reused across ticks", async () => { + const buildMock = mock(async () => "## Known Facts\n- User prefers TS"); + const mockContextBuilder = { build: buildMock } as never; + + const runtime = createMockRuntime(); + const runner = new LoopRunner({ + db, + runtime, + dataDir, + autoSchedule: false, + memoryContextBuilder: mockContextBuilder, + }); + const loop = runner.start({ goal: "Test memory caching", maxIterations: 5 }); + + // Allow the async cache to resolve + await Bun.sleep(10); + + // buildMock called once at start + expect(buildMock).toHaveBeenCalledTimes(1); + expect(buildMock).toHaveBeenCalledWith("Test memory caching"); + + // Tick multiple times - build should still only be called once + await runner.tick(loop.id); + await runner.tick(loop.id); + expect(buildMock).toHaveBeenCalledTimes(1); + + // Verify the prompt contains memory context + const promptArg = runtime.handleMessage.mock.calls[0][2] as string; + expect(promptArg).toContain("RECALLED MEMORIES"); + expect(promptArg).toContain("User prefers TS"); + }); + + test("memory context is cleared on finalize", async () => { + const buildMock = mock(async () => "some context"); + const mockContextBuilder = { build: buildMock } as never; + + const runtime = createMockRuntime(); + const runner = new LoopRunner({ + db, + runtime, + dataDir, + autoSchedule: false, + memoryContextBuilder: mockContextBuilder, + }); + const loop = runner.start({ goal: "clean up", maxIterations: 1 }); + await Bun.sleep(10); + + runtime.handleMessage.mockImplementation(agentFinishes(loop.stateFile, loop.id)); + await runner.tick(loop.id); + + expect(runner.getLoop(loop.id)?.status).toBe("done"); + + // Start another loop - build should be called again (cache was cleared) + runner.start({ goal: "another" }); + await Bun.sleep(10); + expect(buildMock).toHaveBeenCalledTimes(2); + }); + + test("post-loop evolution is called on finalize with correct session data", async () => { + const afterSessionMock = mock(async () => ({ version: 1, changes_applied: [], changes_rejected: [] })); + const mockEvolution = { + afterSession: afterSessionMock, + getConfig: () => ({ userProfile: "", domainKnowledge: "" }), + }; + const mockMemory = { isReady: () => false }; + const onUpdate = mock(() => {}); + + const runtime = createMockRuntime(); + const runner = new LoopRunner({ + db, + runtime, + dataDir, + autoSchedule: false, + postLoopDeps: { + evolution: mockEvolution as never, + memory: mockMemory as never, + onEvolvedConfigUpdate: onUpdate, + }, + }); + const loop = runner.start({ goal: "evolve this" }); + runtime.handleMessage.mockImplementation(agentFinishes(loop.stateFile, loop.id)); + await runner.tick(loop.id); + + // Allow fire-and-forget to complete + await Bun.sleep(50); + + expect(afterSessionMock).toHaveBeenCalledTimes(1); + const summary = (afterSessionMock.mock.calls as unknown[][])[0][0] as Record; + expect(summary.session_id).toBe(loop.id); + expect(summary.outcome).toBe("success"); + }); + + test("post-loop evolution failure does not affect loop status", async () => { + const mockEvolution = { + afterSession: mock(async () => { + throw new Error("evolution broke"); + }), + getConfig: () => ({ userProfile: "", domainKnowledge: "" }), + }; + const mockMemory = { isReady: () => false }; + + const runtime = createMockRuntime(); + const runner = new LoopRunner({ + db, + runtime, + dataDir, + autoSchedule: false, + postLoopDeps: { + evolution: mockEvolution as never, + memory: mockMemory as never, + onEvolvedConfigUpdate: () => {}, + }, + }); + const loop = runner.start({ goal: "survive evolution failure" }); + runtime.handleMessage.mockImplementation(agentFinishes(loop.stateFile, loop.id)); + await runner.tick(loop.id); + + await Bun.sleep(50); + + // Loop should still be done, not failed + expect(runner.getLoop(loop.id)?.status).toBe("done"); + }); + + test("critique does NOT fire when postLoopDeps is absent", async () => { + const runtime = createMockRuntime(); + // No postLoopDeps = no evolution engine = critique should never fire + const runner = new LoopRunner({ + db, + runtime, + dataDir, + autoSchedule: false, + }); + const loop = runner.start({ goal: "no evolution", checkpointInterval: 1 }); + + await runner.tick(loop.id); + + // The prompt should NOT contain reviewer feedback (critique skipped) + const promptArg = runtime.handleMessage.mock.calls[0][2] as string; + expect(promptArg).not.toContain("REVIEWER FEEDBACK"); + }); + + test("critique does NOT fire when usesLLMJudges returns false", async () => { + const mockEvolution = { + afterSession: mock(async () => ({ version: 1, changes_applied: [], changes_rejected: [] })), + getConfig: () => ({ userProfile: "", domainKnowledge: "" }), + usesLLMJudges: () => false, + isWithinCostCap: () => true, + trackExternalJudgeCost: mock(() => {}), + }; + + const runtime = createMockRuntime(); + const runner = new LoopRunner({ + db, + runtime, + dataDir, + autoSchedule: false, + postLoopDeps: { + evolution: mockEvolution as never, + memory: { isReady: () => false } as never, + onEvolvedConfigUpdate: () => {}, + }, + }); + const loop = runner.start({ goal: "judges disabled", checkpointInterval: 1 }); + + await runner.tick(loop.id); + await runner.tick(loop.id); + + // Second tick prompt should NOT have critique (judges disabled) + const secondPrompt = runtime.handleMessage.mock.calls[1][2] as string; + expect(secondPrompt).not.toContain("REVIEWER FEEDBACK"); + expect(mockEvolution.trackExternalJudgeCost).not.toHaveBeenCalled(); + }); + + test("critique does NOT fire when cost cap is exceeded", async () => { + const mockEvolution = { + afterSession: mock(async () => ({ version: 1, changes_applied: [], changes_rejected: [] })), + getConfig: () => ({ userProfile: "", domainKnowledge: "" }), + usesLLMJudges: () => true, + isWithinCostCap: () => false, + trackExternalJudgeCost: mock(() => {}), + }; + + const runtime = createMockRuntime(); + const runner = new LoopRunner({ + db, + runtime, + dataDir, + autoSchedule: false, + postLoopDeps: { + evolution: mockEvolution as never, + memory: { isReady: () => false } as never, + onEvolvedConfigUpdate: () => {}, + }, + }); + const loop = runner.start({ goal: "over budget", checkpointInterval: 1 }); + + await runner.tick(loop.id); + await runner.tick(loop.id); + + const secondPrompt = runtime.handleMessage.mock.calls[1][2] as string; + expect(secondPrompt).not.toContain("REVIEWER FEEDBACK"); + expect(mockEvolution.trackExternalJudgeCost).not.toHaveBeenCalled(); + }); + + test("tick 1 summary is recorded in transcript", async () => { + const runtime = createMockRuntime(); + const afterSessionMock = mock(async () => ({ version: 1, changes_applied: [], changes_rejected: [] })); + const runner = new LoopRunner({ + db, + runtime, + dataDir, + autoSchedule: false, + postLoopDeps: { + evolution: { + afterSession: afterSessionMock, + getConfig: () => ({ userProfile: "", domainKnowledge: "" }), + usesLLMJudges: () => false, + isWithinCostCap: () => true, + trackExternalJudgeCost: mock(() => {}), + } as never, + memory: { isReady: () => false } as never, + onEvolvedConfigUpdate: () => {}, + }, + }); + const loop = runner.start({ goal: "check tick 1 summary" }); + + runtime.handleMessage.mockImplementation(agentFinishes(loop.stateFile, loop.id)); + await runner.tick(loop.id); + + // Allow fire-and-forget to complete + await Bun.sleep(50); + + // The session data should include a Tick 1 summary + const summary = (afterSessionMock.mock.calls as unknown[][])[0][0] as Record; + const userMsgs = summary.user_messages as string[]; + const hasTick1Summary = userMsgs.some((m) => m.includes("Tick 1:")); + expect(hasTick1Summary).toBe(true); + }); + + test("checkpoint_interval round-trips through start/store", () => { + const runtime = createMockRuntime(); + const runner = new LoopRunner({ db, runtime, dataDir, autoSchedule: false }); + + const loop = runner.start({ goal: "with checkpoint", checkpointInterval: 5 }); + expect(loop.checkpointInterval).toBe(5); + + const reloaded = runner.getLoop(loop.id); + expect(reloaded?.checkpointInterval).toBe(5); + }); + + test("checkpoint_interval defaults to null when omitted", () => { + const runtime = createMockRuntime(); + const runner = new LoopRunner({ db, runtime, dataDir, autoSchedule: false }); + + const loop = runner.start({ goal: "no checkpoint" }); + expect(loop.checkpointInterval).toBeNull(); + }); +}); + +describe("LoopStartInputSchema checkpoint_interval validation", () => { + test("accepts valid checkpoint_interval", () => { + const result = LoopStartInputSchema.safeParse({ + goal: "test", + checkpoint_interval: 5, + }); + expect(result.success).toBe(true); + }); + + test("accepts 0 (disabled)", () => { + const result = LoopStartInputSchema.safeParse({ + goal: "test", + checkpoint_interval: 0, + }); + expect(result.success).toBe(true); + }); + + test("rejects negative values", () => { + const result = LoopStartInputSchema.safeParse({ + goal: "test", + checkpoint_interval: -1, + }); + expect(result.success).toBe(false); + }); + + test("rejects values above ceiling", () => { + const result = LoopStartInputSchema.safeParse({ + goal: "test", + checkpoint_interval: 201, + }); + expect(result.success).toBe(false); + }); + + test("rejects non-integer values", () => { + const result = LoopStartInputSchema.safeParse({ + goal: "test", + checkpoint_interval: 5.5, + }); + expect(result.success).toBe(false); + }); + + test("accepts omitted (optional)", () => { + const result = LoopStartInputSchema.safeParse({ goal: "test" }); + expect(result.success).toBe(true); + }); +}); diff --git a/src/loop/__tests__/notifications.test.ts b/src/loop/__tests__/notifications.test.ts new file mode 100644 index 0000000..39840d4 --- /dev/null +++ b/src/loop/__tests__/notifications.test.ts @@ -0,0 +1,436 @@ +import { Database } from "bun:sqlite"; +import { afterEach, beforeEach, describe, expect, mock, test } from "bun:test"; +import { mkdirSync, mkdtempSync, rmSync, writeFileSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import type { SlackChannel } from "../../channels/slack.ts"; +import { runMigrations } from "../../db/migrate.ts"; +import { LoopNotifier, buildProgressBar, terminalEmoji } from "../notifications.ts"; +import { LoopStore } from "../store.ts"; +import type { Loop, LoopStatus } from "../types.ts"; + +// Minimal SlackChannel shape the notifier actually calls. Every method is +// a mock so we can assert call args and ordering. +type MockSlack = { + postToChannel: ReturnType; + updateMessage: ReturnType; + addReaction: ReturnType; + removeReaction: ReturnType; +}; + +function makeSlack(overrides: Partial = {}): MockSlack { + return { + postToChannel: mock(async () => "1700000000.100100"), + updateMessage: mock(async () => undefined), + addReaction: mock(async () => undefined), + removeReaction: mock(async () => undefined), + ...overrides, + }; +} + +function asSlack(m: MockSlack): SlackChannel { + return m as unknown as SlackChannel; +} + +function makeLoop(overrides: Partial = {}): Loop { + return { + id: "abcdef0123456789", + goal: "test goal", + workspaceDir: "/tmp/ws", + stateFile: "/tmp/ws/state.md", + successCommand: null, + maxIterations: 10, + maxCostUsd: 5, + checkpointInterval: null, + status: "running", + iterationCount: 0, + totalCostUsd: 0, + channelId: "C100", + conversationId: "1700000000.000100", + statusMessageTs: null, + triggerMessageTs: "1700000000.000200", + interruptRequested: false, + lastError: null, + startedAt: "2026-04-05T00:00:00Z", + lastTickAt: null, + finishedAt: null, + ...overrides, + }; +} + +describe("buildProgressBar", () => { + test("renders empty bar at 0/N", () => { + expect(buildProgressBar(0, 10)).toBe("[░░░░░░░░░░]"); + }); + test("renders full bar at N/N", () => { + expect(buildProgressBar(10, 10)).toBe("[██████████]"); + }); + test("renders half bar at N/2", () => { + expect(buildProgressBar(5, 10)).toBe("[█████░░░░░]"); + }); + test("rounds to nearest cell", () => { + // 3/7 ≈ 43% → 4 cells of 10 + expect(buildProgressBar(3, 7)).toBe("[████░░░░░░]"); + }); + test("clamps overflow", () => { + expect(buildProgressBar(99, 10)).toBe("[██████████]"); + }); + test("handles zero total safely", () => { + expect(buildProgressBar(0, 0)).toBe("[░░░░░░░░░░]"); + }); +}); + +describe("terminalEmoji", () => { + test("maps every known status", () => { + expect(terminalEmoji("done")).toBe(":white_check_mark:"); + expect(terminalEmoji("stopped")).toBe(":octagonal_sign:"); + expect(terminalEmoji("budget_exceeded")).toBe(":warning:"); + expect(terminalEmoji("failed")).toBe(":x:"); + expect(terminalEmoji("running")).toBe(":repeat:"); + }); +}); + +describe("LoopNotifier", () => { + let db: Database; + let store: LoopStore; + + beforeEach(() => { + db = new Database(":memory:"); + db.run("PRAGMA journal_mode = WAL"); + runMigrations(db); + store = new LoopStore(db); + }); + + afterEach(() => { + db.close(); + }); + + describe("postStartNotice", () => { + test("no-ops when slackChannel is null", async () => { + const notifier = new LoopNotifier(null, store); + await notifier.postStartNotice(makeLoop()); + // Nothing to assert beyond "did not throw"; the null guard is the + // whole point. + expect(true).toBe(true); + }); + + test("no-ops when loop.channelId is null", async () => { + const slack = makeSlack(); + const notifier = new LoopNotifier(asSlack(slack), store); + await notifier.postStartNotice(makeLoop({ channelId: null })); + expect(slack.postToChannel).not.toHaveBeenCalled(); + expect(slack.addReaction).not.toHaveBeenCalled(); + }); + + test("posts, persists ts, attaches stop button, stamps start reaction", async () => { + // Insert a real row so setStatusMessageTs can UPDATE it. + const loop = store.insert({ + id: "abcdef0123456789", + goal: "g", + workspaceDir: "/tmp/ws", + stateFile: "/tmp/ws/state.md", + successCommand: null, + maxIterations: 10, + maxCostUsd: 5, + channelId: "C100", + conversationId: "1700000000.000100", + triggerMessageTs: "1700000000.000200", + }); + + const slack = makeSlack(); + const notifier = new LoopNotifier(asSlack(slack), store); + await notifier.postStartNotice(loop); + + expect(slack.postToChannel).toHaveBeenCalledTimes(1); + const [channel, text, threadTs] = slack.postToChannel.mock.calls[0]; + expect(channel).toBe("C100"); + expect(text).toContain("Starting loop"); + expect(threadTs).toBe("1700000000.000100"); + + // Stop button attached via updateMessage with blocks + expect(slack.updateMessage).toHaveBeenCalledTimes(1); + const updateArgs = slack.updateMessage.mock.calls[0]; + expect(updateArgs[0]).toBe("C100"); + expect(updateArgs[3]).toBeDefined(); // blocks array + const blocks = updateArgs[3] as Array>; + const actionsBlock = blocks.find((b) => b.type === "actions"); + expect(actionsBlock).toBeDefined(); + + // Reaction stamped on the operator's trigger message + expect(slack.addReaction).toHaveBeenCalledWith("C100", "1700000000.000200", "hourglass_flowing_sand"); + + // Persisted status_message_ts round-trips back through findById + const reloaded = store.findById(loop.id); + expect(reloaded?.statusMessageTs).toBe("1700000000.100100"); + }); + + test("skips reaction when triggerMessageTs is null", async () => { + const loop = store.insert({ + id: "abcdef0123456789", + goal: "g", + workspaceDir: "/tmp/ws", + stateFile: "/tmp/ws/state.md", + successCommand: null, + maxIterations: 10, + maxCostUsd: 5, + channelId: "C100", + conversationId: null, + triggerMessageTs: null, + }); + const slack = makeSlack(); + const notifier = new LoopNotifier(asSlack(slack), store); + await notifier.postStartNotice(loop); + expect(slack.postToChannel).toHaveBeenCalled(); + expect(slack.addReaction).not.toHaveBeenCalled(); + }); + }); + + describe("postTickUpdate", () => { + function insertWithStatusTs(overrides: { triggerMessageTs?: string | null; iteration?: number } = {}) { + const row = store.insert({ + id: "abcdef0123456789", + goal: "g", + workspaceDir: "/tmp/ws", + stateFile: "/tmp/ws/state.md", + successCommand: null, + maxIterations: 10, + maxCostUsd: 5, + channelId: "C100", + conversationId: "1700000000.000100", + triggerMessageTs: overrides.triggerMessageTs ?? "1700000000.000200", + }); + store.setStatusMessageTs(row.id, "1700000000.100100"); + if (overrides.iteration) store.recordTick(row.id, overrides.iteration, 0); + const reloaded = store.findById(row.id); + if (!reloaded) throw new Error("failed to reload"); + return reloaded; + } + + test("edits the status message with a progress bar and cost", async () => { + const loop = insertWithStatusTs(); + const slack = makeSlack(); + const notifier = new LoopNotifier(asSlack(slack), store); + await notifier.postTickUpdate(loop.id, 3, "in-progress"); + + expect(slack.updateMessage).toHaveBeenCalledTimes(1); + const [ch, ts, text] = slack.updateMessage.mock.calls[0]; + expect(ch).toBe("C100"); + expect(ts).toBe("1700000000.100100"); + expect(text).toContain("3/10"); + expect(text).toContain("abcdef01"); + expect(text).toMatch(/\[█+░+\]/); + expect(text).toContain("in-progress"); + }); + + test("re-sends blocks on every tick edit so the Stop button persists", async () => { + // Regression test: Slack's chat.update replaces the entire message + // and drops blocks the caller does not include. Without passing + // blocks on tick updates, the Stop button would disappear after + // the first tick edit. Verify the button survives. + insertWithStatusTs(); + const slack = makeSlack(); + const notifier = new LoopNotifier(asSlack(slack), store); + await notifier.postTickUpdate("abcdef0123456789", 2, "in-progress"); + + expect(slack.updateMessage).toHaveBeenCalledTimes(1); + const updateArgs = slack.updateMessage.mock.calls[0]; + const blocks = updateArgs[3] as Array> | undefined; + expect(blocks).toBeDefined(); + const actionsBlock = blocks?.find((b) => b.type === "actions"); + expect(actionsBlock).toBeDefined(); + const elements = (actionsBlock as { elements: Array> }).elements; + expect(elements[0].action_id).toBe("phantom:loop_stop:abcdef0123456789"); + }); + + test("swaps hourglass → cycle on the first tick", async () => { + insertWithStatusTs(); + const slack = makeSlack(); + const notifier = new LoopNotifier(asSlack(slack), store); + await notifier.postTickUpdate("abcdef0123456789", 1, "in-progress"); + + expect(slack.removeReaction).toHaveBeenCalledWith("C100", "1700000000.000200", "hourglass_flowing_sand"); + expect(slack.addReaction).toHaveBeenCalledWith("C100", "1700000000.000200", "arrows_counterclockwise"); + }); + + test("does not swap reactions on tick 2+", async () => { + insertWithStatusTs(); + const slack = makeSlack(); + const notifier = new LoopNotifier(asSlack(slack), store); + await notifier.postTickUpdate("abcdef0123456789", 2, "in-progress"); + + expect(slack.removeReaction).not.toHaveBeenCalled(); + expect(slack.addReaction).not.toHaveBeenCalled(); + }); + + test("no-ops when statusMessageTs is not yet set", async () => { + store.insert({ + id: "abcdef0123456789", + goal: "g", + workspaceDir: "/tmp/ws", + stateFile: "/tmp/ws/state.md", + successCommand: null, + maxIterations: 10, + maxCostUsd: 5, + channelId: "C100", + conversationId: null, + triggerMessageTs: "1700000000.000200", + }); + const slack = makeSlack(); + const notifier = new LoopNotifier(asSlack(slack), store); + await notifier.postTickUpdate("abcdef0123456789", 1, "in-progress"); + expect(slack.updateMessage).not.toHaveBeenCalled(); + }); + }); + + describe("postFinalNotice", () => { + const cases: Array<{ status: LoopStatus; reaction: string }> = [ + { status: "done", reaction: "white_check_mark" }, + { status: "stopped", reaction: "octagonal_sign" }, + { status: "budget_exceeded", reaction: "warning" }, + { status: "failed", reaction: "x" }, + ]; + + for (const { status, reaction } of cases) { + test(`stamps terminal reaction for status=${status}`, async () => { + const slack = makeSlack(); + const notifier = new LoopNotifier(asSlack(slack), store); + await notifier.postFinalNotice(makeLoop({ statusMessageTs: "1700000000.100100", status }), status); + const addCalls = slack.addReaction.mock.calls.map((c: unknown[]) => c[2]); + expect(addCalls).toContain(reaction); + // Both in-flight reactions best-effort removed + const removeCalls = slack.removeReaction.mock.calls.map((c: unknown[]) => c[2]); + expect(removeCalls).toContain("hourglass_flowing_sand"); + expect(removeCalls).toContain("arrows_counterclockwise"); + }); + } + + test("edits existing status message when set", async () => { + const slack = makeSlack(); + const notifier = new LoopNotifier(asSlack(slack), store); + await notifier.postFinalNotice(makeLoop({ statusMessageTs: "1700000000.100100" }), "done"); + expect(slack.updateMessage).toHaveBeenCalledTimes(1); + expect(slack.postToChannel).not.toHaveBeenCalled(); + }); + + test("posts new message when statusMessageTs is null", async () => { + const slack = makeSlack(); + const notifier = new LoopNotifier(asSlack(slack), store); + await notifier.postFinalNotice(makeLoop({ statusMessageTs: null }), "done"); + expect(slack.postToChannel).toHaveBeenCalledTimes(1); + expect(slack.updateMessage).not.toHaveBeenCalled(); + }); + + test("no-ops when triggerMessageTs is null", async () => { + const slack = makeSlack(); + const notifier = new LoopNotifier(asSlack(slack), store); + await notifier.postFinalNotice( + makeLoop({ statusMessageTs: "1700000000.100100", triggerMessageTs: null }), + "done", + ); + expect(slack.addReaction).not.toHaveBeenCalled(); + expect(slack.removeReaction).not.toHaveBeenCalled(); + }); + + describe("state summary thread reply", () => { + let workDir: string; + + beforeEach(() => { + workDir = mkdtempSync(join(tmpdir(), "loop-notifier-summary-")); + }); + + afterEach(() => { + rmSync(workDir, { recursive: true, force: true }); + }); + + function writeStateFile(body: string): string { + const stateFile = join(workDir, "state.md"); + mkdirSync(workDir, { recursive: true }); + writeFileSync(stateFile, `---\nloop_id: abc\nstatus: done\niteration: 3\n---\n\n${body}\n`, "utf-8"); + return stateFile; + } + + test("posts the state.md body as a threaded reply on completion", async () => { + const stateFile = writeStateFile("# Progress\n- Tick 1: Hello!\n- Tick 2: Hello!\n- Tick 3: Hello!"); + const slack = makeSlack(); + const notifier = new LoopNotifier(asSlack(slack), store); + await notifier.postFinalNotice(makeLoop({ stateFile, statusMessageTs: "1700000000.100100" }), "done"); + + // The status message edit is one call; the summary is a second + // postToChannel call in the same thread. + expect(slack.postToChannel).toHaveBeenCalledTimes(1); + const [channel, text, threadTs] = slack.postToChannel.mock.calls[0]; + expect(channel).toBe("C100"); + expect(text).toContain("Tick 1: Hello!"); + expect(text).toContain("Tick 3: Hello!"); + expect(text).toContain("final state"); + // Frontmatter must be stripped + expect(text).not.toContain("loop_id: abc"); + expect(text).not.toContain("iteration: 3"); + // Posted in the same thread as the original turn + expect(threadTs).toBe("1700000000.000100"); + }); + + test("falls back to status_message_ts when conversationId is null", async () => { + const stateFile = writeStateFile("# Progress\n- done"); + const slack = makeSlack(); + const notifier = new LoopNotifier(asSlack(slack), store); + await notifier.postFinalNotice( + makeLoop({ + stateFile, + statusMessageTs: "1700000000.100100", + conversationId: null, + }), + "done", + ); + + expect(slack.postToChannel).toHaveBeenCalledTimes(1); + const threadTs = slack.postToChannel.mock.calls[0][2]; + expect(threadTs).toBe("1700000000.100100"); + }); + + test("silently skips summary when state file does not exist", async () => { + const slack = makeSlack(); + const notifier = new LoopNotifier(asSlack(slack), store); + await notifier.postFinalNotice( + makeLoop({ stateFile: "/nonexistent/path/state.md", statusMessageTs: "1700000000.100100" }), + "done", + ); + // The terminal reaction path still runs, but no summary post. + expect(slack.postToChannel).not.toHaveBeenCalled(); + }); + + test("silently skips summary when body is empty", async () => { + const stateFile = writeStateFile(""); + const slack = makeSlack(); + const notifier = new LoopNotifier(asSlack(slack), store); + await notifier.postFinalNotice(makeLoop({ stateFile, statusMessageTs: "1700000000.100100" }), "done"); + expect(slack.postToChannel).not.toHaveBeenCalled(); + }); + + test("truncates very long summaries", async () => { + // 5000 chars of body, well over the 3500 cap + const body = "x".repeat(5000); + const stateFile = writeStateFile(body); + const slack = makeSlack(); + const notifier = new LoopNotifier(asSlack(slack), store); + await notifier.postFinalNotice(makeLoop({ stateFile, statusMessageTs: "1700000000.100100" }), "done"); + const text = slack.postToChannel.mock.calls[0][1] as string; + expect(text).toContain("…(truncated)"); + // Total posted text must be bounded by 3500 chars of body + small + // amount of surrounding formatting, so under ~3700. + expect(text.length).toBeLessThan(3800); + }); + + test("summary also fires for stopped/failed/budget_exceeded outcomes", async () => { + const stateFile = writeStateFile("# Progress\n- partial work"); + for (const status of ["stopped", "failed", "budget_exceeded"] as const) { + const slack = makeSlack(); + const notifier = new LoopNotifier(asSlack(slack), store); + await notifier.postFinalNotice(makeLoop({ stateFile, statusMessageTs: "1700000000.100100", status }), status); + expect(slack.postToChannel).toHaveBeenCalledTimes(1); + expect(slack.postToChannel.mock.calls[0][1]).toContain("partial work"); + } + }); + }); + }); +}); diff --git a/src/loop/__tests__/post-loop.test.ts b/src/loop/__tests__/post-loop.test.ts new file mode 100644 index 0000000..ae12196 --- /dev/null +++ b/src/loop/__tests__/post-loop.test.ts @@ -0,0 +1,128 @@ +import { describe, expect, test } from "bun:test"; +import { type LoopTranscript, synthesizeSessionData } from "../post-loop.ts"; +import type { Loop } from "../types.ts"; + +function makeLoop(overrides: Partial = {}): Loop { + return { + id: "loop-123", + goal: "Refactor the auth module", + workspaceDir: "/tmp/ws", + stateFile: "/tmp/ws/state.md", + successCommand: null, + maxIterations: 20, + maxCostUsd: 5, + checkpointInterval: null, + status: "running", + iterationCount: 5, + totalCostUsd: 1.23, + channelId: null, + conversationId: null, + statusMessageTs: null, + triggerMessageTs: null, + interruptRequested: false, + lastError: null, + startedAt: "2024-01-01T00:00:00Z", + lastTickAt: "2024-01-01T00:05:00Z", + finishedAt: "2024-01-01T00:06:00Z", + ...overrides, + }; +} + +function makeTranscript(overrides: Partial = {}): LoopTranscript { + return { + firstPrompt: "First tick prompt", + firstResponse: "First tick response", + summaries: ["Tick 2: in-progress", "Tick 3: in-progress"], + lastPrompt: "Last tick prompt", + lastResponse: "Last tick response", + ...overrides, + }; +} + +describe("synthesizeSessionData", () => { + test("maps done status to success outcome", () => { + const data = synthesizeSessionData(makeLoop(), "done", makeTranscript()); + expect(data.outcome).toBe("success"); + }); + + test("maps stopped status to abandoned outcome", () => { + const data = synthesizeSessionData(makeLoop(), "stopped", makeTranscript()); + expect(data.outcome).toBe("abandoned"); + }); + + test("maps budget_exceeded status to failure outcome", () => { + const data = synthesizeSessionData(makeLoop(), "budget_exceeded", makeTranscript()); + expect(data.outcome).toBe("failure"); + }); + + test("maps failed status to failure outcome", () => { + const data = synthesizeSessionData(makeLoop(), "failed", makeTranscript()); + expect(data.outcome).toBe("failure"); + }); + + test("includes context header with tick count and goal", () => { + const data = synthesizeSessionData(makeLoop(), "done", makeTranscript()); + expect(data.userMessages[0]).toContain("[Loop: 5 ticks"); + expect(data.userMessages[0]).toContain("Refactor the auth module"); + expect(data.userMessages[0]).toContain("outcome: success"); + }); + + test("includes first tick prompt, rolling summaries, and last tick prompt", () => { + const data = synthesizeSessionData(makeLoop(), "done", makeTranscript()); + expect(data.userMessages[0]).toContain("First tick prompt"); + expect(data.userMessages).toContain("Tick 2: in-progress"); + expect(data.userMessages).toContain("Tick 3: in-progress"); + expect(data.userMessages[data.userMessages.length - 1]).toContain("Last tick prompt"); + }); + + test("includes first and last assistant responses", () => { + const data = synthesizeSessionData(makeLoop(), "done", makeTranscript()); + expect(data.assistantMessages).toHaveLength(2); + expect(data.assistantMessages[0]).toContain("First tick response"); + expect(data.assistantMessages[1]).toContain("Last tick response"); + }); + + test("uses channel:channelId for Slack-originated loops", () => { + const loop = makeLoop({ channelId: "C100" }); + const data = synthesizeSessionData(loop, "done", makeTranscript()); + expect(data.userId).toBe("channel:C100"); + }); + + test("uses 'autonomous' for headless loops", () => { + const loop = makeLoop({ channelId: null }); + const data = synthesizeSessionData(loop, "done", makeTranscript()); + expect(data.userId).toBe("autonomous"); + }); + + test("session key uses channel:conversation for Slack loops", () => { + const loop = makeLoop({ channelId: "C100", conversationId: "1700000.000" }); + const data = synthesizeSessionData(loop, "done", makeTranscript()); + expect(data.sessionKey).toBe("C100:1700000.000"); + }); + + test("session key uses loop:id for headless loops", () => { + const loop = makeLoop({ channelId: null }); + const data = synthesizeSessionData(loop, "done", makeTranscript()); + expect(data.sessionKey).toBe("loop:loop-123"); + }); + + test("passes through cost and timestamps", () => { + const data = synthesizeSessionData(makeLoop(), "done", makeTranscript()); + expect(data.costUsd).toBe(1.23); + expect(data.startedAt).toBe("2024-01-01T00:00:00Z"); + expect(data.endedAt).toBe("2024-01-01T00:06:00Z"); + }); + + test("uses empty arrays for toolsUsed and filesTracked", () => { + const data = synthesizeSessionData(makeLoop(), "done", makeTranscript()); + expect(data.toolsUsed).toEqual([]); + expect(data.filesTracked).toEqual([]); + }); + + test("handles empty transcript (no-tick loop)", () => { + const transcript = makeTranscript({ summaries: [] }); + const data = synthesizeSessionData(makeLoop({ iterationCount: 0 }), "stopped", transcript); + expect(data.userMessages.length).toBeGreaterThan(0); + expect(data.outcome).toBe("abandoned"); + }); +}); diff --git a/src/loop/__tests__/prompt.test.ts b/src/loop/__tests__/prompt.test.ts new file mode 100644 index 0000000..eae3dc1 --- /dev/null +++ b/src/loop/__tests__/prompt.test.ts @@ -0,0 +1,85 @@ +import { describe, expect, test } from "bun:test"; +import { buildTickPrompt } from "../prompt.ts"; +import type { Loop } from "../types.ts"; + +function makeLoop(overrides: Partial = {}): Loop { + return { + id: "loop-1", + goal: "Write a haiku", + workspaceDir: "/tmp/ws", + stateFile: "/tmp/ws/state.md", + successCommand: null, + maxIterations: 20, + maxCostUsd: 5, + checkpointInterval: null, + status: "running", + iterationCount: 3, + totalCostUsd: 0.5, + channelId: null, + conversationId: null, + statusMessageTs: null, + triggerMessageTs: null, + interruptRequested: false, + lastError: null, + startedAt: "2024-01-01T00:00:00Z", + lastTickAt: null, + finishedAt: null, + ...overrides, + }; +} + +describe("buildTickPrompt", () => { + test("returns base prompt without optional sections when no options provided", () => { + const prompt = buildTickPrompt(makeLoop(), "state contents"); + expect(prompt).toContain("Write a haiku"); + expect(prompt).toContain("state contents"); + expect(prompt).not.toContain("RECALLED MEMORIES"); + expect(prompt).not.toContain("REVIEWER FEEDBACK"); + }); + + test("injects memory context before state file section", () => { + const memoryContext = "## Known Facts\n- User prefers TypeScript"; + const prompt = buildTickPrompt(makeLoop(), "state contents", { memoryContext }); + + expect(prompt).toContain("RECALLED MEMORIES (from previous sessions)"); + expect(prompt).toContain("User prefers TypeScript"); + + // Memory should appear before state file contents + const memoryIdx = prompt.indexOf("RECALLED MEMORIES"); + const stateIdx = prompt.indexOf("CURRENT STATE FILE CONTENTS"); + expect(memoryIdx).toBeLessThan(stateIdx); + }); + + test("injects critique section", () => { + const critique = "The loop appears stuck in a pattern."; + const prompt = buildTickPrompt(makeLoop(), "state contents", { critique }); + + expect(prompt).toContain("REVIEWER FEEDBACK (from your last checkpoint)"); + expect(prompt).toContain("stuck in a pattern"); + }); + + test("injects both memory and critique when both provided", () => { + const prompt = buildTickPrompt(makeLoop(), "state contents", { + memoryContext: "Some facts", + critique: "Some feedback", + }); + + expect(prompt).toContain("RECALLED MEMORIES"); + expect(prompt).toContain("REVIEWER FEEDBACK"); + + // Memory should come before critique + const memoryIdx = prompt.indexOf("RECALLED MEMORIES"); + const critiqueIdx = prompt.indexOf("REVIEWER FEEDBACK"); + expect(memoryIdx).toBeLessThan(critiqueIdx); + }); + + test("skips empty memory context", () => { + const prompt = buildTickPrompt(makeLoop(), "state contents", { memoryContext: "" }); + expect(prompt).not.toContain("RECALLED MEMORIES"); + }); + + test("skips empty critique", () => { + const prompt = buildTickPrompt(makeLoop(), "state contents", { critique: "" }); + expect(prompt).not.toContain("REVIEWER FEEDBACK"); + }); +}); diff --git a/src/loop/__tests__/runner.test.ts b/src/loop/__tests__/runner.test.ts new file mode 100644 index 0000000..ee1ffb4 --- /dev/null +++ b/src/loop/__tests__/runner.test.ts @@ -0,0 +1,336 @@ +import { Database } from "bun:sqlite"; +import { afterEach, beforeEach, describe, expect, mock, test } from "bun:test"; +import { mkdtempSync, readFileSync, rmSync, writeFileSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { runMigrations } from "../../db/migrate.ts"; +import { LoopRunner } from "../runner.ts"; + +type HandleMessageImpl = ( + channel: string, + conversationId: string, + text: string, +) => Promise<{ + text: string; + sessionId: string; + cost: { totalUsd: number; inputTokens: number; outputTokens: number; modelUsage: Record }; + durationMs: number; +}>; + +function createMockRuntime(impl?: HandleMessageImpl) { + const defaultImpl: HandleMessageImpl = async () => ({ + text: "ok", + sessionId: "s", + cost: { totalUsd: 0.01, inputTokens: 10, outputTokens: 10, modelUsage: {} }, + durationMs: 10, + }); + return { + handleMessage: mock(impl ?? defaultImpl), + }; +} + +// Mock handleMessage that updates the state file to mark the loop done on first tick. +function agentFinishes(stateFile: string, loopId: string): HandleMessageImpl { + return async () => { + writeFileSync(stateFile, `---\nloop_id: ${loopId}\nstatus: done\niteration: 1\n---\n\nDone.\n`, "utf-8"); + return { + text: "done", + sessionId: "s", + cost: { totalUsd: 0.01, inputTokens: 1, outputTokens: 1, modelUsage: {} }, + durationMs: 1, + }; + }; +} + +describe("LoopRunner", () => { + let db: Database; + let dataDir: string; + + beforeEach(() => { + db = new Database(":memory:"); + db.run("PRAGMA journal_mode = WAL"); + db.run("PRAGMA foreign_keys = ON"); + runMigrations(db); + dataDir = mkdtempSync(join(tmpdir(), "phantom-loop-test-")); + }); + + afterEach(() => { + db.close(); + rmSync(dataDir, { recursive: true, force: true }); + }); + + test("start creates a loop row and initializes state file", () => { + const runtime = createMockRuntime(); + const runner = new LoopRunner({ db, runtime: runtime, dataDir, autoSchedule: false }); + + const loop = runner.start({ goal: "Write a haiku" }); + + expect(loop.id).toBeTruthy(); + expect(loop.status).toBe("running"); + expect(loop.iterationCount).toBe(0); + expect(loop.maxIterations).toBe(20); + expect(loop.maxCostUsd).toBe(5); + expect(loop.workspaceDir).toContain(dataDir); + + const state = readFileSync(loop.stateFile, "utf-8"); + expect(state).toContain(`loop_id: ${loop.id}`); + expect(state).toContain("Write a haiku"); + }); + + test("start clamps max_iterations to hard ceiling", () => { + const runtime = createMockRuntime(); + const runner = new LoopRunner({ db, runtime: runtime, dataDir, autoSchedule: false }); + const loop = runner.start({ goal: "x", maxIterations: 10_000 }); + expect(loop.maxIterations).toBe(200); + }); + + test("start throws when workspace escapes the data dir", () => { + const runtime = createMockRuntime(); + const runner = new LoopRunner({ db, runtime: runtime, dataDir, autoSchedule: false }); + expect(() => runner.start({ goal: "evil", workspace: "../evil" })).toThrow(/escapes data dir/); + }); + + test("start clamps max_cost_usd to hard ceiling", () => { + const runtime = createMockRuntime(); + const runner = new LoopRunner({ db, runtime: runtime, dataDir, autoSchedule: false }); + const loop = runner.start({ goal: "x", maxCostUsd: 9999 }); + expect(loop.maxCostUsd).toBe(50); + }); + + test("triggerMessageTs round-trips through start → store → findById", () => { + const runtime = createMockRuntime(); + const runner = new LoopRunner({ db, runtime: runtime, dataDir, autoSchedule: false }); + const loop = runner.start({ + goal: "with trigger", + channelId: "C100", + conversationId: "1700000000.000100", + triggerMessageTs: "1700000000.000200", + }); + expect(loop.triggerMessageTs).toBe("1700000000.000200"); + + const reloaded = runner.getLoop(loop.id); + expect(reloaded?.triggerMessageTs).toBe("1700000000.000200"); + expect(reloaded?.channelId).toBe("C100"); + expect(reloaded?.conversationId).toBe("1700000000.000100"); + }); + + test("triggerMessageTs is null when omitted at start", () => { + const runtime = createMockRuntime(); + const runner = new LoopRunner({ db, runtime: runtime, dataDir, autoSchedule: false }); + const loop = runner.start({ goal: "no trigger" }); + expect(loop.triggerMessageTs).toBeNull(); + }); + + test("tick invokes runtime with loop channel and rotating conversation ids", async () => { + const runtime = createMockRuntime(); + const runner = new LoopRunner({ db, runtime: runtime, dataDir, autoSchedule: false }); + const loop = runner.start({ goal: "Do the thing" }); + + runtime.handleMessage.mockImplementation(agentFinishes(loop.stateFile, loop.id)); + + await runner.tick(loop.id); + + const final = runner.getLoop(loop.id); + expect(final?.status).toBe("done"); + expect(final?.iterationCount).toBe(1); + expect(runtime.handleMessage).toHaveBeenCalledTimes(1); + + // Verify channel + conversation id shape used for the SDK call + const call = runtime.handleMessage.mock.calls[0]; + expect(call[0]).toBe("loop"); + expect(call[1]).toBe(`${loop.id}:0`); + }); + + test("conversation id rotates across ticks to force fresh sessions", async () => { + const runtime = createMockRuntime(); + const runner = new LoopRunner({ db, runtime: runtime, dataDir, autoSchedule: false }); + const loop = runner.start({ goal: "Rotate ids", maxIterations: 3 }); + + await runner.tick(loop.id); + await runner.tick(loop.id); + await runner.tick(loop.id); + + const calls = runtime.handleMessage.mock.calls; + expect(calls.length).toBe(3); + expect(calls[0][1]).toBe(`${loop.id}:0`); + expect(calls[1][1]).toBe(`${loop.id}:1`); + expect(calls[2][1]).toBe(`${loop.id}:2`); + }); + + test("budget: loop is finalized as budget_exceeded when iteration cap reached", async () => { + const runtime = createMockRuntime(); + const runner = new LoopRunner({ db, runtime: runtime, dataDir, autoSchedule: false }); + const loop = runner.start({ goal: "Never finishes", maxIterations: 3 }); + + await runner.tick(loop.id); + await runner.tick(loop.id); + await runner.tick(loop.id); + // Fourth tick should detect the budget and finalize without invoking the runtime. + await runner.tick(loop.id); + + const final = runner.getLoop(loop.id); + expect(final?.status).toBe("budget_exceeded"); + expect(final?.iterationCount).toBe(3); + expect(runtime.handleMessage).toHaveBeenCalledTimes(3); + }); + + test("budget: loop is finalized as budget_exceeded when cost cap reached", async () => { + const runtime = createMockRuntime(async () => ({ + text: "expensive", + sessionId: "s", + cost: { totalUsd: 0.6, inputTokens: 1, outputTokens: 1, modelUsage: {} }, + durationMs: 1, + })); + const runner = new LoopRunner({ db, runtime: runtime, dataDir, autoSchedule: false }); + const loop = runner.start({ goal: "Spend money", maxIterations: 100, maxCostUsd: 1.0 }); + + // Tick 1: spends 0.60 (under 1.00) - another tick allowed + // Tick 2: spends 0.60 more, total 1.20 (over 1.00) + // Tick 3: budget check sees totalCostUsd >= maxCostUsd, finalizes + await runner.tick(loop.id); + await runner.tick(loop.id); + await runner.tick(loop.id); + + const final = runner.getLoop(loop.id); + expect(final?.status).toBe("budget_exceeded"); + expect(final?.totalCostUsd).toBeGreaterThanOrEqual(1.0); + }); + + test("tick is a no-op when loop already finished", async () => { + const runtime = createMockRuntime(); + const runner = new LoopRunner({ db, runtime: runtime, dataDir, autoSchedule: false }); + const loop = runner.start({ goal: "done already" }); + + runtime.handleMessage.mockImplementation(agentFinishes(loop.stateFile, loop.id)); + await runner.tick(loop.id); + expect(runner.getLoop(loop.id)?.status).toBe("done"); + + await runner.tick(loop.id); + expect(runtime.handleMessage).toHaveBeenCalledTimes(1); + }); + + test("requestStop + tick finalizes the loop as stopped without invoking runtime", async () => { + const runtime = createMockRuntime(); + const runner = new LoopRunner({ db, runtime: runtime, dataDir, autoSchedule: false }); + const loop = runner.start({ goal: "Keep going", maxIterations: 100 }); + + const ok = runner.requestStop(loop.id); + expect(ok).toBe(true); + + await runner.tick(loop.id); + + expect(runner.getLoop(loop.id)?.status).toBe("stopped"); + expect(runtime.handleMessage).not.toHaveBeenCalled(); + }); + + test("requestStop returns false for non-running loops", async () => { + const runtime = createMockRuntime(); + const runner = new LoopRunner({ db, runtime: runtime, dataDir, autoSchedule: false }); + const loop = runner.start({ goal: "x" }); + + runtime.handleMessage.mockImplementation(agentFinishes(loop.stateFile, loop.id)); + await runner.tick(loop.id); + + expect(runner.requestStop(loop.id)).toBe(false); + }); + + test("list(false) returns only running loops; list(true) includes finished", async () => { + const runtime = createMockRuntime(); + const runner = new LoopRunner({ db, runtime: runtime, dataDir, autoSchedule: false }); + + const a = runner.start({ goal: "A", maxIterations: 1 }); + await runner.tick(a.id); + await runner.tick(a.id); // budget check finalizes + expect(runner.getLoop(a.id)?.status).toBe("budget_exceeded"); + + const b = runner.start({ goal: "B", maxIterations: 100 }); + + expect(runner.list(false).map((l) => l.id)).toEqual([b.id]); + const all = runner + .list(true) + .map((l) => l.id) + .sort(); + expect(all).toContain(a.id); + expect(all).toContain(b.id); + }); + + test("resumeRunning returns count of loops that were still running", () => { + const runtime = createMockRuntime(); + const runner = new LoopRunner({ db, runtime: runtime, dataDir, autoSchedule: false }); + + runner.start({ goal: "Survive" }); + runner.start({ goal: "Also survive" }); + + // Fresh runner on the same db, as if the process restarted. + const runner2 = new LoopRunner({ db, runtime: runtime, dataDir, autoSchedule: false }); + expect(runner2.resumeRunning()).toBe(2); + }); + + test("after resume, the next tick re-reads state and continues from current iteration", async () => { + const runtime = createMockRuntime(); + const runner = new LoopRunner({ db, runtime: runtime, dataDir, autoSchedule: false }); + const loop = runner.start({ goal: "Resume me" }); + + // Simulate one tick before "restart" + runtime.handleMessage.mockImplementation(async () => { + // agent wrote some progress but hasn't finished + writeFileSync( + loop.stateFile, + `---\nloop_id: ${loop.id}\nstatus: in-progress\niteration: 1\n---\n\nHalfway there.\n`, + "utf-8", + ); + return { + text: "progress", + sessionId: "s", + cost: { totalUsd: 0.01, inputTokens: 1, outputTokens: 1, modelUsage: {} }, + durationMs: 1, + }; + }); + await runner.tick(loop.id); + + // Restart: fresh runner, state file is source of truth. + const runner2 = new LoopRunner({ db, runtime: runtime, dataDir, autoSchedule: false }); + runtime.handleMessage.mockImplementation(agentFinishes(loop.stateFile, loop.id)); + await runner2.tick(loop.id); + + expect(runner2.getLoop(loop.id)?.status).toBe("done"); + }); + + test("success_command exit 0 terminates loop as done", async () => { + const runtime = createMockRuntime(); + const runner = new LoopRunner({ db, runtime: runtime, dataDir, autoSchedule: false }); + const loop = runner.start({ goal: "Check", maxIterations: 10, successCommand: "true" }); + + await runner.tick(loop.id); + + expect(runner.getLoop(loop.id)?.status).toBe("done"); + }); + + test("autoSchedule: true drives ticks until the state file marks done", async () => { + const runtime = createMockRuntime(); + const runner = new LoopRunner({ db, runtime: runtime, dataDir, autoSchedule: true }); + const loop = runner.start({ goal: "auto" }); + runtime.handleMessage.mockImplementation(agentFinishes(loop.stateFile, loop.id)); + + // Poll up to ~1s (50 * 20ms). Generous for shared CI VMs. + let reached = false; + for (let i = 0; i < 50; i++) { + if (runner.getLoop(loop.id)?.status === "done") { + reached = true; + break; + } + await Bun.sleep(20); + } + expect(reached).toBe(true); // loop did not reach done within 1s + expect(runtime.handleMessage).toHaveBeenCalled(); + }); + + test("success_command exit non-zero does not terminate the loop", async () => { + const runtime = createMockRuntime(); + const runner = new LoopRunner({ db, runtime: runtime, dataDir, autoSchedule: false }); + const loop = runner.start({ goal: "No success", maxIterations: 3, successCommand: "false" }); + + await runner.tick(loop.id); + expect(runner.getLoop(loop.id)?.status).toBe("running"); + }); +}); diff --git a/src/loop/__tests__/state-file.test.ts b/src/loop/__tests__/state-file.test.ts new file mode 100644 index 0000000..4a8331a --- /dev/null +++ b/src/loop/__tests__/state-file.test.ts @@ -0,0 +1,109 @@ +import { afterEach, beforeEach, describe, expect, test } from "bun:test"; +import { existsSync, mkdtempSync, readFileSync, rmSync, writeFileSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { initStateFile, parseFrontmatter, readStateFile } from "../state-file.ts"; + +describe("loop state-file", () => { + let dir: string; + + beforeEach(() => { + dir = mkdtempSync(join(tmpdir(), "phantom-loop-")); + }); + + afterEach(() => { + rmSync(dir, { recursive: true, force: true }); + }); + + test("initStateFile creates file with frontmatter when missing", () => { + const path = join(dir, "state.md"); + const contents = initStateFile(path, "loop-1", "Build a haiku"); + + expect(existsSync(path)).toBe(true); + expect(contents).toContain("loop_id: loop-1"); + expect(contents).toContain("status: in-progress"); + expect(contents).toContain("iteration: 0"); + expect(contents).toContain("Build a haiku"); + }); + + test("initStateFile is idempotent - does not overwrite existing file", () => { + const path = join(dir, "state.md"); + initStateFile(path, "loop-1", "Original goal"); + writeFileSync(path, "---\nloop_id: loop-1\nstatus: done\niteration: 5\n---\n\n# custom", "utf-8"); + + const returned = initStateFile(path, "loop-1", "Original goal"); + const onDisk = readFileSync(path, "utf-8"); + + expect(returned).toContain("status: done"); + expect(onDisk).toContain("iteration: 5"); + expect(onDisk).toContain("# custom"); + }); + + test("parseFrontmatter extracts loop_id, status, iteration", () => { + const contents = `--- +loop_id: abc-123 +status: in-progress +iteration: 3 +--- + +body here`; + const fm = parseFrontmatter(contents); + expect(fm).toEqual({ loopId: "abc-123", status: "in-progress", iteration: 3 }); + }); + + test("parseFrontmatter recognizes status: done", () => { + const fm = parseFrontmatter("---\nloop_id: x\nstatus: done\niteration: 7\n---\n"); + expect(fm?.status).toBe("done"); + }); + + test("parseFrontmatter recognizes status: blocked", () => { + const fm = parseFrontmatter("---\nloop_id: x\nstatus: blocked\niteration: 1\n---\n"); + expect(fm?.status).toBe("blocked"); + }); + + test("parseFrontmatter returns null when frontmatter missing", () => { + expect(parseFrontmatter("# just a heading\n")).toBeNull(); + }); + + test("parseFrontmatter returns null on invalid status value", () => { + const fm = parseFrontmatter("---\nloop_id: x\nstatus: bogus\niteration: 1\n---\n"); + expect(fm).toBeNull(); + }); + + test("parseFrontmatter returns null when required fields missing", () => { + expect(parseFrontmatter("---\nstatus: done\niteration: 1\n---\n")).toBeNull(); + expect(parseFrontmatter("---\nloop_id: x\niteration: 1\n---\n")).toBeNull(); + expect(parseFrontmatter("---\nloop_id: x\nstatus: done\n---\n")).toBeNull(); + }); + + test("parseFrontmatter strips inline YAML comments from values", () => { + const fm = parseFrontmatter("---\nloop_id: abc\nstatus: done # all finished\niteration: 5\n---\n"); + expect(fm).toEqual({ loopId: "abc", status: "done", iteration: 5 }); + }); + + test("parseFrontmatter strips surrounding double quotes", () => { + const fm = parseFrontmatter('---\nloop_id: "abc"\nstatus: "done"\niteration: 5\n---\n'); + expect(fm).toEqual({ loopId: "abc", status: "done", iteration: 5 }); + }); + + test("parseFrontmatter strips surrounding single quotes", () => { + const fm = parseFrontmatter("---\nloop_id: 'abc'\nstatus: 'in-progress'\niteration: 2\n---\n"); + expect(fm).toEqual({ loopId: "abc", status: "in-progress", iteration: 2 }); + }); + + test("parseFrontmatter handles quoted value followed by inline comment", () => { + const fm = parseFrontmatter('---\nloop_id: x\nstatus: "done" # yay\niteration: 3\n---\n'); + expect(fm?.status).toBe("done"); + }); + + test("parseFrontmatter tolerates unexpected extra fields", () => { + const fm = parseFrontmatter("---\nloop_id: x\nstatus: done\niteration: 1\nextra_field: whatever\n---\n"); + expect(fm?.status).toBe("done"); + }); + + test("readStateFile round-trips contents", () => { + const path = join(dir, "state.md"); + writeFileSync(path, "hello\nworld", "utf-8"); + expect(readStateFile(path)).toBe("hello\nworld"); + }); +}); diff --git a/src/loop/__tests__/tool.test.ts b/src/loop/__tests__/tool.test.ts new file mode 100644 index 0000000..b29958b --- /dev/null +++ b/src/loop/__tests__/tool.test.ts @@ -0,0 +1,179 @@ +import { Database } from "bun:sqlite"; +import { afterEach, beforeEach, describe, expect, mock, test } from "bun:test"; +import { mkdtempSync, rmSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { slackContextStore } from "../../agent/slack-context.ts"; +import { runMigrations } from "../../db/migrate.ts"; +import { LoopRunner } from "../runner.ts"; +import { LOOP_TOOL_NAME, createLoopToolServer } from "../tool.ts"; + +function mockRuntime() { + return { + handleMessage: mock(async () => ({ + text: "ok", + sessionId: "s", + cost: { totalUsd: 0.01, inputTokens: 1, outputTokens: 1, modelUsage: {} }, + durationMs: 1, + })), + }; +} + +// NOTE: _registeredTools is internal to @anthropic-ai/claude-agent-sdk. +// Verified against 0.2.77. If this breaks on SDK upgrade, check the SDK's +// tool() + createSdkMcpServer() implementation. Using LOOP_TOOL_NAME (not +// Object.keys()[0]) so a rename fails loudly here instead of silently. +type LoopHandler = ( + input: Record, +) => Promise<{ content: Array<{ type: string; text: string }>; isError?: true }>; + +function getLoopToolHandler(server: ReturnType): LoopHandler { + const instance = (server as unknown as { instance: { _registeredTools: Record } }) + .instance; + const registered = instance._registeredTools[LOOP_TOOL_NAME]; + if (!registered) throw new Error(`SDK internals changed: tool "${LOOP_TOOL_NAME}" not found in _registeredTools`); + return registered.handler; +} + +function parseResult(result: { content: Array<{ type: string; text: string }>; isError?: true }) { + return JSON.parse(result.content[0].text); +} + +describe("phantom_loop MCP tool", () => { + let db: Database; + let dataDir: string; + let runner: LoopRunner; + let handler: LoopHandler; + + beforeEach(() => { + db = new Database(":memory:"); + db.run("PRAGMA journal_mode = WAL"); + db.run("PRAGMA foreign_keys = ON"); + runMigrations(db); + dataDir = mkdtempSync(join(tmpdir(), "phantom-loop-tool-")); + runner = new LoopRunner({ db, runtime: mockRuntime(), dataDir, autoSchedule: false }); + handler = getLoopToolHandler(createLoopToolServer(runner)); + }); + + afterEach(() => { + db.close(); + rmSync(dataDir, { recursive: true, force: true }); + }); + + test("start action creates a loop and returns its id", async () => { + const result = await handler({ action: "start", goal: "Do the thing" }); + expect(result.isError).toBeUndefined(); + const body = parseResult(result); + expect(body.started).toBe(true); + expect(body.loop.id).toBeTruthy(); + expect(body.loop.status).toBe("running"); + expect(body.loop.max_iterations).toBe(20); + }); + + test("start without goal returns error", async () => { + const result = await handler({ action: "start" }); + expect(result.isError).toBe(true); + expect(parseResult(result).error).toContain("goal"); + }); + + test("status action returns loop data and frontmatter", async () => { + const startResult = await handler({ action: "start", goal: "Inspect me" }); + const { loop } = parseResult(startResult); + + const statusResult = await handler({ action: "status", loop_id: loop.id }); + const body = parseResult(statusResult); + expect(body.loop.id).toBe(loop.id); + expect(body.frontmatter).toEqual({ loopId: loop.id, status: "in-progress", iteration: 0 }); + expect(body.state_excerpt).toContain("Inspect me"); + }); + + test("status action with unknown id returns error", async () => { + const result = await handler({ action: "status", loop_id: "nope" }); + expect(result.isError).toBe(true); + }); + + test("stop action sets interrupt flag on a running loop", async () => { + const startResult = await handler({ action: "start", goal: "Will be stopped" }); + const { loop } = parseResult(startResult); + + const stopResult = await handler({ action: "stop", loop_id: loop.id }); + const body = parseResult(stopResult); + expect(body.stop_requested).toBe(true); + }); + + test("list action returns active loops by default", async () => { + await handler({ action: "start", goal: "A" }); + await handler({ action: "start", goal: "B" }); + + const result = await handler({ action: "list" }); + const body = parseResult(result); + expect(body.count).toBeGreaterThanOrEqual(2); + }); + + describe("slackContextStore fallback", () => { + test("start fills channel/thread/trigger from context when args omitted", async () => { + const result = await slackContextStore.run( + { + slackChannelId: "C42", + slackThreadTs: "1700000000.000100", + slackMessageTs: "1700000000.000200", + }, + () => handler({ action: "start", goal: "from context" }), + ); + const { loop } = parseResult(result); + // triggerMessageTs is intentionally not exposed in serializeLoop, + // so read back through the runner directly. + const stored = runner.getLoop(loop.id); + expect(stored?.channelId).toBe("C42"); + expect(stored?.conversationId).toBe("1700000000.000100"); + expect(stored?.triggerMessageTs).toBe("1700000000.000200"); + }); + + test("explicit args override context", async () => { + const result = await slackContextStore.run( + { + slackChannelId: "C_CTX", + slackThreadTs: "1700000000.000100", + slackMessageTs: "1700000000.000200", + }, + () => + handler({ + action: "start", + goal: "explicit wins", + channel_id: "C_EXPLICIT", + conversation_id: "1800000000.000100", + trigger_message_ts: "1800000000.000200", + }), + ); + const { loop } = parseResult(result); + const stored = runner.getLoop(loop.id); + expect(stored?.channelId).toBe("C_EXPLICIT"); + expect(stored?.conversationId).toBe("1800000000.000100"); + expect(stored?.triggerMessageTs).toBe("1800000000.000200"); + }); + + test("missing context leaves fields null without crashing", async () => { + // No slackContextStore.run wrapper here. + const result = await handler({ action: "start", goal: "no context" }); + const { loop } = parseResult(result); + const stored = runner.getLoop(loop.id); + expect(stored?.channelId).toBeNull(); + expect(stored?.conversationId).toBeNull(); + expect(stored?.triggerMessageTs).toBeNull(); + }); + + test("serializeLoop does not expose triggerMessageTs to the agent", async () => { + const result = await slackContextStore.run( + { + slackChannelId: "C42", + slackThreadTs: "1700000000.000100", + slackMessageTs: "1700000000.000200", + }, + () => handler({ action: "start", goal: "hidden field" }), + ); + const body = parseResult(result); + expect(body.loop.trigger_message_ts).toBeUndefined(); + expect(body.loop.triggerMessageTs).toBeUndefined(); + }); + }); +}); diff --git a/src/loop/critique.ts b/src/loop/critique.ts new file mode 100644 index 0000000..a777010 --- /dev/null +++ b/src/loop/critique.ts @@ -0,0 +1,65 @@ +import { z } from "zod/v4"; +import { callJudge } from "../evolution/judges/client.ts"; +import { JUDGE_MODEL_SONNET, type JudgeCostEntry } from "../evolution/judges/types.ts"; +import type { LoopTranscript } from "./post-loop.ts"; + +export type CritiqueResult = { + assessment: string; + cost: JudgeCostEntry; +}; + +const CritiqueSchema = z.object({ assessment: z.string() }); + +/** + * Mid-loop critique: Sonnet 4.6 reviews the loop's progress every N ticks + * to detect drift, stuck patterns, or wasted budget before the loop runs out. + * Same cross-model pattern as interactive sessions (Sonnet judging Opus). + */ +export async function runCritiqueJudge( + goal: string, + stateFileContents: string, + transcript: LoopTranscript, + iteration: number, + maxIterations: number, +): Promise { + const system = `You are a reviewer assessing an autonomous agent loop mid-flight. +The agent (Opus 4.6) is running inside a tight iteration loop toward a goal. +Your job is to assess whether the loop is making meaningful progress or if +it is stuck, drifting, or wasting budget. Be direct and actionable.`; + + const summaryBlock = transcript.summaries.length > 0 ? `\nTick summaries:\n${transcript.summaries.join("\n")}` : ""; + + const user = `Goal: ${goal} + +Progress (${iteration}/${maxIterations} ticks used): +${summaryBlock} + +Current state file: +${stateFileContents.slice(0, 3000)} + +Last response (truncated): +${transcript.lastResponse.slice(0, 1000)} + +Is this loop making meaningful progress toward the goal? Is the agent stuck +in a pattern? Should it change approach? Give a brief (2-3 sentence) assessment +and one concrete suggestion if applicable.`; + + const result = await callJudge({ + model: JUDGE_MODEL_SONNET, + systemPrompt: system, + userMessage: user, + schema: CritiqueSchema, + schemaName: "LoopCritique", + maxTokens: 500, + }); + + return { + assessment: result.data.assessment, + cost: { + calls: 1, + totalUsd: result.costUsd, + totalInputTokens: result.inputTokens, + totalOutputTokens: result.outputTokens, + }, + }; +} diff --git a/src/loop/notifications.ts b/src/loop/notifications.ts new file mode 100644 index 0000000..ced2119 --- /dev/null +++ b/src/loop/notifications.ts @@ -0,0 +1,203 @@ +import type { SlackBlock } from "../channels/feedback.ts"; +import type { SlackChannel } from "../channels/slack.ts"; +import { readStateFile } from "./state-file.ts"; +import type { LoopStore } from "./store.ts"; +import type { Loop, LoopStatus } from "./types.ts"; + +const PROGRESS_BAR_CELLS = 10; + +// Single source of truth for status → emoji. Bare names (no colons) because +// the Slack reactions.add/remove APIs take bare names; the status-message +// text wraps them with colons via `terminalEmoji()`. Keeping both formats +// derived from one map eliminates the silent drift risk when a new terminal +// status is added. +const TERMINAL_REACTION: Partial> = { + done: "white_check_mark", + stopped: "octagonal_sign", + budget_exceeded: "warning", + failed: "x", +}; + +const REACTION_START = "hourglass_flowing_sand"; +const REACTION_IN_FLIGHT = "arrows_counterclockwise"; + +const IN_FLIGHT_REACTIONS = [REACTION_START, REACTION_IN_FLIGHT] as const; + +function terminalReaction(status: LoopStatus): string | null { + return TERMINAL_REACTION[status] ?? null; +} + +export function buildProgressBar(done: number, total: number): string { + if (total <= 0) return `[${"░".repeat(PROGRESS_BAR_CELLS)}]`; + const clamped = Math.max(0, Math.min(done, total)); + const filled = Math.round((clamped / total) * PROGRESS_BAR_CELLS); + const empty = PROGRESS_BAR_CELLS - filled; + return `[${"█".repeat(filled)}${"░".repeat(empty)}]`; +} + +export function terminalEmoji(status: LoopStatus): string { + const reaction = TERMINAL_REACTION[status]; + if (reaction) return `:${reaction}:`; + // Non-terminal statuses still need a glyph for the running-state text. + return status === "running" ? ":repeat:" : ":grey_question:"; +} + +function truncate(text: string, max: number): string { + return text.length <= max ? text : `${text.slice(0, max - 1)}…`; +} + +/** + * Status message blocks: one section for the current text plus a Stop button. + * These must be re-sent on every updateMessage call, because Slack's chat.update + * replaces the message wholesale and drops any blocks the caller does not + * include. Passing this on tick updates is how the Stop button survives across + * progress edits. The final notice deliberately omits blocks so the button + * disappears on completion. + */ +function buildStatusBlocks(text: string, loopId: string): SlackBlock[] { + return [ + { type: "section", text: { type: "mrkdwn", text } }, + { + type: "actions", + block_id: `phantom_loop_actions_${loopId}`, + elements: [ + { + type: "button", + text: { type: "plain_text", text: "Stop loop", emoji: true }, + action_id: `phantom:loop_stop:${loopId}`, + style: "danger", + value: loopId, + }, + ], + }, + ]; +} + +const FRONTMATTER_RE = /^---\s*\n[\s\S]*?\n---\s*\n?/; +const MAX_SUMMARY_CHARS = 3500; + +/** + * Extract the human-readable body of the state file for the end-of-loop + * summary. Drops the YAML frontmatter (runner plumbing) and truncates at a + * safe limit so a runaway state file does not blow out a Slack message. + * Returns null if the file is unreadable or effectively empty, which signals + * the caller to skip the summary cleanly. + */ +function extractStateSummary(stateFilePath: string): string | null { + try { + const contents = readStateFile(stateFilePath); + const body = contents.replace(FRONTMATTER_RE, "").trim(); + if (!body) return null; + if (body.length <= MAX_SUMMARY_CHARS) return body; + return `${body.slice(0, MAX_SUMMARY_CHARS)}\n\n…(truncated)`; + } catch { + return null; + } +} + +/** + * Slack feedback for the loop lifecycle: start notice, per-tick progress + * edit, final notice, and a reaction ladder on the operator's original + * message (hourglass → cycle → terminal emoji). + * + * Extracted from LoopRunner because runner.ts was already at the 300-line + * CONTRIBUTING.md cap and the progress-bar + reaction-ladder additions push + * it over. All Slack-API failures are swallowed upstream in SlackChannel; + * if a call-site here still throws, we catch and warn so loop execution is + * never derailed by chat plumbing. + * + * Why not reuse createStatusReactionController: that controller debounces + * per-tool-call runtime events via a promise-chain serializer. The loop + * ladder has exactly three sequential lifecycle states (start, first tick, + * terminal), no debouncing is required, and wiring it into the controller + * would entangle two unrelated lifecycles. Plain best-effort + * addReaction/removeReaction is the right choice here. + */ +export class LoopNotifier { + constructor( + private slackChannel: SlackChannel | null, + private store: LoopStore, + ) {} + + async postStartNotice(loop: Loop): Promise { + if (!this.slackChannel || !loop.channelId) return; + const text = `:repeat: Starting loop \`${loop.id.slice(0, 8)}\` (max ${loop.maxIterations} iter, $${loop.maxCostUsd.toFixed(2)} budget)\n> ${truncate(loop.goal, 200)}`; + // When conversationId (a Slack thread ts) is set, thread the updates into it; + // otherwise post a top-level message in the channel. + const ts = await this.slackChannel.postToChannel(loop.channelId, text, loop.conversationId ?? undefined); + if (!ts) return; + this.store.setStatusMessageTs(loop.id, ts); + + // Attach the stop button so the operator can interrupt without using MCP. + // Routed via setLoopStopHandler in slack-actions.ts. + await this.slackChannel.updateMessage(loop.channelId, ts, text, buildStatusBlocks(text, loop.id)); + + if (loop.triggerMessageTs) { + await this.slackChannel.addReaction(loop.channelId, loop.triggerMessageTs, REACTION_START); + } + } + + async postTickUpdate(id: string, iteration: number, status: string): Promise { + const loop = this.store.findById(id); + if (!loop || !this.slackChannel || !loop.channelId || !loop.statusMessageTs) return; + + const bar = buildProgressBar(iteration, loop.maxIterations); + const shortId = loop.id.slice(0, 8); + const text = `:repeat: Loop \`${shortId}\` · ${bar} ${iteration}/${loop.maxIterations} · $${loop.totalCostUsd.toFixed(2)}/$${loop.maxCostUsd.toFixed(2)} · ${status}`; + // Re-send the blocks on every edit, otherwise Slack strips the Stop + // button (chat.update replaces the entire message, including blocks). + await this.slackChannel.updateMessage(loop.channelId, loop.statusMessageTs, text, buildStatusBlocks(text, loop.id)); + + // On the first tick, swap hourglass → cycling arrows. Restart-safe by + // construction: iteration is sourced from the call site, so on resume + // the swap only fires if the loop is actually transitioning through + // iteration 1, no in-memory flag to repopulate. + if (iteration === 1 && loop.triggerMessageTs) { + await this.slackChannel.removeReaction(loop.channelId, loop.triggerMessageTs, REACTION_START); + await this.slackChannel.addReaction(loop.channelId, loop.triggerMessageTs, REACTION_IN_FLIGHT); + } + } + + async postFinalNotice(loop: Loop, status: LoopStatus): Promise { + if (!this.slackChannel || !loop.channelId) return; + const emoji = terminalEmoji(status); + const bar = buildProgressBar(loop.iterationCount, loop.maxIterations); + const shortId = loop.id.slice(0, 8); + const text = `${emoji} Loop \`${shortId}\` · ${bar} ${loop.iterationCount}/${loop.maxIterations} · $${loop.totalCostUsd.toFixed(4)} · ${status}`; + // Intentionally no blocks on the terminal edit: this strips the Stop + // button since the loop is no longer interruptible. + if (loop.statusMessageTs) { + await this.slackChannel.updateMessage(loop.channelId, loop.statusMessageTs, text); + } else { + await this.slackChannel.postToChannel(loop.channelId, text); + } + + // Post the state.md body as a threaded reply so the operator can see + // what the agent actually did across the run. The state file is the + // agent's working memory, curated every tick, so it already contains + // a progress log the operator wants to read. This costs no extra + // agent calls; we simply surface content the agent already wrote. + const summary = extractStateSummary(loop.stateFile); + if (summary) { + const summaryThreadTs = loop.conversationId ?? loop.statusMessageTs ?? undefined; + await this.slackChannel.postToChannel( + loop.channelId, + `:notebook: *Loop \`${loop.id.slice(0, 8)}\` final state:*\n\`\`\`\n${summary}\n\`\`\``, + summaryThreadTs, + ); + } + + if (loop.triggerMessageTs) { + // Best-effort: clear whichever in-flight reaction is currently on + // the message (removeReaction is idempotent on missing), then stamp + // the terminal one. + for (const reaction of IN_FLIGHT_REACTIONS) { + await this.slackChannel.removeReaction(loop.channelId, loop.triggerMessageTs, reaction); + } + const terminal = terminalReaction(status); + if (terminal) { + await this.slackChannel.addReaction(loop.channelId, loop.triggerMessageTs, terminal); + } + } + } +} diff --git a/src/loop/post-loop.ts b/src/loop/post-loop.ts new file mode 100644 index 0000000..093d831 --- /dev/null +++ b/src/loop/post-loop.ts @@ -0,0 +1,146 @@ +import type { EvolutionEngine } from "../evolution/engine.ts"; +import type { SessionData } from "../memory/consolidation.ts"; +import type { MemorySystem } from "../memory/system.ts"; +import type { Loop, LoopStatus } from "./types.ts"; + +export type LoopTranscript = { + firstPrompt: string; + firstResponse: string; + summaries: string[]; + lastPrompt: string; + lastResponse: string; +}; + +export type PostLoopDeps = { + evolution?: EvolutionEngine; + memory?: MemorySystem; + /** Callback to update runtime's evolved config after evolution applies changes. */ + onEvolvedConfigUpdate?: (config: ReturnType) => void; +}; + +function loopStatusToOutcome(status: LoopStatus): SessionData["outcome"] { + switch (status) { + case "done": + return "success"; + case "stopped": + return "abandoned"; + default: + return "failure"; + } +} + +const MAX_ROLLING_SUMMARIES = 10; + +export function recordTranscript( + transcripts: Map, + loopId: string, + iteration: number, + prompt: string, + response: string, + stateStatus: string | undefined, +): void { + let transcript = transcripts.get(loopId); + if (!transcript) { + transcript = { + firstPrompt: prompt, + firstResponse: response, + summaries: [], + lastPrompt: prompt, + lastResponse: response, + }; + transcripts.set(loopId, transcript); + } else { + transcript.lastPrompt = prompt; + transcript.lastResponse = response; + } + const summary = `Tick ${iteration}: ${stateStatus ?? "in-progress"}`; + transcript.summaries.push(summary); + if (transcript.summaries.length > MAX_ROLLING_SUMMARIES) transcript.summaries.shift(); +} + +export function clamp(value: number, min: number, max: number): number { + return Math.min(Math.max(value, min), max); +} + +export function synthesizeSessionData(loop: Loop, status: LoopStatus, transcript: LoopTranscript): SessionData { + const outcome = loopStatusToOutcome(status); + const header = `[Loop: ${loop.iterationCount} ticks, goal: ${loop.goal.slice(0, 200)}, outcome: ${outcome}]`; + + const userMessages = [ + `${header} Tick 1: ${transcript.firstPrompt.slice(0, 500)}`, + ...transcript.summaries, + `Final tick: ${transcript.lastPrompt.slice(0, 500)}`, + ]; + + const assistantMessages = [transcript.firstResponse.slice(0, 1000), transcript.lastResponse.slice(0, 1000)]; + + // userId sentinel: channel-originated loops use channel ID, headless use "autonomous" + const userId = loop.channelId ? `channel:${loop.channelId}` : "autonomous"; + + return { + sessionId: loop.id, + sessionKey: loop.channelId && loop.conversationId ? `${loop.channelId}:${loop.conversationId}` : `loop:${loop.id}`, + userId, + userMessages, + assistantMessages, + toolsUsed: [], + filesTracked: [], + startedAt: loop.startedAt, + endedAt: loop.finishedAt ?? new Date().toISOString(), + costUsd: loop.totalCostUsd, + outcome, + }; +} + +/** + * Run evolution and memory consolidation after a loop finishes. + * Fire-and-forget from the runner's perspective - errors are logged, + * never propagated to affect loop status. + */ +export async function runPostLoopPipeline(deps: PostLoopDeps, sessionData: SessionData): Promise { + const { evolution, memory, onEvolvedConfigUpdate } = deps; + const { consolidateSessionWithLLM, consolidateSession, sessionDataToSummary } = await import( + "../memory/consolidation.ts" + ); + + // Evolution pipeline - runs independently of memory state + if (evolution) { + const summary = sessionDataToSummary(sessionData); + try { + const result = await evolution.afterSession(summary); + if (result.changes_applied.length > 0 && onEvolvedConfigUpdate) { + onEvolvedConfigUpdate(evolution.getConfig()); + } + } catch (err: unknown) { + const msg = err instanceof Error ? err.message : String(err); + console.warn(`[loop] Post-loop evolution failed: ${msg}`); + } + } + + // Memory consolidation - runs independently of evolution state + if (!memory?.isReady()) return; + try { + const useLLM = evolution?.usesLLMJudges() && evolution?.isWithinCostCap(); + if (useLLM && evolution) { + const evolvedConfig = evolution.getConfig(); + const existingFacts = `${evolvedConfig.userProfile}\n${evolvedConfig.domainKnowledge}`; + const { result, judgeCost } = await consolidateSessionWithLLM(memory, sessionData, existingFacts); + if (judgeCost) evolution.trackExternalJudgeCost(judgeCost); + if (result.episodesCreated > 0 || result.factsExtracted > 0) { + console.log( + `[loop] Consolidated (LLM): ${result.episodesCreated} episodes, ${result.factsExtracted} facts (${result.durationMs}ms)`, + ); + } + } else { + const result = await consolidateSession(memory, sessionData); + if (result.episodesCreated > 0 || result.factsExtracted > 0) { + console.log( + `[loop] Consolidated: ${result.episodesCreated} episodes, ${result.factsExtracted} facts (${result.durationMs}ms)`, + ); + } + } + } catch (err: unknown) { + const msg = err instanceof Error ? err.message : String(err); + console.warn(`[loop] Post-loop memory consolidation failed: ${msg}`); + } +} diff --git a/src/loop/prompt.ts b/src/loop/prompt.ts new file mode 100644 index 0000000..7862af6 --- /dev/null +++ b/src/loop/prompt.ts @@ -0,0 +1,95 @@ +import type { Loop } from "./types.ts"; + +export type TickPromptOptions = { + memoryContext?: string; + critique?: string; +}; + +/** + * Per-tick prompt. Each tick is a fresh SDK session with no prior context + * (rotating conversation ids in the runner guarantee this), so the prompt + * must carry everything the agent needs: the goal, the state file contract, + * the current state file contents, and the workspace path. + */ +export function buildTickPrompt(loop: Loop, stateFileContents: string, options?: TickPromptOptions): string { + const memorySections: string[] = []; + + if (options?.memoryContext) { + memorySections.push(`RECALLED MEMORIES (from previous sessions)\n\n${options.memoryContext}`); + } + + if (options?.critique) { + memorySections.push(`REVIEWER FEEDBACK (from your last checkpoint)\n\n${options.critique}`); + } + + const injected = memorySections.length > 0 ? `\n\n${memorySections.join("\n\n")}\n` : ""; + + return `You are running inside a "ralph loop" - a tight iteration primitive where a +fresh agent session is invoked once per tick. You have no memory from previous +ticks. All shared memory lives in the state file at: + + ${loop.stateFile} + +The workspace for this loop is: + + ${loop.workspaceDir} + +Your job this tick is to make concrete forward progress toward the goal, then +update the state file so the next tick can pick up where you left off. + +THE STATE FILE CONTRACT (strict) + +The state file has YAML frontmatter that the runner reads after each tick. +You MUST keep this frontmatter valid and update the fields as described: + + --- + loop_id: + status: in-progress | done | blocked + iteration: + --- + +- Set status to "done" ONLY when the goal is fully achieved (or when a + configured success check will confirm it). The runner stops the loop the + moment it sees status: done. +- Set status to "blocked" if you cannot make progress without external input. + The loop continues, but your message to the operator belongs in the Notes + section. +- Otherwise leave status as "in-progress". + +Below the frontmatter, keep the sections: Goal, Progress, Next Action, Notes. +Be concise. Progress is a bullet list of what is actually done. Next Action +is one short paragraph telling the next tick exactly what to do first. + +THE GOAL + +${loop.goal}${injected} + +CURRENT STATE FILE CONTENTS + +${stateFileContents} + +BUDGETS (enforced by the runner, informational for you) + +- Max iterations: ${loop.maxIterations} +- Max total cost: $${loop.maxCostUsd.toFixed(2)} +- Iterations used so far: ${loop.iterationCount} +- Cost used so far: $${loop.totalCostUsd.toFixed(4)} + +INSTRUCTIONS FOR THIS TICK + +1. Read the current state file (above) carefully. Understand what the previous + tick accomplished and what it asked you to do next. +2. Do the next action. Use whatever tools you need (Read, Write, Edit, Bash, + etc.). Favor small verifiable steps over large speculative ones. +3. Write the updated state file using the Write tool at the exact path + "${loop.stateFile}". Preserve the frontmatter format. Increment iteration. + Update Progress with what you just did. Update Next Action with what the + next tick should do. If you are fully done, set status to "done". +4. Briefly report in your final assistant message what you did this tick. The + runner does not read that message for control flow - only the state file + frontmatter decides termination - but it helps the operator watching logs. + +Do not re-open the loop concept with the user. Do not ask clarifying questions. +If you are blocked, write that into the state file Notes and set status to +blocked. The operator is watching asynchronously.`; +} diff --git a/src/loop/runner.ts b/src/loop/runner.ts new file mode 100644 index 0000000..0f40bb7 --- /dev/null +++ b/src/loop/runner.ts @@ -0,0 +1,292 @@ +import type { Database } from "bun:sqlite"; +import { randomUUID } from "node:crypto"; +import { join, relative, resolve } from "node:path"; +import type { AgentRuntime } from "../agent/runtime.ts"; +import type { SlackChannel } from "../channels/slack.ts"; +import { buildSafeEnv } from "../mcp/dynamic-handlers.ts"; +import type { MemoryContextBuilder } from "../memory/context-builder.ts"; +import { runCritiqueJudge } from "./critique.ts"; +import { LoopNotifier } from "./notifications.ts"; +import { + type LoopTranscript, + type PostLoopDeps, + clamp, + recordTranscript, + runPostLoopPipeline, + synthesizeSessionData, +} from "./post-loop.ts"; +import { buildTickPrompt } from "./prompt.ts"; +import { initStateFile, parseFrontmatter, readStateFile } from "./state-file.ts"; +import { LoopStore } from "./store.ts"; +import { + LOOP_DEFAULT_MAX_COST_USD, + LOOP_DEFAULT_MAX_ITERATIONS, + LOOP_MAX_COST_CEILING_USD, + LOOP_MAX_ITERATIONS_CEILING, + type Loop, + type LoopStartInput, + type LoopStatus, +} from "./types.ts"; + +/** Narrowed runtime interface for testability. */ +type LoopRuntime = Pick; + +type RunnerDeps = { + db: Database; + runtime: LoopRuntime; + slackChannel?: SlackChannel; + dataDir?: string; + memoryContextBuilder?: MemoryContextBuilder; + postLoopDeps?: PostLoopDeps; + /** Tests set to false to drive ticks deterministically. */ + autoSchedule?: boolean; +}; + +const SUCCESS_COMMAND_TIMEOUT_MS = 5 * 60 * 1000; + +/** start -> tick (N times) -> finalize. State file is the agent's memory across ticks. */ +export class LoopRunner { + private store: LoopStore; + private runtime: LoopRuntime; + private slackChannel: SlackChannel | undefined; + private dataDir: string; + private autoSchedule: boolean; + private inFlight = new Set(); + private notifier: LoopNotifier; + private memoryContextBuilder: MemoryContextBuilder | undefined; + private postLoopDeps: PostLoopDeps | undefined; + private memoryCache = new Map(); + private transcripts = new Map(); + private pendingCritique = new Map(); + + constructor(deps: RunnerDeps) { + this.store = new LoopStore(deps.db); + this.runtime = deps.runtime; + this.slackChannel = deps.slackChannel; + this.dataDir = deps.dataDir ?? resolve(process.cwd(), "data"); + this.autoSchedule = deps.autoSchedule ?? true; + this.notifier = new LoopNotifier(this.slackChannel ?? null, this.store); + this.memoryContextBuilder = deps.memoryContextBuilder; + this.postLoopDeps = deps.postLoopDeps; + } + + setSlackChannel(channel: SlackChannel): void { + this.slackChannel = channel; + this.notifier = new LoopNotifier(channel, this.store); + } + + private assertWorkspaceInsideDataDir(workspace: string): string { + const base = resolve(this.dataDir); + const target = resolve(base, workspace); + const rel = relative(base, target); + if (rel.startsWith("..") || rel.includes("..")) { + throw new Error(`workspace path escapes data dir: ${workspace}`); + } + return target; + } + + start(input: LoopStartInput): Loop { + const id = randomUUID(); + const workspaceDir = input.workspace + ? this.assertWorkspaceInsideDataDir(input.workspace) + : join(this.dataDir, "loops", id); + const stateFile = join(workspaceDir, "state.md"); + + const maxIterations = clamp(input.maxIterations ?? LOOP_DEFAULT_MAX_ITERATIONS, 1, LOOP_MAX_ITERATIONS_CEILING); + const maxCostUsd = clamp(input.maxCostUsd ?? LOOP_DEFAULT_MAX_COST_USD, 0.01, LOOP_MAX_COST_CEILING_USD); + + initStateFile(stateFile, id, input.goal); + + const loop = this.store.insert({ + id, + goal: input.goal, + workspaceDir, + stateFile, + successCommand: input.successCommand ?? null, + maxIterations, + maxCostUsd, + checkpointInterval: input.checkpointInterval ?? null, + channelId: input.channelId ?? null, + conversationId: input.conversationId ?? null, + triggerMessageTs: input.triggerMessageTs ?? null, + }); + + this.notifier.postStartNotice(loop).catch((e: unknown) => { + console.warn(`[loop] Failed to post start notice for ${id}: ${e instanceof Error ? e.message : e}`); + }); + + // Cache memory context once for the entire loop (goal is constant) + this.cacheMemoryContext(id, input.goal); + + this.scheduleTick(id); + return loop; + } + + getLoop(id: string): Loop | null { + return this.store.findById(id); + } + + list(includeFinished: boolean): Loop[] { + return this.store.listAll(includeFinished); + } + + requestStop(id: string): boolean { + return this.store.requestStop(id); + } + + resumeRunning(): number { + const running = this.store.listByStatus("running"); + for (const loop of running) { + console.log(`[loop] Resuming ${loop.id} (iteration ${loop.iterationCount})`); + this.cacheMemoryContext(loop.id, loop.goal); + this.scheduleTick(loop.id); + } + return running.length; + } + + private scheduleTick(id: string): void { + if (!this.autoSchedule) return; + setImmediate(() => { + this.tick(id).catch((e: unknown) => { + const msg = e instanceof Error ? e.message : String(e); + console.error(`[loop] Tick ${id} threw: ${msg}`); + this.finalize(id, "failed", msg); + }); + }); + } + + async tick(id: string): Promise { + if (this.inFlight.has(id)) return; + this.inFlight.add(id); + try { + const loop = this.store.findById(id); + if (!loop || loop.status !== "running") return; + + if (loop.interruptRequested) { + this.finalize(id, "stopped", null); + return; + } + + if (loop.iterationCount >= loop.maxIterations || loop.totalCostUsd >= loop.maxCostUsd) { + this.finalize(id, "budget_exceeded", null); + return; + } + + const stateFileContents = readStateFile(loop.stateFile); + const memoryContext = this.memoryCache.get(id); + const critique = this.pendingCritique.get(id); + if (critique) this.pendingCritique.delete(id); + const prompt = buildTickPrompt(loop, stateFileContents, { + memoryContext, + critique, + }); + + const conversationId = `${loop.id}:${loop.iterationCount}`; + const response = await this.runtime.handleMessage("loop", conversationId, prompt); + + const addedCost = response.cost.totalUsd; + const nextIteration = loop.iterationCount + 1; + this.store.recordTick(id, nextIteration, addedCost); + + // Re-read state file to learn what the agent wants to do next. + const updatedContents = readStateFile(loop.stateFile); + const frontmatter = parseFrontmatter(updatedContents); + + // Track bounded transcript for post-loop evolution + recordTranscript(this.transcripts, id, nextIteration, prompt, response.text, frontmatter?.status); + + if (frontmatter?.status === "done") { + this.finalize(id, "done", null); + return; + } + + if (loop.successCommand) { + const passed = await this.runSuccessCommand(loop); + if (passed) { + this.finalize(id, "done", null); + return; + } + } + + // Mid-loop critique checkpoint (Sonnet reviewing Opus mid-flight). + // Runs after terminal checks so we don't waste a judge call on the final tick. + if (loop.checkpointInterval && loop.checkpointInterval > 0 && nextIteration % loop.checkpointInterval === 0) { + await this.runCritique(id, loop, updatedContents, nextIteration); + } + + // Await tick update so its Slack write finishes before the next tick + try { + await this.notifier.postTickUpdate(id, nextIteration, frontmatter?.status ?? "in-progress"); + } catch (err: unknown) { + const msg = err instanceof Error ? err.message : String(err); + console.warn(`[loop] Failed to post tick update for ${id}: ${msg}`); + } + + this.scheduleTick(id); + } finally { + this.inFlight.delete(id); + } + } + + private async runSuccessCommand(loop: Loop): Promise { + if (!loop.successCommand) return false; + const proc = Bun.spawn(["bash", "-c", loop.successCommand], { + stdout: "pipe", + stderr: "pipe", + env: buildSafeEnv({ loop_id: loop.id, workspace: loop.workspaceDir }), + }); + + const timer = setTimeout(() => proc.kill(), SUCCESS_COMMAND_TIMEOUT_MS); + try { + await proc.exited; + } finally { + clearTimeout(timer); + } + return proc.exitCode === 0; + } + + private finalize(id: string, status: LoopStatus, error: string | null): void { + const transcript = this.transcripts.get(id); + this.memoryCache.delete(id); + this.transcripts.delete(id); + this.pendingCritique.delete(id); + const loop = this.store.finalize(id, status, error); + if (!loop) return; + this.notifier.postFinalNotice(loop, status).catch((e: unknown) => { + console.warn(`[loop] Failed to post final notice for ${id}: ${e instanceof Error ? e.message : e}`); + }); + + // Post-loop evolution and consolidation (fire-and-forget, never affects loop status) + if (this.postLoopDeps && transcript) { + runPostLoopPipeline(this.postLoopDeps, synthesizeSessionData(loop, status, transcript)).catch((e: unknown) => { + console.warn(`[loop] Post-loop evolution failed for ${id}: ${e instanceof Error ? e.message : e}`); + }); + } + } + + private async runCritique(loopId: string, loop: Loop, stateContents: string, iteration: number): Promise { + const transcript = this.transcripts.get(loopId); + if (!transcript) return; + const evo = this.postLoopDeps?.evolution; + if (!evo || !evo.usesLLMJudges() || !evo.isWithinCostCap()) return; + try { + const r = await runCritiqueJudge(loop.goal, stateContents, transcript, iteration, loop.maxIterations); + this.pendingCritique.set(loopId, r.assessment); + evo.trackExternalJudgeCost(r.cost); + } catch (e: unknown) { + console.warn(`[loop] Critique failed for ${loopId}: ${e instanceof Error ? e.message : e}`); + } + } + + private cacheMemoryContext(loopId: string, goal: string): void { + if (!this.memoryContextBuilder) return; + this.memoryContextBuilder + .build(goal) + .then((ctx) => { + if (ctx) this.memoryCache.set(loopId, ctx); + }) + .catch((e: unknown) => { + console.warn(`[loop] Memory context failed for ${loopId}: ${e instanceof Error ? e.message : e}`); + }); + } +} diff --git a/src/loop/state-file.ts b/src/loop/state-file.ts new file mode 100644 index 0000000..bd3be24 --- /dev/null +++ b/src/loop/state-file.ts @@ -0,0 +1,103 @@ +import { existsSync, mkdirSync, readFileSync, writeFileSync } from "node:fs"; +import { dirname } from "node:path"; +import type { LoopFrontmatter } from "./types.ts"; + +/** + * The state file is the loop's memory across iterations. The runner only + * inspects the YAML frontmatter to decide termination. The body belongs to + * the agent and is opaque to TypeScript. + * + * Format: + * --- + * loop_id: + * status: in-progress # in-progress | done | blocked + * iteration: 0 + * --- + * + */ + +const FRONTMATTER_RE = /^---\s*\n([\s\S]*?)\n---\s*\n?/; + +export function initStateFile(path: string, loopId: string, goal: string): string { + mkdirSync(dirname(path), { recursive: true }); + if (existsSync(path)) return readFileSync(path, "utf-8"); + + const initial = `--- +loop_id: ${loopId} +status: in-progress +iteration: 0 +--- + +# Goal +${goal} + +# Progress +(nothing yet) + +# Next Action +Read this file, take one concrete step toward the goal, then update the +Progress and Next Action sections. When the goal is fully achieved, change +\`status\` in the frontmatter above to \`done\`. + +# Notes +(empty) +`; + writeFileSync(path, initial, "utf-8"); + return initial; +} + +export function readStateFile(path: string): string { + return readFileSync(path, "utf-8"); +} + +export function parseFrontmatter(contents: string): LoopFrontmatter | null { + const match = FRONTMATTER_RE.exec(contents); + if (!match) return null; + + const block = match[1]; + const fields: Record = {}; + for (const line of block.split("\n")) { + const idx = line.indexOf(":"); + if (idx === -1) continue; + const key = line.slice(0, idx).trim(); + const value = normalizeYamlValue(line.slice(idx + 1)); + if (key) fields[key] = value; + } + + const loopId = fields.loop_id; + const rawStatus = fields.status; + const iteration = Number(fields.iteration); + + if (!loopId || !rawStatus || !Number.isFinite(iteration)) return null; + if (rawStatus !== "in-progress" && rawStatus !== "done" && rawStatus !== "blocked") return null; + + return { loopId, status: rawStatus, iteration }; +} + +/** + * Normalize a raw YAML value as agents commonly write it: + * - strip trailing inline `# comment` + * - strip surrounding single or double quotes + * - trim whitespace + * + * We parse this by hand because we only care about a handful of control + * fields, and the alternative (pulling in a YAML library) is heavier than + * the parser we need. + */ +function normalizeYamlValue(raw: string): string { + let value = raw; + // Drop an unquoted inline comment. A `#` inside a quoted string would be + // kept, but since our expected values are identifiers and small integers + // the simple approach is correct for real agent output. + const hashIdx = value.indexOf("#"); + if (hashIdx !== -1) value = value.slice(0, hashIdx); + value = value.trim(); + if (value.length >= 2) { + const first = value[0]; + const last = value[value.length - 1]; + if ((first === '"' && last === '"') || (first === "'" && last === "'")) { + value = value.slice(1, -1); + } + } + return value; +} diff --git a/src/loop/store.ts b/src/loop/store.ts new file mode 100644 index 0000000..1824605 --- /dev/null +++ b/src/loop/store.ts @@ -0,0 +1,90 @@ +import type { Database } from "bun:sqlite"; +import { type Loop, type LoopRow, type LoopStatus, rowToLoop } from "./types.ts"; + +export type LoopInsertInput = { + id: string; + goal: string; + workspaceDir: string; + stateFile: string; + successCommand: string | null; + maxIterations: number; + maxCostUsd: number; + checkpointInterval?: number | null; + channelId: string | null; + conversationId: string | null; + triggerMessageTs: string | null; +}; + +/** + * SQLite persistence for the loop primitive. Kept thin: the runner owns all + * lifecycle logic, the store just reads and writes rows. + */ +export class LoopStore { + constructor(private db: Database) {} + + insert(input: LoopInsertInput): Loop { + this.db.run( + `INSERT INTO loops (id, goal, workspace_dir, state_file, success_command, max_iterations, max_cost_usd, checkpoint_interval, status, channel_id, conversation_id, trigger_message_ts) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, 'running', ?, ?, ?)`, + [ + input.id, + input.goal, + input.workspaceDir, + input.stateFile, + input.successCommand, + input.maxIterations, + input.maxCostUsd, + input.checkpointInterval ?? null, + input.channelId, + input.conversationId, + input.triggerMessageTs, + ], + ); + const created = this.findById(input.id); + if (!created) throw new Error(`Failed to insert loop: ${input.id}`); + return created; + } + + findById(id: string): Loop | null { + const row = this.db.query("SELECT * FROM loops WHERE id = ?").get(id) as LoopRow | null; + return row ? rowToLoop(row) : null; + } + + listByStatus(status: LoopStatus): Loop[] { + const rows = this.db.query("SELECT * FROM loops WHERE status = ? ORDER BY started_at ASC").all(status) as LoopRow[]; + return rows.map(rowToLoop); + } + + listAll(includeFinished: boolean): Loop[] { + const sql = includeFinished + ? "SELECT * FROM loops ORDER BY started_at DESC LIMIT 50" + : "SELECT * FROM loops WHERE status = 'running' ORDER BY started_at ASC"; + const rows = this.db.query(sql).all() as LoopRow[]; + return rows.map(rowToLoop); + } + + recordTick(id: string, iterationCount: number, addedCostUsd: number): void { + this.db.run( + `UPDATE loops SET iteration_count = ?, total_cost_usd = total_cost_usd + ?, last_tick_at = datetime('now') WHERE id = ?`, + [iterationCount, addedCostUsd, id], + ); + } + + requestStop(id: string): boolean { + const result = this.db.run("UPDATE loops SET interrupt_requested = 1 WHERE id = ? AND status = 'running'", [id]); + return result.changes > 0; + } + + setStatusMessageTs(id: string, ts: string): void { + this.db.run("UPDATE loops SET status_message_ts = ? WHERE id = ?", [ts, id]); + } + + finalize(id: string, status: LoopStatus, lastError: string | null): Loop | null { + this.db.run(`UPDATE loops SET status = ?, last_error = ?, finished_at = datetime('now') WHERE id = ?`, [ + status, + lastError, + id, + ]); + return this.findById(id); + } +} diff --git a/src/loop/tool.ts b/src/loop/tool.ts new file mode 100644 index 0000000..d404cfd --- /dev/null +++ b/src/loop/tool.ts @@ -0,0 +1,154 @@ +import { createSdkMcpServer, tool } from "@anthropic-ai/claude-agent-sdk"; +import type { McpSdkServerConfigWithInstance } from "@anthropic-ai/claude-agent-sdk"; +import { z } from "zod"; +import { slackContextStore } from "../agent/slack-context.ts"; +import type { LoopRunner } from "./runner.ts"; +import { parseFrontmatter, readStateFile } from "./state-file.ts"; +import type { Loop } from "./types.ts"; + +export const LOOP_TOOL_NAME = "phantom_loop"; + +function ok(data: Record): { content: Array<{ type: "text"; text: string }> } { + return { content: [{ type: "text" as const, text: JSON.stringify(data, null, 2) }] }; +} + +function err(message: string): { content: Array<{ type: "text"; text: string }>; isError: true } { + return { content: [{ type: "text" as const, text: JSON.stringify({ error: message }) }], isError: true }; +} + +function serializeLoop(loop: Loop): Record { + return { + id: loop.id, + goal: loop.goal, + workspace_dir: loop.workspaceDir, + state_file: loop.stateFile, + status: loop.status, + iteration_count: loop.iterationCount, + max_iterations: loop.maxIterations, + total_cost_usd: loop.totalCostUsd, + max_cost_usd: loop.maxCostUsd, + started_at: loop.startedAt, + last_tick_at: loop.lastTickAt, + finished_at: loop.finishedAt, + last_error: loop.lastError, + }; +} + +/** + * In-process MCP tools that let the agent spawn and control ralph loops. + * Exposed via createLoopToolServer, registered in src/index.ts alongside + * the scheduler tool server. + */ +export function createLoopToolServer(runner: LoopRunner): McpSdkServerConfigWithInstance { + const loopTool = tool( + LOOP_TOOL_NAME, + `Start, check, stop, or list "ralph loops" - autonomous iteration primitives. + +A ralph loop runs the agent against a goal repeatedly, each iteration in a fresh +session, with state persisted in a markdown file the agent reads and rewrites. +The loop terminates when: (1) the agent sets status: done in the state file, +(2) iteration or cost budget is exhausted, (3) an optional success_command +returns exit 0, or (4) the operator interrupts it. + +ACTIONS: +- start: Begin a new loop. Requires "goal". Returns the loop_id. Optional: + workspace (defaults to data/loops//), + max_iterations (default 20, hard ceiling 200), + max_cost_usd (default 5, hard ceiling 50), + checkpoint_interval (run a Sonnet critique every N ticks, 0 or omitted = off), + success_command (shell command run after each tick; exit 0 = goal + achieved. Runs under bash -c with a 5 minute timeout in a sanitized env + containing only PATH, HOME, LANG, TERM, loop_id, and workspace), + channel_id + conversation_id (Slack channel/thread for status updates). +- status: Inspect a loop. Returns row data + parsed state file frontmatter. +- stop: Request graceful stop of a running loop (takes effect before next tick). +- list: List active loops, or include_finished: true for recent history. + +Use start for long-horizon tasks the agent should grind on in the background: +"keep refactoring until tests pass", "iterate on this design doc", "bisect this +regression". Each iteration is fresh - all context must live in the state file.`, + { + action: z.enum(["start", "status", "stop", "list"]), + goal: z.string().optional().describe("The goal (required for start)"), + workspace: z.string().optional(), + max_iterations: z.number().int().positive().max(200).optional(), + max_cost_usd: z.number().positive().max(50).optional(), + checkpoint_interval: z + .number() + .int() + .min(0) + .max(200) + .optional() + .describe("Run a Sonnet review every N ticks. 0 or omitted = no critique."), + success_command: z.string().optional(), + channel_id: z.string().optional(), + conversation_id: z.string().optional(), + trigger_message_ts: z.string().optional(), + loop_id: z.string().optional().describe("Loop ID (required for status and stop)"), + include_finished: z.boolean().optional().describe("For list: include terminated loops"), + }, + async (input) => { + try { + switch (input.action) { + case "start": { + if (!input.goal) return err("goal is required for start"); + // Explicit tool arguments always win. When the agent omits + // channel/thread plumbing, fall back to the Slack context + // captured by the router for the current turn. + const ctx = slackContextStore.getStore(); + const loop = runner.start({ + goal: input.goal, + workspace: input.workspace, + maxIterations: input.max_iterations, + maxCostUsd: input.max_cost_usd, + checkpointInterval: input.checkpoint_interval, + successCommand: input.success_command, + channelId: input.channel_id ?? ctx?.slackChannelId, + conversationId: input.conversation_id ?? ctx?.slackThreadTs, + triggerMessageTs: input.trigger_message_ts ?? ctx?.slackMessageTs, + }); + return ok({ started: true, loop: serializeLoop(loop) }); + } + + case "status": { + if (!input.loop_id) return err("loop_id is required for status"); + const loop = runner.getLoop(input.loop_id); + if (!loop) return err(`Loop not found: ${input.loop_id}`); + let frontmatter: ReturnType = null; + let stateExcerpt = ""; + try { + const contents = readStateFile(loop.stateFile); + frontmatter = parseFrontmatter(contents); + stateExcerpt = contents.split("\n").slice(0, 40).join("\n"); + } catch { + // state file may have been removed manually; report what we have + } + return ok({ loop: serializeLoop(loop), frontmatter, state_excerpt: stateExcerpt }); + } + + case "stop": { + if (!input.loop_id) return err("loop_id is required for stop"); + const stopped = runner.requestStop(input.loop_id); + return ok({ stop_requested: stopped, loop_id: input.loop_id }); + } + + case "list": { + const loops = runner.list(input.include_finished ?? false); + return ok({ count: loops.length, loops: loops.map(serializeLoop) }); + } + + default: + return err(`Unknown action: ${input.action}`); + } + } catch (error: unknown) { + const msg = error instanceof Error ? error.message : String(error); + return err(msg); + } + }, + ); + + return createSdkMcpServer({ + name: "phantom-loop", + tools: [loopTool], + }); +} diff --git a/src/loop/types.ts b/src/loop/types.ts new file mode 100644 index 0000000..207fa87 --- /dev/null +++ b/src/loop/types.ts @@ -0,0 +1,114 @@ +import { z } from "zod"; + +export type LoopStatus = "running" | "done" | "stopped" | "budget_exceeded" | "failed"; + +export type Loop = { + id: string; + goal: string; + workspaceDir: string; + stateFile: string; + successCommand: string | null; + maxIterations: number; + maxCostUsd: number; + checkpointInterval: number | null; + status: LoopStatus; + iterationCount: number; + totalCostUsd: number; + channelId: string | null; + conversationId: string | null; + statusMessageTs: string | null; + triggerMessageTs: string | null; + interruptRequested: boolean; + lastError: string | null; + startedAt: string; + lastTickAt: string | null; + finishedAt: string | null; +}; + +export type LoopRow = { + id: string; + goal: string; + workspace_dir: string; + state_file: string; + success_command: string | null; + max_iterations: number; + max_cost_usd: number; + checkpoint_interval: number | null; + status: string; + iteration_count: number; + total_cost_usd: number; + channel_id: string | null; + conversation_id: string | null; + status_message_ts: string | null; + trigger_message_ts: string | null; + interrupt_requested: number; + last_error: string | null; + started_at: string; + last_tick_at: string | null; + finished_at: string | null; +}; + +export type LoopFrontmatter = { + loopId: string; + status: "in-progress" | "done" | "blocked"; + iteration: number; +}; + +export type LoopStartInput = { + goal: string; + workspace?: string; + maxIterations?: number; + maxCostUsd?: number; + checkpointInterval?: number; + successCommand?: string; + channelId?: string; + conversationId?: string; + triggerMessageTs?: string; +}; + +// Hard ceilings the agent cannot raise. Caller-provided values are clamped. +export const LOOP_MAX_ITERATIONS_CEILING = 200; +export const LOOP_MAX_COST_CEILING_USD = 50; +export const LOOP_DEFAULT_MAX_ITERATIONS = 20; +export const LOOP_DEFAULT_MAX_COST_USD = 5; + +export const LoopStartInputSchema = z.object({ + goal: z.string().min(1).max(10_000), + workspace: z.string().optional(), + max_iterations: z.number().int().positive().max(LOOP_MAX_ITERATIONS_CEILING).optional(), + max_cost_usd: z.number().positive().max(LOOP_MAX_COST_CEILING_USD).optional(), + checkpoint_interval: z.number().int().min(0).max(LOOP_MAX_ITERATIONS_CEILING).optional(), + success_command: z.string().optional(), + channel_id: z.string().optional(), + conversation_id: z.string().optional(), + trigger_message_ts: z.string().optional(), +}); + +export const LoopIdSchema = z.object({ loop_id: z.string().min(1) }); + +export const LoopListInputSchema = z.object({ include_finished: z.boolean().optional() }); + +export function rowToLoop(row: LoopRow): Loop { + return { + id: row.id, + goal: row.goal, + workspaceDir: row.workspace_dir, + stateFile: row.state_file, + successCommand: row.success_command, + maxIterations: row.max_iterations, + maxCostUsd: row.max_cost_usd, + checkpointInterval: row.checkpoint_interval, + status: row.status as LoopStatus, + iterationCount: row.iteration_count, + totalCostUsd: row.total_cost_usd, + channelId: row.channel_id, + conversationId: row.conversation_id, + statusMessageTs: row.status_message_ts, + triggerMessageTs: row.trigger_message_ts, + interruptRequested: row.interrupt_requested === 1, + lastError: row.last_error, + startedAt: row.started_at, + lastTickAt: row.last_tick_at, + finishedAt: row.finished_at, + }; +} diff --git a/src/memory/consolidation.ts b/src/memory/consolidation.ts index 35868d7..3cd58f2 100644 --- a/src/memory/consolidation.ts +++ b/src/memory/consolidation.ts @@ -1,3 +1,4 @@ +import { JudgeParseError } from "../evolution/judges/client.ts"; import { runConsolidationJudge } from "../evolution/judges/consolidation-judge.ts"; import type { JudgeCostEntry } from "../evolution/judges/types.ts"; import type { SessionSummary } from "../evolution/types.ts"; @@ -66,11 +67,21 @@ export async function consolidateSessionWithLLM( const msg = error instanceof Error ? error.message : String(error); console.warn(`[memory] Consolidation judge failed, falling back to heuristic: ${msg}`); const result = await consolidateSession(memory, sessionData); - return { result, judgeCost: null }; + // Track cost from successful API calls that failed parsing (tokens were consumed) + const judgeCost = + error instanceof JudgeParseError + ? { + calls: 1, + totalUsd: error.costUsd, + totalInputTokens: error.inputTokens, + totalOutputTokens: error.outputTokens, + } + : null; + return { result, judgeCost }; } } -function sessionDataToSummary(data: SessionData): SessionSummary { +export function sessionDataToSummary(data: SessionData): SessionSummary { return { session_id: data.sessionId, session_key: data.sessionKey,