diff --git a/client/src/features/xero/use-xero-desktop-state/runtime-stream.ts b/client/src/features/xero/use-xero-desktop-state/runtime-stream.ts index 47e09cae..d46dec42 100644 --- a/client/src/features/xero/use-xero-desktop-state/runtime-stream.ts +++ b/client/src/features/xero/use-xero-desktop-state/runtime-stream.ts @@ -638,6 +638,22 @@ export function createRuntimeStreamEventBuffer({ let cancelScheduledFlush: ScheduledFlushCancel | null = null let disposed = false let completionNotificationsEnabled = false + let pendingTranscriptTextChunks: string[] | null = null + + const flushPendingTranscriptText = () => { + if (!pendingTranscriptTextChunks) { + return + } + + const previous = pendingEvents.at(-1) + if (previous && !isRuntimeStreamPatch(previous) && previous.item.kind === 'transcript') { + previous.item = { + ...previous.item, + text: pendingTranscriptTextChunks.join(''), + } + } + pendingTranscriptTextChunks = null + } const cancelFlush = () => { if (!cancelScheduledFlush) { @@ -659,6 +675,7 @@ export function createRuntimeStreamEventBuffer({ return } + flushPendingTranscriptText() const events = pendingEvents.splice(0, pendingEvents.length) updateRuntimeStream(projectId, agentSessionId, (currentStream) => mergeRuntimeStreamEvents(currentStream, events)) if (completionNotificationsEnabled) { @@ -718,8 +735,33 @@ export function createRuntimeStreamEventBuffer({ if (isRuntimeStreamPatch(payload)) { pendingEvents.length = 0 + pendingTranscriptTextChunks = null } - pendingEvents.push(payload) + + const previous = pendingEvents.at(-1) + if ( + previous && + !isRuntimeStreamPatch(previous) && + !isRuntimeStreamPatch(payload) && + canAggregateRuntimeTranscriptDelta(previous, payload) + ) { + pendingTranscriptTextChunks ??= [previous.item.text ?? ''] + pendingTranscriptTextChunks.push(payload.item.text ?? '') + previous.item = { + ...previous.item, + updatedSequence: runtimeStreamPayloadUpdateSequence(payload), + createdAt: payload.item.createdAt, + } + schedule() + return + } + + flushPendingTranscriptText() + pendingEvents.push( + !isRuntimeStreamPatch(payload) && payload.item.kind === 'transcript' + ? cloneRuntimeStreamEventForAggregation(payload) + : payload, + ) if (isUrgentRuntimeStreamEvent(payload)) { flush() return @@ -736,6 +778,7 @@ export function createRuntimeStreamEventBuffer({ disposed = true cancelFlush() pendingEvents.length = 0 + pendingTranscriptTextChunks = null }, } }