From 7e35bc5ac040454c3d899ebf5b07d8023e81ad75 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Mon, 4 May 2026 16:21:14 +0200 Subject: [PATCH 1/2] feat(agents): split coder into thin entity + coding-session resource MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The coder entity used to own three collections (sessionMeta, cursorState, events) so the session history was bound to one entity instance. Pull the durable, portable parts (event history + session-info facts) onto a shared-state resource — the entity becomes a thin wrapper over the SDK runner that reads/writes the resource and tracks only its own run lifecycle (runStatus, inboxCursor). This is the prerequisite for forking sessions, sharing a session across devices/users, and surfacing the same history through multiple entities — the resource sits at a stable shared-state id (`coder-session/`) and survives independently of any particular entity. Why two collections, not one: - `sessionInfo` carries the static facts about which session this is (agent, cwd, electricSessionId, nativeSessionId, createdAt). - `transcript` carries the normalized event stream. Originally named `events`, but ObservationHandle reserves that field for ChangeEvent[] and Object.assign clobbers any like-named collection on the SharedStateHandle the runtime returns from `observe(db(...))`. `transcript` sidesteps the collision. The wrapper entity tags itself with `coderResource` pointing at its resource id; the UI fetches the entity, reads the tag, and attaches to the shared-state stream alongside the entity stream. On a freshly spawned coder the resource stream isn't registered on disk until the entity's first wake commits its mkdb manifest entry, so `connectSharedStateStream` retries on 404 with bounded backoff to cover the spawn → first-wake race. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/coding-session-resource.ts | 95 +++++ .../agents-runtime/src/context-factory.ts | 64 ++- packages/agents-runtime/src/index.ts | 17 +- .../agents-runtime/src/observation-sources.ts | 12 - .../src/hooks/useCodingSession.ts | 230 +++++++---- .../src/lib/entity-connection.ts | 72 +++- packages/agents/src/agents/coding-session.ts | 381 ++++++++---------- .../test/coding-session-resource.test.ts | 243 +++++++++++ packages/agents/test/coding-session.test.ts | 306 ++++++++------ 9 files changed, 988 insertions(+), 432 deletions(-) create mode 100644 packages/agents-runtime/src/coding-session-resource.ts create mode 100644 packages/agents/test/coding-session-resource.test.ts diff --git a/packages/agents-runtime/src/coding-session-resource.ts b/packages/agents-runtime/src/coding-session-resource.ts new file mode 100644 index 0000000000..ba8ffd4631 --- /dev/null +++ b/packages/agents-runtime/src/coding-session-resource.ts @@ -0,0 +1,95 @@ +/** + * Schema and helpers for the coding-session **resource** — the durable, + * shareable, forkable representation of a coder's state. + * + * Background. Originally the coder entity owned three of its own + * collections (`sessionMeta`, `cursorState`, `events`) and the entity + * was the canonical home for the session's history. That couples the + * history to one entity instance — fine for "open a coder, send some + * prompts" but awkward when you want to fork the session, attach a + * second entity to it, share a session URL, or surface it in a + * specialised viewer outside any one entity's lifecycle. + * + * The resource pattern fixes that. The history (events + the static + * facts about *which* session this is) lives in a shared-state DB at + * a stable id (`coder-session/`). The wrapper coder entity + * just observes/appends to it. Because shared-state DBs are + * server-side first-class streams, multiple entities can attach, the + * stream survives the entity, and the server already knows how to + * fork-rewrite shared-state ids when entities are forked. + */ +import { z } from 'zod' +import type { SharedStateSchemaMap } from './types' + +/** Collection event-type strings (mirror of the entity-collection naming convention). */ +export const CODING_SESSION_RESOURCE_INFO_TYPE = `coding_session_info` +export const CODING_SESSION_RESOURCE_TRANSCRIPT_TYPE = `coding_session_transcript` + +/** + * Static facts about a coding session that don't change as it runs. + * `nativeSessionId` becomes set once the CLI assigns one (after the + * first turn for a fresh session, or up front for an attached/imported + * one). `electricSessionId` matches the slug of the wrapper entity + * that originally created the resource. + */ +export const codingSessionInfoRowSchema = z.object({ + key: z.literal(`current`), + agent: z.enum([`claude`, `codex`]), + cwd: z.string(), + electricSessionId: z.string(), + nativeSessionId: z.string().optional(), + createdAt: z.number(), +}) +export type CodingSessionInfoRow = z.infer + +/** + * One normalized event from the agent-session-protocol stream. Same + * shape the entity used to write into its events collection. Lives + * under the resource's `transcript` collection — *not* `events`, + * because the runtime's `ObservationHandle` reserves the field name + * `events` (for raw `ChangeEvent`s) and would silently shadow a + * collection with that name when we attach via `observe(db(...))`. + */ +export const codingSessionTranscriptRowSchema = z.object({ + key: z.string(), + ts: z.number(), + type: z.string(), + callId: z.string().optional(), + payload: z.looseObject({}), +}) +export type CodingSessionTranscriptRow = z.infer< + typeof codingSessionTranscriptRowSchema +> + +/** + * The shape of a coding-session resource. Both collections live on a + * single shared-state DB — there's no reason to split them, and + * keeping them together lets observers attach with one `db(...)` call. + */ +export const codingSessionResourceSchema = { + sessionInfo: { + schema: codingSessionInfoRowSchema, + type: CODING_SESSION_RESOURCE_INFO_TYPE, + primaryKey: `key`, + }, + transcript: { + schema: codingSessionTranscriptRowSchema, + type: CODING_SESSION_RESOURCE_TRANSCRIPT_TYPE, + primaryKey: `key`, + }, +} as const satisfies SharedStateSchemaMap + +export type CodingSessionResourceSchema = typeof codingSessionResourceSchema + +/** + * Default resource id for a coder entity. The wrapper entity stores + * this on its tags as `coderResource` so observers (e.g. the UI) can + * look up the entity, read the tag, and connect to the resource + * stream without needing a separate registry. + */ +export function codingSessionResourceId(entityId: string): string { + return `coder-session/${entityId}` +} + +/** Tag key used by the coder entity to point at its resource. */ +export const CODER_RESOURCE_TAG = `coderResource` diff --git a/packages/agents-runtime/src/context-factory.ts b/packages/agents-runtime/src/context-factory.ts index 713316f653..9088ac1bb2 100644 --- a/packages/agents-runtime/src/context-factory.ts +++ b/packages/agents-runtime/src/context-factory.ts @@ -16,7 +16,16 @@ import { CACHE_TIERS } from './types' import { CODING_SESSION_ENTITY_TYPE, codingSessionEntityUrl, + db, } from './observation-sources' +import { + codingSessionResourceId, + codingSessionResourceSchema, +} from './coding-session-resource' +import type { + CodingSessionInfoRow, + CodingSessionResourceSchema, +} from './coding-session-resource' import type { ChangeEvent } from '@durable-streams/state' import type { AgentConfig, @@ -588,19 +597,62 @@ export function createHandlerContext( ) const entityUrl = codingSessionEntityUrl(sessionId) + // The session's history (events) and static facts (agent, cwd, + // nativeSessionId) live on a coding-session resource, not on + // the entity. Attach to it so the handle can surface the same + // `events` / `meta` shape the caller used to get from the + // entity's collections directly. + const resourceId = codingSessionResourceId(sessionId) + const resourceHandle = (await config.doObserve( + db(resourceId, codingSessionResourceSchema) + )) as unknown as SharedStateHandle + const readEvents = (): Array => { - const collection = entityHandle.db?.collections.events - if (!collection) return [] - const rows = (collection as { toArray?: unknown }).toArray + const rows = resourceHandle.transcript.toArray return (Array.isArray(rows) ? rows : []) as Array } - const readMeta = (): CodingSessionMeta | undefined => { - const collection = entityHandle.db?.collections.sessionMeta + const readSessionInfo = (): CodingSessionInfoRow | undefined => { + return resourceHandle.sessionInfo.get(`current`) as + | CodingSessionInfoRow + | undefined + } + const readRunStatus = (): + | { + status: CodingSessionStatus + error?: string + currentPromptInboxKey?: string + } + | undefined => { + const collection = entityHandle.db?.collections.runStatus if (!collection) return undefined const row = (collection as { get?: (k: string) => unknown }).get?.( `current` ) - return row as CodingSessionMeta | undefined + return row as + | { + status: CodingSessionStatus + error?: string + currentPromptInboxKey?: string + } + | undefined + } + const readMeta = (): CodingSessionMeta | undefined => { + const info = readSessionInfo() + if (!info) return undefined + const runStatus = readRunStatus() + return { + electricSessionId: info.electricSessionId, + ...(info.nativeSessionId !== undefined + ? { nativeSessionId: info.nativeSessionId } + : {}), + agent: info.agent, + cwd: info.cwd, + status: runStatus?.status ?? `initializing`, + ...(runStatus?.error !== undefined ? { error: runStatus.error } : {}), + ...(runStatus?.currentPromptInboxKey !== undefined + ? { currentPromptInboxKey: runStatus.currentPromptInboxKey } + : {}), + } } const MESSAGE_TYPES = new Set([`user_message`, `assistant_message`]) diff --git a/packages/agents-runtime/src/index.ts b/packages/agents-runtime/src/index.ts index 3bb3fabbf3..37b7303e73 100644 --- a/packages/agents-runtime/src/index.ts +++ b/packages/agents-runtime/src/index.ts @@ -197,9 +197,6 @@ export { export { CODING_SESSION_ENTITY_TYPE, - CODING_SESSION_META_COLLECTION_TYPE, - CODING_SESSION_CURSOR_COLLECTION_TYPE, - CODING_SESSION_EVENT_COLLECTION_TYPE, codingSession, codingSessionEntityUrl, entity, @@ -208,6 +205,20 @@ export { tagged, db, } from './observation-sources' +export { + CODER_RESOURCE_TAG, + CODING_SESSION_RESOURCE_INFO_TYPE, + CODING_SESSION_RESOURCE_TRANSCRIPT_TYPE, + codingSessionInfoRowSchema, + codingSessionResourceId, + codingSessionResourceSchema, + codingSessionTranscriptRowSchema, +} from './coding-session-resource' +export type { + CodingSessionInfoRow, + CodingSessionResourceSchema, + CodingSessionTranscriptRow, +} from './coding-session-resource' export type { EntityObservationSource, CronObservationSource, diff --git a/packages/agents-runtime/src/observation-sources.ts b/packages/agents-runtime/src/observation-sources.ts index 51959a3c03..4e0193b5ed 100644 --- a/packages/agents-runtime/src/observation-sources.ts +++ b/packages/agents-runtime/src/observation-sources.ts @@ -78,18 +78,6 @@ export function entity(entityUrl: string): EntityObservationSource { /** Entity type name for the built-in coder entity. */ export const CODING_SESSION_ENTITY_TYPE = `coder` -/** - * Collection event-type strings used by the coder entity's state. The - * coder entity (in `@electric-ax/agents`) declares its three custom - * collections under these names, and any consumer that needs to read - * the same collections back out (the agents-server-ui hook is the - * primary one) imports these constants instead of hard-coding the - * strings — so the entity's contract has a single source of truth. - */ -export const CODING_SESSION_META_COLLECTION_TYPE = `coding_session_meta` -export const CODING_SESSION_CURSOR_COLLECTION_TYPE = `coding_session_cursor` -export const CODING_SESSION_EVENT_COLLECTION_TYPE = `coding_session_event` - export function codingSessionEntityUrl(sessionId: string): string { return `/${CODING_SESSION_ENTITY_TYPE}/${sessionId}` } diff --git a/packages/agents-server-ui/src/hooks/useCodingSession.ts b/packages/agents-server-ui/src/hooks/useCodingSession.ts index 87906f2f32..6fe6dd3084 100644 --- a/packages/agents-server-ui/src/hooks/useCodingSession.ts +++ b/packages/agents-server-ui/src/hooks/useCodingSession.ts @@ -1,11 +1,15 @@ import { useEffect, useMemo, useRef, useState } from 'react' import { useLiveQuery } from '@tanstack/react-db' import { - CODING_SESSION_CURSOR_COLLECTION_TYPE, - CODING_SESSION_EVENT_COLLECTION_TYPE, - CODING_SESSION_META_COLLECTION_TYPE, + CODER_RESOURCE_TAG, + CODING_SESSION_RESOURCE_INFO_TYPE, + CODING_SESSION_RESOURCE_TRANSCRIPT_TYPE, + codingSessionResourceId, } from '@electric-ax/agents-runtime' -import { connectEntityStream } from '../lib/entity-connection' +import { + connectEntityStream, + connectSharedStateStream, +} from '../lib/entity-connection' import type { CodingSessionEventRow, CodingSessionMetaRow, @@ -18,22 +22,38 @@ import type { export type { CodingSessionEventRow, CodingSessionMetaRow, CodingSessionStatus } /** - * Mirror of the state-collection shape declared by the coder entity - * in `@electric-ax/agents/src/agents/coding-session.ts`. The - * collection-type strings are imported from agents-runtime so the - * entity contract has a single source of truth. + * The coder entity is now a thin wrapper over a coding-session + * **resource** (a shared-state DB). The entity owns only the run + * lifecycle bookkeeping — `runStatus` (idle/running/error) and + * `inboxCursor` (last processed prompt) — while the durable history + * (`events`) and the static session facts (`sessionInfo`) live on + * the resource. The UI follows a `coderResource` tag the entity + * publishes on first wake to find the resource id, then connects to + * both streams in parallel and recombines the data into the legacy + * `CodingSessionMetaRow` shape that timeline/components consume. */ -const CODING_SESSION_STATE = { - sessionMeta: { - type: CODING_SESSION_META_COLLECTION_TYPE, + +const RUN_STATUS_COLLECTION_TYPE = `coder_run_status` +const INBOX_CURSOR_COLLECTION_TYPE = `coder_inbox_cursor` + +const ENTITY_CUSTOM_STATE = { + runStatus: { + type: RUN_STATUS_COLLECTION_TYPE, + primaryKey: `key`, + }, + inboxCursor: { + type: INBOX_CURSOR_COLLECTION_TYPE, primaryKey: `key`, }, - cursorState: { - type: CODING_SESSION_CURSOR_COLLECTION_TYPE, +} as const + +const RESOURCE_CUSTOM_STATE = { + sessionInfo: { + type: CODING_SESSION_RESOURCE_INFO_TYPE, primaryKey: `key`, }, - events: { - type: CODING_SESSION_EVENT_COLLECTION_TYPE, + transcript: { + type: CODING_SESSION_RESOURCE_TRANSCRIPT_TYPE, primaryKey: `key`, }, } as const @@ -54,13 +74,14 @@ export type CodingSessionEventType = | `session_end` export interface UseCodingSessionResult { + /** The entity stream (runStatus, inboxCursor, inbox). */ db: EntityStreamDBWithActions | null /** - * Normalized session events from the CLI's JSONL transcript, plus - * synthetic user_message rows for any prompt that's been posted to - * the inbox but not yet reflected in the transcript. Synthetic rows - * carry `payload._pending: true` so the timeline can render them - * with a subtle "queued" affordance. + * Normalized session events from the resource, plus synthetic + * user_message rows for any prompt that's been posted to the inbox + * but not yet reflected in the transcript. Synthetic rows carry + * `payload._pending: true` so the timeline can render them with a + * subtle "queued" affordance. */ events: Array meta: CodingSessionMetaRow | undefined @@ -76,23 +97,43 @@ interface InboxRowShape { message_type?: string } -interface CursorStateRowShape { +interface InboxCursorRowShape { key: string - cursor?: string lastProcessedInboxKey?: string } +interface RunStatusRowShape { + key: string + status: CodingSessionStatus + error?: string + currentPromptInboxKey?: string +} + +interface SessionInfoRowShape { + key: string + agent: `claude` | `codex` + cwd: string + electricSessionId: string + nativeSessionId?: string + createdAt: number +} + export function useCodingSession( baseUrl: string | null, entityUrl: string | null ): UseCodingSessionResult { - const [db, setDb] = useState(null) + const [entityDb, setEntityDb] = useState( + null + ) + const [resourceDb, setResourceDb] = + useState(null) const [loading, setLoading] = useState(false) const [error, setError] = useState(null) - const closeRef = useRef<(() => void) | null>(null) + const closersRef = useRef void>>([]) useEffect(() => { - setDb(null) + setEntityDb(null) + setResourceDb(null) setError(null) if (!baseUrl || !entityUrl) { @@ -103,58 +144,88 @@ export function useCodingSession( let cancelled = false setLoading(true) - connectEntityStream({ - baseUrl, - entityUrl, - customState: CODING_SESSION_STATE, - }) - .then((result) => { + void (async (): Promise => { + try { + const entityResult = await connectEntityStream({ + baseUrl, + entityUrl, + customState: ENTITY_CUSTOM_STATE, + }) if (cancelled) { - result.close() + entityResult.close() return } - closeRef.current = result.close - setDb(result.db) - setLoading(false) - }) - .catch((err) => { - if (!cancelled) { - console.error(`Failed to connect coding-session stream`, { - baseUrl, - entityUrl, - error: err, - }) - setError(err instanceof Error ? err.message : String(err)) - setLoading(false) + closersRef.current.push(entityResult.close) + setEntityDb(entityResult.db) + + // The entity tags itself with `coderResource` on first wake. + // For a freshly spawned coder the tag may not be set yet; fall + // back to the deterministic id in that case so the resource + // stream still loads. + const fallbackId = codingSessionResourceId( + entityUrl.split(`/`).pop() ?? entityUrl + ) + const resourceId = + entityResult.entity.tags[CODER_RESOURCE_TAG] ?? fallbackId + + const resourceResult = await connectSharedStateStream({ + baseUrl, + resourceId, + customState: RESOURCE_CUSTOM_STATE, + }) + if (cancelled) { + resourceResult.close() + return } - }) + closersRef.current.push(resourceResult.close) + setResourceDb(resourceResult.db) + setLoading(false) + } catch (err) { + if (cancelled) return + console.error(`Failed to connect coding-session streams`, { + baseUrl, + entityUrl, + error: err, + }) + setError(err instanceof Error ? err.message : String(err)) + setLoading(false) + } + })() return () => { cancelled = true - closeRef.current?.() - closeRef.current = null + for (const close of closersRef.current) close() + closersRef.current = [] } }, [baseUrl, entityUrl]) - const eventsCollection = db?.collections.events - const metaCollection = db?.collections.sessionMeta - const cursorCollection = db?.collections.cursorState - const inboxCollection = db?.collections.inbox + const transcriptCollection = resourceDb?.collections.transcript + const sessionInfoCollection = resourceDb?.collections.sessionInfo + const runStatusCollection = entityDb?.collections.runStatus + const inboxCursorCollection = entityDb?.collections.inboxCursor + const inboxCollection = entityDb?.collections.inbox const { data: eventRows = [] } = useLiveQuery( (q) => - eventsCollection - ? q.from({ e: eventsCollection }).orderBy(({ e }) => e.$key, `asc`) + transcriptCollection + ? q.from({ e: transcriptCollection }).orderBy(({ e }) => e.$key, `asc`) : undefined, - [eventsCollection] + [transcriptCollection] ) - const { data: metaRows = [] } = useLiveQuery( - (q) => (metaCollection ? q.from({ m: metaCollection }) : undefined), - [metaCollection] + const { data: sessionInfoRows = [] } = useLiveQuery( + (q) => + sessionInfoCollection ? q.from({ s: sessionInfoCollection }) : undefined, + [sessionInfoCollection] ) - const { data: cursorRows = [] } = useLiveQuery( - (q) => (cursorCollection ? q.from({ c: cursorCollection }) : undefined), - [cursorCollection] + const { data: runStatusRows = [] } = useLiveQuery( + (q) => + runStatusCollection ? q.from({ r: runStatusCollection }) : undefined, + [runStatusCollection] + ) + const { data: inboxCursorRows = [] } = useLiveQuery( + (q) => + inboxCursorCollection ? q.from({ c: inboxCursorCollection }) : undefined, + [inboxCursorCollection] ) const { data: inboxRows = [] } = useLiveQuery( (q) => @@ -164,18 +235,32 @@ export function useCodingSession( [inboxCollection] ) - const meta = useMemo( - () => (metaRows as unknown as Array)[0], - [metaRows] - ) + const meta = useMemo(() => { + const info = (sessionInfoRows as unknown as Array)[0] + if (!info) return undefined + const status = (runStatusRows as unknown as Array)[0] + return { + key: info.key, + agent: info.agent, + cwd: info.cwd, + electricSessionId: info.electricSessionId, + ...(info.nativeSessionId !== undefined + ? { nativeSessionId: info.nativeSessionId } + : {}), + status: status?.status ?? `initializing`, + ...(status?.error !== undefined ? { error: status.error } : {}), + ...(status?.currentPromptInboxKey !== undefined + ? { currentPromptInboxKey: status.currentPromptInboxKey } + : {}), + } as CodingSessionMetaRow + }, [sessionInfoRows, runStatusRows]) const events = useMemo(() => { const real = eventRows as unknown as Array - const cursor = (cursorRows as unknown as Array)[0] + const cursor = (inboxCursorRows as unknown as Array)[0] const lastProcessed = cursor?.lastProcessedInboxKey ?? `` - // Once a prompt's text shows up as a real user_message (mirrored - // from the CLI's JSONL), there's nothing for the pending bubble - // to add — drop it immediately to avoid a duplicate below the + // Once a prompt's text shows up as a real user_message, drop the + // pending bubble so the user doesn't see a duplicate below the // assistant's reply. Track remaining capacity per text so two // identical prompts in a row each get matched at most once. const realUserTextRemaining = new Map() @@ -185,9 +270,6 @@ export function useCodingSession( if (typeof t !== `string` || t.length === 0) continue realUserTextRemaining.set(t, (realUserTextRemaining.get(t) ?? 0) + 1) } - // Show inbox prompts that haven't been processed yet AND whose - // text hasn't already shown up as a real user_message in events. - // Inbox keys are durable-stream offsets that sort lexicographically. const pending: Array = [] for (const row of inboxRows as unknown as Array) { if (row.key <= lastProcessed) continue @@ -212,7 +294,7 @@ export function useCodingSession( } if (pending.length === 0) return real return [...real, ...pending] - }, [eventRows, inboxRows, cursorRows]) + }, [eventRows, inboxRows, inboxCursorRows]) - return { db, events, meta, loading, error } + return { db: entityDb, events, meta, loading, error } } diff --git a/packages/agents-server-ui/src/lib/entity-connection.ts b/packages/agents-server-ui/src/lib/entity-connection.ts index ca9becce71..ba7a55f2b0 100644 --- a/packages/agents-server-ui/src/lib/entity-connection.ts +++ b/packages/agents-server-ui/src/lib/entity-connection.ts @@ -1,5 +1,9 @@ -import { createEntityStreamDB } from '@electric-ax/agents-runtime' +import { + createEntityStreamDB, + getSharedStateStreamPath, +} from '@electric-ax/agents-runtime' import type { EntityStreamDBWithActions } from '@electric-ax/agents-runtime' +import type { PublicEntity } from './types' function getMainStreamPath(entityUrl: string): string { return `${entityUrl}/main` @@ -17,7 +21,11 @@ export async function connectEntityStream(opts: { baseUrl: string entityUrl: string customState?: UICustomState -}): Promise<{ db: EntityStreamDBWithActions; close: () => void }> { +}): Promise<{ + db: EntityStreamDBWithActions + entity: PublicEntity + close: () => void +}> { const { baseUrl, entityUrl, customState } = opts const res = await fetch(`${baseUrl}${entityUrl}`, { @@ -26,7 +34,7 @@ export async function connectEntityStream(opts: { if (!res.ok) { throw new Error(`Failed to fetch entity at ${entityUrl}: ${res.statusText}`) } - await res.body?.cancel() + const entity = (await res.json()) as PublicEntity const streamUrl = `${baseUrl}${getMainStreamPath(entityUrl)}` const db = createEntityStreamDB( streamUrl, @@ -34,5 +42,61 @@ export async function connectEntityStream(opts: { ) await db.preload() - return { db, close: () => db.close() } + return { db, entity, close: () => db.close() } +} + +/** + * Connect to a shared-state (resource) stream. Used by views that + * follow a resource pointer published as a tag on a wrapper entity — + * e.g. the coder UI hook reads its `coderResource` tag and attaches + * here for the session history. The schema mapping follows the same + * `UICustomState` shape as the entity-side connection. + * + * Retries on 404 with bounded backoff. Reason: the shared-state + * stream is only registered on the server *after* the wrapper + * entity's first wake calls `mkdb`. The UI can race ahead of that + * — user clicks the new coder in the sidebar before the builtin + * server has finished its first-wake handler — and would otherwise + * see "Stream not found" until they manually reload. The retry + * window covers that race; if the resource genuinely doesn't exist + * the final error still surfaces. + */ +export async function connectSharedStateStream(opts: { + baseUrl: string + resourceId: string + customState: UICustomState + retryMs?: number + maxAttempts?: number +}): Promise<{ db: EntityStreamDBWithActions; close: () => void }> { + const { + baseUrl, + resourceId, + customState, + retryMs = 250, + maxAttempts = 20, + } = opts + const streamUrl = `${baseUrl}${getSharedStateStreamPath(resourceId)}` + let lastErr: unknown = undefined + for (let attempt = 0; attempt < maxAttempts; attempt++) { + const db = createEntityStreamDB( + streamUrl, + customState as unknown as Parameters[1] + ) + try { + await db.preload() + return { db, close: () => db.close() } + } catch (err) { + db.close() + lastErr = err + const code = (err as { code?: string } | null)?.code + const status = (err as { status?: number } | null)?.status + const isNotFound = code === `NOT_FOUND` || status === 404 + if (!isNotFound) throw err + // Linear backoff — the race window is short (the entity's + // first-wake handler runs within a few hundred ms of spawn), + // and exponential backoff would over-wait the common case. + await new Promise((r) => setTimeout(r, retryMs)) + } + } + throw lastErr ?? new Error(`Failed to connect shared-state stream`) } diff --git a/packages/agents/src/agents/coding-session.ts b/packages/agents/src/agents/coding-session.ts index 9249c0b9b5..d06c223c42 100644 --- a/packages/agents/src/agents/coding-session.ts +++ b/packages/agents/src/agents/coding-session.ts @@ -1,20 +1,20 @@ import { z } from 'zod' -import { - importLocalSession, - loadSession, - serializeCursor, -} from 'agent-session-protocol' +import { importLocalSession, loadSession } from 'agent-session-protocol' import type { NormalizedEvent } from 'agent-session-protocol' import { - CODING_SESSION_CURSOR_COLLECTION_TYPE, - CODING_SESSION_EVENT_COLLECTION_TYPE, - CODING_SESSION_META_COLLECTION_TYPE, + CODER_RESOURCE_TAG, + codingSessionResourceId, + codingSessionResourceSchema, + db, } from '@electric-ax/agents-runtime' import type { CodingAgentType, - CodingSessionEventRow, + CodingSessionInfoRow, + CodingSessionTranscriptRow, + CodingSessionResourceSchema, EntityRegistry, HandlerContext, + SharedStateHandle, WakeEvent, } from '@electric-ax/agents-runtime' @@ -22,18 +22,10 @@ import { claudeSdkRunner } from './runners/claude-sdk.js' import { codexSdkRunner } from './runners/codex-sdk.js' /** - * Abstraction over a coding-agent runner. The default implementations - * drive `@anthropic-ai/claude-agent-sdk` and `@openai/codex-sdk` - * directly; tests can inject a fake. - * - * Runners stream `NormalizedEvent`s via `onEvent` as the agent makes - * progress, and call `onSessionId` once with the new (or resumed) - * session id so the orchestrator can persist it on the entity. - * - * `sessionId` is undefined for the first prompt on a fresh session — - * the runner should then let the SDK generate its own id and emit it - * via `onSessionId`. For every subsequent prompt, pass the id so the - * SDK resumes that conversation. + * Abstraction over a coding-agent runner. Defaults dispatch to the + * Claude / Codex SDKs; tests can inject a fake. Runners stream + * `NormalizedEvent`s via `onEvent` and call `onSessionId` once with + * the new (or resumed) native session id. */ export interface CodingSessionCliRunner { run(opts: { @@ -53,42 +45,30 @@ const defaultCliRunner: CodingSessionCliRunner = { }, } -const sessionMetaRowSchema = z.object({ +// ── Entity-local state schemas ───────────────────────────────────── +// +// The coder entity is now a thin wrapper around the coding-session +// resource (see coding-session-resource.ts in agents-runtime). It +// owns only the bookkeeping that's tied to *this* entity instance: +// the run lifecycle status and the inbox processing cursor. Anything +// portable (event history, session metadata) lives on the resource. + +const RUN_STATUS_COLLECTION_TYPE = `coder_run_status` +const INBOX_CURSOR_COLLECTION_TYPE = `coder_inbox_cursor` + +const runStatusRowSchema = z.object({ key: z.literal(`current`), - electricSessionId: z.string(), - nativeSessionId: z.string().optional(), - agent: z.enum([`claude`, `codex`]), - cwd: z.string(), status: z.enum([`initializing`, `idle`, `running`, `error`]), error: z.string().optional(), + /** Inbox key of the prompt currently running, when status === `running`. */ currentPromptInboxKey: z.string().optional(), }) -const cursorStateRowSchema = z.object({ +const inboxCursorRowSchema = z.object({ key: z.literal(`current`), - /** - * JSON-serialized SerializedSessionCursor or empty string. Used as a - * "have I seeded the events collection from the JSONL yet?" marker for - * imported / attached sessions — once non-empty, we don't reseed. - * The SDK runners stream events live, so this is no longer used for - * tail/cursor state past first wake. - */ - cursor: z.string(), lastProcessedInboxKey: z.string().optional(), }) -const eventRowSchema = z.object({ - key: z.string(), - ts: z.number(), - type: z.string(), - callId: z.string().optional(), - // `z.record(z.string(), z.unknown())` would emit JSON-Schema `propertyNames`, - // which the agents-server schema validator rejects. `looseObject` emits a - // plain `{ type: "object", additionalProperties: {} }` that's allowed and - // still captures "any JSON object". - payload: z.looseObject({}), -}) - const creationArgsSchema = z.object({ agent: z.enum([`claude`, `codex`]), cwd: z.string().optional(), @@ -105,8 +85,8 @@ const promptMessageSchema = z.object({ text: z.string(), }) -type SessionMetaRow = z.infer -type CursorStateRow = z.infer +type RunStatusRow = z.infer +type InboxCursorRow = z.infer interface InboxRow { key: string @@ -124,10 +104,8 @@ export interface RegisterCodingSessionOptions { } /** - * Stable key for an events-collection row, derived from the event's content. - * Lets us re-insert the same event without producing duplicates — the caller - * (or the collection's uniqueness guard) uses this to de-dup across retries, - * replays, and crash recovery. Sorts chronologically by ts, then by type. + * Stable key for an event row, derived from content. Lets the same + * event re-arrive (e.g. on retry) without producing duplicates. */ function eventKey(event: NormalizedEvent): string { const tsPart = String(event.ts).padStart(16, `0`) @@ -144,7 +122,9 @@ function contentHashHex(event: NormalizedEvent): string { return h.toString(16).padStart(8, `0`) } -function buildEventRow(event: NormalizedEvent): CodingSessionEventRow { +function buildTranscriptRow( + event: NormalizedEvent +): CodingSessionTranscriptRow { const callId = `callId` in event && typeof event.callId === `string` ? event.callId @@ -158,19 +138,15 @@ function buildEventRow(event: NormalizedEvent): CodingSessionEventRow { } } -interface LiveMirrorCtx { - events: { - get: (k: string) => unknown - } - actions: { - events_insert: (arg: { row: CodingSessionEventRow }) => unknown - } -} +type CodingSessionResource = SharedStateHandle -function appendIfNew(ctx: LiveMirrorCtx, event: NormalizedEvent): void { - const row = buildEventRow(event) - if (ctx.events.get(row.key) !== undefined) return - ctx.actions.events_insert({ row }) +function appendIfNew( + resource: CodingSessionResource, + event: NormalizedEvent +): void { + const row = buildTranscriptRow(event) + if (resource.transcript.get(row.key) !== undefined) return + resource.transcript.insert(row) } export function registerCodingSession( @@ -181,40 +157,47 @@ export function registerCodingSession( const defaultCwd = options.defaultWorkingDirectory ?? process.cwd() registry.define(`coder`, { - description: `Runs a Claude Code / Codex SDK session and mirrors its normalized event stream into a durable store. Prompts arrive via message_received (type: "prompt") and are executed serially.`, + description: `Wraps a Claude Code / Codex SDK session. The session's history (events + sessionInfo) lives on a coding-session resource (shared-state DB) the entity creates on first wake; the entity itself just queues prompts and drives the SDK runner. Prompts arrive via message_received (type: "prompt") and run serially.`, creationSchema: creationArgsSchema, inboxSchemas: { prompt: promptMessageSchema, }, state: { - sessionMeta: { - schema: sessionMetaRowSchema, - type: CODING_SESSION_META_COLLECTION_TYPE, + runStatus: { + schema: runStatusRowSchema, + type: RUN_STATUS_COLLECTION_TYPE, primaryKey: `key`, }, - cursorState: { - schema: cursorStateRowSchema, - type: CODING_SESSION_CURSOR_COLLECTION_TYPE, - primaryKey: `key`, - }, - events: { - schema: eventRowSchema, - type: CODING_SESSION_EVENT_COLLECTION_TYPE, + inboxCursor: { + schema: inboxCursorRowSchema, + type: INBOX_CURSOR_COLLECTION_TYPE, primaryKey: `key`, }, }, async handler(ctx: HandlerContext, _wake: WakeEvent) { - // Seed sessionMeta / cursorState on the very first wake, once and - // only once. `ctx.firstWake` is derived from "manifest is empty" — - // this entity never writes a manifest entry (no mkdb/observe/spawn/ - // effect), so firstWake stays true on every wake. Guard by reading - // state instead, per the define-entity review checklist. - const existingMeta = ctx.db.collections.sessionMeta.get(`current`) - if (!existingMeta) { + const entityId = ctx.entityUrl.split(`/`).pop() ?? ctx.entityUrl + const resourceId = codingSessionResourceId(entityId) + + // First wake: register the resource via mkdb so subsequent + // wakes can observe it (and any third party can attach by id). + // mkdb throws if called more than once for the same id, so the + // firstWake guard is mandatory. After this wake commits the + // manifest entry, ctx.firstWake will correctly be false next + // time round. + if (ctx.firstWake) { + ctx.mkdb(resourceId, codingSessionResourceSchema) + } + + const resource = (await ctx.observe( + db(resourceId, codingSessionResourceSchema) + )) as unknown as CodingSessionResource + + // First-wake initialisation: parse args, run the optional + // cross-agent import, seed sessionInfo + entity-local state, + // and tag the entity so the UI can find the resource. + if (ctx.firstWake) { const args = creationArgsSchema.parse(ctx.args) const cwd = args.cwd ?? defaultCwd - const electricSessionId = - ctx.entityUrl.split(`/`).pop() ?? ctx.entityUrl let resolvedNativeId = args.nativeSessionId if (args.importFrom) { @@ -228,95 +211,91 @@ export function registerCodingSession( resolvedNativeId = result.sessionId } - const hasNative = resolvedNativeId !== undefined - ctx.db.actions.sessionMeta_insert({ - row: { - key: `current`, - electricSessionId, - ...(hasNative ? { nativeSessionId: resolvedNativeId } : {}), - agent: args.agent, - cwd, - status: hasNative ? `idle` : `initializing`, - } satisfies SessionMetaRow, + resource.sessionInfo.insert({ + key: `current`, + agent: args.agent, + cwd, + electricSessionId: entityId, + ...(resolvedNativeId ? { nativeSessionId: resolvedNativeId } : {}), + createdAt: Date.now(), }) - } - if (!ctx.db.collections.cursorState.get(`current`)) { - ctx.db.actions.cursorState_insert({ + + ctx.db.actions.runStatus_insert({ row: { key: `current`, - cursor: ``, - } satisfies CursorStateRow, + status: resolvedNativeId ? `idle` : `initializing`, + } satisfies RunStatusRow, }) + ctx.db.actions.inboxCursor_insert({ + row: { key: `current` } satisfies InboxCursorRow, + }) + void ctx.setTag(CODER_RESOURCE_TAG, resourceId) } - const metaRow = ctx.db.collections.sessionMeta.get(`current`) as - | SessionMetaRow - | undefined - const cursorRow = ctx.db.collections.cursorState.get(`current`) as - | CursorStateRow + const sessionInfo = resource.sessionInfo.get(`current`) as + | CodingSessionInfoRow | undefined - if (!metaRow || !cursorRow) { + if (!sessionInfo) { throw new Error( - `[coding-session] expected sessionMeta and cursorState rows to exist after init` + `[coding-session] sessionInfo missing on resource ${resourceId}` ) } - // Initial mirror. When the session already exists on disk (imported - // or attached) but the cursor is still empty, pull every existing - // event into the durable stream so the viewer shows the full history - // without waiting for a first prompt. - if (metaRow.nativeSessionId && !cursorRow.cursor) { - const mirrorCtx: LiveMirrorCtx = { - events: { - get: (k) => ctx.db.collections.events.get(k), - }, - actions: { - events_insert: ctx.db.actions.events_insert, - }, - } + // Initial event mirror. When the session already exists on + // disk (imported or attached) but the resource's events + // collection is still empty, pull every existing event from + // the JSONL into the resource so the viewer has the full + // history without waiting for a first prompt. + if ( + sessionInfo.nativeSessionId !== undefined && + resource.transcript.toArray.length === 0 + ) { try { const initial = await loadSession({ - sessionId: metaRow.nativeSessionId, - agent: metaRow.agent, - }) - for (const ev of initial.events) appendIfNew(mirrorCtx, ev) - const serialized = serializeCursor(initial.cursor) - ctx.db.actions.cursorState_update({ - key: `current`, - updater: (d: CursorStateRow) => { - d.cursor = JSON.stringify(serialized) - }, + sessionId: sessionInfo.nativeSessionId, + agent: sessionInfo.agent, }) + for (const ev of initial.events) appendIfNew(resource, ev) } catch (e) { - // Non-fatal: the session will still work on the next prompt, - // we just won't have the pre-prompt history mirrored. const message = e instanceof Error ? e.message : String(e) - ctx.db.actions.sessionMeta_update({ + ctx.db.actions.runStatus_update({ key: `current`, - updater: (d: SessionMetaRow) => { + updater: (d: RunStatusRow) => { d.error = `initial mirror failed: ${message}` }, }) } } + const cursorRow = ctx.db.collections.inboxCursor.get(`current`) as + | InboxCursorRow + | undefined + if (!cursorRow) { + throw new Error( + `[coding-session] inboxCursor missing — first-wake init never completed` + ) + } + // Every inbox entry is treated as a prompt. `message_type === "prompt"` - // is the preferred tag (see inboxSchemas) but is not required — a bare - // `/send` with `{ payload: { text } }` from the generic UI MessageInput - // arrives with no message_type and should still be processed. - // Entries whose payload is not a `{ text }` object are ignored - // (tracked via lastProcessedInboxKey so they don't re-trigger). + // is preferred but not required — bare `/send { payload: { text } }` + // from the generic UI MessageInput arrives without a type. Entries + // whose payload isn't a `{ text }` object are skipped (and tracked + // via lastProcessedInboxKey so they don't re-trigger). const inboxRows = (ctx.db.collections.inbox.toArray as Array) .slice() .sort((a, b) => (a.key < b.key ? -1 : a.key > b.key ? 1 : 0)) const lastKey = cursorRow.lastProcessedInboxKey ?? `` const pending = inboxRows.filter((m) => m.key > lastKey) + const runStatus = ctx.db.collections.runStatus.get(`current`) as + | RunStatusRow + | undefined + if (pending.length === 0) { - if (metaRow.status === `running` || metaRow.status === `error`) { - ctx.db.actions.sessionMeta_update({ + if (runStatus?.status === `running` || runStatus?.status === `error`) { + ctx.db.actions.runStatus_update({ key: `current`, - updater: (d: SessionMetaRow) => { + updater: (d: RunStatusRow) => { d.status = `idle` delete d.currentPromptInboxKey delete d.error @@ -326,123 +305,91 @@ export function registerCodingSession( return } - let runningMeta = metaRow - let runningCursor = cursorRow + let currentInfo = sessionInfo for (const inboxMsg of pending) { const parsed = promptMessageSchema.safeParse(inboxMsg.payload) if (!parsed.success) { - ctx.db.actions.cursorState_update({ + ctx.db.actions.inboxCursor_update({ key: `current`, - updater: (d: CursorStateRow) => { + updater: (d: InboxCursorRow) => { d.lastProcessedInboxKey = inboxMsg.key }, }) - runningCursor = { - ...runningCursor, - lastProcessedInboxKey: inboxMsg.key, - } continue } const prompt = parsed.data.text - // Adopt the first prompt as the entity's display title (truncated) - // so the sidebar surfaces something meaningful for coders that - // would otherwise fall back to a random slug. Only set it if no - // title is already present — preserves explicit titles supplied - // by spawners (e.g. a future deep-survey-style use of `tags.title`). + // Adopt the first prompt as the entity's display title so the + // sidebar shows something meaningful instead of a random slug. + // Only set if no title is already present. const existingTitle = ctx.tags.title if (typeof existingTitle !== `string` || existingTitle.length === 0) { void ctx.setTag(`title`, prompt.slice(0, 80)) } - ctx.db.actions.sessionMeta_update({ + ctx.db.actions.runStatus_update({ key: `current`, - updater: (d: SessionMetaRow) => { + updater: (d: RunStatusRow) => { d.status = `running` d.currentPromptInboxKey = inboxMsg.key delete d.error }, }) - // Record the run as a `runs` collection event so observers - // waking on `runFinished` are notified when the turn ends. - // Without this the parent (e.g. Horton via spawn_coder) would - // never be woken because the coder bypasses useAgent. + // Record the run so observers waking on `runFinished` are + // notified. Without this the parent (e.g. Horton via + // spawn_coder) would never be woken because the coder bypasses + // useAgent. const recordedRun = ctx.recordRun() - // Snapshot the existing event keys so we can identify which - // events are appended during this run and surface their - // assistant text as the run's response payload. + // Snapshot existing event keys so we can later pick out the + // assistant_message rows produced by *this* run for + // attachResponse. const eventKeysBefore = new Set( ( - ctx.db.collections.events.toArray as unknown as Array<{ - key: string - }> + resource.transcript + .toArray as unknown as Array ).map((e) => e.key) ) try { - const mirrorCtx: LiveMirrorCtx = { - events: { - get: (k) => ctx.db.collections.events.get(k), - }, - actions: { - events_insert: ctx.db.actions.events_insert, - }, - } - const cliResult = await runner.run({ - agent: runningMeta.agent, - ...(runningMeta.nativeSessionId - ? { sessionId: runningMeta.nativeSessionId } + agent: currentInfo.agent, + ...(currentInfo.nativeSessionId + ? { sessionId: currentInfo.nativeSessionId } : {}), - cwd: runningMeta.cwd, + cwd: currentInfo.cwd, prompt, - onEvent: (ev) => appendIfNew(mirrorCtx, ev), + onEvent: (ev) => appendIfNew(resource, ev), onSessionId: (id) => { - if (runningMeta.nativeSessionId === id) return - ctx.db.actions.sessionMeta_update({ - key: `current`, - updater: (d: SessionMetaRow) => { - d.nativeSessionId = id - }, + if (currentInfo.nativeSessionId === id) return + resource.sessionInfo.update(`current`, (d) => { + d.nativeSessionId = id }) - runningMeta = { ...runningMeta, nativeSessionId: id } + currentInfo = { ...currentInfo, nativeSessionId: id } }, }) if (cliResult.exitCode !== 0) { throw new Error( - `[coding-session] ${runningMeta.agent} runner exited ${cliResult.exitCode}. stderr=${cliResult.stderr.slice(0, 800) || ``} stdout=${cliResult.stdout.slice(0, 800) || ``}` + `[coding-session] ${currentInfo.agent} runner exited ${cliResult.exitCode}. stderr=${cliResult.stderr.slice(0, 800) || ``} stdout=${cliResult.stdout.slice(0, 800) || ``}` ) } - ctx.db.actions.cursorState_update({ + ctx.db.actions.inboxCursor_update({ key: `current`, - updater: (d: CursorStateRow) => { - // Cursor is now just a "have we seeded?" marker — set to - // any non-empty string after the first successful run. - if (!d.cursor) d.cursor = `sdk-stream` + updater: (d: InboxCursorRow) => { d.lastProcessedInboxKey = inboxMsg.key }, }) - runningCursor = { - ...runningCursor, - cursor: runningCursor.cursor || `sdk-stream`, - lastProcessedInboxKey: inboxMsg.key, - } // Pipe assistant_message text from this run into recordedRun // so the runFinished wake's `includeResponse` payload carries // the coder's reply. - for (const row of ctx.db.collections.events - .toArray as unknown as Array<{ - key: string - type: string - payload: { text?: unknown } - }>) { + for (const row of resource.transcript + .toArray as unknown as Array) { if (eventKeysBefore.has(row.key)) continue if (row.type !== `assistant_message`) continue - const text = row.payload?.text + const text = (row.payload as { text?: unknown }).text if (typeof text === `string` && text.length > 0) { recordedRun.attachResponse(text) } @@ -451,33 +398,29 @@ export function registerCodingSession( } catch (e) { const message = e instanceof Error ? e.message : String(e) recordedRun.end({ status: `failed`, finishReason: `error` }) - ctx.db.actions.sessionMeta_update({ + ctx.db.actions.runStatus_update({ key: `current`, - updater: (d: SessionMetaRow) => { + updater: (d: RunStatusRow) => { d.status = `error` d.error = message }, }) - ctx.db.actions.cursorState_update({ + ctx.db.actions.inboxCursor_update({ key: `current`, - updater: (d: CursorStateRow) => { + updater: (d: InboxCursorRow) => { d.lastProcessedInboxKey = inboxMsg.key }, }) // Re-throw so the agent-runtime entity bridge surfaces the // failure to observers (Horton wakes on `runFinished` with - // status=failed, the UI flips the badge to error). The - // failed prompt's inbox key was advanced above, so on the - // next wake the for-loop resumes from the *next* queued - // prompt — remaining inbox messages aren't dropped, just - // deferred until the framework re-wakes us. + // status=failed, the UI flips the badge to error). throw e } } - ctx.db.actions.sessionMeta_update({ + ctx.db.actions.runStatus_update({ key: `current`, - updater: (d: SessionMetaRow) => { + updater: (d: RunStatusRow) => { d.status = `idle` delete d.currentPromptInboxKey delete d.error diff --git a/packages/agents/test/coding-session-resource.test.ts b/packages/agents/test/coding-session-resource.test.ts new file mode 100644 index 0000000000..154679f704 --- /dev/null +++ b/packages/agents/test/coding-session-resource.test.ts @@ -0,0 +1,243 @@ +import { describe, expect, it, vi } from 'vitest' +import { + codingSessionResourceId, + codingSessionResourceSchema, + CODER_RESOURCE_TAG, + CODING_SESSION_RESOURCE_TRANSCRIPT_TYPE, + CODING_SESSION_RESOURCE_INFO_TYPE, + createEntityRegistry, +} from '@electric-ax/agents-runtime' +import { registerCodingSession } from '../src/agents/coding-session' +import type { NormalizedEvent } from 'agent-session-protocol' + +describe(`codingSessionResourceId`, () => { + it(`returns a stable, namespaced id`, () => { + expect(codingSessionResourceId(`abc`)).toBe(`coder-session/abc`) + expect(codingSessionResourceId(`zzz123`)).toBe(`coder-session/zzz123`) + }) +}) + +describe(`codingSessionResourceSchema`, () => { + it(`declares sessionInfo + transcript with primaryKey "key"`, () => { + expect(codingSessionResourceSchema.sessionInfo.primaryKey).toBe(`key`) + expect(codingSessionResourceSchema.sessionInfo.type).toBe( + CODING_SESSION_RESOURCE_INFO_TYPE + ) + expect(codingSessionResourceSchema.transcript.primaryKey).toBe(`key`) + expect(codingSessionResourceSchema.transcript.type).toBe( + CODING_SESSION_RESOURCE_TRANSCRIPT_TYPE + ) + }) + + it(`exposes the tag key consumers should follow`, () => { + expect(CODER_RESOURCE_TAG).toBe(`coderResource`) + }) +}) + +/** + * Integration check: two coder entities pointed at the same resource + * id see each other's appends. Models the forking / attach scenario: + * the durable history is on the resource, not on either entity, so a + * second wrapper can join an existing session and read its history. + * + * The runtime mocked here is the same fake used in the main + * coding-session test — the key trick is sharing a single + * `resourceStores` map between the two ctxs so an `insert` on one + * shows up via `get` on the other. + */ +describe(`shared resource attach`, () => { + it(`appends from entity A are visible to entity B sharing the same resource id`, async () => { + const sharedResourceStores: Record< + string, + { + sessionInfo: Map> + transcript: Map> + } + > = {} + + const buildCtx = ( + entityUrl: string, + runner: { run: ReturnType } + ) => { + const entityState = { + runStatus: new Map>(), + inboxCursor: new Map>(), + } + entityState.runStatus.set(`current`, { + key: `current`, + status: `idle`, + }) + entityState.inboxCursor.set(`current`, { key: `current` }) + + const ensureResource = (id: string) => { + if (!sharedResourceStores[id]) { + sharedResourceStores[id] = { + sessionInfo: new Map(), + transcript: new Map(), + } + } + return sharedResourceStores[id]! + } + + const buildHandle = (id: string) => { + const store = ensureResource(id) + const proxy = (m: Map>) => ({ + insert: (row: Record) => { + m.set(String(row.key), { ...row }) + }, + update: ( + key: string, + updater: (d: Record) => void + ) => { + const existing = m.get(key) + if (existing) updater(existing) + }, + get: (k: string) => m.get(k), + delete: (k: string) => { + m.delete(k) + }, + get toArray() { + return Array.from(m.values()) + }, + }) + return { + id, + sessionInfo: proxy(store.sessionInfo), + transcript: proxy(store.transcript), + } + } + + const tags: Record = {} + const ctx = { + firstWake: false, + args: { agent: `claude` as const, nativeSessionId: `existing-uuid` }, + entityUrl, + tags, + setTag: (k: string, v: string) => { + tags[k] = v + return Promise.resolve() + }, + db: { + actions: { + runStatus_insert: ({ row }: { row: Record }) => { + entityState.runStatus.set(String(row.key), { ...row }) + }, + runStatus_update: ({ + key, + updater, + }: { + key: string + updater: (d: Record) => void + }) => { + const e = entityState.runStatus.get(key) + if (e) updater(e) + }, + inboxCursor_insert: ({ row }: { row: Record }) => { + entityState.inboxCursor.set(String(row.key), { ...row }) + }, + inboxCursor_update: ({ + key, + updater, + }: { + key: string + updater: (d: Record) => void + }) => { + const e = entityState.inboxCursor.get(key) + if (e) updater(e) + }, + }, + collections: { + runStatus: { get: (k: string) => entityState.runStatus.get(k) }, + inboxCursor: { get: (k: string) => entityState.inboxCursor.get(k) }, + inbox: { + toArray: [ + { + key: `m-001`, + from: `u`, + timestamp: `2026-04-23T00:00:00Z`, + payload: { text: `hi` }, + }, + ] as Array, + }, + runs: { toArray: [] as Array<{ key: string }> }, + }, + }, + mkdb: (id: string) => buildHandle(id), + observe: vi.fn(async (source: { sourceRef: string }) => + buildHandle(source.sourceRef) + ), + recordRun: () => ({ + key: `run-0`, + end: () => {}, + attachResponse: () => {}, + }), + } + // Pre-seed sessionInfo on the shared resource so the + // initial-mirror branch is a no-op (events.length === 0 only + // triggers loadSession when the resource is genuinely empty). + // We tee up sessionInfo in advance for both entities to share. + const id = `coder-session/shared-1` + ensureResource(id).sessionInfo.set(`current`, { + key: `current`, + agent: `claude`, + cwd: `/tmp/x`, + electricSessionId: `shared-1`, + nativeSessionId: `existing-uuid`, + createdAt: 0, + }) + // Pre-seed an event so initial mirror is skipped. + ensureResource(id).transcript.set(`seed`, { + key: `seed`, + ts: 0, + type: `seed`, + payload: {}, + }) + return { ctx, runner } + } + + // Entity A's runner emits an event tagged "from-a" + const runnerA = { + run: vi.fn( + async (callArgs: { onEvent?: (ev: NormalizedEvent) => void }) => { + callArgs.onEvent?.({ + v: 1, + ts: 1, + type: `assistant_message`, + text: `from-a`, + }) + return { exitCode: 0, stdout: ``, stderr: `` } + } + ), + } + // Both wrappers point at the same entityUrl slug so they resolve + // the same resource id (`coder-session/shared-1`). + const a = buildCtx(`/coder/shared-1`, runnerA) + + const registry = createEntityRegistry() + registerCodingSession(registry, { + defaultWorkingDirectory: `/tmp/x`, + cliRunner: runnerA, + }) + const def = registry.get(`coder`)! + + await def.definition.handler( + a.ctx as unknown as Parameters[0], + { type: `message_received` } as unknown as Parameters< + typeof def.definition.handler + >[1] + ) + + // Entity A appended `from-a` into the resource. Entity B (running + // a different runner) attached to the same resource id and reads + // it back via observe(). We don't run the second handler — we + // just check the resource state directly to verify the share. + const transcript = + sharedResourceStores[`coder-session/shared-1`]!.transcript + const types = Array.from(transcript.values()).map((e) => e.type) + expect(types).toContain(`assistant_message`) + const assistant = Array.from(transcript.values()).find( + (e) => e.type === `assistant_message` + )! + expect((assistant.payload as { text?: string }).text).toBe(`from-a`) + }) +}) diff --git a/packages/agents/test/coding-session.test.ts b/packages/agents/test/coding-session.test.ts index b479acb45a..359053cdf0 100644 --- a/packages/agents/test/coding-session.test.ts +++ b/packages/agents/test/coding-session.test.ts @@ -1,8 +1,22 @@ import { describe, expect, it, vi } from 'vitest' -import { createEntityRegistry } from '@electric-ax/agents-runtime' +import { + createEntityRegistry, + codingSessionResourceId, +} from '@electric-ax/agents-runtime' import { registerCodingSession } from '../src/agents/coding-session' import type { NormalizedEvent } from 'agent-session-protocol' +/** + * Build a fake HandlerContext rich enough to drive the coder entity + * end-to-end without a real runtime. The fake provides: + * - entity-local state collections (`runStatus`, `inboxCursor`) + * - a fake `mkdb` + `observe` pair that returns a SharedStateHandle + * wired to in-memory Maps (one per resource id) + * - a `setTag` capture so tests can assert on the resource pointer + * + * Tests exercise this via `state.entity.runStatus` / + * `state.resource.events` / `state.tags.coderResource` etc. + */ function makeFakeCtx(opts: { firstWake: boolean args: Record @@ -15,31 +29,44 @@ function makeFakeCtx(opts: { message_type?: string }> existing?: { - sessionMeta?: Record - cursorState?: Record + runStatus?: Record + inboxCursor?: Record + sessionInfo?: Record + transcript?: Array> } }) { - const state: Record>> = { - sessionMeta: new Map(), - cursorState: new Map(), - events: new Map(), + const entityState: Record>> = { + runStatus: new Map(), + inboxCursor: new Map(), } - if (opts.existing?.sessionMeta) { - state.sessionMeta!.set(`current`, { ...opts.existing.sessionMeta }) + if (opts.existing?.runStatus) { + entityState.runStatus!.set(`current`, { ...opts.existing.runStatus }) } - if (opts.existing?.cursorState) { - state.cursorState!.set(`current`, { ...opts.existing.cursorState }) + if (opts.existing?.inboxCursor) { + entityState.inboxCursor!.set(`current`, { ...opts.existing.inboxCursor }) + } + + const resourceStores: Record< + string, + Record>> + > = {} + const ensureResource = (id: string) => { + if (!resourceStores[id]) { + resourceStores[id] = { + sessionInfo: new Map(), + transcript: new Map(), + } + } + return resourceStores[id]! } const inbox = opts.inbox ?? [] - const calls: Array<{ action: string; args: unknown }> = [] + const tags: Record = {} - const makeActions = () => { + const makeEntityActions = () => { const mk = (name: string) => ({ insert: ({ row }: { row: Record }) => { - calls.push({ action: `${name}_insert`, args: { row } }) - const key = String(row.key) - state[name]!.set(key, { ...row }) + entityState[name]!.set(String(row.key), { ...row }) }, update: ({ key, @@ -48,54 +75,93 @@ function makeFakeCtx(opts: { key: string updater: (d: Record) => void }) => { - const existing = state[name]!.get(key) + const existing = entityState[name]!.get(key) if (!existing) return updater(existing) - calls.push({ action: `${name}_update`, args: { key } }) }, }) - const sm = mk(`sessionMeta`) - const cs = mk(`cursorState`) - const ev = mk(`events`) + const rs = mk(`runStatus`) + const ic = mk(`inboxCursor`) return { - sessionMeta_insert: sm.insert, - sessionMeta_update: sm.update, - cursorState_insert: cs.insert, - cursorState_update: cs.update, - events_insert: ev.insert, + runStatus_insert: rs.insert, + runStatus_update: rs.update, + inboxCursor_insert: ic.insert, + inboxCursor_update: ic.update, + } + } + + const buildCollectionProxy = (map: Map>) => ({ + insert: (row: Record) => { + map.set(String(row.key), { ...row }) + return undefined + }, + update: (key: string, updater: (d: Record) => void) => { + const existing = map.get(key) + if (!existing) return undefined + updater(existing) + return undefined + }, + get: (key: string) => map.get(key), + delete: (key: string) => { + map.delete(key) + return undefined + }, + get toArray() { + return Array.from(map.values()) + }, + }) + + const buildResourceHandle = (id: string) => { + const store = ensureResource(id) + return { + id, + sessionInfo: buildCollectionProxy(store.sessionInfo!), + transcript: buildCollectionProxy(store.transcript!), + } + } + + if (opts.existing?.sessionInfo) { + const id = codingSessionResourceId( + (opts.entityUrl ?? `/coder/test-1`).split(`/`).pop() ?? `` + ) + ensureResource(id).sessionInfo.set(`current`, { + ...opts.existing.sessionInfo, + }) + } + if (opts.existing?.transcript) { + const id = codingSessionResourceId( + (opts.entityUrl ?? `/coder/test-1`).split(`/`).pop() ?? `` + ) + const map = ensureResource(id).transcript + for (const ev of opts.existing.transcript) { + map.set(String(ev.key), { ...ev }) } } const ctx = { firstWake: opts.firstWake, args: opts.args, - entityUrl: opts.entityUrl ?? `/coding-session/test-1`, - // The handler reads `ctx.tags.title` and calls `ctx.setTag(...)` - // when adopting the first prompt as the entity's display title. - // Provide an empty tags map and a no-op setTag so neither call - // throws before the CLI runner is exercised. - tags: {} as Record, - setTag: () => Promise.resolve(), + entityUrl: opts.entityUrl ?? `/coder/test-1`, + tags, + setTag: (key: string, value: string) => { + tags[key] = value + return Promise.resolve() + }, db: { - actions: makeActions(), + actions: makeEntityActions(), collections: { - sessionMeta: { get: (k: string) => state.sessionMeta!.get(k) }, - cursorState: { get: (k: string) => state.cursorState!.get(k) }, - events: { - get: (k: string) => state.events!.get(k), - get toArray() { - return Array.from(state.events!.values()) - }, - }, + runStatus: { get: (k: string) => entityState.runStatus!.get(k) }, + inboxCursor: { get: (k: string) => entityState.inboxCursor!.get(k) }, inbox: { toArray: inbox }, // recordRun() reads `runs.toArray` to seed its counter; an // empty array is fine for tests that don't otherwise care. runs: { toArray: [] as Array<{ key: string }> }, }, }, - // The handler calls ctx.recordRun() around each CLI invocation; - // give the mock a no-op handle so it doesn't blow up before the - // CLI runner is exercised. + mkdb: (id: string) => buildResourceHandle(id), + observe: vi.fn(async (source: { sourceRef: string }) => { + return buildResourceHandle(source.sourceRef) + }), recordRun: () => ({ key: `run-0`, end: () => {}, @@ -103,30 +169,32 @@ function makeFakeCtx(opts: { }), } - return { ctx, state, calls } + return { ctx, entityState, resourceStores, tags } } describe(`registerCodingSession`, () => { - it(`registers the coding-session entity type`, () => { + it(`registers the coder entity type with runStatus + inboxCursor state`, () => { const registry = createEntityRegistry() registerCodingSession(registry) const def = registry.get(`coder`) expect(def).toBeDefined() expect(def!.definition.state).toBeDefined() - expect(def!.definition.state!.sessionMeta).toBeDefined() - expect(def!.definition.state!.cursorState).toBeDefined() - expect(def!.definition.state!.events).toBeDefined() + expect(def!.definition.state!.runStatus).toBeDefined() + expect(def!.definition.state!.inboxCursor).toBeDefined() + // sessionMeta / cursorState / events are gone — they live on the resource + expect(def!.definition.state!.sessionMeta).toBeUndefined() + expect(def!.definition.state!.transcript).toBeUndefined() }) - it(`seeds sessionMeta and cursorState on firstWake with no prompts`, async () => { + it(`creates a resource and tags the entity on first wake`, async () => { const registry = createEntityRegistry() registerCodingSession(registry, { defaultWorkingDirectory: `/tmp/x` }) const def = registry.get(`coder`)! - const { ctx, state } = makeFakeCtx({ + const { ctx, entityState, resourceStores, tags } = makeFakeCtx({ firstWake: true, args: { agent: `claude` }, - entityUrl: `/coding-session/my-task`, + entityUrl: `/coder/my-task`, }) await def.definition.handler( @@ -136,16 +204,21 @@ describe(`registerCodingSession`, () => { >[1] ) - const meta = state.sessionMeta!.get(`current`) - expect(meta).toMatchObject({ - electricSessionId: `my-task`, + const expectedResourceId = `coder-session/my-task` + expect(tags.coderResource).toBe(expectedResourceId) + + const resource = resourceStores[expectedResourceId]! + const sessionInfo = resource.sessionInfo!.get(`current`)! + expect(sessionInfo).toMatchObject({ agent: `claude`, cwd: `/tmp/x`, - status: `initializing`, + electricSessionId: `my-task`, }) - expect(meta!.nativeSessionId).toBeUndefined() - const cursor = state.cursorState!.get(`current`) - expect(cursor).toMatchObject({ cursor: `` }) + expect(sessionInfo.nativeSessionId).toBeUndefined() + + const runStatus = entityState.runStatus!.get(`current`)! + expect(runStatus.status).toBe(`initializing`) + expect(entityState.inboxCursor!.get(`current`)).toBeDefined() }) it(`starts as idle when attaching to an existing nativeSessionId`, async () => { @@ -153,38 +226,32 @@ describe(`registerCodingSession`, () => { registerCodingSession(registry, { defaultWorkingDirectory: `/tmp/x` }) const def = registry.get(`coder`)! - const { state } = makeFakeCtx({ - firstWake: true, - args: { agent: `codex`, nativeSessionId: `pre-existing-uuid` }, - }) - const { ctx } = makeFakeCtx({ + const { ctx, entityState, resourceStores } = makeFakeCtx({ firstWake: true, args: { agent: `codex`, nativeSessionId: `pre-existing-uuid` }, + entityUrl: `/coder/attached-1`, }) + // The attach path calls loadSession internally, which would touch + // the filesystem. The resource starts empty so `events.length === 0` + // would trigger the initial mirror — but since we're not actually + // configuring loadSession to succeed here, the catch handler logs + // an error on runStatus and continues. We just verify the seed. await def.definition.handler( ctx as unknown as Parameters[0], { type: `entity_created` } as unknown as Parameters< typeof def.definition.handler >[1] ) - void state // silence unused binding - // Re-read from the ctx's own state via its collection - const meta = (ctx.db.collections.sessionMeta.get(`current`) as - | Record - | undefined)! - expect(meta.agent).toBe(`codex`) - expect(meta.nativeSessionId).toBe(`pre-existing-uuid`) - expect(meta.status).toBe(`idle`) + const sessionInfo = + resourceStores[`coder-session/attached-1`]!.sessionInfo!.get(`current`)! + expect(sessionInfo.agent).toBe(`codex`) + expect(sessionInfo.nativeSessionId).toBe(`pre-existing-uuid`) + expect(entityState.runStatus!.get(`current`)!.status).toBe(`idle`) }) - it(`invokes the injected cliRunner for a queued prompt and mirrors normalized events`, async () => { - // Pre-populate the cursorState with a non-empty seeded marker so - // the initial-mirror path is skipped (no filesystem touch). The - // injected runner streams events and the orchestrator should - // append them to the events collection and complete cleanly. - + it(`runs a queued prompt and writes events into the resource`, async () => { const runner = { run: vi.fn( async (callArgs: { @@ -208,9 +275,10 @@ describe(`registerCodingSession`, () => { }) const def = registry.get(`coder`)! - const { ctx, state } = makeFakeCtx({ + const { ctx, entityState, resourceStores } = makeFakeCtx({ firstWake: false, args: { agent: `claude`, nativeSessionId: `existing-uuid` }, + entityUrl: `/coder/run-1`, inbox: [ { key: `m-001`, @@ -221,19 +289,27 @@ describe(`registerCodingSession`, () => { }, ], existing: { - sessionMeta: { + runStatus: { key: `current`, status: `idle` }, + inboxCursor: { key: `current` }, + sessionInfo: { key: `current`, - electricSessionId: `test-1`, - nativeSessionId: `existing-uuid`, agent: `claude`, cwd: `/tmp/x`, - status: `idle`, - }, - cursorState: { - key: `current`, - cursor: `sdk-stream`, - eventCounter: 0, + electricSessionId: `run-1`, + nativeSessionId: `existing-uuid`, + createdAt: 1714000000000, }, + // Pre-seed at least one transcript row so the initial-mirror + // branch is skipped (otherwise loadSession would be invoked + // against the filesystem). + transcript: [ + { + key: `0000000000000000_seed_aaaa`, + ts: 0, + type: `seed`, + payload: {}, + }, + ], }, }) @@ -244,29 +320,26 @@ describe(`registerCodingSession`, () => { >[1] ) - // Runner was invoked with the prompt expect(runner.run).toHaveBeenCalledTimes(1) const call = ( runner.run.mock.calls as unknown as Array> - )[0]![0] as { - agent: string - prompt: string - sessionId?: string - } + )[0]![0] as { agent: string; prompt: string; sessionId?: string } expect(call.agent).toBe(`claude`) expect(call.prompt).toBe(`say hi`) expect(call.sessionId).toBe(`existing-uuid`) - // Streamed event made it into the events collection - expect(state.events!.size).toBe(1) - const event = Array.from(state.events!.values())[0]! - expect(event.type).toBe(`assistant_message`) + // Streamed event landed in the resource transcript (not the entity) + const transcript = resourceStores[`coder-session/run-1`]!.transcript! + expect(transcript.size).toBe(2) // seed + assistant_message + const types = Array.from(transcript.values()).map((e) => e.type) + expect(types).toContain(`assistant_message`) - // Meta is back to idle and the inbox key is marked processed - const meta = state.sessionMeta!.get(`current`)! - expect(meta.status).toBe(`idle`) - const cursor = state.cursorState!.get(`current`)! - expect(cursor.lastProcessedInboxKey).toBe(`m-001`) + // Run state cleaned up + const runStatus = entityState.runStatus!.get(`current`)! + expect(runStatus.status).toBe(`idle`) + expect(entityState.inboxCursor!.get(`current`)!.lastProcessedInboxKey).toBe( + `m-001` + ) }) it(`accepts inbox messages without message_type (bare /send from generic UI)`, async () => { @@ -283,29 +356,34 @@ describe(`registerCodingSession`, () => { const { ctx } = makeFakeCtx({ firstWake: false, args: { agent: `claude`, nativeSessionId: `existing-uuid` }, + entityUrl: `/coder/bare-1`, inbox: [ { key: `m-001`, from: `user`, timestamp: `2026-04-23T00:00:00Z`, - // No message_type — mimics the existing UI MessageInput payload: { text: `hello` }, }, ], existing: { - sessionMeta: { + runStatus: { key: `current`, status: `idle` }, + inboxCursor: { key: `current` }, + sessionInfo: { key: `current`, - electricSessionId: `test-1`, - nativeSessionId: `existing-uuid`, agent: `claude`, cwd: `/tmp/x`, - status: `idle`, - }, - cursorState: { - key: `current`, - cursor: `sdk-stream`, - eventCounter: 0, + electricSessionId: `bare-1`, + nativeSessionId: `existing-uuid`, + createdAt: 1714000000000, }, + transcript: [ + { + key: `0000000000000000_seed_aaaa`, + ts: 0, + type: `seed`, + payload: {}, + }, + ], }, }) From 681eb6d58918f39f518e58bc69044fb3c7df54ef Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Mon, 4 May 2026 16:21:37 +0200 Subject: [PATCH 2/2] changeset: split coder into thin entity + coding-session resource Co-Authored-By: Claude Opus 4.7 (1M context) --- .changeset/coder-session-resource.md | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 .changeset/coder-session-resource.md diff --git a/.changeset/coder-session-resource.md b/.changeset/coder-session-resource.md new file mode 100644 index 0000000000..8bb3b1ce0e --- /dev/null +++ b/.changeset/coder-session-resource.md @@ -0,0 +1,18 @@ +--- +'@electric-ax/agents': minor +'@electric-ax/agents-runtime': minor +'@electric-ax/agents-server-ui': minor +--- + +feat(coder): split the coder entity into a thin wrapper + a coding-session resource + +The coder entity used to own the session's full history on its own collections (`sessionMeta`, `cursorState`, `events`), which coupled the durable session state to a single entity instance. With this change the history (`transcript` + `sessionInfo`) lives on a standalone shared-state resource at a stable id (`coder-session/`), and the wrapper entity tracks only its own run lifecycle (`runStatus`, `inboxCursor`). + +Why: this is the prerequisite for forking a session, attaching multiple wrappers to the same history, sharing a coder URL across devices/users, and surfacing the same session through specialised viewers — all without entangling those use cases with the SDK runner that produces events. + +Visible API additions on `@electric-ax/agents-runtime`: + +- `codingSessionResourceSchema`, `codingSessionResourceId(entityId)`, `CODER_RESOURCE_TAG` — the resource schema + id helpers. +- `CodingSessionInfoRow`, `CodingSessionTranscriptRow`, `CodingSessionResourceSchema` — row + schema types. + +Removed (clean break, pre-1.0): `CODING_SESSION_META_COLLECTION_TYPE`, `CODING_SESSION_CURSOR_COLLECTION_TYPE`, `CODING_SESSION_EVENT_COLLECTION_TYPE`. Coders created by older versions are not migrated; new coders use the new layout.