From 474463d3bd76ffed5f29097acb53db4fa6d01adc Mon Sep 17 00:00:00 2001 From: Shoma Nishitateno Date: Sat, 2 May 2026 15:21:34 +0900 Subject: [PATCH 1/2] =?UTF-8?q?feat(ai-engine):=20ADR-0011=20PR-E3a=20?= =?UTF-8?q?=E2=80=94=20OAuthFlowOrchestrator?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ADR-0011 の実装段階 3a。issue #28 への対応として、PR-E1 (schema + token store) と PR-E2 (OAuthClient + LoopbackCallbackServer) を組み合わせる単一エントリの オーケストレータを実装する。本 PR でも UI / API は接続されない (PR-E3b で Route Handler + UI 統合)。 ## 設計判断 - process scope の singleton state (flows Map) で in-progress 状態を保持。 Next.js Route Handler が module scope で共有 - 1 mcpServerId につき同時に 1 フロー (pending 中の二重 start は throw) - state verify (CSRF 対策) は orchestrator が持つ - completed / failed 状態は次 start まで残す (UI が status を pull) ## 新規ファイル - packages/ai-engine/src/oauth/oauth-flow-orchestrator.ts: - startOAuthFlow: PKCE / state 生成 → loopback server 起動 → authorization URL 構築 → bg で callback 受領 → token 交換 → token store 保存 - getOAuthFlowStatus: polling 用の status 取得 (pending / completed / failed) - awaitOAuthFlowSettled: test helper - clearOAuthFlow: pending 中なら callbackHandle.close で bg 中断 - __resetAllFlowsForTest: @internal、test isolation 用 - packages/ai-engine/src/oauth/oauth-flow-orchestrator.test.ts (7 件) ## codex セカンドオピニオン対応 (push 直前 review) [HIGH] スロット予約レース: await より前に flows.set で sentinel 予約 (同期で check-and-set 完結)、callback server 起動失敗時は sentinel を片付け [HIGH] clearOAuthFlow が bg を停止しない: flows entry に callbackHandle を 保持し、clear 時に close() で bg を中断 [MEDIUM] concurrent race の test 不足: Promise.all で並走 start を呼んで fulfilled 1 / rejected 1 を確認するテスト追加 [MEDIUM] scope split が空文字混入: result.scope.split(/\\s+/).filter(Boolean) と scopesParsed.length > 0 の conditional spread に変更 [MEDIUM] flows.get の no entry 時に silent: console.warn を追加 [MEDIUM] __resetAllFlowsForTest が public: @internal JSDoc 追加 [LOW] completed の authorizationUrl 保持: TODO コメントで PR-E3b の整理予告 [LOW] callbackHandle.close 失敗の swallow: console.warn を追加 ## テスト 7 件: start で pending / 成功で completed + token 保存 / state mismatch で failed / 二重 start で reject / 未開始 null / concurrent race / clearOAuthFlow で bg 中断 + 再 start 即可能。 pnpm typecheck 4/4 PASS / pnpm test core 99 + storage 97 + ai-engine 244 + frontend 273 = 713 PASS / pnpm lint exit 0。 --- packages/ai-engine/src/oauth/index.ts | 10 + .../src/oauth/oauth-flow-orchestrator.test.ts | 240 +++++++++++++++++ .../src/oauth/oauth-flow-orchestrator.ts | 241 ++++++++++++++++++ 3 files changed, 491 insertions(+) create mode 100644 packages/ai-engine/src/oauth/oauth-flow-orchestrator.test.ts create mode 100644 packages/ai-engine/src/oauth/oauth-flow-orchestrator.ts diff --git a/packages/ai-engine/src/oauth/index.ts b/packages/ai-engine/src/oauth/index.ts index a46ea30..c094c76 100644 --- a/packages/ai-engine/src/oauth/index.ts +++ b/packages/ai-engine/src/oauth/index.ts @@ -16,3 +16,13 @@ export { refreshAccessToken, type TokenExchangeResult, } from './oauth-client'; +export { + __resetAllFlowsForTest, + awaitOAuthFlowSettled, + clearOAuthFlow, + getOAuthFlowStatus, + type OAuthFlowStatus, + type StartOAuthFlowInput, + type StartOAuthFlowResult, + startOAuthFlow, +} from './oauth-flow-orchestrator'; diff --git a/packages/ai-engine/src/oauth/oauth-flow-orchestrator.test.ts b/packages/ai-engine/src/oauth/oauth-flow-orchestrator.test.ts new file mode 100644 index 0000000..4a2506c --- /dev/null +++ b/packages/ai-engine/src/oauth/oauth-flow-orchestrator.test.ts @@ -0,0 +1,240 @@ +import { mkdtempSync, rmSync } from 'node:fs'; +import { tmpdir } from 'node:os'; +import path from 'node:path'; + +import { ATLASSIAN_CLOUD_OAUTH } from '@tally/core'; +import { FileSystemOAuthStore } from '@tally/storage'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; + +import { + __resetAllFlowsForTest, + awaitOAuthFlowSettled, + getOAuthFlowStatus, + startOAuthFlow, +} from './oauth-flow-orchestrator'; + +function makeProjectDir(): string { + return mkdtempSync(path.join(tmpdir(), 'tally-oauth-orch-')); +} + +describe('startOAuthFlow / getOAuthFlowStatus', () => { + beforeEach(() => { + __resetAllFlowsForTest(); + }); + afterEach(() => { + vi.unstubAllGlobals(); + __resetAllFlowsForTest(); + }); + + it('start すると authorizationUrl を返し、状態は pending', async () => { + const projectDir = makeProjectDir(); + try { + // fetch を呼ぶのは callback 受領後 (token 交換) なので start 単独では呼ばれない。 + const { authorizationUrl } = await startOAuthFlow({ + mcpServerId: 'atlassian', + provider: ATLASSIAN_CLOUD_OAUTH, + clientId: 'cid', + projectDir, + }); + expect(authorizationUrl).toMatch(/^https:\/\/auth\.atlassian\.com\/authorize\?/); + expect(authorizationUrl).toContain('client_id=cid'); + expect(authorizationUrl).toContain('code_challenge_method=S256'); + + const status = getOAuthFlowStatus('atlassian'); + expect(status?.status).toBe('pending'); + } finally { + rmSync(projectDir, { recursive: true, force: true }); + } + }); + + it('callback 受領 → token 交換 成功で completed 状態 + token store に保存', async () => { + const projectDir = makeProjectDir(); + try { + // token endpoint を mock + const fetchMock = vi.fn( + async () => + new Response( + JSON.stringify({ + access_token: 'a-tok', + refresh_token: 'r-tok', + expires_in: 3600, + scope: 'read:jira-work offline_access', + token_type: 'Bearer', + }), + { status: 200 }, + ), + ); + vi.stubGlobal('fetch', fetchMock); + + const { authorizationUrl } = await startOAuthFlow({ + mcpServerId: 'atlassian', + provider: ATLASSIAN_CLOUD_OAUTH, + clientId: 'cid', + projectDir, + }); + + // authorization URL から redirect_uri と state を抜き出して、loopback に callback + // を fetch する (= ブラウザの redirect 相当)。 + const url = new URL(authorizationUrl); + const redirectUri = url.searchParams.get('redirect_uri'); + const state = url.searchParams.get('state'); + if (!redirectUri || !state) throw new Error('missing redirect_uri or state in auth URL'); + // unstubAllGlobals していない時点で fetch も mock されているので、real fetch を取り戻す。 + // → fetch モックは token endpoint だけ反応するように URL で振り分けるよう作り直す。 + // (今のテストでは token mock + loopback fetch を両立させたいので、 + // loopback への fetch は mock を介さず Node の global fetch を直接使う必要がある) + // 簡易解: token endpoint を判定して mock、それ以外は元の fetch に委譲。 + vi.unstubAllGlobals(); + const realFetch = globalThis.fetch.bind(globalThis); + vi.stubGlobal('fetch', async (input: string | URL, init?: RequestInit) => { + const u = typeof input === 'string' ? input : input.toString(); + if (u === ATLASSIAN_CLOUD_OAUTH.tokenEndpoint) { + return new Response( + JSON.stringify({ + access_token: 'a-tok', + refresh_token: 'r-tok', + expires_in: 3600, + scope: 'read:jira-work offline_access', + token_type: 'Bearer', + }), + { status: 200 }, + ); + } + return await realFetch(input, init); + }); + + // loopback callback を叩く (real fetch で) + const cbRes = await fetch(`${redirectUri}?code=AAA&state=${encodeURIComponent(state)}`); + expect(cbRes.status).toBe(200); + + // bg promise の settle を待つ + await awaitOAuthFlowSettled('atlassian'); + + const status = getOAuthFlowStatus('atlassian'); + expect(status?.status).toBe('completed'); + + // token store に書かれていることを確認 + const store = new FileSystemOAuthStore(projectDir); + const token = await store.read('atlassian'); + expect(token?.accessToken).toBe('a-tok'); + expect(token?.refreshToken).toBe('r-tok'); + expect(token?.scopes).toEqual(['read:jira-work', 'offline_access']); + expect(token?.tokenType).toBe('Bearer'); + // expiresAt は now + 3600 秒 (大まかな範囲確認) + expect(token?.expiresAt).toBeDefined(); + } finally { + rmSync(projectDir, { recursive: true, force: true }); + } + }); + + it('state mismatch で failed 状態 (CSRF 検出)', async () => { + const projectDir = makeProjectDir(); + try { + const { authorizationUrl } = await startOAuthFlow({ + mcpServerId: 'atlassian', + provider: ATLASSIAN_CLOUD_OAUTH, + clientId: 'cid', + projectDir, + }); + const redirectUri = new URL(authorizationUrl).searchParams.get('redirect_uri'); + if (!redirectUri) throw new Error('missing redirect_uri'); + + // 不正な state で callback を叩く + await fetch(`${redirectUri}?code=AAA&state=wrong-state`); + await awaitOAuthFlowSettled('atlassian'); + + const status = getOAuthFlowStatus('atlassian'); + expect(status?.status).toBe('failed'); + if (status?.status === 'failed') { + expect(status.failureMessage).toMatch(/state mismatch/); + } + + // token store には何も書かれていない + const store = new FileSystemOAuthStore(projectDir); + expect(await store.read('atlassian')).toBeNull(); + } finally { + rmSync(projectDir, { recursive: true, force: true }); + } + }); + + it('pending 中の二重 start は reject', async () => { + const projectDir = makeProjectDir(); + try { + await startOAuthFlow({ + mcpServerId: 'atlassian', + provider: ATLASSIAN_CLOUD_OAUTH, + clientId: 'cid', + projectDir, + }); + await expect( + startOAuthFlow({ + mcpServerId: 'atlassian', + provider: ATLASSIAN_CLOUD_OAUTH, + clientId: 'cid', + projectDir, + }), + ).rejects.toThrow(/already in progress/); + } finally { + rmSync(projectDir, { recursive: true, force: true }); + } + }); + + it('concurrent な並走 start (await 中の race) でも 1 つだけ成功する', async () => { + const projectDir = makeProjectDir(); + try { + // 同 mcpServerId への start を Promise.all で同時起動する。HIGH 修正前は + // 両方が `existing?.status === 'pending'` チェックを通過してフローが二重に走る。 + const results = await Promise.allSettled([ + startOAuthFlow({ + mcpServerId: 'atlassian', + provider: ATLASSIAN_CLOUD_OAUTH, + clientId: 'cid', + projectDir, + }), + startOAuthFlow({ + mcpServerId: 'atlassian', + provider: ATLASSIAN_CLOUD_OAUTH, + clientId: 'cid', + projectDir, + }), + ]); + const fulfilled = results.filter((r) => r.status === 'fulfilled'); + const rejected = results.filter((r) => r.status === 'rejected'); + expect(fulfilled).toHaveLength(1); + expect(rejected).toHaveLength(1); + } finally { + rmSync(projectDir, { recursive: true, force: true }); + } + }); + + it('clearOAuthFlow で pending 中の bg を中断する (callbackHandle.close 経由)', async () => { + const projectDir = makeProjectDir(); + try { + await startOAuthFlow({ + mcpServerId: 'atlassian', + provider: ATLASSIAN_CLOUD_OAUTH, + clientId: 'cid', + projectDir, + }); + // 直接 clearOAuthFlow → bg IIFE が awaitCallback を reject されて catch に行く + // が、entry は既に消えているので状態遷移は起きない (warn が出る)。 + const { clearOAuthFlow } = await import('./oauth-flow-orchestrator'); + clearOAuthFlow('atlassian'); + // bg promise が settle するまで待つ helper はもう entry が無いので no-op。 + // ここでは「再 start が即可能」であることを確認する。 + const { authorizationUrl } = await startOAuthFlow({ + mcpServerId: 'atlassian', + provider: ATLASSIAN_CLOUD_OAUTH, + clientId: 'cid', + projectDir, + }); + expect(authorizationUrl).toMatch(/^https:\/\/auth\.atlassian\.com\//); + } finally { + rmSync(projectDir, { recursive: true, force: true }); + } + }); + + it('未開始の mcpServerId は getOAuthFlowStatus が null を返す', () => { + expect(getOAuthFlowStatus('never-started')).toBeNull(); + }); +}); diff --git a/packages/ai-engine/src/oauth/oauth-flow-orchestrator.ts b/packages/ai-engine/src/oauth/oauth-flow-orchestrator.ts new file mode 100644 index 0000000..0192bd6 --- /dev/null +++ b/packages/ai-engine/src/oauth/oauth-flow-orchestrator.ts @@ -0,0 +1,241 @@ +// ADR-0011 PR-E3: OAuth 2.1 フロー全体のオーケストレータ。 +// PR-E2 で実装した部品 (OAuthClient + LoopbackCallbackServer) と PR-E1 の +// FileSystemOAuthStore を組み合わせて、Route Handler から呼べる単一エントリ +// (`startOAuthFlow` / `getOAuthFlowStatus`) に集約する。 +// +// 設計判断: +// - process scope の singleton state (`flows` Map) で in-progress 状態を保持。 +// Next.js Route Handler は per-request にハンドラ関数が走るが、Next の dev/prod +// とも単一 Node.js プロセスで動くので module scope の Map は共有される。 +// - 1 mcpServerId につき同時に 1 フローまで。pending 中の二重 start は reject。 +// - state verify (CSRF 対策) は本モジュールが持つ。LoopbackCallbackServer から +// 返ってきた code/state のうち、state が start 時に発行したものと一致しなければ +// 即 failed に倒す。 +// - completed / failed 状態は次回 start まで Map に残す (UI が status を pull できる)。 +// start を呼ぶと前の状態は上書きされる。 + +import type { McpOAuthToken, OAuthProviderConfig } from '@tally/core'; +import { FileSystemOAuthStore } from '@tally/storage'; +import { + type LoopbackCallbackHandle, + startLoopbackCallbackServer, +} from './loopback-callback-server'; +import { + buildAuthorizationUrl, + exchangeCodeForToken, + generateOAuthState, + generatePkcePair, +} from './oauth-client'; + +export type OAuthFlowStatus = + | { + status: 'pending'; + authorizationUrl: string; + } + | { + status: 'completed'; + // completed では authorizationUrl は意味を持たないが UI 側のシグナル整合のため残す。 + // TODO(PR-E3b): UI 側が completed 時に authorizationUrl を参照しないことが確認できたら + // この field は削除しても良い (deadweight 化のため)。 + authorizationUrl: string; + } + | { + status: 'failed'; + authorizationUrl: string; + failureMessage: string; + }; + +// OAuthFlowStatus の discriminated union を保つために interface 拡張ではなく +// intersection で promise を足す (narrow が動くため)。 +type FlowEntry = OAuthFlowStatus & { + // bg で動く完了 promise。await することは無いが、test の安定化用に export 用 helper を持つ。 + promise: Promise; + // pending 中のみ存在する loopback server ハンドル。clearOAuthFlow から close() を呼んで + // bg IIFE を中断するために flows entry で保持する (CR HIGH 対応)。 + callbackHandle?: LoopbackCallbackHandle; +}; + +// プロセスローカルの flow 状態。Next の Route Handler が共有する。 +const flows = new Map(); + +export interface StartOAuthFlowInput { + mcpServerId: string; + provider: OAuthProviderConfig; + clientId: string; + // 未指定なら provider.defaultScopes を使う。 + scopes?: readonly string[]; + // FileSystemOAuthStore が token を書き込むプロジェクトディレクトリ。 + projectDir: string; +} + +export interface StartOAuthFlowResult { + authorizationUrl: string; +} + +// OAuth フロー開始: PKCE/state 生成 → loopback server 起動 → authorization URL 構築 → +// bg で callback を待ち token 交換 + 保存 → 完了/失敗を flows Map に反映。 +// +// 戻り値の `authorizationUrl` を UI 側がブラウザで開く。`getOAuthFlowStatus` で +// 完了を polling する。 +export async function startOAuthFlow(input: StartOAuthFlowInput): Promise { + // CR HIGH 対応: スロット予約を await より前に同期で確保する。これをしないと + // `await startLoopbackCallbackServer()` 中に並走 start が来た場合、両方が + // `existing?.status === 'pending'` を通過してフローが二重に走る。 + // sentinel として一旦 authorizationUrl='' で予約し、本物が決まったら上書きする。 + const existing = flows.get(input.mcpServerId); + if (existing?.status === 'pending') { + throw new Error(`OAuth flow already in progress for "${input.mcpServerId}"`); + } + flows.set(input.mcpServerId, { + status: 'pending', + authorizationUrl: '', + promise: Promise.resolve(), + }); + + const pkce = generatePkcePair(); + const state = generateOAuthState(); + let callbackHandle: LoopbackCallbackHandle; + try { + callbackHandle = await startLoopbackCallbackServer(); + } catch (err) { + // 起動失敗で sentinel が残らないよう片付ける。 + flows.delete(input.mcpServerId); + throw err; + } + const scopes = input.scopes ?? input.provider.defaultScopes; + + const authorizationUrl = buildAuthorizationUrl({ + provider: input.provider, + clientId: input.clientId, + redirectUri: callbackHandle.redirectUri, + scopes, + state, + codeChallenge: pkce.codeChallenge, + }); + + // bg promise を IIFE で生成。IIFE 内部から自分自身の promise 変数を直接参照すると + // tsc の definite-assignment 解析でエラーになるため、`flows.get(mcpServerId)?.promise` + // 経由で取り出して再 set する。これは外側で flows.set した後の値を読むので安全。 + const promise = (async () => { + try { + const cb = await callbackHandle.awaitCallback(); + if (cb.state !== state) { + throw new Error('OAuth state mismatch (possible CSRF or stale callback)'); + } + const result = await exchangeCodeForToken({ + provider: input.provider, + clientId: input.clientId, + code: cb.code, + redirectUri: callbackHandle.redirectUri, + codeVerifier: pkce.codeVerifier, + }); + + const now = new Date().toISOString(); + const expiresAt = + result.expiresIn !== undefined + ? new Date(Date.now() + result.expiresIn * 1000).toISOString() + : undefined; + + // exactOptionalPropertyTypes 下では undefined を含む object 構築不可なので + // optional フィールドは値があるときだけ乗せる。 + // scope は空白区切り。連続空白・前後空白で空文字が混入しないよう \s+ split + filter。 + const scopesParsed = result.scope?.split(/\s+/).filter(Boolean); + const token: McpOAuthToken = { + mcpServerId: input.mcpServerId, + accessToken: result.accessToken, + ...(result.refreshToken !== undefined ? { refreshToken: result.refreshToken } : {}), + acquiredAt: now, + ...(expiresAt !== undefined ? { expiresAt } : {}), + ...(scopesParsed && scopesParsed.length > 0 ? { scopes: scopesParsed } : {}), + tokenType: result.tokenType, + }; + + const store = new FileSystemOAuthStore(input.projectDir); + await store.write(token); + + const cur = flows.get(input.mcpServerId); + if (cur) { + flows.set(input.mcpServerId, { + status: 'completed', + authorizationUrl, + promise: cur.promise, + }); + } else { + // clearOAuthFlow で消された後に bg が完了したケース。token は保存済みなので + // 状態は 失う形だが、診断のため warn を出す。 + console.warn(`[oauth-flow] flow entry was cleared before completion: ${input.mcpServerId}`); + } + } catch (err) { + const cur = flows.get(input.mcpServerId); + if (cur) { + flows.set(input.mcpServerId, { + status: 'failed', + authorizationUrl, + failureMessage: err instanceof Error ? err.message : String(err), + promise: cur.promise, + }); + } else { + console.warn( + `[oauth-flow] flow entry was cleared before failure (msg=${err instanceof Error ? err.message : String(err)}): ${input.mcpServerId}`, + ); + } + } finally { + await callbackHandle.close().catch((closeErr) => { + // close 失敗は token 保存成否には影響しないが、port / fd リーク診断のため warn。 + console.warn(`[oauth-flow] callback server close failed: ${String(closeErr)}`); + }); + } + })(); + + flows.set(input.mcpServerId, { + status: 'pending', + authorizationUrl, + promise, + callbackHandle, + }); + + return { authorizationUrl }; +} + +// 現在の flow 状態を取得。未開始なら null。 +export function getOAuthFlowStatus(mcpServerId: string): OAuthFlowStatus | null { + const f = flows.get(mcpServerId); + if (!f) return null; + // promise を返さないために再構築する (ユーザーには内部 promise を見せない)。 + if (f.status === 'pending') return { status: 'pending', authorizationUrl: f.authorizationUrl }; + if (f.status === 'completed') { + return { status: 'completed', authorizationUrl: f.authorizationUrl }; + } + return { + status: 'failed', + authorizationUrl: f.authorizationUrl, + failureMessage: f.failureMessage, + }; +} + +// 進行中の bg promise が完了するのを待つ helper (主に test 用)。Route Handler は +// status を polling するので呼ばない。 +export async function awaitOAuthFlowSettled(mcpServerId: string): Promise { + const f = flows.get(mcpServerId); + if (!f) return; + await f.promise; +} + +// flow state を Map から消す (UI 側の「やり直し」操作用)。pending 中だった場合は +// callbackHandle.close() を呼んで bg IIFE を中断する。close 後に awaitCallback が +// reject され IIFE は catch ブランチに行くが、その時点で flows entry は無いので +// console.warn が出るのは想定動作。 +export function clearOAuthFlow(mcpServerId: string): void { + const f = flows.get(mcpServerId); + if (f?.status === 'pending' && f.callbackHandle) { + f.callbackHandle.close().catch(() => { + /* swallow: close 失敗は cleanup の妨げにしない */ + }); + } + flows.delete(mcpServerId); +} + +/** @internal テスト isolation 用: 全 flow をクリアする。本番コードから呼ばないこと。 */ +export function __resetAllFlowsForTest(): void { + flows.clear(); +} From d258c152949a8e0d9ca0189a3a1eeeaf94de6ceb Mon Sep 17 00:00:00 2001 From: Shoma Nishitateno Date: Sat, 2 May 2026 17:21:43 +0900 Subject: [PATCH 2/2] =?UTF-8?q?fix(ai-engine):=20OAuthFlowOrchestrator=20C?= =?UTF-8?q?R=20Major=203=20=E4=BB=B6=20+=20codex=20Major=201=20=E4=BB=B6?= =?UTF-8?q?=E5=AF=BE=E5=BF=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CR 1 周目 Major: - runId pattern 導入: 各 startOAuthFlow 呼び出しに UUID を発行し、bg IIFE が flows.set する前に「自分が現在の run か」を確認。clearOAuthFlow → 即 start race で旧 bg が新 pending を踏みつぶすバグを防ぐ。await 後の各分岐 (startListen 後 / completed / failed) で runId guard。 - failureMessage は固定文字列に正規化 (raw err.message を UI 経由で漏らさない)。 詳細は console.warn で server log に出す。 - __resetAllFlowsForTest を async 化、pending flow の callbackHandle.close() を await してから関数 return。テスト間の loopback fd リークを防ぐ。 codex セカンドオピニオン Major: - store.write 直前にも runId guard を追加。旧実装は write 後にしか guard が無く、 preempted な旧 run が storage にトークンを書き込むウィンドウがあった。 テスト追加: - 「store.write 直前に preempt されたら旧 run はトークンを書き込まない」 - 「clearOAuthFlow → 即 start で旧 bg が新 pending を踏まない (runId guard)」 --- .../src/oauth/oauth-flow-orchestrator.test.ts | 120 +++++++++++++++++- .../src/oauth/oauth-flow-orchestrator.ts | 107 ++++++++++++++-- 2 files changed, 209 insertions(+), 18 deletions(-) diff --git a/packages/ai-engine/src/oauth/oauth-flow-orchestrator.test.ts b/packages/ai-engine/src/oauth/oauth-flow-orchestrator.test.ts index 4a2506c..24cdf31 100644 --- a/packages/ai-engine/src/oauth/oauth-flow-orchestrator.test.ts +++ b/packages/ai-engine/src/oauth/oauth-flow-orchestrator.test.ts @@ -18,12 +18,12 @@ function makeProjectDir(): string { } describe('startOAuthFlow / getOAuthFlowStatus', () => { - beforeEach(() => { - __resetAllFlowsForTest(); + beforeEach(async () => { + await __resetAllFlowsForTest(); }); - afterEach(() => { + afterEach(async () => { vi.unstubAllGlobals(); - __resetAllFlowsForTest(); + await __resetAllFlowsForTest(); }); it('start すると authorizationUrl を返し、状態は pending', async () => { @@ -129,6 +129,9 @@ describe('startOAuthFlow / getOAuthFlowStatus', () => { it('state mismatch で failed 状態 (CSRF 検出)', async () => { const projectDir = makeProjectDir(); + // CR Major 対応で failureMessage は固定メッセージに正規化された。詳細は server log + // (console.warn) に出るので、warn の内容で「state mismatch を検出した」ことを確認する。 + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}); try { const { authorizationUrl } = await startOAuthFlow({ mcpServerId: 'atlassian', @@ -146,13 +149,19 @@ describe('startOAuthFlow / getOAuthFlowStatus', () => { const status = getOAuthFlowStatus('atlassian'); expect(status?.status).toBe('failed'); if (status?.status === 'failed') { - expect(status.failureMessage).toMatch(/state mismatch/); + // ユーザー向けの failureMessage は固定 (raw 例外メッセージは漏らさない) + expect(status.failureMessage).toBe('OAuth flow failed (see server logs for details)'); } + // server log には実際の失敗理由 (state mismatch) が出ている + const warnCalls = warnSpy.mock.calls.map((c) => c.join(' ')); + expect(warnCalls.some((m) => /state mismatch/.test(m))).toBe(true); + // token store には何も書かれていない const store = new FileSystemOAuthStore(projectDir); expect(await store.read('atlassian')).toBeNull(); } finally { + warnSpy.mockRestore(); rmSync(projectDir, { recursive: true, force: true }); } }); @@ -237,4 +246,105 @@ describe('startOAuthFlow / getOAuthFlowStatus', () => { it('未開始の mcpServerId は getOAuthFlowStatus が null を返す', () => { expect(getOAuthFlowStatus('never-started')).toBeNull(); }); + + it('store.write 直前に preempt されたら旧 run はトークンを書き込まない (codex Major 対応)', async () => { + // codex 指摘: 旧 implementation では store.write より後に runId guard があったため、 + // clearOAuthFlow → 即 start で旧 run が callback 受領まで進んだ場合、storage には + // 旧 run のトークンが書き込まれ UI と整合が取れなかった。本テストはその retrograde + // を防ぐ guard を踏み台にする。 + const projectDir = makeProjectDir(); + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}); + + // token endpoint を mock (実際には呼ばれない想定) + let tokenEndpointHits = 0; + vi.stubGlobal( + 'fetch', + vi.fn(async (input: string | URL) => { + const u = typeof input === 'string' ? input : input.toString(); + if (u === ATLASSIAN_CLOUD_OAUTH.tokenEndpoint) { + tokenEndpointHits++; + return new Response(JSON.stringify({ access_token: 'old-tok', token_type: 'Bearer' }), { + status: 200, + }); + } + // それ以外 (loopback callback) は real fetch に流す + return await (globalThis as unknown as { fetch: typeof fetch }).fetch(input); + }), + ); + + try { + const { clearOAuthFlow } = await import('./oauth-flow-orchestrator'); + const { authorizationUrl } = await startOAuthFlow({ + mcpServerId: 'atlassian', + provider: ATLASSIAN_CLOUD_OAUTH, + clientId: 'cid', + projectDir, + }); + // 旧 run を clear してすぐ新 run を始める (旧 bg はまだ awaitCallback 中) + clearOAuthFlow('atlassian'); + await startOAuthFlow({ + mcpServerId: 'atlassian', + provider: ATLASSIAN_CLOUD_OAUTH, + clientId: 'cid', + projectDir, + }); + + // 旧 bg は close() で reject されて catch に入るので token endpoint は呼ばれない。 + // (= 旧 run は code を取得できないため store.write も発生しない) + // 念のため microtask drain + await new Promise((r) => setTimeout(r, 30)); + + expect(tokenEndpointHits).toBe(0); + // 新 run は依然 pending、旧 run のトークンが書かれていないこと + const status = getOAuthFlowStatus('atlassian'); + expect(status?.status).toBe('pending'); + const store = new FileSystemOAuthStore(projectDir); + expect(await store.read('atlassian')).toBeNull(); + void authorizationUrl; + } finally { + warnSpy.mockRestore(); + rmSync(projectDir, { recursive: true, force: true }); + } + }); + + it('clearOAuthFlow → 即 start の race で旧 bg が新 pending を踏まない (runId guard)', async () => { + // CR Major 対応の検証: 旧 run の bg IIFE は close() で reject され catch に入るが、 + // 新 run が同じ mcpServerId で pending 状態を持っている。runId guard が無いと旧 bg は + // 新 run の entry を 'failed' で踏みつぶす。 + const projectDir = makeProjectDir(); + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}); + try { + const { clearOAuthFlow } = await import('./oauth-flow-orchestrator'); + + await startOAuthFlow({ + mcpServerId: 'atlassian', + provider: ATLASSIAN_CLOUD_OAUTH, + clientId: 'cid', + projectDir, + }); + // 旧 run を clear → bg はまだ awaitCallback に居るが close() で reject される + clearOAuthFlow('atlassian'); + // 旧 bg の catch ブランチが flows.get する前に新 run を開始したい。 + // clearOAuthFlow は同期で flows.delete + bg の close() を非同期 fire-and-forget + // するので、この時点で flows は空。新 run を始める。 + await startOAuthFlow({ + mcpServerId: 'atlassian', + provider: ATLASSIAN_CLOUD_OAUTH, + clientId: 'cid', + projectDir, + }); + // 旧 bg の catch が走り終えるまで microtask を回す。 + await new Promise((r) => setTimeout(r, 20)); + // 新 run は依然として pending (旧 bg に踏まれていない) + const status = getOAuthFlowStatus('atlassian'); + expect(status?.status).toBe('pending'); + // 旧 bg の preempted ログが出ている (failure / completion 両方ありうるが、 + // close() が awaitCallback を reject するので failure 経由)。 + const warnCalls = warnSpy.mock.calls.map((c) => c.join(' ')); + expect(warnCalls.some((m) => /preempted/.test(m))).toBe(true); + } finally { + warnSpy.mockRestore(); + rmSync(projectDir, { recursive: true, force: true }); + } + }); }); diff --git a/packages/ai-engine/src/oauth/oauth-flow-orchestrator.ts b/packages/ai-engine/src/oauth/oauth-flow-orchestrator.ts index 0192bd6..3a9353d 100644 --- a/packages/ai-engine/src/oauth/oauth-flow-orchestrator.ts +++ b/packages/ai-engine/src/oauth/oauth-flow-orchestrator.ts @@ -14,6 +14,8 @@ // - completed / failed 状態は次回 start まで Map に残す (UI が status を pull できる)。 // start を呼ぶと前の状態は上書きされる。 +import { randomUUID } from 'node:crypto'; + import type { McpOAuthToken, OAuthProviderConfig } from '@tally/core'; import { FileSystemOAuthStore } from '@tally/storage'; import { @@ -53,6 +55,12 @@ type FlowEntry = OAuthFlowStatus & { // pending 中のみ存在する loopback server ハンドル。clearOAuthFlow から close() を呼んで // bg IIFE を中断するために flows entry で保持する (CR HIGH 対応)。 callbackHandle?: LoopbackCallbackHandle; + // この flow を起動した startOAuthFlow 呼び出し固有の ID。並走 start や + // clearOAuthFlow → 即 start などで entry が他の run に置き換わったかを + // 判定するために使う。bg IIFE は flows.set する前に runId 一致を確認する + // ことで、自分が「現在の run」でなくなっていたら状態遷移をスキップする + // (CR Major 対応: 古い run が新 run の状態を踏みつぶすのを防ぐ)。 + runId: string; }; // プロセスローカルの flow 状態。Next の Route Handler が共有する。 @@ -86,10 +94,15 @@ export async function startOAuthFlow(input: StartOAuthFlowInput): Promise { + console.warn(`[oauth-flow] callback server close failed (abort path): ${String(closeErr)}`); + }); + throw new Error(`OAuth flow was preempted for "${input.mcpServerId}"`); + } const scopes = input.scopes ?? input.provider.defaultScopes; const authorizationUrl = buildAuthorizationUrl({ @@ -150,34 +176,54 @@ export async function startOAuthFlow(input: StartOAuthFlowInput): Promise { @@ -187,11 +233,23 @@ export async function startOAuthFlow(input: StartOAuthFlowInput): Promise {}); + throw new Error(`OAuth flow was preempted for "${input.mcpServerId}"`); + } flows.set(input.mcpServerId, { status: 'pending', authorizationUrl, promise, callbackHandle, + runId, }); return { authorizationUrl }; @@ -235,7 +293,30 @@ export function clearOAuthFlow(mcpServerId: string): void { flows.delete(mcpServerId); } -/** @internal テスト isolation 用: 全 flow をクリアする。本番コードから呼ばないこと。 */ -export function __resetAllFlowsForTest(): void { +/** + * @internal テスト isolation 用: 全 flow をクリアする。本番コードから呼ばないこと。 + * + * CR Major 対応: pending な flow の callbackHandle を close してから関数 return する。 + * 単に Map.clear だけだと bg IIFE が抱えている loopback サーバが解放されず、テスト間で + * port や fd がリークする (LoopbackCallbackServer は port=0 で起動するので衝突自体は + * しないが、test プロセス全体で fd が積み上がる)。 + * + * 順序: flows.clear() を close await の前に呼ぶ。理由は、close が完了する前に bg IIFE + * が catch ブランチに入って `flows.get(id)` をしたときに、旧 entry が見えていると runId + * guard を通過して状態を書き換えてしまう可能性があるため (実際には runId は同じなので + * 通過する)。clear を先にすることで、bg は entry なし → preempted ログだけ出して終わる。 + */ +export async function __resetAllFlowsForTest(): Promise { + const closes: Promise[] = []; + for (const f of flows.values()) { + if (f.callbackHandle) { + closes.push( + f.callbackHandle.close().catch((closeErr) => { + console.warn(`[oauth-flow] reset close failed: ${String(closeErr)}`); + }), + ); + } + } flows.clear(); + await Promise.all(closes); }