Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions packages/ai-engine/src/agent-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,32 @@ export interface StartRequest {
input: unknown;
}

// Streaming input mode 用の最小 SDKUserMessage 形状 (実 SDK の SDKUserMessage を duck-type 化)。
// MCP HTTP transport の OAuth 状態を turn 跨ぎで保持したい場合は、
// 1 query に AsyncIterable<SdkUserMessageLike> を渡し続ける必要がある。
export interface SdkUserMessageLike {
type: 'user';
message: { role: 'user'; content: string };
parent_tool_use_id: null;
session_id?: string;
}

// SDK Query は AsyncIterable<SdkMessageLike> + 任意の close() を持つハンドル。
// 実 SDK の Query 型 (interrupt / setMcpServers / streamInput / close) のうち、
// chat-runner が触るのは close のみなので最小化して受ける。
export interface SdkQueryHandle extends AsyncIterable<SdkMessageLike> {
close?(): void;
}

// Agent SDK との結合点だけ抽象化する。query は AsyncIterable<SdkMessageLike> を返すこと。
// 実 SDK の厳密な型 (Options, SDKMessage) に合わせず duck typing で受けるのは、
// テスト時に mockSdk を差し込めるようにするため。
// SDK 実体のシグネチャは `query({ prompt, options })` なので、systemPrompt / mcpServers /
// allowedTools / cwd / settingSources / permissionMode はすべて options 内に入れる必要がある。
export interface SdkLike {
query(opts: {
prompt: string;
// 単発 (agent-runner) は文字列、chat (multi-turn) は AsyncIterable で push 流す。
prompt: string | AsyncIterable<SdkUserMessageLike>;
options?: {
systemPrompt?: string;
mcpServers?: Record<string, unknown>;
Expand All @@ -43,7 +61,7 @@ export interface SdkLike {
// 解決に失敗するケースがある。明示的にシステムの claude CLI パスを渡すと回避できる。
pathToClaudeCodeExecutable?: string;
};
}): AsyncIterable<SdkMessageLike>;
}): SdkQueryHandle;
}

export interface RunAgentDeps {
Expand Down
119 changes: 119 additions & 0 deletions packages/ai-engine/src/async-input.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import { describe, expect, it } from 'vitest';

import { AsyncIterableInput } from './async-input';

describe('AsyncIterableInput', () => {
it('push 後に next() で順番に取り出せる (close は drain 後に呼ぶ)', async () => {
// close 後はバッファをクリアして即時 done に倒す仕様 (teardown 用) のため、
// 「push してから close、その後 iterate」では値が取れない。
// 実運用では push → consumer が next() でドレイン → close の順。
const input = new AsyncIterableInput<number>();
input.push(1);
input.push(2);
input.push(3);
const it = input.iterable()[Symbol.asyncIterator]();
const got: number[] = [];
for (let i = 0; i < 3; i++) {
const r = await it.next();
if (r.done) break;
got.push(r.value);
}
expect(got).toEqual([1, 2, 3]);
input.close();
const r = await it.next();
expect(r.done).toBe(true);
});

it('iter が空の状態で next() を待ち、後の push で解決される', async () => {
const input = new AsyncIterableInput<string>();
const it = input.iterable()[Symbol.asyncIterator]();
const p = it.next();
let resolved = false;
p.then(() => {
resolved = true;
});
await new Promise((r) => setTimeout(r, 5));
expect(resolved).toBe(false);
input.push('hi');
const r = await p;
expect(r).toEqual({ value: 'hi', done: false });
});

it('close() で待機中の next() が done: true で解決される', async () => {
const input = new AsyncIterableInput<number>();
const it = input.iterable()[Symbol.asyncIterator]();
const p = it.next();
input.close();
const r = await p;
expect(r.done).toBe(true);
});

it('close 後の push は無視される', async () => {
const input = new AsyncIterableInput<number>();
input.push(1);
input.close();
input.push(99);
const got: number[] = [];
for await (const v of input.iterable()) got.push(v);
// close 時点で残バッファもクリアされるため即終了
expect(got).toEqual([]);
});

it('iterator.return() で残りの push が消費されず終了', async () => {
const input = new AsyncIterableInput<number>();
input.push(1);
input.push(2);
const it = input.iterable()[Symbol.asyncIterator]();
const r1 = await it.next();
expect(r1.value).toBe(1);
if (it.return) {
const r2 = await it.return();
expect(r2.done).toBe(true);
}
});

// FIFO キュー化: next() を 2 回連続で呼んで push 1 回だけのとき、
// 1 つ目だけ resolve され 2 つ目は close まで残る。
it('next() を複数回先に呼んでから push しても、push 順に各 promise が解決される', async () => {
const input = new AsyncIterableInput<number>();
const it = input.iterable()[Symbol.asyncIterator]();
const p1 = it.next();
const p2 = it.next();
input.push(10);
input.push(20);
const [r1, r2] = await Promise.all([p1, p2]);
expect(r1).toEqual({ value: 10, done: false });
expect(r2).toEqual({ value: 20, done: false });
});

it('next() を 2 回先に呼んで push を 1 回だけしても、未解決の Promise は close で done に倒れる', async () => {
const input = new AsyncIterableInput<number>();
const it = input.iterable()[Symbol.asyncIterator]();
const p1 = it.next();
const p2 = it.next();
input.push(42);
const r1 = await p1;
expect(r1).toEqual({ value: 42, done: false });
let p2Resolved = false;
p2.then(() => {
p2Resolved = true;
});
await new Promise((r) => setTimeout(r, 5));
expect(p2Resolved).toBe(false);
input.close();
const r2 = await p2;
expect(r2.done).toBe(true);
});

// CR Major (PR #18 2nd review): close 後にバッファ済み値を返し続けると
// teardown で残メッセージが消費されてしまう。close 時点で打ち切りたい。
it('close 後の next() はバッファに値が残っていても即 done', async () => {
const input = new AsyncIterableInput<number>();
input.push(1);
input.push(2);
input.close();
const it = input.iterable()[Symbol.asyncIterator]();
const r = await it.next();
expect(r.done).toBe(true);
});
});
63 changes: 63 additions & 0 deletions packages/ai-engine/src/async-input.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// SDK の query({ prompt: AsyncIterable<SDKUserMessage> }) に流す、
// 後から push できる AsyncIterable 実装。
// 1 chat thread = 1 long-lived sdk.query() を実現するため、user message を
// 任意のタイミングで投入し、close で iter を終わらせる。
//
// 実装方針: バッファ + waiter キュー。consumer が next を複数回連続で呼んでも
// 各 promise が独立に保持される。AsyncIterator 仕様に沿うため waiter は
// 単一スロットではなく FIFO キューで持つ (consumer が並行で next() を呼ぶ
// ケースに耐える)。
// close() は finished フラグを立てた上で残バッファをクリアし、待機中の
// waiter を done で全て倒す。next() は finished を最優先で確認するため、
// close 後の再 next() はバッファに値が残っていても即 done を返す。
export class AsyncIterableInput<T> {
private buf: T[] = [];
private waiters: Array<(r: IteratorResult<T>) => void> = [];
private finished = false;

push(value: T): void {
if (this.finished) return;
const w = this.waiters.shift();
if (w) {
w({ value, done: false });
return;
}
this.buf.push(value);
}

// close 後は残バッファを捨てて即時に終了させる (teardown 用)。
close(): void {
if (this.finished) return;
this.finished = true;
this.buf.length = 0;
while (this.waiters.length > 0) {
this.waiters.shift()?.({ value: undefined as never, done: true });
}
}

// SDK 等に渡す iter。同一インスタンスから複数回 [Symbol.asyncIterator] を取られる
// ことは想定しない (本パッケージでは 1 query に 1 input)。
iterable(): AsyncIterable<T> {
return {
[Symbol.asyncIterator]: () => ({
next: (): Promise<IteratorResult<T>> => {
// close 後は即座に done。残バッファに値が乗っていても無視する。
if (this.finished) {
return Promise.resolve({ value: undefined as never, done: true });
}
if (this.buf.length > 0) {
const v = this.buf.shift() as T;
return Promise.resolve({ value: v, done: false });
}
return new Promise<IteratorResult<T>>((resolve) => {
this.waiters.push(resolve);
});
},
return: (): Promise<IteratorResult<T>> => {
this.close();
return Promise.resolve({ value: undefined as never, done: true });
},
}),
};
}
}
49 changes: 37 additions & 12 deletions packages/ai-engine/src/chat-runner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,27 @@ import type { SdkLike } from './agent-runner';
import { buildChatPrompt, ChatRunner, formatNodeForContext } from './chat-runner';
import type { ChatEvent, SdkMessageLike } from './stream';

// long-lived Query 化に伴い prompt は AsyncIterable<SdkUserMessageLike> 型に変わった。
// テスト側で「最初に push された user message の content」を読むためのヘルパ。
// string で渡された場合 (互換) も同じ shape で扱えるようにする。
function startCapturePromptText(prompt: unknown): { read: () => string } {
const captured = { value: '' };
if (typeof prompt === 'string') {
captured.value = prompt;
} else if (
prompt &&
typeof (prompt as { [Symbol.asyncIterator]?: unknown })[Symbol.asyncIterator] === 'function'
) {
const it = (prompt as AsyncIterable<{ message?: { content?: string } }>)[
Symbol.asyncIterator
]();
it.next().then((r) => {
if (!r.done && r.value?.message?.content) captured.value = r.value.message.content;
});
}
return { read: () => captured.value };
}

describe('ChatRunner', () => {
let root: string;

Expand Down Expand Up @@ -194,10 +215,10 @@ describe('ChatRunner', () => {
priority: 'must',
})) as Node;

let capturedPrompt = '';
let promptCapture: { read: () => string } = { read: () => '' };
const sdk: SdkLike = {
query: ({ prompt }: { prompt: string }) => {
capturedPrompt = prompt;
query: ({ prompt }: { prompt: unknown }) => {
promptCapture = startCapturePromptText(prompt);
return (async function* () {
yield {
type: 'assistant',
Expand All @@ -221,6 +242,7 @@ describe('ChatRunner', () => {
events.push(e);
}

const capturedPrompt = promptCapture.read();
expect(capturedPrompt).toContain('<context_nodes>');
expect(capturedPrompt).toContain(`id: ${target.id}`);
expect(capturedPrompt).toContain('type: requirement');
Expand Down Expand Up @@ -273,10 +295,10 @@ describe('ChatRunner', () => {
body: '',
})) as Node;

let capturedPrompt = '';
let promptCapture: { read: () => string } = { read: () => '' };
const sdk: SdkLike = {
query: ({ prompt }: { prompt: string }) => {
capturedPrompt = prompt;
query: ({ prompt }: { prompt: unknown }) => {
promptCapture = startCapturePromptText(prompt);
return (async function* () {
yield { type: 'result', subtype: 'success', result: 'ok' } as unknown as SdkMessageLike;
})();
Expand All @@ -295,6 +317,7 @@ describe('ChatRunner', () => {
// drain
}

const capturedPrompt = promptCapture.read();
const histIdx = capturedPrompt.indexOf('<conversation_history>');
const histEndIdx = capturedPrompt.indexOf('</conversation_history>');
const ctxIdx = capturedPrompt.indexOf('<context_nodes>');
Expand Down Expand Up @@ -325,10 +348,10 @@ describe('ChatRunner', () => {
body: '',
})) as Node;

let capturedPrompt = '';
let promptCapture: { read: () => string } = { read: () => '' };
const sdk: SdkLike = {
query: ({ prompt }: { prompt: string }) => {
capturedPrompt = prompt;
query: ({ prompt }: { prompt: unknown }) => {
promptCapture = startCapturePromptText(prompt);
return (async function* () {
yield { type: 'result', subtype: 'success', result: 'ok' } as unknown as SdkMessageLike;
})();
Expand All @@ -344,6 +367,7 @@ describe('ChatRunner', () => {
for await (const _e of runner.runUserTurn('q', ['nonexistent', valid.id, 'also-gone'])) {
// drain
}
const capturedPrompt = promptCapture.read();
expect(capturedPrompt).toContain('<context_nodes>');
expect(capturedPrompt).toContain(`id: ${valid.id}`);
expect(capturedPrompt).not.toContain('id: nonexistent');
Expand All @@ -356,10 +380,10 @@ describe('ChatRunner', () => {
const projectStore = new FileSystemProjectStore(root);
const thread = await chatStore.createChat({ projectId: 'proj-1', title: 't' });

let capturedPrompt = '';
let promptCapture: { read: () => string } = { read: () => '' };
const sdk: SdkLike = {
query: ({ prompt }: { prompt: string }) => {
capturedPrompt = prompt;
query: ({ prompt }: { prompt: unknown }) => {
promptCapture = startCapturePromptText(prompt);
return (async function* () {
yield { type: 'result', subtype: 'success', result: 'ok' } as unknown as SdkMessageLike;
})();
Expand All @@ -375,6 +399,7 @@ describe('ChatRunner', () => {
for await (const _e of runner.runUserTurn('hello', [])) {
// drain
}
const capturedPrompt = promptCapture.read();
expect(capturedPrompt).not.toContain('<context_nodes>');
// user 文字列自体は (履歴経由で) 必ず prompt に入る
expect(capturedPrompt).toContain('hello');
Expand Down
Loading