diff --git a/packages/ai-engine/src/oauth/index.ts b/packages/ai-engine/src/oauth/index.ts index a46ea30..c094c76 100644 --- a/packages/ai-engine/src/oauth/index.ts +++ b/packages/ai-engine/src/oauth/index.ts @@ -16,3 +16,13 @@ export { refreshAccessToken, type TokenExchangeResult, } from './oauth-client'; +export { + __resetAllFlowsForTest, + awaitOAuthFlowSettled, + clearOAuthFlow, + getOAuthFlowStatus, + type OAuthFlowStatus, + type StartOAuthFlowInput, + type StartOAuthFlowResult, + startOAuthFlow, +} from './oauth-flow-orchestrator'; diff --git a/packages/ai-engine/src/oauth/oauth-flow-orchestrator.test.ts b/packages/ai-engine/src/oauth/oauth-flow-orchestrator.test.ts new file mode 100644 index 0000000..24cdf31 --- /dev/null +++ b/packages/ai-engine/src/oauth/oauth-flow-orchestrator.test.ts @@ -0,0 +1,350 @@ +import { mkdtempSync, rmSync } from 'node:fs'; +import { tmpdir } from 'node:os'; +import path from 'node:path'; + +import { ATLASSIAN_CLOUD_OAUTH } from '@tally/core'; +import { FileSystemOAuthStore } from '@tally/storage'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; + +import { + __resetAllFlowsForTest, + awaitOAuthFlowSettled, + getOAuthFlowStatus, + startOAuthFlow, +} from './oauth-flow-orchestrator'; + +function makeProjectDir(): string { + return mkdtempSync(path.join(tmpdir(), 'tally-oauth-orch-')); +} + +describe('startOAuthFlow / getOAuthFlowStatus', () => { + beforeEach(async () => { + await __resetAllFlowsForTest(); + }); + afterEach(async () => { + vi.unstubAllGlobals(); + await __resetAllFlowsForTest(); + }); + + it('start すると authorizationUrl を返し、状態は pending', async () => { + const projectDir = makeProjectDir(); + try { + // fetch を呼ぶのは callback 受領後 (token 交換) なので start 単独では呼ばれない。 + const { authorizationUrl } = await startOAuthFlow({ + mcpServerId: 'atlassian', + provider: ATLASSIAN_CLOUD_OAUTH, + clientId: 'cid', + projectDir, + }); + expect(authorizationUrl).toMatch(/^https:\/\/auth\.atlassian\.com\/authorize\?/); + expect(authorizationUrl).toContain('client_id=cid'); + expect(authorizationUrl).toContain('code_challenge_method=S256'); + + const status = getOAuthFlowStatus('atlassian'); + expect(status?.status).toBe('pending'); + } finally { + rmSync(projectDir, { recursive: true, force: true }); + } + }); + + it('callback 受領 → token 交換 成功で completed 状態 + token store に保存', async () => { + const projectDir = makeProjectDir(); + try { + // token endpoint を mock + const fetchMock = vi.fn( + async () => + new Response( + JSON.stringify({ + access_token: 'a-tok', + refresh_token: 'r-tok', + expires_in: 3600, + scope: 'read:jira-work offline_access', + token_type: 'Bearer', + }), + { status: 200 }, + ), + ); + vi.stubGlobal('fetch', fetchMock); + + const { authorizationUrl } = await startOAuthFlow({ + mcpServerId: 'atlassian', + provider: ATLASSIAN_CLOUD_OAUTH, + clientId: 'cid', + projectDir, + }); + + // authorization URL から redirect_uri と state を抜き出して、loopback に callback + // を fetch する (= ブラウザの redirect 相当)。 + const url = new URL(authorizationUrl); + const redirectUri = url.searchParams.get('redirect_uri'); + const state = url.searchParams.get('state'); + if (!redirectUri || !state) throw new Error('missing redirect_uri or state in auth URL'); + // unstubAllGlobals していない時点で fetch も mock されているので、real fetch を取り戻す。 + // → fetch モックは token endpoint だけ反応するように URL で振り分けるよう作り直す。 + // (今のテストでは token mock + loopback fetch を両立させたいので、 + // loopback への fetch は mock を介さず Node の global fetch を直接使う必要がある) + // 簡易解: token endpoint を判定して mock、それ以外は元の fetch に委譲。 + vi.unstubAllGlobals(); + const realFetch = globalThis.fetch.bind(globalThis); + vi.stubGlobal('fetch', async (input: string | URL, init?: RequestInit) => { + const u = typeof input === 'string' ? input : input.toString(); + if (u === ATLASSIAN_CLOUD_OAUTH.tokenEndpoint) { + return new Response( + JSON.stringify({ + access_token: 'a-tok', + refresh_token: 'r-tok', + expires_in: 3600, + scope: 'read:jira-work offline_access', + token_type: 'Bearer', + }), + { status: 200 }, + ); + } + return await realFetch(input, init); + }); + + // loopback callback を叩く (real fetch で) + const cbRes = await fetch(`${redirectUri}?code=AAA&state=${encodeURIComponent(state)}`); + expect(cbRes.status).toBe(200); + + // bg promise の settle を待つ + await awaitOAuthFlowSettled('atlassian'); + + const status = getOAuthFlowStatus('atlassian'); + expect(status?.status).toBe('completed'); + + // token store に書かれていることを確認 + const store = new FileSystemOAuthStore(projectDir); + const token = await store.read('atlassian'); + expect(token?.accessToken).toBe('a-tok'); + expect(token?.refreshToken).toBe('r-tok'); + expect(token?.scopes).toEqual(['read:jira-work', 'offline_access']); + expect(token?.tokenType).toBe('Bearer'); + // expiresAt は now + 3600 秒 (大まかな範囲確認) + expect(token?.expiresAt).toBeDefined(); + } finally { + rmSync(projectDir, { recursive: true, force: true }); + } + }); + + it('state mismatch で failed 状態 (CSRF 検出)', async () => { + const projectDir = makeProjectDir(); + // CR Major 対応で failureMessage は固定メッセージに正規化された。詳細は server log + // (console.warn) に出るので、warn の内容で「state mismatch を検出した」ことを確認する。 + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}); + try { + const { authorizationUrl } = await startOAuthFlow({ + mcpServerId: 'atlassian', + provider: ATLASSIAN_CLOUD_OAUTH, + clientId: 'cid', + projectDir, + }); + const redirectUri = new URL(authorizationUrl).searchParams.get('redirect_uri'); + if (!redirectUri) throw new Error('missing redirect_uri'); + + // 不正な state で callback を叩く + await fetch(`${redirectUri}?code=AAA&state=wrong-state`); + await awaitOAuthFlowSettled('atlassian'); + + const status = getOAuthFlowStatus('atlassian'); + expect(status?.status).toBe('failed'); + if (status?.status === 'failed') { + // ユーザー向けの failureMessage は固定 (raw 例外メッセージは漏らさない) + expect(status.failureMessage).toBe('OAuth flow failed (see server logs for details)'); + } + + // server log には実際の失敗理由 (state mismatch) が出ている + const warnCalls = warnSpy.mock.calls.map((c) => c.join(' ')); + expect(warnCalls.some((m) => /state mismatch/.test(m))).toBe(true); + + // token store には何も書かれていない + const store = new FileSystemOAuthStore(projectDir); + expect(await store.read('atlassian')).toBeNull(); + } finally { + warnSpy.mockRestore(); + rmSync(projectDir, { recursive: true, force: true }); + } + }); + + it('pending 中の二重 start は reject', async () => { + const projectDir = makeProjectDir(); + try { + await startOAuthFlow({ + mcpServerId: 'atlassian', + provider: ATLASSIAN_CLOUD_OAUTH, + clientId: 'cid', + projectDir, + }); + await expect( + startOAuthFlow({ + mcpServerId: 'atlassian', + provider: ATLASSIAN_CLOUD_OAUTH, + clientId: 'cid', + projectDir, + }), + ).rejects.toThrow(/already in progress/); + } finally { + rmSync(projectDir, { recursive: true, force: true }); + } + }); + + it('concurrent な並走 start (await 中の race) でも 1 つだけ成功する', async () => { + const projectDir = makeProjectDir(); + try { + // 同 mcpServerId への start を Promise.all で同時起動する。HIGH 修正前は + // 両方が `existing?.status === 'pending'` チェックを通過してフローが二重に走る。 + const results = await Promise.allSettled([ + startOAuthFlow({ + mcpServerId: 'atlassian', + provider: ATLASSIAN_CLOUD_OAUTH, + clientId: 'cid', + projectDir, + }), + startOAuthFlow({ + mcpServerId: 'atlassian', + provider: ATLASSIAN_CLOUD_OAUTH, + clientId: 'cid', + projectDir, + }), + ]); + const fulfilled = results.filter((r) => r.status === 'fulfilled'); + const rejected = results.filter((r) => r.status === 'rejected'); + expect(fulfilled).toHaveLength(1); + expect(rejected).toHaveLength(1); + } finally { + rmSync(projectDir, { recursive: true, force: true }); + } + }); + + it('clearOAuthFlow で pending 中の bg を中断する (callbackHandle.close 経由)', async () => { + const projectDir = makeProjectDir(); + try { + await startOAuthFlow({ + mcpServerId: 'atlassian', + provider: ATLASSIAN_CLOUD_OAUTH, + clientId: 'cid', + projectDir, + }); + // 直接 clearOAuthFlow → bg IIFE が awaitCallback を reject されて catch に行く + // が、entry は既に消えているので状態遷移は起きない (warn が出る)。 + const { clearOAuthFlow } = await import('./oauth-flow-orchestrator'); + clearOAuthFlow('atlassian'); + // bg promise が settle するまで待つ helper はもう entry が無いので no-op。 + // ここでは「再 start が即可能」であることを確認する。 + const { authorizationUrl } = await startOAuthFlow({ + mcpServerId: 'atlassian', + provider: ATLASSIAN_CLOUD_OAUTH, + clientId: 'cid', + projectDir, + }); + expect(authorizationUrl).toMatch(/^https:\/\/auth\.atlassian\.com\//); + } finally { + rmSync(projectDir, { recursive: true, force: true }); + } + }); + + it('未開始の mcpServerId は getOAuthFlowStatus が null を返す', () => { + expect(getOAuthFlowStatus('never-started')).toBeNull(); + }); + + it('store.write 直前に preempt されたら旧 run はトークンを書き込まない (codex Major 対応)', async () => { + // codex 指摘: 旧 implementation では store.write より後に runId guard があったため、 + // clearOAuthFlow → 即 start で旧 run が callback 受領まで進んだ場合、storage には + // 旧 run のトークンが書き込まれ UI と整合が取れなかった。本テストはその retrograde + // を防ぐ guard を踏み台にする。 + const projectDir = makeProjectDir(); + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}); + + // token endpoint を mock (実際には呼ばれない想定) + let tokenEndpointHits = 0; + vi.stubGlobal( + 'fetch', + vi.fn(async (input: string | URL) => { + const u = typeof input === 'string' ? input : input.toString(); + if (u === ATLASSIAN_CLOUD_OAUTH.tokenEndpoint) { + tokenEndpointHits++; + return new Response(JSON.stringify({ access_token: 'old-tok', token_type: 'Bearer' }), { + status: 200, + }); + } + // それ以外 (loopback callback) は real fetch に流す + return await (globalThis as unknown as { fetch: typeof fetch }).fetch(input); + }), + ); + + try { + const { clearOAuthFlow } = await import('./oauth-flow-orchestrator'); + const { authorizationUrl } = await startOAuthFlow({ + mcpServerId: 'atlassian', + provider: ATLASSIAN_CLOUD_OAUTH, + clientId: 'cid', + projectDir, + }); + // 旧 run を clear してすぐ新 run を始める (旧 bg はまだ awaitCallback 中) + clearOAuthFlow('atlassian'); + await startOAuthFlow({ + mcpServerId: 'atlassian', + provider: ATLASSIAN_CLOUD_OAUTH, + clientId: 'cid', + projectDir, + }); + + // 旧 bg は close() で reject されて catch に入るので token endpoint は呼ばれない。 + // (= 旧 run は code を取得できないため store.write も発生しない) + // 念のため microtask drain + await new Promise((r) => setTimeout(r, 30)); + + expect(tokenEndpointHits).toBe(0); + // 新 run は依然 pending、旧 run のトークンが書かれていないこと + const status = getOAuthFlowStatus('atlassian'); + expect(status?.status).toBe('pending'); + const store = new FileSystemOAuthStore(projectDir); + expect(await store.read('atlassian')).toBeNull(); + void authorizationUrl; + } finally { + warnSpy.mockRestore(); + rmSync(projectDir, { recursive: true, force: true }); + } + }); + + it('clearOAuthFlow → 即 start の race で旧 bg が新 pending を踏まない (runId guard)', async () => { + // CR Major 対応の検証: 旧 run の bg IIFE は close() で reject され catch に入るが、 + // 新 run が同じ mcpServerId で pending 状態を持っている。runId guard が無いと旧 bg は + // 新 run の entry を 'failed' で踏みつぶす。 + const projectDir = makeProjectDir(); + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}); + try { + const { clearOAuthFlow } = await import('./oauth-flow-orchestrator'); + + await startOAuthFlow({ + mcpServerId: 'atlassian', + provider: ATLASSIAN_CLOUD_OAUTH, + clientId: 'cid', + projectDir, + }); + // 旧 run を clear → bg はまだ awaitCallback に居るが close() で reject される + clearOAuthFlow('atlassian'); + // 旧 bg の catch ブランチが flows.get する前に新 run を開始したい。 + // clearOAuthFlow は同期で flows.delete + bg の close() を非同期 fire-and-forget + // するので、この時点で flows は空。新 run を始める。 + await startOAuthFlow({ + mcpServerId: 'atlassian', + provider: ATLASSIAN_CLOUD_OAUTH, + clientId: 'cid', + projectDir, + }); + // 旧 bg の catch が走り終えるまで microtask を回す。 + await new Promise((r) => setTimeout(r, 20)); + // 新 run は依然として pending (旧 bg に踏まれていない) + const status = getOAuthFlowStatus('atlassian'); + expect(status?.status).toBe('pending'); + // 旧 bg の preempted ログが出ている (failure / completion 両方ありうるが、 + // close() が awaitCallback を reject するので failure 経由)。 + const warnCalls = warnSpy.mock.calls.map((c) => c.join(' ')); + expect(warnCalls.some((m) => /preempted/.test(m))).toBe(true); + } finally { + warnSpy.mockRestore(); + rmSync(projectDir, { recursive: true, force: true }); + } + }); +}); diff --git a/packages/ai-engine/src/oauth/oauth-flow-orchestrator.ts b/packages/ai-engine/src/oauth/oauth-flow-orchestrator.ts new file mode 100644 index 0000000..3a9353d --- /dev/null +++ b/packages/ai-engine/src/oauth/oauth-flow-orchestrator.ts @@ -0,0 +1,322 @@ +// ADR-0011 PR-E3: OAuth 2.1 フロー全体のオーケストレータ。 +// PR-E2 で実装した部品 (OAuthClient + LoopbackCallbackServer) と PR-E1 の +// FileSystemOAuthStore を組み合わせて、Route Handler から呼べる単一エントリ +// (`startOAuthFlow` / `getOAuthFlowStatus`) に集約する。 +// +// 設計判断: +// - process scope の singleton state (`flows` Map) で in-progress 状態を保持。 +// Next.js Route Handler は per-request にハンドラ関数が走るが、Next の dev/prod +// とも単一 Node.js プロセスで動くので module scope の Map は共有される。 +// - 1 mcpServerId につき同時に 1 フローまで。pending 中の二重 start は reject。 +// - state verify (CSRF 対策) は本モジュールが持つ。LoopbackCallbackServer から +// 返ってきた code/state のうち、state が start 時に発行したものと一致しなければ +// 即 failed に倒す。 +// - completed / failed 状態は次回 start まで Map に残す (UI が status を pull できる)。 +// start を呼ぶと前の状態は上書きされる。 + +import { randomUUID } from 'node:crypto'; + +import type { McpOAuthToken, OAuthProviderConfig } from '@tally/core'; +import { FileSystemOAuthStore } from '@tally/storage'; +import { + type LoopbackCallbackHandle, + startLoopbackCallbackServer, +} from './loopback-callback-server'; +import { + buildAuthorizationUrl, + exchangeCodeForToken, + generateOAuthState, + generatePkcePair, +} from './oauth-client'; + +export type OAuthFlowStatus = + | { + status: 'pending'; + authorizationUrl: string; + } + | { + status: 'completed'; + // completed では authorizationUrl は意味を持たないが UI 側のシグナル整合のため残す。 + // TODO(PR-E3b): UI 側が completed 時に authorizationUrl を参照しないことが確認できたら + // この field は削除しても良い (deadweight 化のため)。 + authorizationUrl: string; + } + | { + status: 'failed'; + authorizationUrl: string; + failureMessage: string; + }; + +// OAuthFlowStatus の discriminated union を保つために interface 拡張ではなく +// intersection で promise を足す (narrow が動くため)。 +type FlowEntry = OAuthFlowStatus & { + // bg で動く完了 promise。await することは無いが、test の安定化用に export 用 helper を持つ。 + promise: Promise; + // pending 中のみ存在する loopback server ハンドル。clearOAuthFlow から close() を呼んで + // bg IIFE を中断するために flows entry で保持する (CR HIGH 対応)。 + callbackHandle?: LoopbackCallbackHandle; + // この flow を起動した startOAuthFlow 呼び出し固有の ID。並走 start や + // clearOAuthFlow → 即 start などで entry が他の run に置き換わったかを + // 判定するために使う。bg IIFE は flows.set する前に runId 一致を確認する + // ことで、自分が「現在の run」でなくなっていたら状態遷移をスキップする + // (CR Major 対応: 古い run が新 run の状態を踏みつぶすのを防ぐ)。 + runId: string; +}; + +// プロセスローカルの flow 状態。Next の Route Handler が共有する。 +const flows = new Map(); + +export interface StartOAuthFlowInput { + mcpServerId: string; + provider: OAuthProviderConfig; + clientId: string; + // 未指定なら provider.defaultScopes を使う。 + scopes?: readonly string[]; + // FileSystemOAuthStore が token を書き込むプロジェクトディレクトリ。 + projectDir: string; +} + +export interface StartOAuthFlowResult { + authorizationUrl: string; +} + +// OAuth フロー開始: PKCE/state 生成 → loopback server 起動 → authorization URL 構築 → +// bg で callback を待ち token 交換 + 保存 → 完了/失敗を flows Map に反映。 +// +// 戻り値の `authorizationUrl` を UI 側がブラウザで開く。`getOAuthFlowStatus` で +// 完了を polling する。 +export async function startOAuthFlow(input: StartOAuthFlowInput): Promise { + // CR HIGH 対応: スロット予約を await より前に同期で確保する。これをしないと + // `await startLoopbackCallbackServer()` 中に並走 start が来た場合、両方が + // `existing?.status === 'pending'` を通過してフローが二重に走る。 + // sentinel として一旦 authorizationUrl='' で予約し、本物が決まったら上書きする。 + const existing = flows.get(input.mcpServerId); + if (existing?.status === 'pending') { + throw new Error(`OAuth flow already in progress for "${input.mcpServerId}"`); + } + // CR Major 対応: この呼び出し固有の runId を発行する。bg IIFE は状態遷移する前に + // 自分が flows に登録された当時の runId を保持しているか確認する。clearOAuthFlow + // → 別 start で entry が置き換わったケースでは、古い run の bg は何もしない。 + const runId = randomUUID(); + flows.set(input.mcpServerId, { + status: 'pending', + authorizationUrl: '', + promise: Promise.resolve(), + runId, + }); + + const pkce = generatePkcePair(); + const state = generateOAuthState(); + let callbackHandle: LoopbackCallbackHandle; + try { + callbackHandle = await startLoopbackCallbackServer(); + } catch (err) { + // 起動失敗で sentinel が残らないよう片付ける (この run が登録した sentinel 限定)。 + const cur = flows.get(input.mcpServerId); + if (cur?.runId === runId) { + flows.delete(input.mcpServerId); + } + throw err; + } + // ownership 確認: startLoopbackCallbackServer の await 中に clearOAuthFlow か + // 別 start が割り込んで entry が置き換わっていたら、起動した callback server + // を片付けて本 run は abort する (新 run を踏みつぶさない)。 + const afterListen = flows.get(input.mcpServerId); + if (afterListen?.runId !== runId) { + await callbackHandle.close().catch((closeErr) => { + console.warn(`[oauth-flow] callback server close failed (abort path): ${String(closeErr)}`); + }); + throw new Error(`OAuth flow was preempted for "${input.mcpServerId}"`); + } + const scopes = input.scopes ?? input.provider.defaultScopes; + + const authorizationUrl = buildAuthorizationUrl({ + provider: input.provider, + clientId: input.clientId, + redirectUri: callbackHandle.redirectUri, + scopes, + state, + codeChallenge: pkce.codeChallenge, + }); + + // bg promise を IIFE で生成。IIFE 内部から自分自身の promise 変数を直接参照すると + // tsc の definite-assignment 解析でエラーになるため、`flows.get(mcpServerId)?.promise` + // 経由で取り出して再 set する。これは外側で flows.set した後の値を読むので安全。 + const promise = (async () => { + try { + const cb = await callbackHandle.awaitCallback(); + if (cb.state !== state) { + throw new Error('OAuth state mismatch (possible CSRF or stale callback)'); + } + const result = await exchangeCodeForToken({ + provider: input.provider, + clientId: input.clientId, + code: cb.code, + redirectUri: callbackHandle.redirectUri, + codeVerifier: pkce.codeVerifier, + }); + + const now = new Date().toISOString(); + const expiresAt = + result.expiresIn !== undefined + ? new Date(Date.now() + result.expiresIn * 1000).toISOString() + : undefined; + + // exactOptionalPropertyTypes 下では undefined を含む object 構築不可なので + // optional フィールドは値があるときだけ乗せる。 + // scope は空白区切り。連続空白・前後空白で空文字が混入しないよう \s+ split + filter。 + const scopesParsed = result.scope?.split(/\s+/).filter(Boolean); + const token: McpOAuthToken = { + mcpServerId: input.mcpServerId, + accessToken: result.accessToken, + ...(result.refreshToken !== undefined ? { refreshToken: result.refreshToken } : {}), + acquiredAt: now, + ...(expiresAt !== undefined ? { expiresAt } : {}), + ...(scopesParsed && scopesParsed.length > 0 ? { scopes: scopesParsed } : {}), + tokenType: result.tokenType, + }; + + // CR Major 対応 (codex): store.write の手前で ownership 確認。ここを通さないと、 + // この run が clearOAuthFlow → 別 start に置き換えられた後でも旧 run のトークンが + // ストレージに書き込まれ、UI 上の新 run の pending と保存済みトークンの不整合が起きる。 + // 不一致なら throw して catch 側に流す (catch 側でも runId guard が掛かるので状態は遷移しない)。 + const ownerBeforeWrite = flows.get(input.mcpServerId); + if (ownerBeforeWrite?.runId !== runId) { + throw new Error(`OAuth flow was preempted before token write for "${input.mcpServerId}"`); + } + + const store = new FileSystemOAuthStore(input.projectDir); + await store.write(token); + + const cur = flows.get(input.mcpServerId); + if (cur && cur.runId === runId) { + flows.set(input.mcpServerId, { + status: 'completed', + authorizationUrl, + promise: cur.promise, + runId, + }); + } else { + // store.write 後の極めて稀な race: write 中に clearOAuthFlow が走ったケース。 + // 新 run の pending を踏まないために状態遷移はせず warn のみ。 + // (write 後にトークンが残るが、これは runId guard が write 前に通った時点で + // 「現在の run の成果」として正しく書かれている。後で新 run が同 mcpServerId で + // 完了すれば上書きされる。) + console.warn( + `[oauth-flow] flow entry was preempted between write and completion: ${input.mcpServerId}`, + ); + } + } catch (err) { + // CR Major 対応: failureMessage は raw な err.message を露出すると provider エンドポイント / + // 内部メッセージが UI 経由でユーザーに返ってしまうので、固定メッセージに正規化する。 + // 詳細は console.warn で server-side ログに残す。 + console.warn( + `[oauth-flow] flow failed (mcpServerId=${input.mcpServerId}): ${err instanceof Error ? (err.stack ?? err.message) : String(err)}`, + ); + const cur = flows.get(input.mcpServerId); + if (cur && cur.runId === runId) { + flows.set(input.mcpServerId, { + status: 'failed', + authorizationUrl, + failureMessage: 'OAuth flow failed (see server logs for details)', + promise: cur.promise, + runId, + }); + } else { + console.warn(`[oauth-flow] flow entry was preempted before failure: ${input.mcpServerId}`); + } + } finally { + await callbackHandle.close().catch((closeErr) => { + // close 失敗は token 保存成否には影響しないが、port / fd リーク診断のため warn。 + console.warn(`[oauth-flow] callback server close failed: ${String(closeErr)}`); + }); + } + })(); + + // ここでも ownership 再確認: 上の `afterListen` チェック以降は同期のみだが、 + // bg IIFE の生成中にも microtask は走らないので置き換えは起きない想定。 + // 念のため runId 一致を確認してから本物の entry に上書きする。 + const beforePublish = flows.get(input.mcpServerId); + if (beforePublish?.runId !== runId) { + // この run は abort された後。bg promise はもう動いているが、awaitCallback で + // close() による reject を受けて catch に行き、preempted ログを出して終わる。 + // ここで認可 URL を return しても呼び出し元には混乱を招くだけなので throw する。 + await callbackHandle.close().catch(() => {}); + throw new Error(`OAuth flow was preempted for "${input.mcpServerId}"`); + } + flows.set(input.mcpServerId, { + status: 'pending', + authorizationUrl, + promise, + callbackHandle, + runId, + }); + + return { authorizationUrl }; +} + +// 現在の flow 状態を取得。未開始なら null。 +export function getOAuthFlowStatus(mcpServerId: string): OAuthFlowStatus | null { + const f = flows.get(mcpServerId); + if (!f) return null; + // promise を返さないために再構築する (ユーザーには内部 promise を見せない)。 + if (f.status === 'pending') return { status: 'pending', authorizationUrl: f.authorizationUrl }; + if (f.status === 'completed') { + return { status: 'completed', authorizationUrl: f.authorizationUrl }; + } + return { + status: 'failed', + authorizationUrl: f.authorizationUrl, + failureMessage: f.failureMessage, + }; +} + +// 進行中の bg promise が完了するのを待つ helper (主に test 用)。Route Handler は +// status を polling するので呼ばない。 +export async function awaitOAuthFlowSettled(mcpServerId: string): Promise { + const f = flows.get(mcpServerId); + if (!f) return; + await f.promise; +} + +// flow state を Map から消す (UI 側の「やり直し」操作用)。pending 中だった場合は +// callbackHandle.close() を呼んで bg IIFE を中断する。close 後に awaitCallback が +// reject され IIFE は catch ブランチに行くが、その時点で flows entry は無いので +// console.warn が出るのは想定動作。 +export function clearOAuthFlow(mcpServerId: string): void { + const f = flows.get(mcpServerId); + if (f?.status === 'pending' && f.callbackHandle) { + f.callbackHandle.close().catch(() => { + /* swallow: close 失敗は cleanup の妨げにしない */ + }); + } + flows.delete(mcpServerId); +} + +/** + * @internal テスト isolation 用: 全 flow をクリアする。本番コードから呼ばないこと。 + * + * CR Major 対応: pending な flow の callbackHandle を close してから関数 return する。 + * 単に Map.clear だけだと bg IIFE が抱えている loopback サーバが解放されず、テスト間で + * port や fd がリークする (LoopbackCallbackServer は port=0 で起動するので衝突自体は + * しないが、test プロセス全体で fd が積み上がる)。 + * + * 順序: flows.clear() を close await の前に呼ぶ。理由は、close が完了する前に bg IIFE + * が catch ブランチに入って `flows.get(id)` をしたときに、旧 entry が見えていると runId + * guard を通過して状態を書き換えてしまう可能性があるため (実際には runId は同じなので + * 通過する)。clear を先にすることで、bg は entry なし → preempted ログだけ出して終わる。 + */ +export async function __resetAllFlowsForTest(): Promise { + const closes: Promise[] = []; + for (const f of flows.values()) { + if (f.callbackHandle) { + closes.push( + f.callbackHandle.close().catch((closeErr) => { + console.warn(`[oauth-flow] reset close failed: ${String(closeErr)}`); + }), + ); + } + } + flows.clear(); + await Promise.all(closes); +}