From f8705b5dceba85d83ae7729f520461e6901c3478 Mon Sep 17 00:00:00 2001 From: Evgeny Shurakov Date: Wed, 10 Jun 2026 10:44:08 +0200 Subject: [PATCH] fix(cloud-agent-next): recover starved sessions safely --- services/cloud-agent-next/Dockerfile | 9 +- services/cloud-agent-next/Dockerfile.dev | 9 +- services/cloud-agent-next/Dockerfile.dind | 10 +- .../cloudflare-agent-sandbox.test.ts | 109 ++++--- .../cloudflare/cloudflare-agent-sandbox.ts | 8 +- .../src/agent-sandbox/protocol.ts | 2 + .../src/kilo/devcontainer.test.ts | 19 +- .../cloud-agent-next/src/kilo/devcontainer.ts | 6 +- .../src/kilo/wrapper-client.test.ts | 14 +- .../src/kilo/wrapper-client.ts | 39 ++- .../src/kilo/wrapper-manager.test.ts | 191 +++++++++++- .../src/kilo/wrapper-manager.ts | 187 +++++++++++- .../src/session/agent-runtime.test.ts | 7 + .../src/session/agent-runtime.ts | 9 +- .../src/session/pending-messages.test.ts | 52 ++++ .../src/session/pending-messages.ts | 4 + .../src/session/session-message-queue.test.ts | 63 ++++ .../src/session/session-message-queue.ts | 64 ++-- .../session/pending-messages.test.ts | 110 +++++++ .../wrapper/src/connection.ts | 11 +- .../src/running-bash-event-coalescer.test.ts | 284 ++++++++++++++++++ .../src/running-bash-event-coalescer.ts | 156 ++++++++++ 22 files changed, 1241 insertions(+), 122 deletions(-) create mode 100644 services/cloud-agent-next/wrapper/src/running-bash-event-coalescer.test.ts create mode 100644 services/cloud-agent-next/wrapper/src/running-bash-event-coalescer.ts diff --git a/services/cloud-agent-next/Dockerfile b/services/cloud-agent-next/Dockerfile index de88c0c63c..b6a207fa10 100644 --- a/services/cloud-agent-next/Dockerfile +++ b/services/cloud-agent-next/Dockerfile @@ -54,8 +54,13 @@ RUN apt-get update && apt-get install -y --no-install-recommends locales && \ ENV LC_ALL=en_US.UTF-8 ENV LANG=en_US.UTF-8 -# Install dependencies globally (accessible to all users) -RUN npm install -g pnpm @kilocode/cli@${KILOCODE_CLI_VERSION} +# Install dependencies globally (accessible to all users). Keep the wrapper at +# normal priority while Kilo and its descendants inherit nice 10. +RUN npm install -g pnpm @kilocode/cli@${KILOCODE_CLI_VERSION} && \ + kilo_path="$(command -v kilo)" && \ + mv "$kilo_path" "${kilo_path}-real" && \ + printf '#!/bin/sh\nexec nice -n 10 "%s" "$@"\n' "${kilo_path}-real" > "$kilo_path" && \ + chmod +x "$kilo_path" # === Build wrapper bundle inside container === # This ensures the wrapper is built with the same Bun version that will run it, diff --git a/services/cloud-agent-next/Dockerfile.dev b/services/cloud-agent-next/Dockerfile.dev index 667a1fad0a..1503e77c1f 100644 --- a/services/cloud-agent-next/Dockerfile.dev +++ b/services/cloud-agent-next/Dockerfile.dev @@ -47,8 +47,13 @@ RUN GLAB_VERSION="1.93.0" \ && dpkg -i /tmp/glab.deb \ && rm /tmp/glab.deb -# Install pnpm and kilocode -RUN npm install -g pnpm @kilocode/cli@${KILOCODE_CLI_VERSION} +# Install pnpm and Kilo. Keep the wrapper at normal priority while Kilo and +# its descendants inherit nice 10. +RUN npm install -g pnpm @kilocode/cli@${KILOCODE_CLI_VERSION} && \ + kilo_path="$(command -v kilo)" && \ + mv "$kilo_path" "${kilo_path}-real" && \ + printf '#!/bin/sh\nexec nice -n 10 "%s" "$@"\n' "${kilo_path}-real" > "$kilo_path" && \ + chmod +x "$kilo_path" # Copy to install pre-built kilo binary (built via ./cloud-agent-build.sh) #COPY kilo /usr/local/bin/kilo diff --git a/services/cloud-agent-next/Dockerfile.dind b/services/cloud-agent-next/Dockerfile.dind index 2ac53953cc..fefe5142b3 100644 --- a/services/cloud-agent-next/Dockerfile.dind +++ b/services/cloud-agent-next/Dockerfile.dind @@ -55,7 +55,11 @@ RUN GLAB_VERSION="1.93.0" \ # Tools used by the outer sandbox. Kilo itself is still installed globally for # the existing wrapper path; the platform package bundle under /opt/kilo-agent # is intended for mounting or copying into inner dev containers. -RUN npm install -g bun pnpm @devcontainers/cli @kilocode/cli@${KILOCODE_CLI_VERSION} +RUN npm install -g bun pnpm @devcontainers/cli @kilocode/cli@${KILOCODE_CLI_VERSION} \ + && kilo_path="$(command -v kilo)" \ + && mv "$kilo_path" "${kilo_path}-real" \ + && printf '#!/bin/sh\nexec nice -n 10 "%s" "$@"\n' "${kilo_path}-real" > "$kilo_path" \ + && chmod +x "$kilo_path" RUN mkdir -p /opt/kilo-agent/bin \ /opt/kilo-agent/cli-linux-x64 \ @@ -90,8 +94,8 @@ else fi case "$arch:$libc" in - x86_64:glibc) exec "$root/cli-linux-x64/bin/kilo" "$@" ;; - x86_64:musl) exec "$root/cli-linux-x64-musl/bin/kilo" "$@" ;; + x86_64:glibc) exec nice -n 10 "$root/cli-linux-x64/bin/kilo" "$@" ;; + x86_64:musl) exec nice -n 10 "$root/cli-linux-x64-musl/bin/kilo" "$@" ;; *) echo "Unsupported devcontainer platform: $arch/$libc" >&2; exit 1 ;; esac EOF diff --git a/services/cloud-agent-next/src/agent-sandbox/cloudflare/cloudflare-agent-sandbox.test.ts b/services/cloud-agent-next/src/agent-sandbox/cloudflare/cloudflare-agent-sandbox.test.ts index b456484588..6db96dee0f 100644 --- a/services/cloud-agent-next/src/agent-sandbox/cloudflare/cloudflare-agent-sandbox.test.ts +++ b/services/cloud-agent-next/src/agent-sandbox/cloudflare/cloudflare-agent-sandbox.test.ts @@ -438,71 +438,73 @@ describe('CloudflareAgentSandbox', () => { }); }); - it('does not require Docker discovery for standard sandboxes', async () => { - const exec = vi.fn().mockRejectedValue(new Error('docker unavailable')); + it('inspects owned runtimes without requiring Docker for standard sandboxes', async () => { + const exec = vi.fn().mockResolvedValue({ exitCode: 0, stdout: '' }); const sandbox = new CloudflareAgentSandbox({} as Env, metadata(), { resolveSandbox: () => ({ listProcesses: vi.fn().mockResolvedValue([]), exec }) as unknown as SandboxInstance, }); await expect(sandbox.discoverSessionWrappers()).resolves.toEqual({ status: 'absent' }); - expect(exec).not.toHaveBeenCalled(); - }); - - it('requires container discovery for a DIND sandbox even before resolved devcontainer metadata exists', async () => { - const unresolvedDindMetadata = { - ...metadata(), - workspace: { sandboxId: 'dind-unresolved' }, - } satisfies SessionMetadata; - const sandbox = new CloudflareAgentSandbox({} as Env, unresolvedDindMetadata, { - resolveSandbox: () => - ({ - listProcesses: vi.fn().mockResolvedValue([]), - exec: vi.fn().mockRejectedValue(new Error('docker inspection unavailable')), - }) as unknown as SandboxInstance, - }); - - await expect(sandbox.discoverSessionWrappers()).resolves.toMatchObject({ - status: 'inspection-failed', - error: expect.stringContaining('docker inspection unavailable'), - }); + expect(exec).toHaveBeenCalledOnce(); + expect(exec).toHaveBeenCalledWith(expect.stringContaining('/proc/[0-9]*/environ'), undefined); }); - it('stops remaining session wrappers before confirming an instance target is absent', async () => { - const stopObservedWrappers = vi.fn().mockResolvedValue(undefined); + it('force stops a targeted wrapper that remains after graceful termination and confirms absence', async () => { const listProcesses = vi .fn() .mockResolvedValueOnce([ { - id: 'wrapper-legacy', - command: 'WRAPPER_PORT=5001 kilocode-wrapper --agent-session agent_cloudflare', + id: 'wrapper-target', + command: + 'WRAPPER_PORT=5000 kilocode-wrapper --agent-session agent_cloudflare --wrapper-instance-id instance_1 --wrapper-instance-generation 2', + status: 'running', + }, + ]) + .mockResolvedValueOnce([ + { + id: 'wrapper-target', + command: + 'WRAPPER_PORT=5000 kilocode-wrapper --agent-session agent_cloudflare --wrapper-instance-id instance_1 --wrapper-instance-generation 2', status: 'running', }, ]) .mockResolvedValueOnce([]); + let runtimeInspectionCount = 0; + const exec = vi.fn().mockImplementation((command: string) => { + if (command.includes('/proc/[0-9]*/environ')) { + runtimeInspectionCount += 1; + return Promise.resolve({ + exitCode: 0, + stdout: + runtimeInspectionCount === 1 + ? '812\tagent_cloudflare:instance_1:2\t1\tinstance_1\t2\t/tmp/kilocode-wrapper-agent_cloudflare-1.log\t.kilo serve\n' + : '', + }); + } + return Promise.resolve({ exitCode: 0, stdout: '' }); + }); const sandbox = new CloudflareAgentSandbox({} as Env, metadata(), { - resolveSandbox: () => ({ listProcesses }) as unknown as SandboxInstance, - stopObservedWrappers, - stopObservationDelaysMs: [0], + resolveSandbox: () => ({ listProcesses, exec }) as unknown as SandboxInstance, sleep: vi.fn().mockResolvedValue(undefined), + stopObservationDelaysMs: [0], }); await expect( sandbox.stopWrappers({ - target: { - kind: 'instance', - instance: { instanceId: 'instance_gone', instanceGeneration: 1 }, - }, - attemptId: 'attempt_residual', - reason: 'session-delete', + target: { kind: 'instance', instance: { instanceId: 'instance_1', instanceGeneration: 2 } }, + attemptId: 'attempt_1', + reason: 'readiness-failed', }) - ).resolves.toEqual({ status: 'absent' }); - expect(stopObservedWrappers).toHaveBeenCalledWith(expect.anything(), 'agent_cloudflare', [ - { representation: 'process', id: 'wrapper-legacy', port: 5001 }, - ]); + ).resolves.toEqual({ status: 'absent', stoppedInstanceIds: ['instance_1'] }); + expect(exec).toHaveBeenCalledWith(expect.stringContaining('pkill -f --')); + expect(exec).toHaveBeenCalledWith(expect.stringContaining('pkill -9 -f --')); + expect(exec).toHaveBeenCalledWith(expect.stringContaining('--agent-session agent_cloudflare')); }); - it('force stops a targeted wrapper that remains after graceful termination and confirms absence', async () => { + it('does not confirm absence while an owned runtime survives wrapper cleanup', async () => { + const ownedRuntimeOutput = + '812\tagent_cloudflare:instance_1:2\t1\tinstance_1\t2\t/tmp/kilocode-wrapper-agent_cloudflare-1.log\t.kilo serve --hostname 127.0.0.1\n'; const listProcesses = vi .fn() .mockResolvedValueOnce([ @@ -513,16 +515,13 @@ describe('CloudflareAgentSandbox', () => { status: 'running', }, ]) - .mockResolvedValueOnce([ - { - id: 'wrapper-target', - command: - 'WRAPPER_PORT=5000 kilocode-wrapper --agent-session agent_cloudflare --wrapper-instance-id instance_1 --wrapper-instance-generation 2', - status: 'running', - }, - ]) - .mockResolvedValueOnce([]); - const exec = vi.fn().mockResolvedValue({ exitCode: 0, stdout: '' }); + .mockResolvedValue([]); + const exec = vi.fn().mockImplementation((command: string) => { + if (command.startsWith('sh -c')) { + return Promise.resolve({ exitCode: 0, stdout: ownedRuntimeOutput }); + } + return Promise.resolve({ exitCode: 0, stdout: '' }); + }); const sandbox = new CloudflareAgentSandbox({} as Env, metadata(), { resolveSandbox: () => ({ listProcesses, exec }) as unknown as SandboxInstance, sleep: vi.fn().mockResolvedValue(undefined), @@ -532,13 +531,13 @@ describe('CloudflareAgentSandbox', () => { await expect( sandbox.stopWrappers({ target: { kind: 'instance', instance: { instanceId: 'instance_1', instanceGeneration: 2 } }, - attemptId: 'attempt_1', + attemptId: 'attempt_runtime_survives', reason: 'readiness-failed', }) - ).resolves.toEqual({ status: 'absent', stoppedInstanceIds: ['instance_1'] }); - expect(exec).toHaveBeenCalledWith(expect.stringContaining('pkill -f --')); - expect(exec).toHaveBeenCalledWith(expect.stringContaining('pkill -9 -f --')); - expect(exec).toHaveBeenCalledWith(expect.stringContaining('--agent-session agent_cloudflare')); + ).resolves.toMatchObject({ + status: 'still-present', + observed: [expect.objectContaining({ id: '812', processKind: 'runtime' })], + }); }); it('returns still-present when targeted forceful cleanup remains observable', async () => { diff --git a/services/cloud-agent-next/src/agent-sandbox/cloudflare/cloudflare-agent-sandbox.ts b/services/cloud-agent-next/src/agent-sandbox/cloudflare/cloudflare-agent-sandbox.ts index 80ade47f2b..8bacf38218 100644 --- a/services/cloud-agent-next/src/agent-sandbox/cloudflare/cloudflare-agent-sandbox.ts +++ b/services/cloud-agent-next/src/agent-sandbox/cloudflare/cloudflare-agent-sandbox.ts @@ -294,9 +294,11 @@ export class CloudflareAgentSandbox implements AgentSandbox { const final = await this.observeTarget(request.target); if (final.status === 'inspection-failed') return final; if (final.status === 'present') return { status: 'still-present', observed: final.observed }; - const stoppedInstanceIds = initial.observed.flatMap(observed => - observed.instanceId ? [observed.instanceId] : [] - ); + const stoppedInstanceIds = [ + ...new Set( + initial.observed.flatMap(observed => (observed.instanceId ? [observed.instanceId] : [])) + ), + ]; return stoppedInstanceIds.length > 0 ? { status: 'absent', stoppedInstanceIds } : final; } diff --git a/services/cloud-agent-next/src/agent-sandbox/protocol.ts b/services/cloud-agent-next/src/agent-sandbox/protocol.ts index e8f874c221..f1ff481afc 100644 --- a/services/cloud-agent-next/src/agent-sandbox/protocol.ts +++ b/services/cloud-agent-next/src/agent-sandbox/protocol.ts @@ -16,6 +16,8 @@ export type WrapperInstanceLease = { export type ObservedWrapper = { representation: 'process' | 'container'; id: string; + containerId?: string; + processKind?: 'wrapper' | 'runtime'; port?: number; instanceId?: string; instanceGeneration?: number; diff --git a/services/cloud-agent-next/src/kilo/devcontainer.test.ts b/services/cloud-agent-next/src/kilo/devcontainer.test.ts index ce08fa00b4..921026e04e 100644 --- a/services/cloud-agent-next/src/kilo/devcontainer.test.ts +++ b/services/cloud-agent-next/src/kilo/devcontainer.test.ts @@ -90,6 +90,18 @@ describe('sandbox image versions', () => { expect(wranglerConfig.split(imageVar)).toHaveLength(7); expect(DEFAULT_SLASH_COMMANDS_SOURCE).toBe(`kilo@${KILO_CLI_VERSION}`); }); + + it('runs image-installed Kilo through a nice 10 shim without lowering wrapper priority', () => { + for (const dockerfileName of ['Dockerfile', 'Dockerfile.dev', 'Dockerfile.dind']) { + const dockerfile = readFileSync( + fileURLToPath(new URL(`../../${dockerfileName}`, import.meta.url).href), + 'utf8' + ); + expect(dockerfile).toContain('exec nice -n 10'); + expect(dockerfile).not.toContain('nice -n 10 bun'); + expect(dockerfile).not.toContain('nice -n 10 kilocode-wrapper'); + } + }); }); describe('detectDevContainer', () => { @@ -243,7 +255,12 @@ describe('bringUpDevContainer', () => { expect(commands.some(cmd => cmd.includes('@kilocode/cli@7.2.52'))).toBe(true); expect(commands.some(cmd => cmd.includes('set -euo pipefail'))).toBe(true); expect(commands.some(cmd => cmd.includes('/usr/local/bin/bun'))).toBe(true); - expect(commands.some(cmd => cmd.includes('/usr/local/bin/kilo'))).toBe(true); + expect(bootstrapCall?.[0]).toContain('chmod +x "$kilo_path"'); + expect(bootstrapCall?.[0]).toContain('kilo_path="$(command -v kilo)"'); + expect(bootstrapCall?.[0]).toContain('rm -f "${kilo_path}-real"'); + expect(bootstrapCall?.[0]).toContain('mv "$kilo_path" "${kilo_path}-real"'); + expect(bootstrapCall?.[0]).toContain('exec nice -n 10 "%s-real" "$@"'); + expect(bootstrapCall?.[0]).not.toContain('ln -sf "$(command -v kilo)"'); expect(bootstrapCall?.[1]).toEqual({ env: { DOCKER_HOST: 'unix:///var/run/docker.sock' }, timeout: 10 * 60 * 1000, diff --git a/services/cloud-agent-next/src/kilo/devcontainer.ts b/services/cloud-agent-next/src/kilo/devcontainer.ts index 52b4cef7bd..9894c1b7de 100644 --- a/services/cloud-agent-next/src/kilo/devcontainer.ts +++ b/services/cloud-agent-next/src/kilo/devcontainer.ts @@ -280,7 +280,11 @@ async function bootstrapDevContainerRuntimeTools( `curl -fsSL https://bun.sh/install | bash -s "bun-v${DEVCONTAINER_RUNTIME_BUN_VERSION}"`, 'ln -sf "$HOME/.bun/bin/bun" /usr/local/bin/bun', `npm install -g ${shellQuote(`@kilocode/cli@${opts.kiloCliVersion}`)}`, - 'ln -sf "$(command -v kilo)" /usr/local/bin/kilo', + 'kilo_path="$(command -v kilo)"', + 'rm -f "${kilo_path}-real"', + 'mv "$kilo_path" "${kilo_path}-real"', + `printf ${shellQuote('#!/bin/sh\nexec nice -n 10 "%s-real" "$@"\n')} "$kilo_path" > "$kilo_path"`, + 'chmod +x "$kilo_path"', ].join(' && '); const command = buildDevContainerRuntimeExecCommand(opts, installCommand, 'bash -lc'); const result = await session.exec(command, { diff --git a/services/cloud-agent-next/src/kilo/wrapper-client.test.ts b/services/cloud-agent-next/src/kilo/wrapper-client.test.ts index d809dd46a9..9980ad6230 100644 --- a/services/cloud-agent-next/src/kilo/wrapper-client.test.ts +++ b/services/cloud-agent-next/src/kilo/wrapper-client.test.ts @@ -1053,6 +1053,13 @@ describe('WrapperClient', () => { const command = (session.startProcess as ReturnType).mock.calls[0][0] as string; expect(command).toContain("WRAPPER_INSTANCE_ID='instance_test'"); expect(command).toContain('WRAPPER_INSTANCE_GENERATION=6'); + expect(command).toContain("KILO_RUNTIME_OWNER='test-session:instance_test:6'"); + const options = (session.startProcess as ReturnType).mock.calls[0][1] as { + env?: Record; + }; + expect(options.env).toEqual( + expect.objectContaining({ KILO_RUNTIME_OWNER: 'test-session:instance_test:6' }) + ); expect(command).not.toContain('--wrapper-instance-id'); expect(command).not.toContain('--wrapper-instance-generation'); }); @@ -1501,7 +1508,7 @@ describe('WrapperClient', () => { expect(session.startProcess).not.toHaveBeenCalled(); }); - it('reuses an env-tagged legacy wrapper whose health does not report its lease', async () => { + it('reuses an env-tagged legacy wrapper with a matching owned runtime', async () => { const session = createMockSession(createSuccessResponse(healthResponseData)); const sandbox = createMockSandbox({ port: 5555, healthy: true }); (sandbox.listProcesses as ReturnType).mockResolvedValue([ @@ -1512,6 +1519,11 @@ describe('WrapperClient', () => { status: 'running', }, ]); + (sandbox.exec as ReturnType).mockResolvedValue({ + exitCode: 0, + stdout: + '812\ttest-session:instance_current:2\t1\tinstance_current\t2\t/tmp/kilocode-wrapper-test-session-1.log\t.kilo serve --hostname 127.0.0.1\n', + }); await expect( WrapperClient.ensureWrapper(sandbox, session, { diff --git a/services/cloud-agent-next/src/kilo/wrapper-client.ts b/services/cloud-agent-next/src/kilo/wrapper-client.ts index 2b9208528b..835678d3ae 100644 --- a/services/cloud-agent-next/src/kilo/wrapper-client.ts +++ b/services/cloud-agent-next/src/kilo/wrapper-client.ts @@ -14,6 +14,7 @@ import { findWrapperForSession, findWrapperForSessionInProcesses, getWrapperSessionMarker, + serializeRuntimeOwner, } from './wrapper-manager.js'; import { randomPort } from './ports.js'; import { @@ -246,17 +247,19 @@ async function observationMatchesLease( const observation = await discoverSessionWrappers(sandbox, agentSessionId, { inspectContainers: options.inspectContainers, }); - if (observation.status !== 'present' || observation.observed.length !== 1) return false; - const wrapper = observation.observed[0]; - if (!wrapper) return false; + if (observation.status !== 'present') return false; + const wrappers = observation.observed.filter(observed => observed.processKind !== 'runtime'); + const wrapper = wrappers[0]; + if (wrappers.length !== 1 || !wrapper) return false; if (options.expectedContainerId !== undefined) { if (wrapper.representation !== 'container' || wrapper.id !== options.expectedContainerId) { return false; } } - return ( - wrapper.instanceId === leasedInstance.instanceId && - wrapper.instanceGeneration === leasedInstance.instanceGeneration + return observation.observed.every( + observed => + observed.instanceId === leasedInstance.instanceId && + observed.instanceGeneration === leasedInstance.instanceGeneration ); } @@ -577,16 +580,27 @@ export class WrapperClient { // When running inside a dev container, the wrapper sees the *inner* // workspace path (set by `devcontainer up`'s remoteWorkspaceFolder). const innerWorkspacePath = devcontainer?.innerWorkspaceFolder ?? workspacePath; + const runtimeLease = leasedInstance + ? { + instance: leasedInstance, + owner: serializeRuntimeOwner( + agentSessionId, + leasedInstance.instanceId, + leasedInstance.instanceGeneration + ), + } + : undefined; const wrapperEnv: Record = { WRAPPER_PORT: String(this.port), WORKSPACE_PATH: innerWorkspacePath, WRAPPER_LOG_PATH: wrapperLogPath, KILO_SESSION_RETRY_LIMIT: '5', KILO_CLOUD_AGENT: '1', - ...(leasedInstance + ...(runtimeLease ? { - WRAPPER_INSTANCE_ID: leasedInstance.instanceId, - WRAPPER_INSTANCE_GENERATION: String(leasedInstance.instanceGeneration), + WRAPPER_INSTANCE_ID: runtimeLease.instance.instanceId, + WRAPPER_INSTANCE_GENERATION: String(runtimeLease.instance.instanceGeneration), + KILO_RUNTIME_OWNER: runtimeLease.owner, } : {}), }; @@ -597,10 +611,11 @@ export class WrapperClient { `KILO_SESSION_RETRY_LIMIT=5`, `KILO_CLOUD_AGENT=1`, // Environment markers let pre-lease wrapper bundles launch during a rolling deploy. - ...(leasedInstance + ...(runtimeLease ? [ - `WRAPPER_INSTANCE_ID=${shellQuote(leasedInstance.instanceId)}`, - `WRAPPER_INSTANCE_GENERATION=${leasedInstance.instanceGeneration}`, + `WRAPPER_INSTANCE_ID=${shellQuote(runtimeLease.instance.instanceId)}`, + `WRAPPER_INSTANCE_GENERATION=${runtimeLease.instance.instanceGeneration}`, + `KILO_RUNTIME_OWNER=${shellQuote(runtimeLease.owner)}`, ] : []), ...dockerEnvParts, diff --git a/services/cloud-agent-next/src/kilo/wrapper-manager.test.ts b/services/cloud-agent-next/src/kilo/wrapper-manager.test.ts index 65c39ee619..552972994b 100644 --- a/services/cloud-agent-next/src/kilo/wrapper-manager.test.ts +++ b/services/cloud-agent-next/src/kilo/wrapper-manager.test.ts @@ -265,7 +265,7 @@ describe('discoverSessionWrappers', () => { status: 'running', }, ]), - exec: vi.fn(), + exec: vi.fn().mockResolvedValue({ exitCode: 0, stdout: '' }), }; await expect( @@ -284,6 +284,86 @@ describe('discoverSessionWrappers', () => { }); }); + it('discovers an explicitly owned orphan runtime after its wrapper exits', async () => { + const sandbox = { + listProcesses: vi.fn().mockResolvedValue([]), + exec: vi.fn().mockResolvedValue({ + exitCode: 0, + stdout: + '811\tagent_xyz:instance_orphan:9\t1\tinstance_orphan\t9\t/tmp/kilocode-wrapper-agent_xyz-1.log\tkilocode-wrapper --agent-session agent_xyz\n' + + '812\tagent_xyz:instance_orphan:9\t1\tinstance_orphan\t9\t/tmp/kilocode-wrapper-agent_xyz-1.log\t.kilo serve --hostname 127.0.0.1\n', + }), + }; + + await expect( + discoverSessionWrappers(sandbox as never, 'agent_xyz', { inspectContainers: false }) + ).resolves.toEqual({ + status: 'present', + observed: [ + { + representation: 'process', + id: '812', + processKind: 'runtime', + instanceId: 'instance_orphan', + instanceGeneration: 9, + }, + ], + }); + }); + + it('discovers and safely signals a pre-owner-marker orphan runtime', async () => { + const sandbox = { + listProcesses: vi.fn().mockResolvedValue([]), + exec: vi.fn().mockResolvedValue({ + exitCode: 0, + stdout: + '812\t\t1\tinstance_legacy\t4\t/tmp/kilocode-wrapper-agent_xyz-123.log\t.kilo serve\n' + + '813\t\t1\tinstance_other\t7\t/tmp/kilocode-wrapper-agent_other-123.log\t.kilo serve\n', + }), + }; + + const discovery = await discoverSessionWrappers(sandbox as never, 'agent_xyz', { + inspectContainers: false, + }); + + expect(discovery).toEqual({ + status: 'present', + observed: [ + { + representation: 'process', + id: '812', + processKind: 'runtime', + instanceId: 'instance_legacy', + instanceGeneration: 4, + }, + ], + }); + if (discovery.status !== 'present') expect.fail('Expected inherited runtime discovery'); + + await stopObservedWrappers(sandbox as never, 'agent_xyz', discovery.observed); + + const signalCommand = sandbox.exec.mock.calls.find(([command]) => + String(command).includes('kill -TERM') + )?.[0] as string; + expect(signalCommand).toContain('/proc/$pid/environ'); + expect(signalCommand).toContain('KILO_CLOUD_AGENT'); + expect(signalCommand).toContain('WRAPPER_INSTANCE_ID'); + expect(signalCommand).toContain('WRAPPER_INSTANCE_GENERATION'); + expect(signalCommand).toContain('/tmp/kilocode-wrapper-agent_xyz-'); + expect(signalCommand).toContain('kill -TERM "$pid"'); + }); + + it('fails inspection rather than reporting absence when owned-runtime inspection fails', async () => { + const sandbox = { + listProcesses: vi.fn().mockResolvedValue([]), + exec: vi.fn().mockResolvedValue({ exitCode: 1, stderr: 'proc unavailable' }), + }; + + await expect( + discoverSessionWrappers(sandbox as never, 'agent_xyz', { inspectContainers: false }) + ).resolves.toEqual({ status: 'inspection-failed', error: 'proc unavailable' }); + }); + it('observes backward-compatible environment physical instance markers', async () => { const sandbox = { listProcesses: vi.fn().mockResolvedValue([ @@ -294,7 +374,7 @@ describe('discoverSessionWrappers', () => { status: 'running', }, ]), - exec: vi.fn(), + exec: vi.fn().mockResolvedValue({ exitCode: 0, stdout: '' }), }; await expect( @@ -313,6 +393,43 @@ describe('discoverSessionWrappers', () => { }); }); + it('discovers an owned runtime inside a persistent devcontainer without another session runtime', async () => { + const sandbox = { + listProcesses: vi.fn().mockResolvedValue([]), + exec: vi.fn().mockImplementation((command: string) => { + if (command.startsWith('sh -c')) return Promise.resolve({ exitCode: 0, stdout: '' }); + if (command.includes('docker ps')) { + return Promise.resolve({ + exitCode: 0, + stdout: 'cont-id\t0.0.0.0:5050->5050/tcp\tkilo.agentSession=agent_xyz\n', + }); + } + if (command.includes('ps -eo')) return Promise.resolve({ exitCode: 0, stdout: '' }); + return Promise.resolve({ + exitCode: 0, + stdout: + '51\tagent_xyz:instance_target:3\t1\tinstance_target\t3\t/tmp/kilocode-wrapper-agent_xyz-1.log\t.kilo serve\n52\tagent_other:instance_other:7\t1\tinstance_other\t7\t/tmp/kilocode-wrapper-agent_other-1.log\t.kilo serve\n', + }); + }), + }; + + await expect( + discoverSessionWrappers(sandbox as never, 'agent_xyz', { dockerEnv: {} }) + ).resolves.toEqual({ + status: 'present', + observed: [ + { + representation: 'container', + id: '51', + containerId: 'cont-id', + processKind: 'runtime', + instanceId: 'instance_target', + instanceGeneration: 3, + }, + ], + }); + }); + it('reports inspection failure instead of absence when requested Docker inspection fails', async () => { const sandbox = { listProcesses: vi.fn().mockResolvedValue([]), @@ -327,16 +444,20 @@ describe('discoverSessionWrappers', () => { }); }); - it('does not require Docker inspection for a standard-sandbox lifecycle query', async () => { + it('inspects owned runtimes without requiring Docker for a standard sandbox', async () => { const sandbox = { listProcesses: vi.fn().mockResolvedValue([]), - exec: vi.fn().mockRejectedValue(new Error('docker unavailable')), + exec: vi.fn().mockResolvedValue({ exitCode: 0, stdout: '' }), }; await expect( discoverSessionWrappers(sandbox as never, 'agent_xyz', { inspectContainers: false }) ).resolves.toEqual({ status: 'absent' }); - expect(sandbox.exec).not.toHaveBeenCalled(); + expect(sandbox.exec).toHaveBeenCalledOnce(); + expect(sandbox.exec).toHaveBeenCalledWith( + expect.stringContaining('/proc/[0-9]*/environ'), + undefined + ); }); it('observes a direct physical wrapper even when it has no HTTP port yet', async () => { @@ -349,7 +470,7 @@ describe('discoverSessionWrappers', () => { status: 'starting', }, ]), - exec: vi.fn(), + exec: vi.fn().mockResolvedValue({ exitCode: 0, stdout: '' }), }; await expect( @@ -384,6 +505,64 @@ describe('discoverSessionWrappers', () => { expect(sandbox.exec).not.toHaveBeenCalledWith(expect.stringContaining('WRAPPER_INSTANCE_ID')); }); + it('terminates exact owned runtime PIDs before the direct wrapper marker', async () => { + const sandbox = { exec: vi.fn().mockResolvedValue({ exitCode: 0 }) }; + + await stopObservedWrappers(sandbox as never, 'agent_xyz', [ + { + representation: 'process', + id: '812', + processKind: 'runtime', + instanceId: 'instance_direct', + instanceGeneration: 4, + }, + { + representation: 'process', + id: '811', + processKind: 'wrapper', + instanceId: 'instance_direct', + instanceGeneration: 4, + }, + ]); + + const runtimeSignalCommand = sandbox.exec.mock.calls[0]?.[0] as string; + expect(runtimeSignalCommand).toContain('/proc/$pid/environ'); + expect(runtimeSignalCommand).toContain('agent_xyz:instance_direct:4'); + expect(runtimeSignalCommand).toContain('kill -TERM "$pid"'); + expect(runtimeSignalCommand).not.toContain('kill -TERM --'); + expect(sandbox.exec).toHaveBeenNthCalledWith( + 2, + expect.stringContaining('--agent-session agent_xyz') + ); + }); + + it('revalidates devcontainer runtime ownership before signaling its exact PID', async () => { + const sandbox = { + exec: vi + .fn() + .mockResolvedValueOnce({ exitCode: 0, stdout: '/run/user/1000/docker.sock' }) + .mockResolvedValueOnce({ exitCode: 0, stdout: '' }), + }; + + await stopObservedWrappers(sandbox as never, 'agent_xyz', [ + { + representation: 'container', + containerId: 'persistent-container', + id: '52', + processKind: 'runtime', + instanceId: 'instance_container', + instanceGeneration: 7, + }, + ]); + + const command = sandbox.exec.mock.calls[1]?.[0] as string; + expect(command).toContain('docker exec'); + expect(command).toContain('/proc/$pid/environ'); + expect(command).toContain('agent_xyz:instance_container:7'); + expect(command).toContain('kill -TERM "$pid"'); + expect(command).not.toContain('kill -TERM --'); + }); + it('force stops a leased devcontainer wrapper process without destroying its persistent container', async () => { const sandbox = { exec: vi diff --git a/services/cloud-agent-next/src/kilo/wrapper-manager.ts b/services/cloud-agent-next/src/kilo/wrapper-manager.ts index 9f88be9190..812de65b9b 100644 --- a/services/cloud-agent-next/src/kilo/wrapper-manager.ts +++ b/services/cloud-agent-next/src/kilo/wrapper-manager.ts @@ -29,6 +29,9 @@ const KILO_WRAPPER_INSTANCE_FLAG = '--wrapper-instance-id'; const KILO_WRAPPER_INSTANCE_GENERATION_FLAG = '--wrapper-instance-generation'; const KILO_WRAPPER_INSTANCE_ENV = 'WRAPPER_INSTANCE_ID='; const KILO_WRAPPER_INSTANCE_GENERATION_ENV = 'WRAPPER_INSTANCE_GENERATION='; +const KILO_RUNTIME_OWNER_ENV = 'KILO_RUNTIME_OWNER'; +const KILO_CLOUD_AGENT_ENV = 'KILO_CLOUD_AGENT'; +const KILO_WRAPPER_LOG_PATH_ENV = 'WRAPPER_LOG_PATH'; /** * Information about a running wrapper. @@ -107,6 +110,111 @@ export function extractWrapperInstanceGenerationFromCommand(command: string): nu return Number.isInteger(generation) && generation >= 0 ? generation : null; } +export function serializeRuntimeOwner( + sessionId: string, + instanceId: string, + instanceGeneration: number +): string { + if (sessionId.includes(':') || instanceId.includes(':')) { + throw new Error('Runtime owner identity cannot contain a colon'); + } + if (!Number.isInteger(instanceGeneration) || instanceGeneration < 0) { + throw new Error('Runtime owner generation must be a non-negative integer'); + } + return `${sessionId}:${instanceId}:${instanceGeneration}`; +} + +function parseRuntimeOwner( + value: string +): { sessionId: string; instanceId: string; instanceGeneration: number } | null { + const firstSeparator = value.indexOf(':'); + const lastSeparator = value.lastIndexOf(':'); + if (firstSeparator <= 0 || lastSeparator <= firstSeparator) return null; + const sessionId = value.slice(0, firstSeparator); + const instanceId = value.slice(firstSeparator + 1, lastSeparator); + const instanceGeneration = Number.parseInt(value.slice(lastSeparator + 1), 10); + if (!instanceId || !Number.isInteger(instanceGeneration) || instanceGeneration < 0) return null; + return { sessionId, instanceId, instanceGeneration }; +} + +function ownedRuntimeInspectionCommand(): string { + const parseEnvironment = [ + 'BEGIN { OFS="\\t" }', + `$1 == "${KILO_RUNTIME_OWNER_ENV}" { owner = substr($0, index($0, "=") + 1) }`, + `$1 == "${KILO_CLOUD_AGENT_ENV}" { cloudAgent = substr($0, index($0, "=") + 1) }`, + `$1 == "${KILO_WRAPPER_INSTANCE_ENV.slice(0, -1)}" { instanceId = substr($0, index($0, "=") + 1) }`, + `$1 == "${KILO_WRAPPER_INSTANCE_GENERATION_ENV.slice(0, -1)}" { generation = substr($0, index($0, "=") + 1) }`, + `$1 == "${KILO_WRAPPER_LOG_PATH_ENV}" { logPath = substr($0, index($0, "=") + 1) }`, + 'END { if (owner != "" || cloudAgent == "1") print owner, cloudAgent, instanceId, generation, logPath }', + ].join(' '); + return `for e in /proc/[0-9]*/environ; do [ -r "$e" ] || continue; values=$(tr '\\000' '\\n' < "$e" | awk -F= ${shellQuote(parseEnvironment)}); [ -n "$values" ] || continue; pid=\${e#/proc/}; pid=\${pid%/environ}; printf '%s\\t%s\\t' "$pid" "$values"; tr '\\000' ' ' < "/proc/$pid/cmdline"; printf '\\n'; done`; +} + +async function inspectOwnedRuntimes( + executor: DockerExecutor, + sessionId: string, + options?: { containerId?: string; dockerEnv?: Record } +): Promise { + const script = ownedRuntimeInspectionCommand(); + const command = options?.containerId + ? `docker exec ${shellQuote(options.containerId)} sh -c ${shellQuote(script)}` + : `sh -c ${shellQuote(script)}`; + let result: { exitCode: number; stdout?: string; stderr?: string }; + try { + result = await executor.exec( + command, + options?.containerId ? { env: options.dockerEnv ?? {} } : undefined + ); + } catch (error) { + return { + status: 'inspection-failed', + error: error instanceof Error ? error.message : String(error), + }; + } + if (result.exitCode !== 0) { + return { + status: 'inspection-failed', + error: result.stderr?.trim() || `runtime inspection exited with code ${result.exitCode}`, + }; + } + const observed: ObservedWrapper[] = []; + const legacyLogPrefix = `/tmp/kilocode-wrapper-${sessionId}-`; + for (const line of (result.stdout ?? '').split('\n')) { + const [pid, owner, cloudAgent, legacyInstanceId, legacyGeneration, logPath, command] = + line.split('\t'); + if (!pid || command?.includes('kilocode-wrapper')) continue; + const explicitOwner = owner ? parseRuntimeOwner(owner) : null; + const legacyGenerationNumber = Number(legacyGeneration); + const legacyOwner = + !owner && + cloudAgent === '1' && + legacyInstanceId && + legacyGeneration !== undefined && + legacyGeneration !== '' && + Number.isInteger(legacyGenerationNumber) && + legacyGenerationNumber >= 0 && + logPath?.startsWith(legacyLogPrefix) && + logPath.endsWith('.log') + ? { + sessionId, + instanceId: legacyInstanceId, + instanceGeneration: legacyGenerationNumber, + } + : null; + const parsed = explicitOwner ?? legacyOwner; + if (!parsed || parsed.sessionId !== sessionId) continue; + observed.push({ + representation: options?.containerId ? 'container' : 'process', + id: pid, + ...(options?.containerId ? { containerId: options.containerId } : {}), + processKind: 'runtime', + instanceId: parsed.instanceId, + instanceGeneration: parsed.instanceGeneration, + }); + } + return observed.length === 0 ? { status: 'absent' } : { status: 'present', observed }; +} + /** * Find a wrapper for the given session in a pre-fetched process list. * Useful when the caller already has the process list (e.g. to avoid @@ -334,6 +442,11 @@ export async function discoverSessionWrappers( }); } + const directRuntimeInspection = await inspectOwnedRuntimes(sandbox, sessionId); + if (directRuntimeInspection.status === 'inspection-failed') return directRuntimeInspection; + if (directRuntimeInspection.status === 'present') + observed.push(...directRuntimeInspection.observed); + if (options?.inspectContainers === false) { return observed.length === 0 ? { status: 'absent' } : { status: 'present', observed }; } @@ -406,6 +519,12 @@ export async function discoverSessionWrappers( ...(instanceGeneration !== undefined ? { instanceGeneration } : {}), }); } + const runtimeInspection = await inspectOwnedRuntimes(sandbox, sessionId, { + containerId: container.containerId, + dockerEnv, + }); + if (runtimeInspection.status === 'inspection-failed') return runtimeInspection; + if (runtimeInspection.status === 'present') observed.push(...runtimeInspection.observed); } return observed.length === 0 ? { status: 'absent' } : { status: 'present', observed }; } @@ -463,23 +582,81 @@ export function getWrapperSessionMarker(sessionId: string): string { return `${KILO_WRAPPER_SESSION_FLAG} ${sessionId}`; } +function ownedRuntimeSignalScript( + runtime: ObservedWrapper, + sessionId: string, + signal: 'TERM' | 'KILL' +): string { + if (!runtime.instanceId || runtime.instanceGeneration === undefined) { + throw new Error('Owned runtime is missing its physical instance identity'); + } + const expectedOwner = serializeRuntimeOwner( + sessionId, + runtime.instanceId, + runtime.instanceGeneration + ); + const legacyLogPrefix = `/tmp/kilocode-wrapper-${sessionId}-`; + return `pid=${shellQuote(runtime.id)} +expected=${shellQuote(expectedOwner)} +expected_instance=${shellQuote(runtime.instanceId)} +expected_generation=${shellQuote(String(runtime.instanceGeneration))} +expected_log_prefix=${shellQuote(legacyLogPrefix)} +environ="/proc/$pid/environ" +[ -r "$environ" ] || exit 0 +owner=$(tr '\\000' '\\n' < "$environ" | sed -n 's/^${KILO_RUNTIME_OWNER_ENV}=//p') +if [ "$owner" != "$expected" ]; then + cloud_agent=$(tr '\\000' '\\n' < "$environ" | sed -n 's/^${KILO_CLOUD_AGENT_ENV}=//p') + instance_id=$(tr '\\000' '\\n' < "$environ" | sed -n 's/^${KILO_WRAPPER_INSTANCE_ENV}//p') + generation=$(tr '\\000' '\\n' < "$environ" | sed -n 's/^${KILO_WRAPPER_INSTANCE_GENERATION_ENV}//p') + log_path=$(tr '\\000' '\\n' < "$environ" | sed -n 's/^${KILO_WRAPPER_LOG_PATH_ENV}=//p') + [ "$cloud_agent" = "1" ] || exit 0 + [ "$instance_id" = "$expected_instance" ] || exit 0 + [ "$generation" = "$expected_generation" ] || exit 0 + case "$log_path" in "$expected_log_prefix"*.log) ;; *) exit 0 ;; esac +fi +kill -${signal} "$pid"`; +} + export async function stopObservedWrappers( sandbox: SandboxInstance, sessionId: string, observed: ObservedWrapper[], options?: { force?: boolean; devcontainer?: { workspacePath: string; configPath?: string } } ): Promise { - const dockerRows = observed.filter(wrapper => wrapper.representation === 'container'); - const processRows = observed.filter(wrapper => wrapper.representation === 'process'); + const directRuntimes = observed.filter( + wrapper => wrapper.representation === 'process' && wrapper.processKind === 'runtime' + ); + const directWrappers = observed.filter( + wrapper => wrapper.representation === 'process' && wrapper.processKind !== 'runtime' + ); + const containerRuntimes = observed.filter( + wrapper => wrapper.representation === 'container' && wrapper.processKind === 'runtime' + ); + const containerWrappers = observed.filter( + wrapper => wrapper.representation === 'container' && wrapper.processKind !== 'runtime' + ); + const signal = options?.force ? 'KILL' : 'TERM'; + for (const runtime of directRuntimes) { + const script = ownedRuntimeSignalScript(runtime, sessionId, signal); + await sandbox.exec(`sh -c ${shellQuote(script)}`); + } const sessionMarker = getWrapperSessionMarker(sessionId); - if (processRows.length > 0) { + if (directWrappers.length > 0) { await sandbox.exec( `${options?.force ? 'pkill -9' : 'pkill'} -f -- ${shellQuote(sessionMarker)}` ); } - if (dockerRows.length > 0) { + if (containerRuntimes.length > 0 || containerWrappers.length > 0) { const dockerEnv = dockerSocketEnv(await resolveDockerSocketPath(sandbox)); - for (const wrapper of dockerRows) { + for (const runtime of containerRuntimes) { + const containerId = runtime.containerId; + if (!containerId) throw new Error('Owned container runtime is missing its container id'); + const script = ownedRuntimeSignalScript(runtime, sessionId, signal); + await sandbox.exec(`docker exec ${shellQuote(containerId)} sh -c ${shellQuote(script)}`, { + env: dockerEnv, + }); + } + for (const wrapper of containerWrappers) { const pkill = options?.force ? 'pkill -9' : 'pkill'; await sandbox.exec( `docker exec ${shellQuote(wrapper.id)} sh -c ${shellQuote( diff --git a/services/cloud-agent-next/src/session/agent-runtime.test.ts b/services/cloud-agent-next/src/session/agent-runtime.test.ts index c4bff219b3..0cd5d46892 100644 --- a/services/cloud-agent-next/src/session/agent-runtime.test.ts +++ b/services/cloud-agent-next/src/session/agent-runtime.test.ts @@ -546,6 +546,13 @@ describe('AgentRuntime', () => { instanceId: 'instance_warm', instanceGeneration: 1, }, + { + representation: 'process', + id: 'kilo-warm', + processKind: 'runtime', + instanceId: 'instance_warm', + instanceGeneration: 1, + }, ], }), } as unknown as AgentSandbox; diff --git a/services/cloud-agent-next/src/session/agent-runtime.ts b/services/cloud-agent-next/src/session/agent-runtime.ts index 06e45bfa5d..7f0291e3a4 100644 --- a/services/cloud-agent-next/src/session/agent-runtime.ts +++ b/services/cloud-agent-next/src/session/agent-runtime.ts @@ -175,9 +175,12 @@ export function createAgentRuntime(dependencies: AgentRuntimeDependencies): Agen } const matchingObserved = observation.status === 'present' && - observation.observed.length === 1 && - observation.observed[0].instanceId === current.instance.instanceId && - observation.observed[0].instanceGeneration === current.instance.instanceGeneration; + observation.observed.filter(observed => observed.processKind !== 'runtime').length === 1 && + observation.observed.every( + observed => + observed.instanceId === current.instance.instanceId && + observed.instanceGeneration === current.instance.instanceGeneration + ); if (matchingObserved) { if (current.keepWarmUntil !== undefined) { const startupDeadlineAt = Date.now() + WRAPPER_STARTUP_TIMEOUT_MS; diff --git a/services/cloud-agent-next/src/session/pending-messages.test.ts b/services/cloud-agent-next/src/session/pending-messages.test.ts index 6f698d3e79..e6d144d77c 100644 --- a/services/cloud-agent-next/src/session/pending-messages.test.ts +++ b/services/cloud-agent-next/src/session/pending-messages.test.ts @@ -407,6 +407,58 @@ describe('recordPendingFlushFailure', () => { expect(result.nextFlushAttemptAt).toBeUndefined(); }); + it('schedules terminalization repair before persisting an exhausted disposition', async () => { + const storage = createMemoryStorage(); + const message = makeMessage(); + await storePendingSessionMessage(storage, message); + let repairScheduled = false; + const originalPut = storage.put.bind(storage); + storage.put = async (key, value) => { + if ( + typeof value === 'object' && + value !== null && + 'deliveryDisposition' in value && + value.deliveryDisposition === 'terminalization-pending' + ) { + expect(repairScheduled).toBe(true); + } + await originalPut(key, value); + }; + + await recordPendingFlushFailure(storage, message, 'bad', 100_000, { + policy: 'warm-followup', + code: 'BAD_REQUEST', + scheduleTerminalizationRepair: async () => { + repairScheduled = true; + }, + }); + + expect(repairScheduled).toBe(true); + }); + + it('does not persist an exhausted disposition when repair scheduling fails', async () => { + const storage = createMemoryStorage(); + const message = makeMessage(); + await storePendingSessionMessage(storage, message); + + await expect( + recordPendingFlushFailure(storage, message, 'bad', 100_000, { + policy: 'warm-followup', + code: 'BAD_REQUEST', + scheduleTerminalizationRepair: async () => { + throw new Error('alarm unavailable'); + }, + }) + ).rejects.toThrow('alarm unavailable'); + + const [stored] = await listPendingSessionMessages(storage); + expect(stored).toMatchObject({ + messageId: message.messageId, + flushAttempts: undefined, + deliveryDisposition: undefined, + }); + }); + it('keeps exhausted messages in storage for caller terminalization', async () => { const storage = createMemoryStorage(); const message = makeMessage(); diff --git a/services/cloud-agent-next/src/session/pending-messages.ts b/services/cloud-agent-next/src/session/pending-messages.ts index f166908972..d2cda9583c 100644 --- a/services/cloud-agent-next/src/session/pending-messages.ts +++ b/services/cloud-agent-next/src/session/pending-messages.ts @@ -451,6 +451,7 @@ export async function recordPendingFlushFailure( | 'PENDING_QUEUE_FULL' | 'MODEL_MISSING' | 'UNKNOWN'; + scheduleTerminalizationRepair?: () => Promise; } ): Promise { if (options.code === undefined || options.code === 'UNKNOWN') { @@ -480,6 +481,9 @@ export async function recordPendingFlushFailure( lastFlushFailureCode: flushFailureCode, deliveryDisposition: exhausted ? 'terminalization-pending' : undefined, }; + if (exhausted) { + await options.scheduleTerminalizationRepair?.(); + } const entries = (await listPendingMessageEntries(storage)).filter( candidate => candidate.message.messageId === message.messageId ); diff --git a/services/cloud-agent-next/src/session/session-message-queue.test.ts b/services/cloud-agent-next/src/session/session-message-queue.test.ts index fe25127069..4da25b85f0 100644 --- a/services/cloud-agent-next/src/session/session-message-queue.test.ts +++ b/services/cloud-agent-next/src/session/session-message-queue.test.ts @@ -121,6 +121,7 @@ function createQueueHarness(options?: { storage?: SessionMessageQueueStorage; failQueuedEventOnce?: boolean; failAlarmOnce?: boolean; + failTerminalizationRepairAlarmOnce?: boolean; failTerminalizationOnce?: boolean; ensureAcceptedMessageEffects?: (messageId: string) => Promise; getDeliveryBlock?: () => Promise<{ retryAt: number } | null>; @@ -132,6 +133,7 @@ function createQueueHarness(options?: { const finalizedTerminalCallbacks: Array<{ allowWithoutObservedIdle?: boolean } | undefined> = []; let failQueuedEvent = options?.failQueuedEventOnce ?? false; let failAlarm = options?.failAlarmOnce ?? false; + let failTerminalizationRepairAlarm = options?.failTerminalizationRepairAlarmOnce ?? false; let failTerminalization = options?.failTerminalizationOnce ?? false; const terminalizations: Terminalization[] = []; const metadata = options?.metadata === undefined ? createMetadata() : options.metadata; @@ -190,6 +192,10 @@ function createQueueHarness(options?: { failAlarm = false; throw new Error('failed to schedule prompt drain'); } + if (failTerminalizationRepairAlarm && deliver.mock.calls.length > 0) { + failTerminalizationRepairAlarm = false; + throw new Error('failed to schedule terminalization repair'); + } alarmDeadlines.push(deadline); }, getSessionIdForLogs: () => metadata?.identity.sessionId, @@ -1224,6 +1230,63 @@ describe('SessionMessageQueue', () => { expect(drain.remainingPendingCount).toBe(0); }); + it('does not retry a terminal delivery failure when repair scheduling fails', async () => { + const harness = createQueueHarness({ + failTerminalizationRepairAlarmOnce: true, + deliver: async () => ({ success: false, code: 'BAD_REQUEST', error: 'invalid queued turn' }), + }); + await harness.queue.admitSubmittedMessage({ + userId: 'user_test' as UserId, + turn: { type: 'prompt', id: FIRST_MESSAGE_ID, prompt: 'do not retry invalid delivery' }, + }); + + await expect(harness.queue.drainNextPendingMessage()).rejects.toThrow( + 'failed to schedule terminalization repair' + ); + + const [pending] = await listPendingSessionMessages(harness.storage); + expect(pending).toMatchObject({ + messageId: FIRST_MESSAGE_ID, + flushAttempts: undefined, + nextFlushAttemptAt: undefined, + deliveryDisposition: undefined, + }); + expect(harness.deliver).toHaveBeenCalledOnce(); + expect(harness.terminalizations).toHaveLength(0); + }); + + it('repairs interrupted exhausted terminalization without redispatch', async () => { + const harness = createQueueHarness({ + failTerminalizationOnce: true, + deliver: async () => ({ success: false, code: 'BAD_REQUEST', error: 'invalid queued turn' }), + }); + await harness.queue.admitSubmittedMessage({ + userId: 'user_test' as UserId, + turn: { type: 'prompt', id: FIRST_MESSAGE_ID, prompt: 'repair terminalization' }, + }); + const alarmCountBeforeDrain = harness.alarmDeadlines.length; + + await expect(harness.queue.drainNextPendingMessage()).rejects.toThrow( + 'terminal transition failed' + ); + const [interruptedPending] = await listPendingSessionMessages(harness.storage); + + expect(interruptedPending).toMatchObject({ + messageId: FIRST_MESSAGE_ID, + deliveryDisposition: 'terminalization-pending', + nextFlushAttemptAt: undefined, + }); + expect(harness.alarmDeadlines.length).toBeGreaterThan(alarmCountBeforeDrain); + expect(harness.deliver).toHaveBeenCalledOnce(); + + const repaired = await harness.queue.drainNextPendingMessage(); + + expect(repaired).toEqual({ retryAt: undefined, remainingPendingCount: 0 }); + expect(harness.deliver).toHaveBeenCalledOnce(); + expect(harness.terminalizations).toHaveLength(1); + expect(await listPendingSessionMessages(harness.storage)).toHaveLength(0); + }); + it('hands exhausted queued delivery to settlement terminalization', async () => { const harness = createQueueHarness({ deliver: async () => ({ success: false, code: 'BAD_REQUEST', error: 'invalid queued turn' }), diff --git a/services/cloud-agent-next/src/session/session-message-queue.ts b/services/cloud-agent-next/src/session/session-message-queue.ts index 48d652fce1..62c3b4195a 100644 --- a/services/cloud-agent-next/src/session/session-message-queue.ts +++ b/services/cloud-agent-next/src/session/session-message-queue.ts @@ -289,6 +289,7 @@ export async function flushNextPendingSessionMessage(params: { repairQueuedMessageEffects?: (intent: SessionMessageIntent) => Promise; prepareQueuedMessageDelivery?: (intent: SessionMessageIntent) => Promise; ensureAcceptedMessageEffects?: (messageId: string) => Promise; + scheduleTerminalizationRepair?: () => Promise; }): Promise { const context = await params.getDeliveryContext(); const messages = await listPendingSessionMessages( @@ -353,7 +354,11 @@ export async function flushNextPendingSessionMessage(params: { message, 'Session metadata is not available', params.now, - { policy, code: 'NOT_FOUND' } + { + policy, + code: 'NOT_FOUND', + scheduleTerminalizationRepair: params.scheduleTerminalizationRepair, + } ); return toFailureResult(failure, totalCount); } @@ -372,7 +377,11 @@ export async function flushNextPendingSessionMessage(params: { message, 'Session is missing a valid model', params.now, - { policy, code: 'MODEL_MISSING' } + { + policy, + code: 'MODEL_MISSING', + scheduleTerminalizationRepair: params.scheduleTerminalizationRepair, + } ); return toFailureResult(failure, totalCount); } @@ -402,32 +411,15 @@ export async function flushNextPendingSessionMessage(params: { await params.prepareQueuedMessageDelivery?.(intent); + let startResult: MessageDeliveryResult; try { const plan = buildMessageDeliveryRequest( intent, context, params.validateModeAgainstRuntimeAgents ); - const startResult = await params.deliver(plan); - if (!startResult.success && startResult.code === 'WRAPPER_FINALIZING') { - return { type: 'held', remainingCount: totalCount }; - } - if (!startResult.success) { - const failure = await recordPendingFlushFailure( - params.storage, - message, - startResult.error, - Date.now(), - { policy, code: startResult.code } - ); - throw new PendingFlushRecordedError(failure); - } - await deletePendingSessionMessageByMessageId(params.storage, message.messageId); - return { type: 'delivered', remainingCount: totalCount - 1 }; + startResult = await params.deliver(plan); } catch (error) { - if (error instanceof PendingFlushRecordedError) { - return toFailureResult(error.failure, totalCount); - } if (error instanceof WrapperCleanupBlockedError) { return { type: 'skipped', @@ -446,17 +438,34 @@ export async function flushNextPendingSessionMessage(params: { message, error instanceof Error ? error.message : String(error), Date.now(), - { policy, code: code ?? 'UNKNOWN' } + { + policy, + code: code ?? 'UNKNOWN', + scheduleTerminalizationRepair: params.scheduleTerminalizationRepair, + } ); return toFailureResult(failure, totalCount); } -} -class PendingFlushRecordedError extends Error { - constructor(readonly failure: PendingFlushFailureResult) { - super(failure.message.lastFlushError ?? 'Pending flush failure recorded'); - this.name = 'PendingFlushRecordedError'; + if (!startResult.success && startResult.code === 'WRAPPER_FINALIZING') { + return { type: 'held', remainingCount: totalCount }; + } + if (!startResult.success) { + const failure = await recordPendingFlushFailure( + params.storage, + message, + startResult.error, + Date.now(), + { + policy, + code: startResult.code, + scheduleTerminalizationRepair: params.scheduleTerminalizationRepair, + } + ); + return toFailureResult(failure, totalCount); } + await deletePendingSessionMessageByMessageId(params.storage, message.messageId); + return { type: 'delivered', remainingCount: totalCount - 1 }; } function buildAdmissionAck( @@ -877,6 +886,7 @@ export function createSessionMessageQueue( repairQueuedMessageEffects: repairQueuedAdmissionEffects, prepareQueuedMessageDelivery, ensureAcceptedMessageEffects, + scheduleTerminalizationRepair: requestPendingDrain, }); if (flushResult.type === 'skipped') { diff --git a/services/cloud-agent-next/test/integration/session/pending-messages.test.ts b/services/cloud-agent-next/test/integration/session/pending-messages.test.ts index b244796887..57c03b1dc7 100644 --- a/services/cloud-agent-next/test/integration/session/pending-messages.test.ts +++ b/services/cloud-agent-next/test/integration/session/pending-messages.test.ts @@ -593,6 +593,116 @@ describe('pending session messages', () => { }); }); + it('repairs terminalization after interruption immediately following exhausted disposition persistence', async () => { + const userId = 'user_pending_terminalization_repair'; + const sessionId = 'agent_pending_terminalization_repair'; + const messageId = 'msg_018f1e2d3c4bTermRepairABCD'; + const stub = env.CLOUD_AGENT_SESSION.get( + env.CLOUD_AGENT_SESSION.idFromName(`${userId}:${sessionId}`) + ); + + const result = await runInDurableObject(stub, async instance => { + let deliveryAttempts = 0; + instance['executeDirectly'] = async () => { + deliveryAttempts += 1; + return { success: false, code: 'BAD_REQUEST', error: 'invalid queued turn' }; + }; + await registerReadySession(instance, { + sessionId, + userId, + kiloSessionId: '45454545-4545-4545-8545-454545454546', + prompt: 'prepared prompt', + mode: 'code', + model: 'test-model', + kilocodeToken: 'token-terminalization-repair', + }); + await storePendingSessionMessage( + instance.ctx.storage, + createMessage({ + messageId, + content: 'repair exhausted terminalization', + createdAt: 1, + flushAttempts: 1, + nextFlushAttemptAt: Date.now() - 1, + }) + ); + await putSessionMessageState(instance.ctx.storage, { + messageId, + status: 'queued', + prompt: 'repair exhausted terminalization', + createdAt: 1, + queuedAt: 1, + }); + const previousAlarm = Date.now() + 60_000; + await instance.ctx.storage.setAlarm(previousAlarm); + + const originalPut = instance.ctx.storage.put.bind(instance.ctx.storage); + let repairAlarmAtDisposition: number | null = null; + let interruptBeforeTerminalState = false; + instance.ctx.storage.put = async (key, value) => { + if ( + interruptBeforeTerminalState && + key === `session_message:${messageId}` && + typeof value === 'object' && + value !== null && + 'status' in value && + value.status === 'failed' + ) { + interruptBeforeTerminalState = false; + throw new Error('interrupted before terminal state'); + } + await originalPut(key, value); + if ( + typeof value === 'object' && + value !== null && + 'deliveryDisposition' in value && + value.deliveryDisposition === 'terminalization-pending' + ) { + repairAlarmAtDisposition = await instance.ctx.storage.getAlarm(); + interruptBeforeTerminalState = true; + } + }; + + await instance.alarm(); + const interruptedPending = await listPendingSessionMessages(instance.ctx.storage); + instance.ctx.storage.put = originalPut; + await instance.alarm(); + + const db = drizzle(instance.ctx.storage, { logger: false }); + const eventQueries = createEventQueries(db, instance.ctx.storage.sql); + const failedEvents = eventQueries.findByFilters({ eventTypes: ['cloud.message.failed'] }); + return { + deliveryAttempts, + previousAlarm, + repairAlarmAtDisposition, + interruptedPending, + finalPending: await listPendingSessionMessages(instance.ctx.storage), + message: await getSessionMessageState(instance.ctx.storage, messageId), + failedEventCount: failedEvents.filter( + event => JSON.parse(event.payload).messageId === messageId + ).length, + }; + }); + + expect(result.repairAlarmAtDisposition).not.toBeNull(); + expect(result.repairAlarmAtDisposition).toBeLessThan(result.previousAlarm); + expect(result.interruptedPending).toMatchObject([ + { + messageId, + deliveryDisposition: 'terminalization-pending', + nextFlushAttemptAt: undefined, + }, + ]); + expect(result.deliveryAttempts).toBe(1); + expect(result.finalPending).toHaveLength(0); + expect(result.message).toMatchObject({ + status: 'failed', + completionSource: 'delivery_failure', + failureReason: 'exhausted', + }); + expect(result.failedEventCount).toBe(1); + }); + it('exhausts failed flush retries, emits cloud.message.failed, and removes the pending message', async () => { const userId = 'user_pending_flush_exhaust'; const sessionId = 'agent_pending_flush_exhaust'; diff --git a/services/cloud-agent-next/wrapper/src/connection.ts b/services/cloud-agent-next/wrapper/src/connection.ts index 3686c2e9e8..8374edde1a 100644 --- a/services/cloud-agent-next/wrapper/src/connection.ts +++ b/services/cloud-agent-next/wrapper/src/connection.ts @@ -14,6 +14,7 @@ import type { IngestEvent, WrapperCommand } from '../../src/shared/protocol.js'; import { trimPayload } from '../../src/shared/trim-payload.js'; import { logToFile } from './utils.js'; import type { KiloEvent, WrapperKiloClient } from './kilo-api.js'; +import { createRunningBashEventCoalescer } from './running-bash-event-coalescer.js'; function isRecord(value: unknown): value is Record { return typeof value === 'object' && value !== null; @@ -409,7 +410,7 @@ export function createConnectionManager( * Send an event to the ingest WebSocket. * Buffers events if disconnected. */ - function sendToIngest(event: IngestEvent): void { + function sendToIngestImmediately(event: IngestEvent): void { if (ingestWs && ingestWs.readyState === WebSocket.OPEN) { ingestWs.send(JSON.stringify(event)); } else { @@ -422,6 +423,12 @@ export function createConnectionManager( } } + const runningBashEventCoalescer = createRunningBashEventCoalescer(sendToIngestImmediately); + + function sendToIngest(event: IngestEvent): void { + runningBashEventCoalescer.forward(event); + } + /** * Flush buffered events after reconnection. */ @@ -1133,6 +1140,7 @@ export function createConnectionManager( return { open: async () => { logToFile('opening connections'); + runningBashEventCoalescer.reopen(); // Open ingest WS first await openIngestWs(); @@ -1182,6 +1190,7 @@ export function createConnectionManager( logToFile('closing connections'); generation++; cancelReconnect(); + runningBashEventCoalescer.close(); clearBuffer(); // Stop event subscription — abort the HTTP stream so the for-await diff --git a/services/cloud-agent-next/wrapper/src/running-bash-event-coalescer.test.ts b/services/cloud-agent-next/wrapper/src/running-bash-event-coalescer.test.ts new file mode 100644 index 0000000000..7951a42ed5 --- /dev/null +++ b/services/cloud-agent-next/wrapper/src/running-bash-event-coalescer.test.ts @@ -0,0 +1,284 @@ +import { describe, expect, it } from 'bun:test'; +import type { IngestEvent } from '../../src/shared/protocol.js'; +import { createRunningBashEventCoalescer } from './running-bash-event-coalescer.js'; + +type ScheduledTask = { id: number; callback: () => void; cancelled: boolean }; + +function createScheduler() { + const tasks: ScheduledTask[] = []; + let nextId = 1; + return { + schedule(callback: () => void) { + const task = { id: nextId++, callback, cancelled: false }; + tasks.push(task); + return task.id; + }, + cancel(id: number) { + const task = tasks.find(candidate => candidate.id === id); + if (task) task.cancelled = true; + }, + runNext() { + const task = tasks.shift(); + if (task && !task.cancelled) task.callback(); + }, + }; +} + +function bashEvent( + partId: string, + status: string, + output: string, + sessionID = 'session-1', + messageID = 'message-1' +): IngestEvent { + const part = { + id: partId, + sessionID, + messageID, + type: 'tool', + tool: 'bash', + state: { status, metadata: { output } }, + }; + return { + streamEventType: 'kilocode', + timestamp: output, + data: { event: 'message.part.updated', type: 'message.part.updated', properties: { part } }, + }; +} + +function event(type: string): IngestEvent { + return { + streamEventType: 'kilocode', + timestamp: type, + data: { event: type, type, properties: {} }, + }; +} + +describe('running bash event coalescer', () => { + it('forwards the first update immediately and later sends only the latest cumulative snapshot', () => { + const scheduler = createScheduler(); + const sent: IngestEvent[] = []; + const coalescer = createRunningBashEventCoalescer(sent.push.bind(sent), callback => { + const task = scheduler.schedule(callback); + return () => scheduler.cancel(task); + }); + + coalescer.forward(bashEvent('part-1', 'running', 'a')); + coalescer.forward(bashEvent('part-1', 'running', 'ab')); + coalescer.forward(bashEvent('part-1', 'running', 'abc')); + + expect(sent.map(item => item.timestamp)).toEqual(['a']); + scheduler.runNext(); + expect(sent.map(item => item.timestamp)).toEqual(['a', 'abc']); + }); + + it('does not repeat an immediate update when no newer snapshot arrives', () => { + const scheduler = createScheduler(); + const sent: IngestEvent[] = []; + const coalescer = createRunningBashEventCoalescer(sent.push.bind(sent), callback => { + const task = scheduler.schedule(callback); + return () => scheduler.cancel(task); + }); + + coalescer.forward(bashEvent('part-1', 'running', 'only')); + scheduler.runNext(); + + expect(sent.map(item => item.timestamp)).toEqual(['only']); + }); + + it('coalesces running updates independently per part', () => { + const scheduler = createScheduler(); + const sent: IngestEvent[] = []; + const coalescer = createRunningBashEventCoalescer(sent.push.bind(sent), callback => { + const task = scheduler.schedule(callback); + return () => scheduler.cancel(task); + }); + + coalescer.forward(bashEvent('part-1', 'running', 'one')); + coalescer.forward(bashEvent('part-2', 'running', 'two')); + coalescer.forward(bashEvent('part-1', 'running', 'one-latest')); + coalescer.forward(bashEvent('part-2', 'running', 'two-latest')); + scheduler.runNext(); + scheduler.runNext(); + + expect(sent.map(item => item.timestamp)).toEqual(['one', 'two', 'one-latest', 'two-latest']); + }); + + it('does not coalesce matching part ids across messages or sessions', () => { + const scheduler = createScheduler(); + const sent: IngestEvent[] = []; + const coalescer = createRunningBashEventCoalescer(sent.push.bind(sent), callback => { + const task = scheduler.schedule(callback); + return () => scheduler.cancel(task); + }); + + coalescer.forward(bashEvent('part-1', 'running', 'session-one')); + coalescer.forward(bashEvent('part-1', 'running', 'message-two', 'session-1', 'message-2')); + coalescer.forward(bashEvent('part-1', 'running', 'session-two', 'session-2', 'message-1')); + + expect(sent.map(item => item.timestamp)).toEqual(['session-one', 'message-two', 'session-two']); + }); + + it('flushes pending snapshots in latest event arrival order', () => { + const scheduler = createScheduler(); + const sent: IngestEvent[] = []; + const coalescer = createRunningBashEventCoalescer(sent.push.bind(sent), callback => { + const task = scheduler.schedule(callback); + return () => scheduler.cancel(task); + }); + + coalescer.forward(bashEvent('part-1', 'running', 'one')); + coalescer.forward(bashEvent('part-2', 'running', 'two')); + coalescer.forward(bashEvent('part-1', 'running', 'one-pending')); + coalescer.forward(bashEvent('part-2', 'running', 'two-pending')); + coalescer.forward(bashEvent('part-1', 'running', 'one-latest')); + coalescer.forward(event('session.idle')); + + expect(sent.map(item => item.timestamp)).toEqual([ + 'one', + 'two', + 'two-pending', + 'one-latest', + 'session.idle', + ]); + }); + + it('expires inactive part state after one coalescing interval', () => { + const scheduler = createScheduler(); + const sent: IngestEvent[] = []; + const coalescer = createRunningBashEventCoalescer(sent.push.bind(sent), callback => { + const task = scheduler.schedule(callback); + return () => scheduler.cancel(task); + }); + + coalescer.forward(bashEvent('part-1', 'running', 'first')); + scheduler.runNext(); + coalescer.forward(bashEvent('part-1', 'running', 'after-idle')); + + expect(sent.map(item => item.timestamp)).toEqual(['first', 'after-idle']); + }); + + it('starts a fresh leading interval after a session boundary', () => { + const scheduler = createScheduler(); + const sent: IngestEvent[] = []; + const coalescer = createRunningBashEventCoalescer(sent.push.bind(sent), callback => { + const task = scheduler.schedule(callback); + return () => scheduler.cancel(task); + }); + + coalescer.forward(bashEvent('part-1', 'running', 'first')); + coalescer.forward(event('session.idle')); + coalescer.forward(bashEvent('part-1', 'running', 'next-session')); + + expect(sent.map(item => item.timestamp)).toEqual(['first', 'session.idle', 'next-session']); + }); + + it('lets a terminal update supersede pending running output and prevents a stale timer send', () => { + const scheduler = createScheduler(); + const sent: IngestEvent[] = []; + const coalescer = createRunningBashEventCoalescer(sent.push.bind(sent), callback => { + const task = scheduler.schedule(callback); + return () => scheduler.cancel(task); + }); + + coalescer.forward(bashEvent('part-1', 'running', 'partial')); + coalescer.forward(bashEvent('part-1', 'running', 'newer-partial')); + coalescer.forward(bashEvent('part-1', 'completed', 'final-full-output')); + scheduler.runNext(); + + expect(sent.map(item => item.timestamp)).toEqual(['partial', 'final-full-output']); + }); + + it('flushes pending output before assistant completion', () => { + const scheduler = createScheduler(); + const sent: IngestEvent[] = []; + const coalescer = createRunningBashEventCoalescer(sent.push.bind(sent), callback => { + const task = scheduler.schedule(callback); + return () => scheduler.cancel(task); + }); + const completion: IngestEvent = { + streamEventType: 'kilocode', + timestamp: 'completion', + data: { + event: 'message.updated', + properties: { info: { role: 'assistant', time: { completed: 1 } } }, + }, + }; + + coalescer.forward(bashEvent('part-1', 'running', 'partial')); + coalescer.forward(bashEvent('part-1', 'running', 'latest')); + coalescer.forward(completion); + scheduler.runNext(); + + expect(sent.map(item => item.timestamp)).toEqual(['partial', 'latest', 'completion']); + }); + + it('flushes pending output before an interruption boundary', () => { + const scheduler = createScheduler(); + const sent: IngestEvent[] = []; + const coalescer = createRunningBashEventCoalescer(sent.push.bind(sent), callback => { + const task = scheduler.schedule(callback); + return () => scheduler.cancel(task); + }); + const interrupted: IngestEvent = { + streamEventType: 'interrupted', + timestamp: 'interrupted', + data: { reason: 'user' }, + }; + + coalescer.forward(bashEvent('part-1', 'running', 'partial')); + coalescer.forward(bashEvent('part-1', 'running', 'latest')); + coalescer.forward(interrupted); + scheduler.runNext(); + + expect(sent.map(item => item.timestamp)).toEqual(['partial', 'latest', 'interrupted']); + }); + + it('forwards new events after reopening for a warm follow-up', () => { + const scheduler = createScheduler(); + const sent: IngestEvent[] = []; + const coalescer = createRunningBashEventCoalescer(sent.push.bind(sent), callback => { + const task = scheduler.schedule(callback); + return () => scheduler.cancel(task); + }); + + coalescer.forward(event('first-turn')); + coalescer.close(); + coalescer.forward(event('between-turns')); + coalescer.reopen(); + coalescer.forward(event('warm-follow-up')); + + expect(sent.map(item => item.timestamp)).toEqual(['first-turn', 'warm-follow-up']); + }); + + it('flushes pending output before completion and cannot send it after close', () => { + const scheduler = createScheduler(); + const sent: IngestEvent[] = []; + const coalescer = createRunningBashEventCoalescer(sent.push.bind(sent), callback => { + const task = scheduler.schedule(callback); + return () => scheduler.cancel(task); + }); + + coalescer.forward(bashEvent('part-1', 'running', 'partial')); + coalescer.forward(bashEvent('part-1', 'running', 'latest')); + coalescer.forward(event('session.idle')); + coalescer.close(); + scheduler.runNext(); + + expect(sent.map(item => item.timestamp)).toEqual(['partial', 'latest', 'session.idle']); + }); + + it('preserves non-bash and unrelated events without delay', () => { + const sent: IngestEvent[] = []; + const coalescer = createRunningBashEventCoalescer(sent.push.bind(sent)); + const nonBash = bashEvent('part-1', 'running', 'edit'); + const data = nonBash.data as { properties: { part: { tool: string } } }; + data.properties.part.tool = 'edit'; + + coalescer.forward(nonBash); + coalescer.forward(event('message.updated')); + + expect(sent).toEqual([nonBash, event('message.updated')]); + coalescer.close(); + }); +}); diff --git a/services/cloud-agent-next/wrapper/src/running-bash-event-coalescer.ts b/services/cloud-agent-next/wrapper/src/running-bash-event-coalescer.ts new file mode 100644 index 0000000000..bf6b945e68 --- /dev/null +++ b/services/cloud-agent-next/wrapper/src/running-bash-event-coalescer.ts @@ -0,0 +1,156 @@ +import type { IngestEvent } from '../../src/shared/protocol.js'; + +const DEFAULT_COALESCE_INTERVAL_MS = 150; + +type Scheduler = (callback: () => void, delayMs: number) => () => void; + +type ThrottledPart = { + latest?: IngestEvent; + latestSequence?: number; + cancel: () => void; +}; + +type BashPartUpdate = { + key: string; + running: boolean; +}; + +function isRecord(value: unknown): value is Record { + return typeof value === 'object' && value !== null; +} + +function classifyBashPartUpdate(event: IngestEvent): BashPartUpdate | undefined { + if (event.streamEventType !== 'kilocode' || !isRecord(event.data)) return undefined; + if (event.data.event !== 'message.part.updated') return undefined; + const properties = event.data.properties; + if (!isRecord(properties)) return undefined; + const part = properties.part; + if (!isRecord(part) || part.type !== 'tool' || part.tool !== 'bash') return undefined; + if ( + typeof part.sessionID !== 'string' || + typeof part.messageID !== 'string' || + typeof part.id !== 'string' + ) { + return undefined; + } + const state = part.state; + return { + key: JSON.stringify([part.sessionID, part.messageID, part.id]), + running: isRecord(state) && state.status === 'running', + }; +} + +function isBoundaryEvent(event: IngestEvent): boolean { + if ( + event.streamEventType === 'complete' || + event.streamEventType === 'error' || + event.streamEventType === 'interrupted' + ) { + return true; + } + if (event.streamEventType !== 'kilocode' || !isRecord(event.data)) return false; + if ( + event.data.event === 'session.idle' || + event.data.event === 'session.error' || + event.data.event === 'payment_required' || + event.data.event === 'insufficient_funds' + ) { + return true; + } + if (event.data.event !== 'message.updated') return false; + const properties = event.data.properties; + if (!isRecord(properties)) return false; + const info = properties.info; + if (!isRecord(info) || info.role !== 'assistant') return false; + const time = info.time; + return (isRecord(time) && typeof time.completed === 'number') || info.error != null; +} + +const defaultScheduler: Scheduler = (callback, delayMs) => { + const timer = setTimeout(callback, delayMs); + return () => clearTimeout(timer); +}; + +export function createRunningBashEventCoalescer( + send: (event: IngestEvent) => void, + schedule: Scheduler = defaultScheduler, + intervalMs = DEFAULT_COALESCE_INTERVAL_MS +) { + const throttledParts = new Map(); + let nextSequence = 0; + let closed = false; + + function scheduleThrottleExpiry(partKey: string): () => void { + return schedule(() => { + const part = throttledParts.get(partKey); + if (!part) return; + if (part.latest && !closed) { + const latest = part.latest; + part.latest = undefined; + part.latestSequence = undefined; + send(latest); + part.cancel = scheduleThrottleExpiry(partKey); + return; + } + throttledParts.delete(partKey); + }, intervalMs); + } + + function cancelPart(partKey: string): void { + const part = throttledParts.get(partKey); + if (!part) return; + part.cancel(); + throttledParts.delete(partKey); + } + + function flushAll(): void { + const updates: Array<{ event: IngestEvent; sequence: number }> = []; + for (const part of throttledParts.values()) { + part.cancel(); + if (part.latest && part.latestSequence !== undefined) { + updates.push({ event: part.latest, sequence: part.latestSequence }); + } + } + throttledParts.clear(); + updates.sort((left, right) => left.sequence - right.sequence); + if (!closed) { + for (const update of updates) send(update.event); + } + } + + function forward(event: IngestEvent): void { + if (closed) return; + + const bashPart = classifyBashPartUpdate(event); + if (bashPart?.running) { + const existing = throttledParts.get(bashPart.key); + if (existing) { + existing.latest = event; + existing.latestSequence = nextSequence++; + return; + } + send(event); + throttledParts.set(bashPart.key, { cancel: scheduleThrottleExpiry(bashPart.key) }); + return; + } + + if (bashPart) { + cancelPart(bashPart.key); + } else if (isBoundaryEvent(event)) { + flushAll(); + } + send(event); + } + + function close(): void { + flushAll(); + closed = true; + } + + function reopen(): void { + closed = false; + nextSequence = 0; + } + + return { forward, close, reopen }; +}