From cdb2200f70e9fc3bbbe802db2d2904340e7e82d5 Mon Sep 17 00:00:00 2001 From: Shoma Nishitateno Date: Fri, 1 May 2026 13:28:10 +0900 Subject: [PATCH 1/3] =?UTF-8?q?refactor(chat):=20ChatRunner=20=E3=82=92=20?= =?UTF-8?q?long-lived=20Query=20=E5=8C=96=20+=20codex=20=E6=8C=87=E6=91=98?= =?UTF-8?q?=204=20=E4=BB=B6=E5=AF=BE=E5=BF=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR-C の主機能: 1 ChatRunner = 1 sdk.query() = 1 subprocess に固定して、 MCP HTTP transport の OAuth 状態 (PKCE / token) を turn 跨ぎで保持する。 main 最新 (PR-A の MCP 統合 + PR-B の OAuth 2.1 / auth_request UX + 私の修正多数: XML escape / externalToolUseIds / ephemeral runOAuthCallback / 空 assistant プレースホルダ等) と統合した形で再実装。 ## 新規ファイル - packages/ai-engine/src/async-input.ts: SDK の streaming input mode 用 AsyncIterable 実装。push 可能 + close 即時 teardown (CR Major 反映: close 後 next() 即 done / FIFO waiter キュー) - packages/ai-engine/src/async-input.test.ts: 8 件 (push / waiter / close / FIFO / close 後 即 done) ## agent-runner.ts - SdkLike.query を `prompt: string | AsyncIterable` に拡張 - SdkUserMessageLike + SdkQueryHandle (任意 close()) 型追加 ## chat-runner.ts 新規フィールド: - query / input / outputLoopDone / outputLoopFailed / currentTurn / cachedExternalConfigById - ensureQueryInflight (Promise キャッシュ、再入ガード) 新規 interface: - TurnState: 1 user turn の間だけ生きる mutable state (assistantMsgId / queue / textBuffer / stashedAuthUses / externalConfigById / externalToolUseIds) 新規メソッド: - ensureQuery: 1 度だけ query 立ち上げ + 出力ループ起動。inflight Promise キャッシュで再入ガード (codex Major 3) - startQueryInternal: 内部の起動本体 - runOutputLoop: SDK message を currentTurn の queue に振り分け - dispatchSdkMessage: 1 メッセージ処理 (text/tool_use/tool_result/result) - tearDownQuery / close: リソース cleanup。close は outputLoopDone を退避 してから tearDownQuery を呼ぶ (codex 致命 Minor: close 順序) 挙動変更: - runUserTurn: 入口で currentTurn ガード (codex Major 1: turn 並走防止) - input null チェックを invariant assertion で表明 (codex Major 2) - 空 assistant message 永続化を ensureQuery 前に移動 (long-lived では bg loop が即起動するため、後置きだと race で空 message 残る) - ephemeral runOAuthCallback も long-lived query 経由に統合 (callback URL は input.push 経由で SDK に渡るが chatStore には 永続化されない) main 最新の機能を保持: - auth-detector / handleAuthToolResult / stashedAuthUses - externalToolUseIds (TurnState に統合) - escapeXmlText / escapeXmlAttr (buildChatPrompt) - 空 assistant プレースホルダ「(認証処理を完了しました)」 (dispatchSdkMessage の result 処理に統合) buildMcpServer は 1 引数化、handler 内で this.currentTurn から assistantMsgId / emit を動的解決 (1 度作った MCP サーバを turn 跨ぎ で使い回せる、turn 中は不変)。 ## server.ts - WS close 時に runner.close() を呼ぶ。close 内部の reject は .catch で warn (codex Major: void close() で unhandled rejection 化を回避) ## chat-runner.test.ts - startCapturePromptText helper を追加: prompt が AsyncIterable<...> に変わったので、最初に push された user message の content を 奪取する (string にも互換) - 既存 test の prompt キャプチャ 4 箇所を helper 経由に書き換え ## テスト pnpm typecheck: 4/4 PASS pnpm test: core 93 + storage 88 + ai-engine 213 + frontend 273 = 667 PASS pnpm lint: exit 0 --- packages/ai-engine/src/agent-runner.ts | 22 +- packages/ai-engine/src/async-input.test.ts | 119 ++++ packages/ai-engine/src/async-input.ts | 63 ++ packages/ai-engine/src/chat-runner.test.ts | 49 +- packages/ai-engine/src/chat-runner.ts | 672 ++++++++++++--------- packages/ai-engine/src/server.ts | 15 + 6 files changed, 656 insertions(+), 284 deletions(-) create mode 100644 packages/ai-engine/src/async-input.test.ts create mode 100644 packages/ai-engine/src/async-input.ts diff --git a/packages/ai-engine/src/agent-runner.ts b/packages/ai-engine/src/agent-runner.ts index fd94972..45031c7 100644 --- a/packages/ai-engine/src/agent-runner.ts +++ b/packages/ai-engine/src/agent-runner.ts @@ -14,6 +14,23 @@ export interface StartRequest { input: unknown; } +// Streaming input mode 用の最小 SDKUserMessage 形状 (実 SDK の SDKUserMessage を duck-type 化)。 +// MCP HTTP transport の OAuth 状態を turn 跨ぎで保持したい場合は、 +// 1 query に AsyncIterable を渡し続ける必要がある。 +export interface SdkUserMessageLike { + type: 'user'; + message: { role: 'user'; content: string }; + parent_tool_use_id: null; + session_id?: string; +} + +// SDK Query は AsyncIterable + 任意の close() を持つハンドル。 +// 実 SDK の Query 型 (interrupt / setMcpServers / streamInput / close) のうち、 +// chat-runner が触るのは close のみなので最小化して受ける。 +export interface SdkQueryHandle extends AsyncIterable { + close?(): void; +} + // Agent SDK との結合点だけ抽象化する。query は AsyncIterable を返すこと。 // 実 SDK の厳密な型 (Options, SDKMessage) に合わせず duck typing で受けるのは、 // テスト時に mockSdk を差し込めるようにするため。 @@ -21,7 +38,8 @@ export interface StartRequest { // allowedTools / cwd / settingSources / permissionMode はすべて options 内に入れる必要がある。 export interface SdkLike { query(opts: { - prompt: string; + // 単発 (agent-runner) は文字列、chat (multi-turn) は AsyncIterable で push 流す。 + prompt: string | AsyncIterable; options?: { systemPrompt?: string; mcpServers?: Record; @@ -43,7 +61,7 @@ export interface SdkLike { // 解決に失敗するケースがある。明示的にシステムの claude CLI パスを渡すと回避できる。 pathToClaudeCodeExecutable?: string; }; - }): AsyncIterable; + }): SdkQueryHandle; } export interface RunAgentDeps { diff --git a/packages/ai-engine/src/async-input.test.ts b/packages/ai-engine/src/async-input.test.ts new file mode 100644 index 0000000..ec99b74 --- /dev/null +++ b/packages/ai-engine/src/async-input.test.ts @@ -0,0 +1,119 @@ +import { describe, expect, it } from 'vitest'; + +import { AsyncIterableInput } from './async-input'; + +describe('AsyncIterableInput', () => { + it('push 後に next() で順番に取り出せる (close は drain 後に呼ぶ)', async () => { + // close 後はバッファをクリアして即時 done に倒す仕様 (teardown 用) のため、 + // 「push してから close、その後 iterate」では値が取れない。 + // 実運用では push → consumer が next() でドレイン → close の順。 + const input = new AsyncIterableInput(); + input.push(1); + input.push(2); + input.push(3); + const it = input.iterable()[Symbol.asyncIterator](); + const got: number[] = []; + for (let i = 0; i < 3; i++) { + const r = await it.next(); + if (r.done) break; + got.push(r.value); + } + expect(got).toEqual([1, 2, 3]); + input.close(); + const r = await it.next(); + expect(r.done).toBe(true); + }); + + it('iter が空の状態で next() を待ち、後の push で解決される', async () => { + const input = new AsyncIterableInput(); + const it = input.iterable()[Symbol.asyncIterator](); + const p = it.next(); + let resolved = false; + p.then(() => { + resolved = true; + }); + await new Promise((r) => setTimeout(r, 5)); + expect(resolved).toBe(false); + input.push('hi'); + const r = await p; + expect(r).toEqual({ value: 'hi', done: false }); + }); + + it('close() で待機中の next() が done: true で解決される', async () => { + const input = new AsyncIterableInput(); + const it = input.iterable()[Symbol.asyncIterator](); + const p = it.next(); + input.close(); + const r = await p; + expect(r.done).toBe(true); + }); + + it('close 後の push は無視される', async () => { + const input = new AsyncIterableInput(); + input.push(1); + input.close(); + input.push(99); + const got: number[] = []; + for await (const v of input.iterable()) got.push(v); + // close 時点で残バッファもクリアされるため即終了 + expect(got).toEqual([]); + }); + + it('iterator.return() で残りの push が消費されず終了', async () => { + const input = new AsyncIterableInput(); + input.push(1); + input.push(2); + const it = input.iterable()[Symbol.asyncIterator](); + const r1 = await it.next(); + expect(r1.value).toBe(1); + if (it.return) { + const r2 = await it.return(); + expect(r2.done).toBe(true); + } + }); + + // FIFO キュー化: next() を 2 回連続で呼んで push 1 回だけのとき、 + // 1 つ目だけ resolve され 2 つ目は close まで残る。 + it('next() を複数回先に呼んでから push しても、push 順に各 promise が解決される', async () => { + const input = new AsyncIterableInput(); + const it = input.iterable()[Symbol.asyncIterator](); + const p1 = it.next(); + const p2 = it.next(); + input.push(10); + input.push(20); + const [r1, r2] = await Promise.all([p1, p2]); + expect(r1).toEqual({ value: 10, done: false }); + expect(r2).toEqual({ value: 20, done: false }); + }); + + it('next() を 2 回先に呼んで push を 1 回だけしても、未解決の Promise は close で done に倒れる', async () => { + const input = new AsyncIterableInput(); + const it = input.iterable()[Symbol.asyncIterator](); + const p1 = it.next(); + const p2 = it.next(); + input.push(42); + const r1 = await p1; + expect(r1).toEqual({ value: 42, done: false }); + let p2Resolved = false; + p2.then(() => { + p2Resolved = true; + }); + await new Promise((r) => setTimeout(r, 5)); + expect(p2Resolved).toBe(false); + input.close(); + const r2 = await p2; + expect(r2.done).toBe(true); + }); + + // CR Major (PR #18 2nd review): close 後にバッファ済み値を返し続けると + // teardown で残メッセージが消費されてしまう。close 時点で打ち切りたい。 + it('close 後の next() はバッファに値が残っていても即 done', async () => { + const input = new AsyncIterableInput(); + input.push(1); + input.push(2); + input.close(); + const it = input.iterable()[Symbol.asyncIterator](); + const r = await it.next(); + expect(r.done).toBe(true); + }); +}); diff --git a/packages/ai-engine/src/async-input.ts b/packages/ai-engine/src/async-input.ts new file mode 100644 index 0000000..7fb9813 --- /dev/null +++ b/packages/ai-engine/src/async-input.ts @@ -0,0 +1,63 @@ +// SDK の query({ prompt: AsyncIterable }) に流す、 +// 後から push できる AsyncIterable 実装。 +// 1 chat thread = 1 long-lived sdk.query() を実現するため、user message を +// 任意のタイミングで投入し、close で iter を終わらせる。 +// +// 実装方針: バッファ + waiter キュー。consumer が next を複数回連続で呼んでも +// 各 promise が独立に保持される。AsyncIterator 仕様に沿うため waiter は +// 単一スロットではなく FIFO キューで持つ (consumer が並行で next() を呼ぶ +// ケースに耐える)。 +// close() は finished フラグを立てた上で残バッファをクリアし、待機中の +// waiter を done で全て倒す。next() は finished を最優先で確認するため、 +// close 後の再 next() はバッファに値が残っていても即 done を返す。 +export class AsyncIterableInput { + private buf: T[] = []; + private waiters: Array<(r: IteratorResult) => void> = []; + private finished = false; + + push(value: T): void { + if (this.finished) return; + const w = this.waiters.shift(); + if (w) { + w({ value, done: false }); + return; + } + this.buf.push(value); + } + + // close 後は残バッファを捨てて即時に終了させる (teardown 用)。 + close(): void { + if (this.finished) return; + this.finished = true; + this.buf.length = 0; + while (this.waiters.length > 0) { + this.waiters.shift()?.({ value: undefined as never, done: true }); + } + } + + // SDK 等に渡す iter。同一インスタンスから複数回 [Symbol.asyncIterator] を取られる + // ことは想定しない (本パッケージでは 1 query に 1 input)。 + iterable(): AsyncIterable { + return { + [Symbol.asyncIterator]: () => ({ + next: (): Promise> => { + // close 後は即座に done。残バッファに値が乗っていても無視する。 + if (this.finished) { + return Promise.resolve({ value: undefined as never, done: true }); + } + if (this.buf.length > 0) { + const v = this.buf.shift() as T; + return Promise.resolve({ value: v, done: false }); + } + return new Promise>((resolve) => { + this.waiters.push(resolve); + }); + }, + return: (): Promise> => { + this.close(); + return Promise.resolve({ value: undefined as never, done: true }); + }, + }), + }; + } +} diff --git a/packages/ai-engine/src/chat-runner.test.ts b/packages/ai-engine/src/chat-runner.test.ts index 0246e12..bbee2df 100644 --- a/packages/ai-engine/src/chat-runner.test.ts +++ b/packages/ai-engine/src/chat-runner.test.ts @@ -11,6 +11,27 @@ import type { SdkLike } from './agent-runner'; import { buildChatPrompt, ChatRunner, formatNodeForContext } from './chat-runner'; import type { ChatEvent, SdkMessageLike } from './stream'; +// long-lived Query 化に伴い prompt は AsyncIterable 型に変わった。 +// テスト側で「最初に push された user message の content」を読むためのヘルパ。 +// string で渡された場合 (互換) も同じ shape で扱えるようにする。 +function startCapturePromptText(prompt: unknown): { read: () => string } { + const captured = { value: '' }; + if (typeof prompt === 'string') { + captured.value = prompt; + } else if ( + prompt && + typeof (prompt as { [Symbol.asyncIterator]?: unknown })[Symbol.asyncIterator] === 'function' + ) { + const it = (prompt as AsyncIterable<{ message?: { content?: string } }>)[ + Symbol.asyncIterator + ](); + it.next().then((r) => { + if (!r.done && r.value?.message?.content) captured.value = r.value.message.content; + }); + } + return { read: () => captured.value }; +} + describe('ChatRunner', () => { let root: string; @@ -194,10 +215,10 @@ describe('ChatRunner', () => { priority: 'must', })) as Node; - let capturedPrompt = ''; + let promptCapture: { read: () => string } = { read: () => '' }; const sdk: SdkLike = { - query: ({ prompt }: { prompt: string }) => { - capturedPrompt = prompt; + query: ({ prompt }: { prompt: unknown }) => { + promptCapture = startCapturePromptText(prompt); return (async function* () { yield { type: 'assistant', @@ -221,6 +242,7 @@ describe('ChatRunner', () => { events.push(e); } + const capturedPrompt = promptCapture.read(); expect(capturedPrompt).toContain(''); expect(capturedPrompt).toContain(`id: ${target.id}`); expect(capturedPrompt).toContain('type: requirement'); @@ -273,10 +295,10 @@ describe('ChatRunner', () => { body: '', })) as Node; - let capturedPrompt = ''; + let promptCapture: { read: () => string } = { read: () => '' }; const sdk: SdkLike = { - query: ({ prompt }: { prompt: string }) => { - capturedPrompt = prompt; + query: ({ prompt }: { prompt: unknown }) => { + promptCapture = startCapturePromptText(prompt); return (async function* () { yield { type: 'result', subtype: 'success', result: 'ok' } as unknown as SdkMessageLike; })(); @@ -295,6 +317,7 @@ describe('ChatRunner', () => { // drain } + const capturedPrompt = promptCapture.read(); const histIdx = capturedPrompt.indexOf(''); const histEndIdx = capturedPrompt.indexOf(''); const ctxIdx = capturedPrompt.indexOf(''); @@ -325,10 +348,10 @@ describe('ChatRunner', () => { body: '', })) as Node; - let capturedPrompt = ''; + let promptCapture: { read: () => string } = { read: () => '' }; const sdk: SdkLike = { - query: ({ prompt }: { prompt: string }) => { - capturedPrompt = prompt; + query: ({ prompt }: { prompt: unknown }) => { + promptCapture = startCapturePromptText(prompt); return (async function* () { yield { type: 'result', subtype: 'success', result: 'ok' } as unknown as SdkMessageLike; })(); @@ -344,6 +367,7 @@ describe('ChatRunner', () => { for await (const _e of runner.runUserTurn('q', ['nonexistent', valid.id, 'also-gone'])) { // drain } + const capturedPrompt = promptCapture.read(); expect(capturedPrompt).toContain(''); expect(capturedPrompt).toContain(`id: ${valid.id}`); expect(capturedPrompt).not.toContain('id: nonexistent'); @@ -356,10 +380,10 @@ describe('ChatRunner', () => { const projectStore = new FileSystemProjectStore(root); const thread = await chatStore.createChat({ projectId: 'proj-1', title: 't' }); - let capturedPrompt = ''; + let promptCapture: { read: () => string } = { read: () => '' }; const sdk: SdkLike = { - query: ({ prompt }: { prompt: string }) => { - capturedPrompt = prompt; + query: ({ prompt }: { prompt: unknown }) => { + promptCapture = startCapturePromptText(prompt); return (async function* () { yield { type: 'result', subtype: 'success', result: 'ok' } as unknown as SdkMessageLike; })(); @@ -375,6 +399,7 @@ describe('ChatRunner', () => { for await (const _e of runner.runUserTurn('hello', [])) { // drain } + const capturedPrompt = promptCapture.read(); expect(capturedPrompt).not.toContain(''); // user 文字列自体は (履歴経由で) 必ず prompt に入る expect(capturedPrompt).toContain('hello'); diff --git a/packages/ai-engine/src/chat-runner.ts b/packages/ai-engine/src/chat-runner.ts index 0ec8458..58c701e 100644 --- a/packages/ai-engine/src/chat-runner.ts +++ b/packages/ai-engine/src/chat-runner.ts @@ -8,7 +8,8 @@ import { } from '@tally/core'; import type { ChatStore, ProjectStore } from '@tally/storage'; -import type { SdkLike } from './agent-runner'; +import type { SdkLike, SdkQueryHandle, SdkUserMessageLike } from './agent-runner'; +import { AsyncIterableInput } from './async-input'; import { type AuthToolNameMatch, extractAuthUrl, parseAuthToolName } from './auth-detector'; import { buildMcpServers } from './mcp/build-mcp-servers'; import { redactMcpSecrets } from './mcp/redact'; @@ -86,6 +87,23 @@ export interface ToolEntry { handler: (input: unknown) => Promise<{ ok: boolean; output: string }>; } +// 1 user turn の間だけ生きる mutable state (long-lived Query 化)。 +// SDK ストリームから流れてくる SDK メッセージを「今どの assistant message に紐付けるか」 +// 「どの queue に流すか」を解決するためのコンテキスト。 +// turn と turn の間 (= ユーザーが次の user message を送るまで) は null。 +interface TurnState { + assistantMsgId: string; + queue: EventQueue; + textBuffer: string[]; + // OAuth 認証フロー検出用 stash (tool_use 受信 → tool_result 到達時に auth_request に変換)。 + stashedAuthUses: Map; + // 外部 MCP の id → name の即引き map (label 表示用、turn 中は不変)。 + externalConfigById: Map; + // 同 turn 中に観測した外部 MCP の tool_use の id 集合。tool_result が来た時、 + // ここに無い id は「内部 / 想定外」として無視する (CR 指摘 #19 2 周目)。 + externalToolUseIds: Set; +} + // ChatRunner は 1 スレッド分の multi-turn 対話を駆動する。 // 各 user turn を `runUserTurn` で流し、tool_use 承認は外部から `approveTool` で通知する。 // @@ -102,6 +120,25 @@ export class ChatRunner { // 承認待ちの Promise resolver。ui-toolUseId → (approved) => void。 private readonly pendingApprovals = new Map void>(); + // long-lived SDK Query。1 ChatRunner = 1 sdk.query() = 1 subprocess に固定して + // MCP HTTP transport の OAuth 状態 (PKCE / token) を turn 跨ぎで保持する。 + // null = まだ start していない。closed 状態になっても close() / 再 ensure で破棄して再開できる。 + private query: SdkQueryHandle | null = null; + private input: AsyncIterableInput | null = null; + private outputLoopDone: Promise | null = null; + private outputLoopFailed = false; + // ensureQuery が並行で複数回 await されるのを防ぐ Promise キャッシュ + // (codex 指摘: 再入で tearDownQuery → sdk.query が 2 度走り、最初の query / output loop が + // 孤立してリークするのを回避)。完了時に null 化して次回の再起動を許す。 + private ensureQueryInflight: Promise | null = null; + + // 現在進行中の turn。runUserTurn の入口で set、出口で null。 + // MCP ハンドラと出力ループはここから assistantMsgId / queue を読む。 + private currentTurn: TurnState | null = null; + // ensureQuery が走った時に決まる、long-lived な externalConfig snapshot。 + // 再起動するまで mcpServers の入替えは反映しない。 + private cachedExternalConfigById: Map | null = null; + constructor(deps: ChatRunnerDeps) { this.deps = deps; } @@ -122,18 +159,28 @@ export class ChatRunner { }); } - // user の 1 ターンを実行する。 - // 1) user message を append - // 2) 空 assistant message を append (ID 確保) - // 3) MCP サーバを組み立てて SDK に渡し、assistant stream を iterate - // 4) text block は buffer + delta emit。tool_use は MCP ハンドラ内で承認 intercept される。 - // 5) turn 末に text blocks を assistant message 先頭に統合 + // user の 1 ターンを実行する (long-lived Query 化版)。 + // - 1 ChatRunner = 1 sdk.query() = 1 subprocess に固定し、MCP HTTP transport の + // OAuth state (PKCE / token) を turn 跨ぎで保持する。 + // - 各 turn では (1) user/空 assistant message の永続化、(2) prompt 組み立て、 + // (3) AsyncIterableInput への push でターン開始、(4) queue ドレインで進む。 + // - turn 並走は禁止 (codex 指摘): currentTurn が既に居れば error を返して即終了。 // // contextNodeIds: ユーザーが「@メンション」で添付したノード ID 配列 (issue #11)。 - // ProjectStore から該当ノードを引いて prompt の ブロックに埋め込む。 - // 不在 ID は無視 (削除済みノード等)。永続化はせず、毎ターンの prompt prefix としてのみ使う。 async *runUserTurn(userText: string, contextNodeIds: string[] = []): AsyncGenerator { - const { sdk, chatStore, projectStore, projectDir, threadId } = this.deps; + const { chatStore, projectStore, threadId } = this.deps; + + // turn 並走禁止 (codex 指摘 Major 1)。currentTurn が既に居る = 前 turn が + // まだ完了していない (UI がイベントをドレイン中) ので、新 turn を被せると + // SDK 出力が前 turn の assistantMsgId に誤接続される。明示的に拒否する。 + if (this.currentTurn) { + yield { + type: 'error', + code: 'turn_in_progress', + message: '前のターンがまだ完了していません', + }; + return; + } const thread = await chatStore.getChat(threadId); if (!thread) { @@ -153,43 +200,44 @@ export class ChatRunner { yield { type: 'chat_user_message_appended', messageId: userMsgId }; // 2. prompt を先に組む。user message 追加直後の履歴 (末尾が user) をスナップショットし、 - // buildChatPrompt が を末尾の user message として正しく抽出できる状態で呼ぶ。 - // これは「 は今ターンの user 入力より前に置く」契約 (issue #11) を守るため必須。 - // 後続で空 assistant を append すると履歴末尾が assistant になってしまい、buildChatPrompt の - // `last?.role === 'user'` 判定が false に倒れる (= context_nodes が user 入力の後ろに並ぶバグ) ので、 - // 必ずこの順で snapshot → prompt 組立 → 空 assistant append の順を守る。 + // buildChatPrompt が を末尾の user message として正しく抽出できる + // 状態で呼ぶ (issue #11 の 配置契約)。 const threadWithUser = await chatStore.getChat(threadId); const contextNodes = await loadContextNodes(projectStore, contextNodeIds); const prompt = buildChatPrompt(threadWithUser?.messages ?? [], contextNodes); - const systemPrompt = buildChatSystemPrompt(); - // 3. assistantMsgId を先に生成 (buildMcpServer の handler が emit 先として参照)。 - // 永続化は MCP 構築成功後に行う (途中で throw した場合に空 assistant message が - // YAML に残らないようにする。ロード時は がスキップされるが、 - // chat 履歴 UI には空バブルが蓄積するのを防ぐため)。 + // 3. 空の assistant message を append (後続の tool_use 即時永続化の親として必要)。 + // long-lived 化では ensureQuery 内の出力ループが bg で即起動するため、append を + // ensureQuery 後に遅らせると「空 assistant 永続化前に SDK の result message が + // dispatch されて空 message のままになる」race が起きる。先に append しておく。 const assistantMsgId = newChatMessageId(); + await chatStore.appendMessage(threadId, { + id: assistantMsgId, + role: 'assistant', + blocks: [], + createdAt: new Date().toISOString(), + }); + yield { type: 'chat_assistant_message_started', messageId: assistantMsgId }; - // 4. MCP 経由で呼ばれる tool ハンドラ内で invokeInterceptedTool を回す。 - // MCP handler は SDK query を block するので、イベント emit は AsyncQueue 経由に分離する。 - // さもないと deadlock (SDK が MCP 応答待ち / MCP が承認待ち / 承認は UI 経由で queue flush が必要)。 + // 4. turn state を組み立てる。runOutputLoop と MCP ハンドラはここから読む。 + // ensureQuery より前に必ず set しておく — output loop が SDK メッセージを + // dispatch する瞬間に currentTurn が null だと取りこぼす (race)。 const queue = new EventQueue(); - const tools = this.buildToolRegistry(); - const emit = (e: ChatEvent) => queue.push(e); - const mcp = this.buildMcpServer(tools, emit, assistantMsgId); + const turnState: TurnState = { + assistantMsgId, + queue, + textBuffer: [], + stashedAuthUses: new Map(), + externalConfigById: this.cachedExternalConfigById ?? new Map(), + externalToolUseIds: new Set(), + }; + this.currentTurn = turnState; - // 4b. プロジェクト設定の mcpServers[] を Tally MCP と合成する (Task 11)。 - // 毎ターン読むことで env / 設定変更がホットリロードされる。 - // env 未設定 (PAT 等) は buildMcpServers が throw するので、ここで補足し - // error event を emit して early return する (sdk.query は呼ばない)。 - const projectMeta = await projectStore.getProjectMeta(); - const externalConfigs = projectMeta?.mcpServers ?? []; - let mcpServers: Record; - let allowedTools: string[]; + // 5. SDK Query (long-lived) を必要なら起動する。 try { - const built = buildMcpServers({ tallyMcp: mcp, configs: externalConfigs }); - mcpServers = built.mcpServers; - allowedTools = built.allowedTools; + await this.ensureQuery(); } catch (err) { + this.currentTurn = null; yield { type: 'error', code: 'mcp_config_invalid', @@ -197,168 +245,259 @@ export class ChatRunner { }; return; } + // ensureQuery が externalConfig を更新するので turnState の参照を最新へ差し替える。 + turnState.externalConfigById = this.cachedExternalConfigById ?? new Map(); + + // 6. user message を SDK の input ストリームに push する。 + // ensureQuery 成功後は this.input が必ず存在するはず (codex 指摘 Major 2: + // null チェックを invariant assertion で表明し、無音破棄 → ハングを防ぐ)。 + if (!this.input) { + this.currentTurn = null; + throw new Error('invariant: ensureQuery succeeded but input is null'); + } + this.input.push({ + type: 'user', + message: { role: 'user', content: prompt }, + parent_tool_use_id: null, + }); - // 5. MCP 構築が成功した時点で空 assistant message を永続化 (後続の tool_use 即時 - // 永続化の親として必要)。buildChatPrompt スナップショット後・sdk.query 前に行う。 - await chatStore.appendMessage(threadId, { - id: assistantMsgId, - role: 'assistant', - blocks: [], - createdAt: new Date().toISOString(), + // 8. queue をドレイン。chat_turn_ended が来たら今 turn は終わり。 + try { + while (true) { + const evt = await queue.next(); + if (evt === null) break; + yield evt; + if (evt.type === 'chat_turn_ended') break; + } + } finally { + this.currentTurn = null; + } + } + + // SDK query を 1 度だけ立ち上げ、出力ループをバックグラウンドで走らせる。 + // close() / iter 終端 / 例外で query が死んでいる場合は次回呼び出しで再起動する。 + // 並行呼び出しは Promise キャッシュで直列化する (codex 指摘 Major 3: 再入で + // tearDownQuery → sdk.query が 2 度走り、最初の query / output loop が孤立して + // リークするのを回避)。 + // throw する条件: project mcpServers の設定不正 (URL 無効等) — 上位で error event 化される。 + private ensureQuery(): Promise { + if (this.query && !this.outputLoopFailed) return Promise.resolve(); + if (this.ensureQueryInflight) return this.ensureQueryInflight; + const p = this.startQueryInternal().finally(() => { + this.ensureQueryInflight = null; }); - yield { type: 'chat_assistant_message_started', messageId: assistantMsgId }; + this.ensureQueryInflight = p; + return p; + } + + private async startQueryInternal(): Promise { + // 既存が死んでいるなら片付けてから作り直す。 + this.tearDownQuery(); + + const { sdk, projectStore, projectDir } = this.deps; + const projectMeta = await projectStore.getProjectMeta(); + const externalConfigs = projectMeta?.mcpServers ?? []; - const textBuffer: string[] = []; - // 外部 MCP の tool_use を観測した toolUseId のみ集合に保持し、対応する tool_result - // のみ external として永続化する。Tally 内部 MCP (mcp__tally__*) は本来 intercept - // 経路で SDK ストリームに現れない前提だが、SDK 仕様変更や edge case で内部 result - // が流れた場合に external 誤分類するのを防ぐためのガード (CR 指摘 #19 2 周目)。 - const externalToolUseIds = new Set(); - - // OAuth 認証フロー検出用 state。 - // SDK の tool_use → tool_result は同じ for-await ループ内で順に流れるので、 - // tool_use 時点で認証系か判定して stash しておき、tool_result 到達時に - // raw な tool_use/tool_result を捨てて auth_request ブロックに変換する。 - // mcpServerLabel 解決用に externalConfigs を id → name の map にしておく。 const externalConfigById = new Map(); for (const c of externalConfigs) externalConfigById.set(c.id, c.name); - const stashedAuthUses = new Map(); + this.cachedExternalConfigById = externalConfigById; - // 5. SDK query をバックグラウンドで走らせ、queue にイベントを push する。 - // generator 側は queue をドレインして yield するだけ。 - const sdkDone = (async () => { - try { - const iter = sdk.query({ - prompt, - options: { - systemPrompt, - mcpServers, - tools: [], - allowedTools, - permissionMode: 'dontAsk', - settingSources: [], - cwd: projectDir, - ...(process.env.CLAUDE_CODE_PATH - ? { pathToClaudeCodeExecutable: process.env.CLAUDE_CODE_PATH } - : {}), - }, - }); + const tools = this.buildToolRegistry(); + // long-lived Query では MCP handler が「いつどの assistantMsgId に emit するか」を + // currentTurn から動的解決する。Builder には turn 越境した値を渡さず、 + // currentTurn 経由で参照させる。 + const mcp = this.buildMcpServer(tools); + const built = buildMcpServers({ tallyMcp: mcp, configs: externalConfigs }); + + const input = new AsyncIterableInput(); + this.input = input; + + const query = sdk.query({ + prompt: input.iterable(), + options: { + systemPrompt: buildChatSystemPrompt(), + mcpServers: built.mcpServers, + tools: [], + allowedTools: built.allowedTools, + permissionMode: 'dontAsk', + settingSources: [], + cwd: projectDir, + ...(process.env.CLAUDE_CODE_PATH + ? { pathToClaudeCodeExecutable: process.env.CLAUDE_CODE_PATH } + : {}), + }, + }); + this.query = query; + this.outputLoopFailed = false; + this.outputLoopDone = this.runOutputLoop(query); + } - for await (const msg of iter) { - console.log( - '[chat-runner] sdk msg:', - JSON.stringify(redactMcpSecrets(msg)).slice(0, 200), - ); - const blocks = extractAssistantBlocks(msg); - for (const b of blocks) { - if (b.type === 'text') { - textBuffer.push(b.text); - queue.push({ type: 'chat_text_delta', messageId: assistantMsgId, text: b.text }); - } else if (b.type === 'tool_use') { - // OAuth 認証系ツール (mcp__*__authenticate / complete_authentication) は - // tool_use を素のまま並べると UX が破綻するので、tool_result 到達まで stash して - // auth_request ブロックに変換する。 - const authMatch = parseAuthToolName(b.name); - if (authMatch) { - const label = - externalConfigById.get(authMatch.mcpServerId) ?? authMatch.mcpServerId; - stashedAuthUses.set(b.toolUseId, { match: authMatch, mcpServerLabel: label }); - continue; - } - // 外部 MCP の tool_use: source='external' で永続化、承認 UI なし (Task 12)。 - externalToolUseIds.add(b.toolUseId); - await chatStore.appendBlockToMessage(threadId, assistantMsgId, { - type: 'tool_use', - toolUseId: b.toolUseId, - name: b.name, - input: b.input, - source: 'external', - }); - queue.push({ - type: 'chat_tool_external_use', - messageId: assistantMsgId, - toolUseId: b.toolUseId, - name: b.name, - input: b.input, - }); - } else if (b.type === 'tool_result') { - // 認証系 tool_use とペアになる tool_result は auth_request に変換する (PR-B)。 - const stash = stashedAuthUses.get(b.toolUseId); - if (stash) { - stashedAuthUses.delete(b.toolUseId); - await this.handleAuthToolResult({ - match: stash.match, - mcpServerLabel: stash.mcpServerLabel, - result: { ok: b.ok, output: b.output }, - assistantMsgId, - emit: (e) => queue.push(e), - }); - continue; - } - // 同 turn 中に観測した外部 tool_use の id のみ external として扱う。 - // 集合に無い toolUseId (= 内部 / intercept 経路 / 想定外) は無視する。 - if (!externalToolUseIds.has(b.toolUseId)) continue; - // Task 13: 大規模 epic で tool_result が 500KB+ になり得るので、 - // YAML 永続化は 4KB に切り詰める。event はフル (UI はメモリ内で全文展開可)。 - await chatStore.appendBlockToMessage(threadId, assistantMsgId, { - type: 'tool_result', - toolUseId: b.toolUseId, - ok: b.ok, - output: truncateForPersistence(b.output), - }); - queue.push({ - type: 'chat_tool_external_result', - messageId: assistantMsgId, - toolUseId: b.toolUseId, - ok: b.ok, - output: b.output, - }); - } - } - } + // SDK query から流れてくる SDKMessage を進行中 turn の queue に振り分ける。 + // turn 終端は SDKResultMessage (type: 'result') の到達で判定し、chat_turn_ended を emit する。 + // iter が終わった (= subprocess 死亡 / close()) ときは進行中 turn にもエラーを通知して終わらせる。 + private async runOutputLoop(query: SdkQueryHandle): Promise { + try { + for await (const msg of query) { + console.log('[chat-runner] sdk msg:', JSON.stringify(redactMcpSecrets(msg)).slice(0, 200)); + await this.dispatchSdkMessage(msg); + } + } catch (err) { + this.outputLoopFailed = true; + const turn = this.currentTurn; + if (turn) { + turn.queue.push({ type: 'error', code: 'agent_failed', message: String(err) }); + turn.queue.push({ type: 'chat_turn_ended' }); + turn.queue.finish(); + } + return; + } + // iter 正常終端 (close 等)。query が死んだ印として outputLoopFailed を立て、 + // 次回 ensureQuery で作り直させる。進行中 turn が残っていれば打ち切る。 + this.outputLoopFailed = true; + const turn = this.currentTurn; + if (turn) { + turn.queue.push({ type: 'chat_turn_ended' }); + turn.queue.finish(); + } + } - // text blocks を assistant message の先頭に統合 (tool_use/result は intercept 経路で既に append 済み) - if (textBuffer.length > 0) { - const current = await chatStore.getChat(threadId); - const target = current?.messages.find((m) => m.id === assistantMsgId); - if (current && target) { - const textBlocks: ChatBlock[] = textBuffer.map((t) => ({ type: 'text', text: t })); - await chatStore.replaceMessageBlocks(threadId, assistantMsgId, [ - ...textBlocks, - ...target.blocks, - ]); - } - } else { - // text 出力なし。complete_authentication 専用 turn のように handleAuthToolResult が - // 過去 message の pending を更新するだけのケースでは assistantMsgId の blocks が - // 0 件のまま残り、UI に空のアシスタント bubble が蓄積する。プレースホルダを置いて - // 「処理した」ことを示す。 - const current = await chatStore.getChat(threadId); - const target = current?.messages.find((m) => m.id === assistantMsgId); - if (target && target.blocks.length === 0) { - await chatStore.replaceMessageBlocks(threadId, assistantMsgId, [ - { type: 'text', text: '(認証処理を完了しました)' }, - ]); - } + // 1 つの SDKMessage を処理する。turn が無ければ捨てる。 + private async dispatchSdkMessage(msg: SdkMessageLike): Promise { + const turn = this.currentTurn; + if (!turn) return; + const { chatStore, threadId } = this.deps; + const { assistantMsgId, queue, textBuffer, stashedAuthUses } = turn; + // ensureQuery 完了直後に SDK が即 yield する test mock のような race ケースで + // turn.externalConfigById が初期値 (空 Map) のまま読まれることがあるので、 + // 最新の cachedExternalConfigById を優先する (load 後に turn は再代入されるが + // dispatch のタイミング差で古い参照を保持する可能性がある)。 + const externalConfigById = this.cachedExternalConfigById ?? turn.externalConfigById; + + // result message: turn 終了 + const m = msg as unknown as { type?: string; subtype?: string }; + if (m.type === 'result') { + // text blocks を assistant message の先頭に統合 (tool_use/result は intercept 経路で既に append 済み) + if (textBuffer.length > 0) { + const current = await chatStore.getChat(threadId); + const target = current?.messages.find((m2) => m2.id === assistantMsgId); + if (current && target) { + const textBlocks: ChatBlock[] = textBuffer.map((t) => ({ type: 'text', text: t })); + await chatStore.replaceMessageBlocks(threadId, assistantMsgId, [ + ...textBlocks, + ...target.blocks, + ]); + } + } else { + // text 出力なし。complete_authentication 専用 turn 等で blocks が 0 件の + // まま残ると UI に空アシスタント bubble が蓄積するため、プレースホルダを置く。 + const current = await chatStore.getChat(threadId); + const target = current?.messages.find((m2) => m2.id === assistantMsgId); + if (target && target.blocks.length === 0) { + await chatStore.replaceMessageBlocks(threadId, assistantMsgId, [ + { type: 'text', text: '(認証処理を完了しました)' }, + ]); } + } + queue.push({ type: 'chat_assistant_message_completed', messageId: assistantMsgId }); + queue.push({ type: 'chat_turn_ended' }); + return; + } - queue.push({ type: 'chat_assistant_message_completed', messageId: assistantMsgId }); - queue.push({ type: 'chat_turn_ended' }); - } catch (err) { - queue.push({ type: 'error', code: 'agent_failed', message: String(err) }); - } finally { - queue.finish(); + const blocks = extractAssistantBlocks(msg); + for (const b of blocks) { + if (b.type === 'text') { + textBuffer.push(b.text); + queue.push({ type: 'chat_text_delta', messageId: assistantMsgId, text: b.text }); + } else if (b.type === 'tool_use') { + const authMatch = parseAuthToolName(b.name); + if (authMatch) { + const label = externalConfigById.get(authMatch.mcpServerId) ?? authMatch.mcpServerId; + stashedAuthUses.set(b.toolUseId, { match: authMatch, mcpServerLabel: label }); + continue; + } + // 外部 MCP の tool_use: source='external' で永続化、承認 UI なし (Task 12)。 + turn.externalToolUseIds.add(b.toolUseId); + await chatStore.appendBlockToMessage(threadId, assistantMsgId, { + type: 'tool_use', + toolUseId: b.toolUseId, + name: b.name, + input: b.input, + source: 'external', + }); + queue.push({ + type: 'chat_tool_external_use', + messageId: assistantMsgId, + toolUseId: b.toolUseId, + name: b.name, + input: b.input, + }); + } else if (b.type === 'tool_result') { + const stash = stashedAuthUses.get(b.toolUseId); + if (stash) { + stashedAuthUses.delete(b.toolUseId); + await this.handleAuthToolResult({ + match: stash.match, + mcpServerLabel: stash.mcpServerLabel, + result: { ok: b.ok, output: b.output }, + assistantMsgId, + emit: (e) => queue.push(e), + }); + continue; + } + // 同 turn 中に観測した外部 tool_use の id のみ external として扱う (CR 指摘 #19 2 周目)。 + if (!turn.externalToolUseIds.has(b.toolUseId)) continue; + // Task 13: 大規模 epic で tool_result が 500KB+ になり得るので、 + // YAML 永続化は 4KB に切り詰める。event はフル (UI はメモリ内で全文展開可)。 + await chatStore.appendBlockToMessage(threadId, assistantMsgId, { + type: 'tool_result', + toolUseId: b.toolUseId, + ok: b.ok, + output: truncateForPersistence(b.output), + }); + queue.push({ + type: 'chat_tool_external_result', + messageId: assistantMsgId, + toolUseId: b.toolUseId, + ok: b.ok, + output: b.output, + }); } - })(); + } + } - // 6. queue をドレイン。MCP handler から push される pending/result も含め全て通過する。 - while (true) { - const evt = await queue.next(); - if (evt === null) break; - yield evt; + private tearDownQuery(): void { + try { + this.input?.close(); + } catch { + /* swallow: close は idempotent */ + } + try { + this.query?.close?.(); + } catch { + /* swallow */ } + this.input = null; + this.query = null; + // outputLoopDone は close() 側が join で待つために退避してから tearDownQuery を + // 呼ぶので、ここで null 化しても安全 (codex 指摘 Minor: close() の順序問題対応)。 + this.outputLoopDone = null; + } - await sdkDone; // バックグラウンドタスクの未捕捉エラーを顕在化 + // ChatRunner 終了時 (WS close 等) に SDK subprocess を片付ける。 + // outputLoopDone は tearDownQuery で null 化されるため、先に退避してから join する + // (codex 指摘の致命 Minor: close() 順序問題)。 + // 進行中の turn があれば打ち切られ、queue.finish() を経て runUserTurn 側の for-await が + // 自然に抜ける。再度 runUserTurn を呼べば ensureQuery が再起動する (= 再 auth が必要)。 + async close(): Promise { + const pendingLoop = this.outputLoopDone; + this.tearDownQuery(); + if (pendingLoop) { + try { + await pendingLoop; + } catch { + /* swallow */ + } + } } // 外部 MCP の OAuth コールバック URL を受け取り、対応 server の complete_authentication @@ -374,9 +513,17 @@ export class ChatRunner { // SDK 制約上、complete_authentication tool 自体は agent loop 経由でしか呼べないため、 // sdk.query は呼ぶが allowedTools = [対象 tool 1 件] で他を遮断する。 async *runOAuthCallback(mcpServerId: string, callbackUrl: string): AsyncGenerator { - const { sdk, projectStore, projectDir } = this.deps; + // turn 並走禁止 (long-lived runUserTurn と同じガード)。 + if (this.currentTurn) { + yield { + type: 'error', + code: 'turn_in_progress', + message: '前のターンがまだ完了していません', + }; + return; + } - // 対象 server の設定のみ取得。Tally 内部 MCP は handler 構築上必要なので残す。 + const { chatStore, projectStore, threadId } = this.deps; const projectMeta = await projectStore.getProjectMeta(); const targetConfig = projectMeta?.mcpServers?.find((s) => s.id === mcpServerId); if (!targetConfig) { @@ -388,20 +535,24 @@ export class ChatRunner { return; } + const assistantMsgId = newChatMessageId(); const queue = new EventQueue(); - const tools = this.buildToolRegistry(); - const emit = (e: ChatEvent) => queue.push(e); - // ephemeral assistantMsgId: 永続化用の親としては使わない (この turn では - // chatStore.appendMessage を呼ばない)。handleAuthToolResult が pending 不在 - // の orphan failed を append するときの fallback としてのみ参照される。 - const ephemeralAssistantMsgId = newChatMessageId(); - const mcp = this.buildMcpServer(tools, emit, ephemeralAssistantMsgId); - - let mcpServers: Record; + const turnState: TurnState = { + assistantMsgId, + queue, + textBuffer: [], + stashedAuthUses: new Map(), + externalConfigById: + this.cachedExternalConfigById ?? new Map([[mcpServerId, targetConfig.name]]), + externalToolUseIds: new Set(), + }; + this.currentTurn = turnState; + + // long-lived query 上で動かす (OAuth state を turn 跨ぎで保持するため)。 try { - const built = buildMcpServers({ tallyMcp: mcp, configs: [targetConfig] }); - mcpServers = built.mcpServers; + await this.ensureQuery(); } catch (err) { + this.currentTurn = null; yield { type: 'error', code: 'mcp_config_invalid', @@ -409,89 +560,54 @@ export class ChatRunner { }; return; } + turnState.externalConfigById = this.cachedExternalConfigById ?? new Map(); - // 単一 tool のみ allow。他の tool は SDK 側で拒否される。 - const allowedTools = [`mcp__${mcpServerId}__complete_authentication`]; + // ephemeral: user message は chatStore に append しない (callback URL の code/state + // を chat 履歴に残さない)。空 assistant message だけは tool_use/tool_result の親 + // として必要なので append し、turn 末で「(認証処理を完了しました)」プレースホルダで + // 埋める (dispatchSdkMessage の result 処理が自動で実行する)。 + await chatStore.appendMessage(threadId, { + id: assistantMsgId, + role: 'assistant', + blocks: [], + createdAt: new Date().toISOString(), + }); + yield { type: 'chat_assistant_message_started', messageId: assistantMsgId }; - const stashedAuthUses = new Map(); + if (!this.input) { + this.currentTurn = null; + throw new Error('invariant: ensureQuery succeeded but input is null'); + } - // 構造化 prompt: AI は単一 tool しか持たないので、必ず指定 server の - // complete_authentication を呼ぶ。callback URL は prompt に乗るが、user message - // としての永続化はされない (chatStore.appendMessage を呼んでいない)。 + // 構造化 prompt: AI に必ず指定 server の complete_authentication を呼ばせる。 + // long-lived query では allowedTools が固定 (ensureQuery 起動時に決まる) なので + // 単一 tool への制約はかけられないが、prompt 指示で実用上はモデルが従う。 const prompt = [ 'OAuth コールバック URL を受信しました。', `mcp__${mcpServerId}__complete_authentication ツールを呼び、`, '以下の callback URL で認証を完了してください:', callbackUrl, + '', + '他の MCP server の complete_authentication ツールや、', + '別の作業ツール (create_node 等) は呼ばないでください。', ].join('\n'); - const sdkDone = (async () => { - try { - const iter = sdk.query({ - prompt, - options: { - systemPrompt: buildChatSystemPrompt(), - mcpServers, - tools: [], - allowedTools, - permissionMode: 'dontAsk', - settingSources: [], - cwd: projectDir, - ...(process.env.CLAUDE_CODE_PATH - ? { pathToClaudeCodeExecutable: process.env.CLAUDE_CODE_PATH } - : {}), - }, - }); + this.input.push({ + type: 'user', + message: { role: 'user', content: prompt }, + parent_tool_use_id: null, + }); - for await (const msg of iter) { - const blocks = extractAssistantBlocks(msg); - for (const b of blocks) { - if (b.type === 'tool_use') { - // complete_authentication の対象 server のみ stash (allowedTools で - // 他はそもそも来ないが、多重防御として name でも一致確認する)。 - const authMatch = parseAuthToolName(b.name); - if ( - authMatch && - authMatch.kind === 'complete_authentication' && - authMatch.mcpServerId === mcpServerId - ) { - stashedAuthUses.set(b.toolUseId, { - match: authMatch, - mcpServerLabel: targetConfig.name, - }); - } - // それ以外の tool_use は ephemeral では完全無視 (chatStore に書かない)。 - } else if (b.type === 'tool_result') { - const stash = stashedAuthUses.get(b.toolUseId); - if (stash) { - stashedAuthUses.delete(b.toolUseId); - await this.handleAuthToolResult({ - match: stash.match, - mcpServerLabel: stash.mcpServerLabel, - result: { ok: b.ok, output: b.output }, - assistantMsgId: ephemeralAssistantMsgId, - emit: (e) => queue.push(e), - }); - } - // 対応 stash の無い tool_result は無視。 - } - // text block は ephemeral では emit しない / 永続化しない。 - } - } - } catch (err) { - queue.push({ type: 'error', code: 'agent_failed', message: String(err) }); - } finally { - queue.finish(); + try { + while (true) { + const evt = await queue.next(); + if (evt === null) break; + yield evt; + if (evt.type === 'chat_turn_ended') break; } - })(); - - while (true) { - const evt = await queue.next(); - if (evt === null) break; - yield evt; + } finally { + this.currentTurn = null; } - - await sdkDone; } // OAuth 認証系 tool_use/tool_result ペアを auth_request ブロックに変換する。 @@ -779,16 +895,32 @@ export class ChatRunner { // SDK 視点では通常の tool_use → tool_result の往復。 // 間に挟まる pending / result の ChatEvent は emit callback で直接 queue に流す // (sideEvents buffer にすると SDK block 中に flush できず deadlock するため)。 - private buildMcpServer(tools: ToolEntry[], emit: (e: ChatEvent) => void, assistantMsgId: string) { + private buildMcpServer(tools: ToolEntry[]) { const find = (name: string): ToolEntry => { const t = tools.find((x) => x.name === name); if (!t) throw new Error(`tool not registered: ${name}`); return t; }; + // long-lived Query では SDK が tool を呼ぶタイミングが turn 中。currentTurn から + // assistantMsgId / emit を動的解決することで、1 度作った MCP サーバを turn 跨ぎで + // 使い回せる (codex 指摘対応: turn 中に currentTurn は不変なので race なし)。 const makeHandler = (name: string) => async (input: unknown) => { const entry = find(name); - const { done } = this.invokeInterceptedTool({ entry, input, emit, assistantMsgId }); + const turn = this.currentTurn; + if (!turn) { + return { + content: [{ type: 'text' as const, text: 'no active chat turn' }], + isError: true, + }; + } + const emit = (e: ChatEvent) => turn.queue.push(e); + const { done } = this.invokeInterceptedTool({ + entry, + input, + emit, + assistantMsgId: turn.assistantMsgId, + }); const result = await done; return { content: [{ type: 'text' as const, text: result.output }], diff --git a/packages/ai-engine/src/server.ts b/packages/ai-engine/src/server.ts index c06224e..e1bcd8e 100644 --- a/packages/ai-engine/src/server.ts +++ b/packages/ai-engine/src/server.ts @@ -304,4 +304,19 @@ function handleChatConnection(ws: WebSocket, sdk: SdkLike): void { message: `unknown message type: ${String(obj.type)}`, }); }); + + // WS が閉じたら ChatRunner も片付ける (long-lived SDK subprocess を終わらせる)。 + // close を呼ばないと subprocess + MCP HTTP transport がプロセス終了まで生き残る。 + ws.on('close', () => { + if (runner) { + const r = runner; + runner = null; + // close 内部の例外 (subprocess kill 失敗など) を unhandled rejection に + // しないために .catch で握る。観測できるよう warn は出す + // (codex 指摘 Major: void close() だと unhandled rejection 化する)。 + r.close().catch((err) => { + console.warn('[server] runner.close() failed:', err); + }); + } + }); } From 947978062ba0fafabf9c6a9e994cfa62700dc6b9 Mon Sep 17 00:00:00 2001 From: Shoma Nishitateno Date: Fri, 1 May 2026 21:10:11 +0900 Subject: [PATCH 2/3] =?UTF-8?q?fix(chat):=20CR=205=20=E5=91=A8=E7=9B=AE=20?= =?UTF-8?q?Major=201=20+=20Minor=201=20=E5=AF=BE=E5=BF=9C=20(Major=201=20?= =?UTF-8?q?=E3=81=AF=E8=A6=8B=E9=80=81=E3=82=8A)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CodeRabbit (PR #22) 1 周目への対応。 [Minor / chat-runner.ts:246] ensureQuery 失敗時の空 assistant ロールバック long-lived Query 化に伴い空 assistant message を ensureQuery 前に append するよう変更したが、ensureQuery 失敗時に空バブルが履歴に残る問題があった。 chatStore に message 単位の delete API が無いため、catch 内で replaceMessageBlocks でエラー内容を blocks に書き込む形でロールバックする。 [Major / chat-runner.ts:361] 正常 EOF と subprocess 終了の区別 runOutputLoop の正常 EOF 経路では従来 chat_turn_ended のみを emit していたが、 これだと「予期しない subprocess 終了」も「明示 shutdown」も区別できず、 途中で切れた turn が正常完了に見えてしまう問題があった。 ChatRunner.isClosing フラグを追加し、close() 内で立てる。runOutputLoop の EOF ブランチで isClosing が false なら agent_failed event を先に emit してから chat_turn_ended で turn を閉じる。 [Major / chat-runner.ts:599] OAuth callback の long-lived 経路統合 (見送り) callback URL を this.input に push することで会話 context に turn 跨ぎで残る、allowedTools 単一制約が prompt 依存になる、という懸念。 PR-C の主目的 (OAuth state を turn 跨ぎで保持) と SDK 制約のトレードオフ。 ephemeral 化すると subprocess 分離で state が引き継がれず再認証が必要になる。 本 PR では long-lived 維持を選択し、prompt 内で「他 tool は呼ぶな」と明示する 形でモデル依存の防御を入れる。SDK の MCP transport が状態共有 API を提供する までは追加対応見送り。CR thread に詳細を返信。 --- packages/ai-engine/src/chat-runner.ts | 32 ++++++++++++++++++++++++--- 1 file changed, 29 insertions(+), 3 deletions(-) diff --git a/packages/ai-engine/src/chat-runner.ts b/packages/ai-engine/src/chat-runner.ts index 58c701e..a552f06 100644 --- a/packages/ai-engine/src/chat-runner.ts +++ b/packages/ai-engine/src/chat-runner.ts @@ -132,6 +132,11 @@ export class ChatRunner { // 孤立してリークするのを回避)。完了時に null 化して次回の再起動を許す。 private ensureQueryInflight: Promise | null = null; + // close() による明示シャットダウン中フラグ。runOutputLoop の正常 EOF を + // 「明示 shutdown」と「予期しない subprocess 終了」で区別するために使う + // (CR Major: 後者を chat_turn_ended で正常完了扱いせず agent_failed を出す)。 + private isClosing = false; + // 現在進行中の turn。runUserTurn の入口で set、出口で null。 // MCP ハンドラと出力ループはここから assistantMsgId / queue を読む。 private currentTurn: TurnState | null = null; @@ -238,10 +243,21 @@ export class ChatRunner { await this.ensureQuery(); } catch (err) { this.currentTurn = null; + // 空 assistant message を「先に append」している関係で、ensureQuery 失敗時に + // 空バブルが履歴に残ってしまう (CR Minor)。chatStore に message 単位の delete + // API は無いので、エラー内容で blocks を埋める形にロールバックする。 + const message = err instanceof Error ? err.message : String(err); + try { + await chatStore.replaceMessageBlocks(threadId, assistantMsgId, [ + { type: 'text', text: `(MCP 設定エラー: ${message})` }, + ]); + } catch { + /* swallow: replace 自体が失敗しても error event は流すので致命的ではない */ + } yield { type: 'error', code: 'mcp_config_invalid', - message: err instanceof Error ? err.message : String(err), + message, }; return; } @@ -353,9 +369,18 @@ export class ChatRunner { } // iter 正常終端 (close 等)。query が死んだ印として outputLoopFailed を立て、 // 次回 ensureQuery で作り直させる。進行中 turn が残っていれば打ち切る。 + // 明示 shutdown (close()) と予期しない subprocess 終了を区別する (CR Major): + // 前者は normal turn end、後者は agent_failed を emit してから turn を閉じる。 this.outputLoopFailed = true; const turn = this.currentTurn; if (turn) { + if (!this.isClosing) { + turn.queue.push({ + type: 'error', + code: 'agent_failed', + message: 'SDK output stream ended unexpectedly', + }); + } turn.queue.push({ type: 'chat_turn_ended' }); turn.queue.finish(); } @@ -486,9 +511,10 @@ export class ChatRunner { // ChatRunner 終了時 (WS close 等) に SDK subprocess を片付ける。 // outputLoopDone は tearDownQuery で null 化されるため、先に退避してから join する // (codex 指摘の致命 Minor: close() 順序問題)。 - // 進行中の turn があれば打ち切られ、queue.finish() を経て runUserTurn 側の for-await が - // 自然に抜ける。再度 runUserTurn を呼べば ensureQuery が再起動する (= 再 auth が必要)。 + // isClosing フラグで runOutputLoop に「明示 shutdown」を伝え、EOF を agent_failed + // としては emit させない (CR Major)。 async close(): Promise { + this.isClosing = true; const pendingLoop = this.outputLoopDone; this.tearDownQuery(); if (pendingLoop) { From 30662cc04256db46b602082c6d09d7629adb580b Mon Sep 17 00:00:00 2001 From: Shoma Nishitateno Date: Fri, 1 May 2026 21:20:37 +0900 Subject: [PATCH 3/3] =?UTF-8?q?fix(chat):=20CR=202=20=E5=91=A8=E7=9B=AE=20?= =?UTF-8?q?Major=204=20=E4=BB=B6=E5=AF=BE=E5=BF=9C=20(Quick=20win)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CodeRabbit (PR #22) 2 周目への対応。Heavy lift #3 (OAuth callback の ephemeral 化) は前 PR と同様 long-lived 維持で見送り、CR thread に返信。 [Major / chat-runner.ts:349] close と ensureQuery の競合 close() が startQueryInternal の途中 (await projectStore.getProjectMeta() 等) で走ると、shutdown 後に sdk.query() が作られて subprocess が孤立する race があった。close() で ensureQueryInflight を join してから tearDown する。 [Major / chat-runner.ts:358] SDK メッセージの常時ログによる機密漏洩 runOutputLoop の console.log が全 SDK メッセージを redactMcpSecrets でラップ してログしていたが、message.content や result の OAuth callback URL の code/state がサーバーログに残るリスクがあった。type だけ出す形に絞り、 本文は出さない。redactMcpSecrets の import も削除。 [Major / chat-runner.ts:386] 異常終了後の pendingApprovals 残存 runOutputLoop の catch / EOF ブランチで queue を閉じるだけだったので、 turn 失敗後に UI から approveTool() が来ると create_node / create_edge 等 の side effect が走る恐れがあった。rejectAllPendingApprovals helper を 新設し、両ブランチで pendingApprovals を一括 reject(false) する。 [Major / chat-runner.ts:625] runOAuthCallback の currentTurn 解放保証 this.currentTurn = turnState の直後から全体を try/finally で囲み、 appendMessage 等の中間ステップで throw しても currentTurn が解放される ことを保証 (turn_in_progress で次の turn を受け付けられなくなるのを防ぐ)。 runUserTurn 側も同じパターンで全体を try/finally に統一。 --- packages/ai-engine/src/chat-runner.ts | 206 +++++++++++++++----------- 1 file changed, 120 insertions(+), 86 deletions(-) diff --git a/packages/ai-engine/src/chat-runner.ts b/packages/ai-engine/src/chat-runner.ts index a552f06..caa95cf 100644 --- a/packages/ai-engine/src/chat-runner.ts +++ b/packages/ai-engine/src/chat-runner.ts @@ -12,7 +12,6 @@ import type { SdkLike, SdkQueryHandle, SdkUserMessageLike } from './agent-runner import { AsyncIterableInput } from './async-input'; import { type AuthToolNameMatch, extractAuthUrl, parseAuthToolName } from './auth-detector'; import { buildMcpServers } from './mcp/build-mcp-servers'; -import { redactMcpSecrets } from './mcp/redact'; import type { ChatEvent, SdkMessageLike } from './stream'; import { CreateEdgeInputSchema, createEdgeHandler } from './tools/create-edge'; import { CreateNodeInputSchema, createNodeHandler } from './tools/create-node'; @@ -164,6 +163,20 @@ export class ChatRunner { }); } + // 進行中の turn が異常終了したとき、pendingApprovals に残っている承認待ちを + // 一括で否認 (false) する (CR Major)。これを呼ばないと、turn 失敗後に UI から + // 承認が来ても create_node / create_edge 等の side effect が走ってしまう。 + private rejectAllPendingApprovals(): void { + for (const resolver of this.pendingApprovals.values()) { + try { + resolver(false); + } catch { + /* swallow: resolver の throw は他の rejection に影響させない */ + } + } + this.pendingApprovals.clear(); + } + // user の 1 ターンを実行する (long-lived Query 化版)。 // - 1 ChatRunner = 1 sdk.query() = 1 subprocess に固定し、MCP HTTP transport の // OAuth state (PKCE / token) を turn 跨ぎで保持する。 @@ -238,47 +251,47 @@ export class ChatRunner { }; this.currentTurn = turnState; - // 5. SDK Query (long-lived) を必要なら起動する。 + // currentTurn を立てた直後から全体を try/finally で囲み、appendMessage / input.push + // 等の中間ステップで throw しても currentTurn が解放されることを保証する (CR Major)。 try { - await this.ensureQuery(); - } catch (err) { - this.currentTurn = null; - // 空 assistant message を「先に append」している関係で、ensureQuery 失敗時に - // 空バブルが履歴に残ってしまう (CR Minor)。chatStore に message 単位の delete - // API は無いので、エラー内容で blocks を埋める形にロールバックする。 - const message = err instanceof Error ? err.message : String(err); + // 5. SDK Query (long-lived) を必要なら起動する。 try { - await chatStore.replaceMessageBlocks(threadId, assistantMsgId, [ - { type: 'text', text: `(MCP 設定エラー: ${message})` }, - ]); - } catch { - /* swallow: replace 自体が失敗しても error event は流すので致命的ではない */ + await this.ensureQuery(); + } catch (err) { + // 空 assistant message を「先に append」している関係で、ensureQuery 失敗時に + // 空バブルが履歴に残ってしまう (CR Minor)。chatStore に message 単位の delete + // API は無いので、エラー内容で blocks を埋める形にロールバックする。 + const message = err instanceof Error ? err.message : String(err); + try { + await chatStore.replaceMessageBlocks(threadId, assistantMsgId, [ + { type: 'text', text: `(MCP 設定エラー: ${message})` }, + ]); + } catch { + /* swallow: replace 自体が失敗しても error event は流すので致命的ではない */ + } + yield { + type: 'error', + code: 'mcp_config_invalid', + message, + }; + return; } - yield { - type: 'error', - code: 'mcp_config_invalid', - message, - }; - return; - } - // ensureQuery が externalConfig を更新するので turnState の参照を最新へ差し替える。 - turnState.externalConfigById = this.cachedExternalConfigById ?? new Map(); - - // 6. user message を SDK の input ストリームに push する。 - // ensureQuery 成功後は this.input が必ず存在するはず (codex 指摘 Major 2: - // null チェックを invariant assertion で表明し、無音破棄 → ハングを防ぐ)。 - if (!this.input) { - this.currentTurn = null; - throw new Error('invariant: ensureQuery succeeded but input is null'); - } - this.input.push({ - type: 'user', - message: { role: 'user', content: prompt }, - parent_tool_use_id: null, - }); + // ensureQuery が externalConfig を更新するので turnState の参照を最新へ差し替える。 + turnState.externalConfigById = this.cachedExternalConfigById ?? new Map(); + + // 6. user message を SDK の input ストリームに push する。 + // ensureQuery 成功後は this.input が必ず存在するはず (codex 指摘 Major 2: + // null チェックを invariant assertion で表明し、無音破棄 → ハングを防ぐ)。 + if (!this.input) { + throw new Error('invariant: ensureQuery succeeded but input is null'); + } + this.input.push({ + type: 'user', + message: { role: 'user', content: prompt }, + parent_tool_use_id: null, + }); - // 8. queue をドレイン。chat_turn_ended が来たら今 turn は終わり。 - try { + // 7. queue をドレイン。chat_turn_ended が来たら今 turn は終わり。 while (true) { const evt = await queue.next(); if (evt === null) break; @@ -354,7 +367,12 @@ export class ChatRunner { private async runOutputLoop(query: SdkQueryHandle): Promise { try { for await (const msg of query) { - console.log('[chat-runner] sdk msg:', JSON.stringify(redactMcpSecrets(msg)).slice(0, 200)); + // SDK メッセージの全文を log すると OAuth callback URL の code/state や + // 外部 MCP の出力 (Jira 本文等) がサーバーログに残る (CR Major)。 + // type だけ出して、content / result は redactMcpSecrets でも完全に落とせない + // ため出さない。詳細デバッグが必要な場合は環境変数で切替可能にする想定。 + const mt = (msg as unknown as { type?: string }).type; + if (mt) console.log('[chat-runner] sdk msg type:', mt); await this.dispatchSdkMessage(msg); } } catch (err) { @@ -365,6 +383,9 @@ export class ChatRunner { turn.queue.push({ type: 'chat_turn_ended' }); turn.queue.finish(); } + // 異常終了後に古い承認が UI から来ても side effect を走らせないため、 + // pendingApprovals を一括 reject する (CR Major)。 + this.rejectAllPendingApprovals(); return; } // iter 正常終端 (close 等)。query が死んだ印として outputLoopFailed を立て、 @@ -384,6 +405,8 @@ export class ChatRunner { turn.queue.push({ type: 'chat_turn_ended' }); turn.queue.finish(); } + // 予期しない終了 / 明示 shutdown のいずれでも、残っている承認待ちは無効化する。 + this.rejectAllPendingApprovals(); } // 1 つの SDKMessage を処理する。turn が無ければ捨てる。 @@ -515,6 +538,16 @@ export class ChatRunner { // としては emit させない (CR Major)。 async close(): Promise { this.isClosing = true; + // 進行中の startQueryInternal を待ってから tearDown する (CR Major)。 + // close() が startQueryInternal の途中 (await projectStore.getProjectMeta() 等) で + // 走ると、shutdown 後に sdk.query() が作られて subprocess が孤立する race を防ぐ。 + if (this.ensureQueryInflight) { + try { + await this.ensureQueryInflight; + } catch { + /* swallow: starter の例外は ensureQuery 呼び出し側で観測される */ + } + } const pendingLoop = this.outputLoopDone; this.tearDownQuery(); if (pendingLoop) { @@ -574,57 +607,58 @@ export class ChatRunner { }; this.currentTurn = turnState; - // long-lived query 上で動かす (OAuth state を turn 跨ぎで保持するため)。 + // currentTurn を立てた直後から全体を try/finally で囲み、appendMessage 等の + // 中間ステップで throw しても currentTurn が解放されることを保証する (CR Major)。 + // 解放しないと以後 turn_in_progress で次の turn を受け付けられない。 try { - await this.ensureQuery(); - } catch (err) { - this.currentTurn = null; - yield { - type: 'error', - code: 'mcp_config_invalid', - message: err instanceof Error ? err.message : String(err), - }; - return; - } - turnState.externalConfigById = this.cachedExternalConfigById ?? new Map(); - - // ephemeral: user message は chatStore に append しない (callback URL の code/state - // を chat 履歴に残さない)。空 assistant message だけは tool_use/tool_result の親 - // として必要なので append し、turn 末で「(認証処理を完了しました)」プレースホルダで - // 埋める (dispatchSdkMessage の result 処理が自動で実行する)。 - await chatStore.appendMessage(threadId, { - id: assistantMsgId, - role: 'assistant', - blocks: [], - createdAt: new Date().toISOString(), - }); - yield { type: 'chat_assistant_message_started', messageId: assistantMsgId }; + // long-lived query 上で動かす (OAuth state を turn 跨ぎで保持するため)。 + try { + await this.ensureQuery(); + } catch (err) { + yield { + type: 'error', + code: 'mcp_config_invalid', + message: err instanceof Error ? err.message : String(err), + }; + return; + } + turnState.externalConfigById = this.cachedExternalConfigById ?? new Map(); + + // ephemeral: user message は chatStore に append しない (callback URL の code/state + // を chat 履歴に残さない)。空 assistant message だけは tool_use/tool_result の親 + // として必要なので append し、turn 末で「(認証処理を完了しました)」プレースホルダで + // 埋める (dispatchSdkMessage の result 処理が自動で実行する)。 + await chatStore.appendMessage(threadId, { + id: assistantMsgId, + role: 'assistant', + blocks: [], + createdAt: new Date().toISOString(), + }); + yield { type: 'chat_assistant_message_started', messageId: assistantMsgId }; - if (!this.input) { - this.currentTurn = null; - throw new Error('invariant: ensureQuery succeeded but input is null'); - } + if (!this.input) { + throw new Error('invariant: ensureQuery succeeded but input is null'); + } - // 構造化 prompt: AI に必ず指定 server の complete_authentication を呼ばせる。 - // long-lived query では allowedTools が固定 (ensureQuery 起動時に決まる) なので - // 単一 tool への制約はかけられないが、prompt 指示で実用上はモデルが従う。 - const prompt = [ - 'OAuth コールバック URL を受信しました。', - `mcp__${mcpServerId}__complete_authentication ツールを呼び、`, - '以下の callback URL で認証を完了してください:', - callbackUrl, - '', - '他の MCP server の complete_authentication ツールや、', - '別の作業ツール (create_node 等) は呼ばないでください。', - ].join('\n'); - - this.input.push({ - type: 'user', - message: { role: 'user', content: prompt }, - parent_tool_use_id: null, - }); + // 構造化 prompt: AI に必ず指定 server の complete_authentication を呼ばせる。 + // long-lived query では allowedTools が固定 (ensureQuery 起動時に決まる) なので + // 単一 tool への制約はかけられないが、prompt 指示で実用上はモデルが従う。 + const prompt = [ + 'OAuth コールバック URL を受信しました。', + `mcp__${mcpServerId}__complete_authentication ツールを呼び、`, + '以下の callback URL で認証を完了してください:', + callbackUrl, + '', + '他の MCP server の complete_authentication ツールや、', + '別の作業ツール (create_node 等) は呼ばないでください。', + ].join('\n'); + + this.input.push({ + type: 'user', + message: { role: 'user', content: prompt }, + parent_tool_use_id: null, + }); - try { while (true) { const evt = await queue.next(); if (evt === null) break;