diff --git a/kagent-ui/CHANGELOG.md b/kagent-ui/CHANGELOG.md index 7267008..933dc29 100644 --- a/kagent-ui/CHANGELOG.md +++ b/kagent-ui/CHANGELOG.md @@ -1,5 +1,16 @@ # CHANGELOG +## 2026-04-16 + +- 修复聊天界面 assistant 回复重复显示: + - 为 assistant UI 消息补充 `turnId` 元信息,并在前端状态层归并同一 turn 中相邻的 assistant 气泡,避免 `item/started`、`item/agentMessage/delta`、`item/completed` 之间的消息 ID 不一致时留下重复回复。 + - `thread/resume` 历史恢复沿用同样的相邻 assistant 归并规则,避免切换或重开线程后再次出现重复欢迎语。 + - 新增 `messages.test.ts` 回归测试,覆盖相邻 assistant 合并、chunked assistant 片段重组、live delta 归并与历史恢复路径,并确保中间夹有其他 item 时不会误合并。 +- 修复聊天界面 SSE 增量消息累积问题: + - `use-codex-session` 改为为 `reasoning` 与 `mcpToolCall/progress` 使用按 turn 稳定的临时消息 ID,流式增量改为原地更新,不再为每个增量块堆出新的消息条目。 + - `turn/completed` 与 `error` 事件现在会清理当前 turn 的临时思考/工具进度卡片,避免对话结束后残留重复占位消息。 + - 新增 `messages.test.ts`,覆盖临时消息清理与流式占位原地更新的纯函数行为。 + ## 2026-02-24 - 修复“查看沙箱日志”鉴权失败(401): diff --git a/kagent-ui/bun.lockb b/kagent-ui/bun.lockb index 2814e3b..4fa654c 100755 Binary files a/kagent-ui/bun.lockb and b/kagent-ui/bun.lockb differ diff --git a/kagent-ui/components/assistant-ui/thread.tsx b/kagent-ui/components/assistant-ui/thread.tsx index cfc1fd9..9304297 100644 --- a/kagent-ui/components/assistant-ui/thread.tsx +++ b/kagent-ui/components/assistant-ui/thread.tsx @@ -14,7 +14,11 @@ import { import ReactMarkdown from "react-markdown"; import remarkGfm from "remark-gfm"; import { Button } from "@/components/ui/button"; -import { getProviderTypeMeta, PROVIDER_TYPE_ICON_MAP } from "@/lib/model-provider-type"; +import type { FileUpdateChange } from "@/lib/codex-app-server/v2/FileUpdateChange"; +import { + getProviderTypeMeta, + PROVIDER_TYPE_ICON_MAP, +} from "@/lib/model-provider-type"; import { Select, SelectContent, @@ -28,7 +32,7 @@ export type UserAttachment = { type: "image" | "localImage"; src: string }; export type UiMessage = | { id: string; kind: "user"; text: string; attachments: UserAttachment[] } - | { id: string; kind: "assistant"; text: string } + | { id: string; kind: "assistant"; text: string; turnId?: string } | { id: string; kind: "turnCompleted"; turnId: string; text: string } | { id: string; kind: "reasoning"; summary: string[]; content: string[] } | { @@ -45,7 +49,7 @@ export type UiMessage = id: string; kind: "fileChange"; status: string; - changes: Array<{ path: string; kind: string; diff: string }>; + changes: Array; output: string | null; } | { @@ -469,7 +473,7 @@ function renderAssistantPayload(msg: Exclude) { className="rounded-md border bg-background p-2" >
- {change.path} · {change.kind} + {change.path} · {formatFileChangeKind(change.kind)}
{change.diff ? (
@@ -646,6 +650,19 @@ function formatDuration(ms: number) {
   return `${minutes}m ${rest.toFixed(0)}s`;
 }
 
+function formatFileChangeKind(kind: FileUpdateChange["kind"]) {
+  switch (kind.type) {
+    case "add":
+      return "add";
+    case "delete":
+      return "delete";
+    case "update":
+      return kind.move_path ? `update -> ${kind.move_path}` : "update";
+  }
+  const exhaustiveCheck: never = kind;
+  return exhaustiveCheck;
+}
+
 const markdownComponents: Components = {
   h1: ({ children }) => (
     

{children}

diff --git a/kagent-ui/hooks/use-codex-session.ts b/kagent-ui/hooks/use-codex-session.ts index 3659855..348a313 100644 --- a/kagent-ui/hooks/use-codex-session.ts +++ b/kagent-ui/hooks/use-codex-session.ts @@ -40,6 +40,7 @@ import { appendReasoningContentDelta, appendReasoningSummaryDelta, extractMessagesFromThread, + removeMessagesByIds, threadItemToUiMessage, upsertMessage, } from "@/lib/assistant/messages"; @@ -128,7 +129,10 @@ function normalizeAndSortSessions(list: CodexThread[]): CodexThread[] { /** * 合并会话列表(以 incoming 为主,补充 local 中暂未同步到服务端的会话) */ -function mergeSessions(incoming: CodexThread[], local: CodexThread[]): CodexThread[] { +function mergeSessions( + incoming: CodexThread[], + local: CodexThread[], +): CodexThread[] { const map = new Map(); for (const item of incoming) { map.set(item.id, item); @@ -140,6 +144,36 @@ function mergeSessions(incoming: CodexThread[], local: CodexThread[]): CodexThre return normalizeAndSortSessions([...map.values()]); } +const TEMPORARY_STREAMING_ITEM_TYPES = ["reasoning", "mcpToolCall"] as const; + +type TemporaryStreamingItemType = + (typeof TEMPORARY_STREAMING_ITEM_TYPES)[number]; + +function isTemporaryStreamingItemType( + type: string, +): type is TemporaryStreamingItemType { + return TEMPORARY_STREAMING_ITEM_TYPES.includes( + type as TemporaryStreamingItemType, + ); +} + +function buildTemporaryMessageId( + threadId: string, + turnId: string, + type: TemporaryStreamingItemType, +): string { + return `temp:${threadId}:${turnId}:${type}`; +} + +function getTemporaryMessageIdsForTurn( + threadId: string, + turnId: string, +): string[] { + return TEMPORARY_STREAMING_ITEM_TYPES.map((type) => + buildTemporaryMessageId(threadId, turnId, type), + ); +} + /** * Codex 会话管理 Hook * 提供会话列表、创建、恢复、消息发送、事件流处理等功能 @@ -170,8 +204,6 @@ export function useCodexSession({ const [createSessionSubmitting, setCreateSessionSubmitting] = useState(false); const eventsAbortRef = useRef(null); - // 映射流式 item ID:key = `${threadId}:${turnId}:${type}`,value = 按顺序存储的 itemId 数组 - const streamingItemRef = useRef>({}); const activeWorkspaceId = activeWorkspace?.id ?? null; @@ -441,11 +473,6 @@ export function useCodexSession({ const p = msg.params as ErrorNotification | undefined; if (!p?.threadId || !p?.turnId || !p?.error?.message) return; - const prefix = `${p.threadId}:${p.turnId}:`; - for (const key of Object.keys(streamingItemRef.current)) { - if (key.startsWith(prefix)) delete streamingItemRef.current[key]; - } - setRunning((cur) => { if (!cur) return cur; if (cur.sessionId !== p.threadId) return cur; @@ -463,11 +490,17 @@ export function useCodexSession({ setMessagesBySession((prev) => ({ ...prev, - [p.threadId]: upsertMessage(prev[p.threadId] ?? [], { - id: `error_${p.threadId}_${p.turnId}`, - kind: "assistant", - text: `**请求失败**\n\n${errorHint}`, - }), + [p.threadId]: upsertMessage( + removeMessagesByIds( + prev[p.threadId] ?? [], + getTemporaryMessageIdsForTurn(p.threadId, p.turnId), + ), + { + id: `error_${p.threadId}_${p.turnId}`, + kind: "assistant", + text: `**请求失败**\n\n${errorHint}`, + }, + ), })); if (shouldGuideNewThread) { @@ -482,17 +515,15 @@ export function useCodexSession({ const p = msg.params as ItemStartedNotification | undefined; if (!p?.threadId || !p?.turnId || !p?.item) return; const scopedId = buildScopedItemId(p.turnId, p.item.id); + const messageId = isTemporaryStreamingItemType(p.item.type) + ? buildTemporaryMessageId(p.threadId, p.turnId, p.item.type) + : scopedId; const ui = threadItemToUiMessage( - overrideThreadItemId(p.item, scopedId), + overrideThreadItemId(p.item, messageId), + p.turnId, ); if (!ui) return; if (ui.kind === "user") return; - const key = `${p.threadId}:${p.turnId}:${p.item.type}`; - const arr = streamingItemRef.current[key] ?? []; - if (!arr.includes(scopedId)) { - arr.push(scopedId); - } - streamingItemRef.current[key] = arr; setMessagesBySession((prev) => ({ ...prev, [p.threadId]: upsertMessage(prev[p.threadId] ?? [], ui), @@ -509,20 +540,14 @@ export function useCodexSession({ typeof p.delta !== "string" ) return; - const key = `${p.threadId}:${p.turnId}:agentMessage`; - const arr = streamingItemRef.current[key] ?? []; - const fallbackId = buildScopedItemId(p.turnId, p.itemId); - const mappedId = arr.length > 0 ? arr[0] : fallbackId; - if (arr.length === 0) { - arr.push(fallbackId); - streamingItemRef.current[key] = arr; - } + const itemId = buildScopedItemId(p.turnId, p.itemId); setMessagesBySession((prev) => ({ ...prev, [p.threadId]: appendAgentDelta( prev[p.threadId] ?? [], - mappedId, + itemId, p.delta, + p.turnId, ), })); return; @@ -533,19 +558,16 @@ export function useCodexSession({ | ReasoningSummaryPartAddedNotification | undefined; if (!p?.threadId || !p?.turnId || !p?.itemId) return; - const key = `${p.threadId}:${p.turnId}:reasoning`; - const arr = streamingItemRef.current[key] ?? []; - const fallbackId = buildScopedItemId(p.turnId, p.itemId); - const mappedId = arr.length > 0 ? arr[0] : fallbackId; - if (arr.length === 0) { - arr.push(fallbackId); - streamingItemRef.current[key] = arr; - } + const itemId = buildTemporaryMessageId( + p.threadId, + p.turnId, + "reasoning", + ); setMessagesBySession((prev) => ({ ...prev, [p.threadId]: appendReasoningSummaryDelta( prev[p.threadId] ?? [], - mappedId, + itemId, p.summaryIndex, "", ), @@ -564,19 +586,16 @@ export function useCodexSession({ typeof p.delta !== "string" ) return; - const key = `${p.threadId}:${p.turnId}:reasoning`; - const arr = streamingItemRef.current[key] ?? []; - const fallbackId = buildScopedItemId(p.turnId, p.itemId); - const mappedId = arr.length > 0 ? arr[0] : fallbackId; - if (arr.length === 0) { - arr.push(fallbackId); - streamingItemRef.current[key] = arr; - } + const itemId = buildTemporaryMessageId( + p.threadId, + p.turnId, + "reasoning", + ); setMessagesBySession((prev) => ({ ...prev, [p.threadId]: appendReasoningSummaryDelta( prev[p.threadId] ?? [], - mappedId, + itemId, p.summaryIndex, p.delta, ), @@ -593,19 +612,16 @@ export function useCodexSession({ typeof p.delta !== "string" ) return; - const key = `${p.threadId}:${p.turnId}:reasoning`; - const arr = streamingItemRef.current[key] ?? []; - const fallbackId = buildScopedItemId(p.turnId, p.itemId); - const mappedId = arr.length > 0 ? arr[0] : fallbackId; - if (arr.length === 0) { - arr.push(fallbackId); - streamingItemRef.current[key] = arr; - } + const itemId = buildTemporaryMessageId( + p.threadId, + p.turnId, + "reasoning", + ); setMessagesBySession((prev) => ({ ...prev, [p.threadId]: appendReasoningContentDelta( prev[p.threadId] ?? [], - mappedId, + itemId, p.contentIndex, p.delta, ), @@ -624,19 +640,12 @@ export function useCodexSession({ typeof p.delta !== "string" ) return; - const key = `${p.threadId}:${p.turnId}:commandExecution`; - const arr = streamingItemRef.current[key] ?? []; - const fallbackId = buildScopedItemId(p.turnId, p.itemId); - const mappedId = arr.length > 0 ? arr[0] : fallbackId; - if (arr.length === 0) { - arr.push(fallbackId); - streamingItemRef.current[key] = arr; - } + const itemId = buildScopedItemId(p.turnId, p.itemId); setMessagesBySession((prev) => ({ ...prev, [p.threadId]: appendCommandOutputDelta( prev[p.threadId] ?? [], - mappedId, + itemId, p.delta, ), })); @@ -646,21 +655,14 @@ export function useCodexSession({ if (method === "item/commandExecution/terminalInteraction") { const p = msg.params as TerminalInteractionNotification | undefined; if (!p?.threadId || !p?.turnId || !p?.itemId) return; - const key = `${p.threadId}:${p.turnId}:commandExecution`; - const arr = streamingItemRef.current[key] ?? []; - const fallbackId = buildScopedItemId(p.turnId, p.itemId); - const mappedId = arr.length > 0 ? arr[0] : fallbackId; - if (arr.length === 0) { - arr.push(fallbackId); - streamingItemRef.current[key] = arr; - } + const itemId = buildScopedItemId(p.turnId, p.itemId); const delta = p.stdin ? `\n> ${p.stdin}` : ""; if (!delta) return; setMessagesBySession((prev) => ({ ...prev, [p.threadId]: appendCommandOutputDelta( prev[p.threadId] ?? [], - mappedId, + itemId, delta, ), })); @@ -678,19 +680,12 @@ export function useCodexSession({ typeof p.delta !== "string" ) return; - const key = `${p.threadId}:${p.turnId}:fileChange`; - const arr = streamingItemRef.current[key] ?? []; - const fallbackId = buildScopedItemId(p.turnId, p.itemId); - const mappedId = arr.length > 0 ? arr[0] : fallbackId; - if (arr.length === 0) { - arr.push(fallbackId); - streamingItemRef.current[key] = arr; - } + const itemId = buildScopedItemId(p.turnId, p.itemId); setMessagesBySession((prev) => ({ ...prev, [p.threadId]: appendFileChangeOutputDelta( prev[p.threadId] ?? [], - mappedId, + itemId, p.delta, ), })); @@ -706,19 +701,16 @@ export function useCodexSession({ typeof p.message !== "string" ) return; - const key = `${p.threadId}:${p.turnId}:mcpToolCall`; - const arr = streamingItemRef.current[key] ?? []; - const fallbackId = buildScopedItemId(p.turnId, p.itemId); - const mappedId = arr.length > 0 ? arr[0] : fallbackId; - if (arr.length === 0) { - arr.push(fallbackId); - streamingItemRef.current[key] = arr; - } + const itemId = buildTemporaryMessageId( + p.threadId, + p.turnId, + "mcpToolCall", + ); setMessagesBySession((prev) => ({ ...prev, [p.threadId]: appendMcpToolProgress( prev[p.threadId] ?? [], - mappedId, + itemId, p.message, ), })); @@ -728,15 +720,12 @@ export function useCodexSession({ if (method === "item/completed") { const p = msg.params as ItemCompletedNotification | undefined; if (!p?.threadId || !p?.turnId || !p?.item) return; - const key = `${p.threadId}:${p.turnId}:${p.item.type}`; - const arr = streamingItemRef.current[key] ?? []; - const mappedId = arr.shift(); - streamingItemRef.current[key] = arr; - const scopedId = mappedId ?? buildScopedItemId(p.turnId, p.item.id); + const scopedId = buildScopedItemId(p.turnId, p.item.id); const item = overrideThreadItemId(p.item, scopedId); - const ui = threadItemToUiMessage(item); + const ui = threadItemToUiMessage(item, p.turnId); if (!ui) return; if (ui.kind === "user") return; + if (p.item.type === "reasoning") return; setMessagesBySession((prev) => ({ ...prev, [p.threadId]: upsertMessage(prev[p.threadId] ?? [], ui), @@ -750,17 +739,19 @@ export function useCodexSession({ const dividerId = `turn_completed_${p.threadId}_${p.turn.id}`; setMessagesBySession((prev) => ({ ...prev, - [p.threadId]: upsertMessage(prev[p.threadId] ?? [], { - id: dividerId, - kind: "turnCompleted", - turnId: p.turn.id, - text: "This round of conversation has ended", - }), + [p.threadId]: upsertMessage( + removeMessagesByIds( + prev[p.threadId] ?? [], + getTemporaryMessageIdsForTurn(p.threadId, p.turn.id), + ), + { + id: dividerId, + kind: "turnCompleted", + turnId: p.turn.id, + text: "This round of conversation has ended", + }, + ), })); - const prefix = `${p.threadId}:${p.turn.id}:`; - for (const key of Object.keys(streamingItemRef.current)) { - if (key.startsWith(prefix)) delete streamingItemRef.current[key]; - } setRunning((cur) => { if (!cur) return cur; if (cur.sessionId !== p.threadId) return cur; diff --git a/kagent-ui/lib/assistant/messages.test.js b/kagent-ui/lib/assistant/messages.test.js new file mode 100644 index 0000000..968b19b --- /dev/null +++ b/kagent-ui/lib/assistant/messages.test.js @@ -0,0 +1,83 @@ +import { describe, expect, test } from "bun:test"; +import { + appendMcpToolProgress, + appendReasoningContentDelta, + removeMessagesByIds, +} from "./messages"; + +describe("assistant message helpers", () => { + test("removeMessagesByIds only removes matching temporary messages", () => { + const messages = [ + { id: "user-1", kind: "user", text: "hello", attachments: [] }, + { + id: "temp:thread:turn:reasoning", + kind: "reasoning", + summary: [], + content: ["thinking"], + }, + { + id: "tool-1", + kind: "mcpToolCall", + server: "demo", + tool: "search", + status: "completed", + args: null, + result: { ok: true }, + error: null, + durationMs: 12, + progress: [], + }, + { id: "assistant-1", kind: "assistant", text: "done" }, + ]; + + const filtered = removeMessagesByIds(messages, [ + "temp:thread:turn:reasoning", + "temp:thread:turn:mcpToolCall", + ]); + + expect(filtered).toHaveLength(3); + expect(filtered.map((msg) => msg.id)).toEqual([ + "user-1", + "tool-1", + "assistant-1", + ]); + }); + + test("streaming deltas update the same temporary placeholder in place", () => { + const first = appendReasoningContentDelta( + [], + "temp:thread:turn:reasoning", + 0, + "step 1", + ); + const second = appendReasoningContentDelta( + first, + "temp:thread:turn:reasoning", + 0, + " + step 2", + ); + const third = appendMcpToolProgress( + second, + "temp:thread:turn:mcpToolCall", + "fetching", + ); + const fourth = appendMcpToolProgress( + third, + "temp:thread:turn:mcpToolCall", + "done", + ); + + expect(second).toHaveLength(1); + expect(second[0]).toMatchObject({ + id: "temp:thread:turn:reasoning", + kind: "reasoning", + content: ["step 1 + step 2"], + }); + expect(fourth).toHaveLength(2); + expect(fourth[1]).toMatchObject({ + id: "temp:thread:turn:mcpToolCall", + kind: "mcpToolCall", + progress: ["fetching", "done"], + }); + }); +}); diff --git a/kagent-ui/lib/assistant/messages.test.ts b/kagent-ui/lib/assistant/messages.test.ts new file mode 100644 index 0000000..72f6bd0 --- /dev/null +++ b/kagent-ui/lib/assistant/messages.test.ts @@ -0,0 +1,209 @@ +import assert from "node:assert/strict"; +import { describe, test } from "node:test"; +import type { Thread } from "@/lib/codex-app-server/v2/Thread"; +import type { UiMessage } from "@/components/assistant-ui/thread"; +import { + appendAgentDelta, + extractMessagesFromThread, + upsertMessage, +} from "@/lib/assistant/messages"; + +describe("assistant message merging", () => { + test("merges adjacent assistant messages from the same turn", () => { + const started = upsertMessage([], { + id: "turn-1:item-1", + kind: "assistant", + text: "你好!很高兴见到你。", + turnId: "turn-1", + } as UiMessage); + + const completed = upsertMessage(started, { + id: "turn-1:item-2", + kind: "assistant", + text: "你好!很高兴见到你。", + turnId: "turn-1", + } as UiMessage); + + assert.deepStrictEqual(completed, [ + { + id: "turn-1:item-1", + kind: "assistant", + text: "你好!很高兴见到你。", + turnId: "turn-1", + }, + ]); + }); + + test("merges a live assistant delta into the adjacent assistant bubble", () => { + const started = upsertMessage([], { + id: "turn-1:item-1", + kind: "assistant", + text: "", + turnId: "turn-1", + } as UiMessage); + + const streamed = appendAgentDelta( + started, + "turn-1:item-2", + "你好!很高兴见到你。", + "turn-1", + ); + + assert.deepStrictEqual(streamed, [ + { + id: "turn-1:item-1", + kind: "assistant", + text: "你好!很高兴见到你。", + turnId: "turn-1", + }, + ]); + }); + + test("rebuilds history without reintroducing adjacent assistant duplicates", () => { + const thread = { + id: "thread-1", + preview: "", + modelProvider: "", + createdAt: Date.now(), + path: "", + cwd: "", + cliVersion: "", + source: "unknown", + gitInfo: null, + turns: [ + { + id: "turn-1", + status: "completed", + error: null, + items: [ + { + type: "agentMessage", + id: "item-1", + text: "你好!很高兴见到你。", + }, + { + type: "agentMessage", + id: "item-2", + text: "你好!很高兴见到你。", + }, + ], + }, + { + id: "turn-2", + status: "completed", + error: null, + items: [ + { + type: "agentMessage", + id: "item-1", + text: "第二轮回复", + }, + ], + }, + ], + } as Thread; + + assert.deepStrictEqual(extractMessagesFromThread(thread), [ + { + id: "turn-1:item-1", + kind: "assistant", + text: "你好!很高兴见到你。", + turnId: "turn-1", + }, + { + id: "turn-2:item-1", + kind: "assistant", + text: "第二轮回复", + turnId: "turn-2", + }, + ]); + }); + + test("merges chunked adjacent assistant segments from the same turn", () => { + const messages = upsertMessage( + [ + { + id: "turn-1:item-1", + kind: "assistant", + text: "你", + turnId: "turn-1", + } as UiMessage, + ], + { + id: "turn-1:item-2", + kind: "assistant", + text: "好!很高兴见到你。", + turnId: "turn-1", + } as UiMessage, + ); + + assert.deepStrictEqual(messages, [ + { + id: "turn-1:item-1", + kind: "assistant", + text: "你好!很高兴见到你。", + turnId: "turn-1", + }, + ]); + }); + + test("keeps assistant messages separated when another item is between them", () => { + const thread = { + id: "thread-2", + preview: "", + modelProvider: "", + createdAt: Date.now(), + path: "", + cwd: "", + cliVersion: "", + source: "unknown", + gitInfo: null, + turns: [ + { + id: "turn-1", + status: "completed", + error: null, + items: [ + { + type: "agentMessage", + id: "item-1", + text: "先说明一下接下来要做什么。", + }, + { + type: "reasoning", + id: "item-2", + summary: ["思考过程"], + content: [], + }, + { + type: "agentMessage", + id: "item-3", + text: "下面是最终答复。", + }, + ], + }, + ], + } as Thread; + + assert.deepStrictEqual(extractMessagesFromThread(thread), [ + { + id: "turn-1:item-1", + kind: "assistant", + text: "先说明一下接下来要做什么。", + turnId: "turn-1", + }, + { + id: "turn-1:item-2", + kind: "reasoning", + summary: ["思考过程"], + content: [], + }, + { + id: "turn-1:item-3", + kind: "assistant", + text: "下面是最终答复。", + turnId: "turn-1", + }, + ]); + }); +}); diff --git a/kagent-ui/lib/assistant/messages.ts b/kagent-ui/lib/assistant/messages.ts index 637fd85..8e3745d 100644 --- a/kagent-ui/lib/assistant/messages.ts +++ b/kagent-ui/lib/assistant/messages.ts @@ -3,7 +3,10 @@ import type { Thread as CodexThread } from "@/lib/codex-app-server/v2/Thread"; import type { ThreadItem } from "@/lib/codex-app-server/v2/ThreadItem"; import type { UserInput } from "@/lib/codex-app-server/v2/UserInput"; -import type { UiMessage, UserAttachment } from "@/components/assistant-ui/thread"; +import type { + UiMessage, + UserAttachment, +} from "@/components/assistant-ui/thread"; import { buildScopedItemId, overrideThreadItemId } from "@/lib/assistant/utils"; export function extractMessagesFromThread(thread: CodexThread): UiMessage[] { @@ -11,8 +14,11 @@ export function extractMessagesFromThread(thread: CodexThread): UiMessage[] { for (const turn of thread.turns ?? []) { const items = sortThreadItemsById(turn.items ?? []); for (const item of items) { - const scoped = overrideThreadItemId(item, buildScopedItemId(turn.id, item.id)); - const msg = threadItemToUiMessage(scoped); + const scoped = overrideThreadItemId( + item, + buildScopedItemId(turn.id, item.id), + ); + const msg = threadItemToUiMessage(scoped, turn.id); if (!msg) continue; appendMergedAssistantMessage(msgs, msg); } @@ -20,7 +26,10 @@ export function extractMessagesFromThread(thread: CodexThread): UiMessage[] { return msgs; } -export function threadItemToUiMessage(item: ThreadItem): UiMessage | null { +export function threadItemToUiMessage( + item: ThreadItem, + turnId?: string, +): UiMessage | null { switch (item.type) { case "userMessage": { const { text, attachments } = userInputToText(item.content); @@ -28,9 +37,14 @@ export function threadItemToUiMessage(item: ThreadItem): UiMessage | null { return { id: item.id, kind: "user", text, attachments }; } case "agentMessage": - return { id: item.id, kind: "assistant", text: item.text ?? "" }; + return { id: item.id, kind: "assistant", text: item.text ?? "", turnId }; case "reasoning": - return { id: item.id, kind: "reasoning", summary: item.summary ?? [], content: item.content ?? [] }; + return { + id: item.id, + kind: "reasoning", + summary: item.summary ?? [], + content: item.content ?? [], + }; case "commandExecution": return { id: item.id, @@ -78,18 +92,44 @@ export function threadItemToUiMessage(item: ThreadItem): UiMessage | null { export function upsertMessage(list: UiMessage[], msg: UiMessage): UiMessage[] { const idx = list.findIndex((m) => m.id === msg.id); - if (idx === -1) return [...list, msg]; - const next = list.slice(); - next[idx] = msg; - return next; + const next = idx === -1 ? [...list, msg] : list.slice(); + if (idx !== -1) { + next[idx] = msg; + } + return normalizeAdjacentAssistantMessages(next); } -export function appendAgentDelta(list: UiMessage[], itemId: string, delta: string): UiMessage[] { - return updateMessage( - list, - itemId, - () => ({ id: itemId, kind: "assistant", text: delta }), - (msg) => (msg.kind === "assistant" ? { ...msg, text: `${msg.text}${delta}` } : msg), +export function removeMessagesByIds( + list: UiMessage[], + ids: Iterable, +): UiMessage[] { + const idSet = new Set(ids); + if (idSet.size === 0) return list; + return normalizeAdjacentAssistantMessages( + list.filter((msg) => !idSet.has(msg.id)), + ); +} + +export function appendAgentDelta( + list: UiMessage[], + itemId: string, + delta: string, + turnId?: string, +): UiMessage[] { + return normalizeAdjacentAssistantMessages( + updateMessage( + list, + itemId, + () => ({ id: itemId, kind: "assistant", text: delta, turnId }), + (msg) => + msg.kind === "assistant" + ? { + ...msg, + text: `${msg.text}${delta}`, + turnId: msg.turnId ?? turnId, + } + : msg, + ), ); } @@ -133,7 +173,11 @@ export function appendReasoningContentDelta( ); } -export function appendCommandOutputDelta(list: UiMessage[], itemId: string, delta: string): UiMessage[] { +export function appendCommandOutputDelta( + list: UiMessage[], + itemId: string, + delta: string, +): UiMessage[] { return updateMessage( list, itemId, @@ -155,11 +199,21 @@ export function appendCommandOutputDelta(list: UiMessage[], itemId: string, delt ); } -export function appendFileChangeOutputDelta(list: UiMessage[], itemId: string, delta: string): UiMessage[] { +export function appendFileChangeOutputDelta( + list: UiMessage[], + itemId: string, + delta: string, +): UiMessage[] { return updateMessage( list, itemId, - () => ({ id: itemId, kind: "fileChange", status: "inProgress", changes: [], output: delta }), + () => ({ + id: itemId, + kind: "fileChange", + status: "inProgress", + changes: [], + output: delta, + }), (msg) => { if (msg.kind !== "fileChange") return msg; const output = `${msg.output ?? ""}${delta}`; @@ -168,7 +222,11 @@ export function appendFileChangeOutputDelta(list: UiMessage[], itemId: string, d ); } -export function appendMcpToolProgress(list: UiMessage[], itemId: string, message: string): UiMessage[] { +export function appendMcpToolProgress( + list: UiMessage[], + itemId: string, + message: string, +): UiMessage[] { return updateMessage( list, itemId, @@ -205,7 +263,10 @@ function updateMessage( return next; } -function userInputToText(content: Array): { text: string; attachments: UserAttachment[] } { +function userInputToText(content: Array): { + text: string; + attachments: UserAttachment[]; +} { const attachments: UserAttachment[] = []; const text = content .map((c) => { @@ -231,14 +292,63 @@ function appendMergedAssistantMessage(list: UiMessage[], msg: UiMessage): void { return; } const last = list[list.length - 1]; - if (last && last.kind === "assistant") { - // 恢复历史记录时将拆分的 assistant 消息按顺序拼接 - last.text = `${last.text}${msg.text}`; + if ( + last && + last.kind === "assistant" && + shouldCoalesceAssistantMessages(last, msg) + ) { + // 恢复历史记录时将同一 turn 中相邻的 assistant 消息归并成一个气泡 + list[list.length - 1] = mergeAssistantMessages(last, msg); return; } list.push(msg); } +function normalizeAdjacentAssistantMessages(list: UiMessage[]): UiMessage[] { + const normalized: UiMessage[] = []; + for (const msg of list) { + const last = normalized[normalized.length - 1]; + if ( + last && + last.kind === "assistant" && + msg.kind === "assistant" && + shouldCoalesceAssistantMessages(last, msg) + ) { + normalized[normalized.length - 1] = mergeAssistantMessages(last, msg); + continue; + } + normalized.push(msg); + } + return normalized; +} + +function mergeAssistantMessages( + current: Extract, + incoming: Extract, +): Extract { + return { + ...current, + text: mergeAssistantText(current.text, incoming.text), + turnId: current.turnId ?? incoming.turnId, + }; +} + +function shouldCoalesceAssistantMessages( + current: Extract, + incoming: Extract, +): boolean { + return !!current.turnId && current.turnId === incoming.turnId; +} + +function mergeAssistantText(current: string, incoming: string): string { + if (!incoming) return current; + if (!current) return incoming; + if (current === incoming) return current; + if (incoming.startsWith(current)) return incoming; + if (current.startsWith(incoming)) return current; + return `${current}${incoming}`; +} + function sortThreadItemsById(items: Array): Array { if (items.length <= 1) return items; const parsed = items.map((item, idx) => {