From 607f34aaaed8969241e075cacf6bebc6ed5cef94 Mon Sep 17 00:00:00 2001 From: yyg-max <175597134+yyg-max@users.noreply.github.com> Date: Sun, 28 Jun 2026 15:04:53 +0800 Subject: [PATCH] fix(gui): stabilize remote chat streaming --- .../src-tauri/src/services/gateway.rs | 2 +- .../agent-gui/src/lib/system/powerActivity.ts | 6 +- crates/agent-gui/src/pages/ChatPage.tsx | 128 +++++++- .../chat/gateway/useGatewayBridgeBatcher.ts | 283 +++++++++++++++++- .../chat/gateway/useGatewayBridgeListeners.ts | 46 ++- .../chat/turns/runAgentConversationTurn.ts | 2 +- .../chat/turns/runTextConversationTurn.ts | 2 +- 7 files changed, 425 insertions(+), 44 deletions(-) diff --git a/crates/agent-gui/src-tauri/src/services/gateway.rs b/crates/agent-gui/src-tauri/src/services/gateway.rs index acec6f93..8a99c995 100644 --- a/crates/agent-gui/src-tauri/src/services/gateway.rs +++ b/crates/agent-gui/src-tauri/src/services/gateway.rs @@ -559,7 +559,7 @@ impl GatewayController { worker_id: Option, ) -> Result<(), String> { if !self.renew_remote_chat_request_lease(&request_id, worker_id.as_deref(), true)? { - return Ok(()); + return Err("remote chat request lease is no longer active".to_string()); } let envelope = build_chat_event_envelope(request_id, event)?; self.send_agent_envelope(envelope).await diff --git a/crates/agent-gui/src/lib/system/powerActivity.ts b/crates/agent-gui/src/lib/system/powerActivity.ts index d808ee85..49c6d3a8 100644 --- a/crates/agent-gui/src/lib/system/powerActivity.ts +++ b/crates/agent-gui/src/lib/system/powerActivity.ts @@ -12,9 +12,9 @@ function createActivityId(scope: string) { async function beginPowerActivity(activityId: string, reason: string) { await invoke("system_begin_power_activity", { - activityId, + activity_id: activityId, reason, - ttlMs: POWER_ACTIVITY_TTL_MS, + ttl_ms: POWER_ACTIVITY_TTL_MS, }); } @@ -40,7 +40,7 @@ export async function withPowerActivity(scope: string, reason: string, run: ( clearInterval(refreshTimer); } try { - await invoke("system_end_power_activity", { activityId }); + await invoke("system_end_power_activity", { activity_id: activityId }); } catch (error) { console.warn("system_end_power_activity failed", error); } diff --git a/crates/agent-gui/src/pages/ChatPage.tsx b/crates/agent-gui/src/pages/ChatPage.tsx index afbb9b1a..04dee225 100644 --- a/crates/agent-gui/src/pages/ChatPage.tsx +++ b/crates/agent-gui/src/pages/ChatPage.tsx @@ -1439,7 +1439,8 @@ export function ChatPage(props: ChatPageProps) { scrollAreaRef, composerBusyRef, }); - const { queueGatewayBridgeEventForRequest } = useGatewayBridgeBatcher(); + const { queueGatewayBridgeEventForRequest, flushGatewayBridgeEventsForRequest } = + useGatewayBridgeBatcher(); const { currentConversationIdRef, conversationRuntimeCacheRef, @@ -2295,13 +2296,29 @@ export function ChatPage(props: ChatPageProps) { ); inFlightQueuedTurn = null; } else if (gatewayRequest) { - void invoke("gateway_chat_complete", { - request_id: gatewayRequest.requestId, - conversation_id: targetConversationId, - worker_id: gatewayWorkerId, - } as any).catch((error) => { + try { + await flushGatewayBridgeEventsForRequest(gatewayRequest.requestId); + await invoke("gateway_chat_complete", { + request_id: gatewayRequest.requestId, + conversation_id: targetConversationId, + worker_id: gatewayWorkerId, + } as any); + } catch (error) { console.warn("gateway_chat_complete failed", error); - }); + await invoke("gateway_chat_fail", { + request_id: gatewayRequest.requestId, + conversation_id: targetConversationId, + error_code: "desktop_event_flush_failed", + message: asErrorMessage( + error, + "Failed to flush gateway chat events before completing queued request.", + ), + terminal: true, + worker_id: gatewayWorkerId, + } as any).catch((failError) => { + console.warn("gateway_chat_fail failed", failError); + }); + } } return accepted; }) @@ -3441,6 +3458,7 @@ export function ChatPage(props: ChatPageProps) { ensureGatewayBridgeConversationReadyRef, sendActionRef, queueGatewayBridgeEventForRequest, + flushGatewayBridgeEventsForRequest, shouldQueueGatewayChatRequest, enqueueGatewayChatRequest, isConversationRunning, @@ -3517,12 +3535,54 @@ export function ChatPage(props: ChatPageProps) { gatewayBridgeRequest?.requestId ?? createLocalGatewayChatRunId(conversationId); const gatewayBridgeWorkerId = gatewayBridgeRequest?.workerId ?? (mirrorsLocalRunToGateway ? "gui-live" : undefined); + let gatewayBridgeAbortSignal: AbortSignal | null = null; + const waitForLocalGatewayMirror = (sendResult: Promise | void) => { + const sendPromise = Promise.resolve(sendResult); + const signal = gatewayBridgeAbortSignal; + if (!signal) { + return sendPromise; + } + if (signal.aborted) { + return Promise.resolve(); + } + return new Promise((resolve, reject) => { + const handleAbort = () => { + resolve(); + }; + signal.addEventListener("abort", handleAbort, { once: true }); + sendPromise.then( + () => { + signal.removeEventListener("abort", handleAbort); + resolve(); + }, + (error) => { + signal.removeEventListener("abort", handleAbort); + reject(error); + }, + ); + }); + }; + const sendGatewayBridgeEventForRun = mirrorsLocalRunToGateway + ? ( + requestId: string, + event: Record, + options?: { workerId?: string }, + ) => { + const sendPromise = waitForLocalGatewayMirror( + queueGatewayBridgeEventForRequest(requestId, event, options), + ); + sendPromise.catch((error) => { + console.warn("local gateway chat event mirror failed", error); + }); + return sendPromise; + } + : queueGatewayBridgeEventForRequest; const gatewayBridgeEvents = createGatewayBridgeEventController({ conversationId, requestId: gatewayBridgeRequestId, workerId: gatewayBridgeWorkerId, enabled: Boolean(gatewayBridgeRequest) || hasRemoteGatewayTarget, - sendEvent: queueGatewayBridgeEventForRequest, + sendEvent: sendGatewayBridgeEventForRun, resolveErrorConversationId: () => gatewayBridgeRequest?.conversationId ?? currentConversationIdRef.current, }); @@ -3696,6 +3756,7 @@ export function ChatPage(props: ChatPageProps) { const conversationThrottleState = getCompactionThrottleState(conversationId); const isConversationVisible = () => currentConversationIdRef.current === conversationId; let requestController = new AbortController(); + gatewayBridgeAbortSignal = requestController.signal; const conversationDebugLogger = createStreamDebugLogger({ enabled: effectiveIsAgentDevExecutionMode, conversationId, @@ -3821,6 +3882,22 @@ export function ChatPage(props: ChatPageProps) { queueGatewayConversationActivity(conversationId, false, conversationCwd); } } + function finishAbortedBeforeRuntimeStart() { + if (!requestController.signal.aborted) { + return false; + } + gatewayBridgeEvents.queueEvent({ + type: "error", + message: "Cancelled", + conversation_id: conversationId, + }); + gatewayBridgeEvents.close(); + updateToolStatus(null, transcriptStore, isConversationVisible()); + clearAbortSnapshot(transcriptStore); + markConversationRunStopped(); + requestQueuedChatTurnProcessing(conversationId); + return true; + } let localGatewayRunStarted = false; async function markLocalGatewayRunStarted() { if (!mirrorsLocalRunToGateway || localGatewayRunStarted) { @@ -3836,9 +3913,18 @@ export function ChatPage(props: ChatPageProps) { markConversationRunStarted(); if (mirrorsLocalRunToGateway) { try { - await markLocalGatewayRunStarted(); + await waitForLocalGatewayMirror(markLocalGatewayRunStarted()); } catch (error) { + const message = asErrorMessage(error, "Gateway 本地会话启动同步失败"); + setConversationErrorState(message); + gatewayBridgeEvents.emitError(message, conversationId); + gatewayBridgeEvents.close(); + markConversationRunStopped(); console.warn("gateway_chat_mark_local_started failed", error); + return true; + } + if (finishAbortedBeforeRuntimeStart()) { + return true; } } if (overrides?.beforeRuntimeStart) { @@ -3852,6 +3938,9 @@ export function ChatPage(props: ChatPageProps) { markConversationRunStopped(); return false; } + if (finishAbortedBeforeRuntimeStart()) { + return true; + } } // Persist the user turn immediately so WebUI/GUI sidebars can surface the @@ -3920,7 +4009,22 @@ export function ChatPage(props: ChatPageProps) { console.warn("gateway stream started before initial user turn was persisted"); } } - await gatewayBridgeEvents.queueUserMessage(text, uploadedFiles); + try { + await gatewayBridgeEvents.queueUserMessage(text, uploadedFiles); + } catch (error) { + if (!mirrorsLocalRunToGateway) { + throw error; + } + const message = asErrorMessage(error, "Gateway 用户消息同步失败"); + setConversationErrorState(message); + gatewayBridgeEvents.emitError(message, conversationId); + gatewayBridgeEvents.close(); + markConversationRunStopped(); + return true; + } + if (finishAbortedBeforeRuntimeStart()) { + return true; + } acknowledgeGatewayRunStarted(); let activeCompactionRollback: { state: ConversationViewState; @@ -4217,6 +4321,7 @@ export function ChatPage(props: ChatPageProps) { function renewRequestController() { requestController = new AbortController(); + gatewayBridgeAbortSignal = requestController.signal; setConversationAbortController(conversationId, requestController); return requestController; } @@ -4489,6 +4594,9 @@ export function ChatPage(props: ChatPageProps) { hookWarning: null, })); } + if (finishAbortedBeforeRuntimeStart()) { + return true; + } try { if (effectiveIsAgentMode) { diff --git a/crates/agent-gui/src/pages/chat/gateway/useGatewayBridgeBatcher.ts b/crates/agent-gui/src/pages/chat/gateway/useGatewayBridgeBatcher.ts index 96732ced..d8e76654 100644 --- a/crates/agent-gui/src/pages/chat/gateway/useGatewayBridgeBatcher.ts +++ b/crates/agent-gui/src/pages/chat/gateway/useGatewayBridgeBatcher.ts @@ -40,6 +40,7 @@ const GATEWAY_BRIDGE_BATCH_MAX_DELAY_MS = 32; const GATEWAY_BRIDGE_BATCH_MAX_TEXT_LENGTH = 640; const GATEWAY_BRIDGE_TOOL_DELTA_BATCH_MAX_DELAY_MS = 200; const GATEWAY_BRIDGE_TOOL_DELTA_HIDDEN_BATCH_MAX_DELAY_MS = 750; +const GATEWAY_BRIDGE_SEND_RETRY_DELAYS_MS = [100, 300, 750]; function normalizeGatewayBridgeBatchRound(value: unknown) { return typeof value === "number" && Number.isFinite(value) ? value : null; @@ -52,6 +53,39 @@ function shouldFlushGatewayBridgeBatchWithoutAnimationFrame() { return document.visibilityState !== "visible"; } +function isTerminalGatewayBridgeEvent(event: Record) { + return event.type === "done" || event.type === "error"; +} + +function delayGatewayBridgeSendRetry(delayMs: number) { + return new Promise((resolve) => { + window.setTimeout(resolve, delayMs); + }); +} + +async function invokeGatewaySendChatEventWithRetry( + requestId: string, + event: Record, + workerId?: string, +) { + for (let attempt = 0; ; attempt += 1) { + try { + await invoke("gateway_send_chat_event", { + request_id: requestId, + event, + worker_id: workerId, + } as any); + return; + } catch (error) { + const retryDelayMs = GATEWAY_BRIDGE_SEND_RETRY_DELAYS_MS[attempt]; + if (retryDelayMs === undefined) { + throw error; + } + await delayGatewayBridgeSendRetry(retryDelayMs); + } + } +} + function toBatchableGatewayBridgeEvent( event: Record, ): BatchableGatewayBridgeEvent | null { @@ -133,32 +167,86 @@ function batchableGatewayBridgeEventSize(event: BatchableGatewayBridgeEvent) { export function useGatewayBridgeBatcher() { const gatewayEventChainRef = useRef>(Promise.resolve()); + const gatewayEventSendsByRequestRef = useRef(new Map>>()); + const gatewayEventBarriersByRequestRef = useRef(new Map>>()); + const gatewayEventBarrierChainByRequestRef = useRef(new Map>()); const pendingGatewayBridgeEventBatchesRef = useRef( new Map(), ); const inFlightToolCallDeltaBatchesRef = useRef(new Set()); const deferredToolCallDeltaSendsRef = useRef(new Map()); + const trackGatewayBridgeEventSend = useCallback( + (requestId: string, sendPromise: Promise) => { + const normalizedRequestId = requestId.trim(); + if (!normalizedRequestId) { + return; + } + let sends = gatewayEventSendsByRequestRef.current.get(normalizedRequestId); + if (!sends) { + sends = new Set(); + gatewayEventSendsByRequestRef.current.set(normalizedRequestId, sends); + } + sends.add(sendPromise); + const removeSend = () => { + const currentSends = gatewayEventSendsByRequestRef.current.get(normalizedRequestId); + if (!currentSends) { + return; + } + currentSends.delete(sendPromise); + if (currentSends.size === 0) { + gatewayEventSendsByRequestRef.current.delete(normalizedRequestId); + } + }; + sendPromise.then(removeSend, removeSend); + }, + [], + ); + + const trackGatewayBridgeEventBarrier = useCallback( + (requestId: string, barrierPromise: Promise) => { + const normalizedRequestId = requestId.trim(); + if (!normalizedRequestId) { + return; + } + let barriers = gatewayEventBarriersByRequestRef.current.get(normalizedRequestId); + if (!barriers) { + barriers = new Set(); + gatewayEventBarriersByRequestRef.current.set(normalizedRequestId, barriers); + } + barriers.add(barrierPromise); + const removeBarrier = () => { + const currentBarriers = + gatewayEventBarriersByRequestRef.current.get(normalizedRequestId); + if (!currentBarriers) { + return; + } + currentBarriers.delete(barrierPromise); + if (currentBarriers.size === 0) { + gatewayEventBarriersByRequestRef.current.delete(normalizedRequestId); + } + }; + barrierPromise.then(removeBarrier, removeBarrier); + }, + [], + ); + const sendGatewayBridgeEventForRequest = useCallback( (requestId: string, event: Record, options?: GatewayBridgeSendOptions) => { const workerId = options?.workerId?.trim() || undefined; const sendPromise = gatewayEventChainRef.current .catch(() => undefined) - .then(() => - invoke("gateway_send_chat_event", { - request_id: requestId, - event, - worker_id: workerId, - } as any), - ) - .then(() => undefined) - .catch((error) => { + .then(() => invokeGatewaySendChatEventWithRetry(requestId, event, workerId)); + gatewayEventChainRef.current = sendPromise.catch(() => undefined); + trackGatewayBridgeEventSend(requestId, sendPromise); + sendPromise.catch((error) => { + if (!isTerminalGatewayBridgeEvent(event)) { console.warn("gateway_send_chat_event failed", error); - }); - gatewayEventChainRef.current = sendPromise; + } + }); return sendPromise; }, - [], + [trackGatewayBridgeEventSend], ); const sendToolCallDeltaForRequest = useCallback( @@ -179,7 +267,8 @@ export function useGatewayBridgeBatcher() { } inFlightToolCallDeltaBatchesRef.current.add(batchKey); - sendGatewayBridgeEventForRequest(requestId, event, options).finally(() => { + const sendPromise = sendGatewayBridgeEventForRequest(requestId, event, options); + const finishSend = () => { inFlightToolCallDeltaBatchesRef.current.delete(batchKey); const deferred = deferredToolCallDeltaSendsRef.current.get(batchKey); if (!deferred) { @@ -192,7 +281,8 @@ export function useGatewayBridgeBatcher() { deferred.event, deferred.options, ); - }); + }; + sendPromise.then(finishSend, finishSend); }, [sendGatewayBridgeEventForRequest], ); @@ -265,6 +355,163 @@ export function useGatewayBridgeBatcher() { [flushGatewayBridgeEventBatchForRequest], ); + const hasPendingGatewayBridgeEventBatchesForRequest = useCallback((requestId: string) => { + const normalizedRequestId = requestId.trim(); + if (!normalizedRequestId) { + return false; + } + for (const pending of pendingGatewayBridgeEventBatchesRef.current.values()) { + if (pending.requestId === normalizedRequestId) { + return true; + } + } + return false; + }, []); + + const hasDeferredToolCallDeltasForRequest = useCallback((requestId: string) => { + const normalizedRequestId = requestId.trim(); + if (!normalizedRequestId) { + return false; + } + for (const deferred of deferredToolCallDeltaSendsRef.current.values()) { + if (deferred.requestId === normalizedRequestId) { + return true; + } + } + return false; + }, []); + + const hasGatewayBridgeEventSendsForRequest = useCallback((requestId: string) => { + const sends = gatewayEventSendsByRequestRef.current.get(requestId.trim()); + return Boolean(sends && sends.size > 0); + }, []); + + const hasGatewayBridgeEventBarriersForRequest = useCallback((requestId: string) => { + const barriers = gatewayEventBarriersByRequestRef.current.get(requestId.trim()); + return Boolean(barriers && barriers.size > 0); + }, []); + + const waitForGatewayBridgeEventSendsForRequest = useCallback(async (requestId: string) => { + const sends = gatewayEventSendsByRequestRef.current.get(requestId.trim()); + if (!sends || sends.size === 0) { + return; + } + const results = await Promise.allSettled(Array.from(sends)); + const rejected = results.find( + (result): result is PromiseRejectedResult => result.status === "rejected", + ); + if (rejected) { + throw rejected.reason; + } + }, []); + + const waitForGatewayBridgeEventBarriersForRequest = useCallback(async (requestId: string) => { + const barriers = gatewayEventBarriersByRequestRef.current.get(requestId.trim()); + if (!barriers || barriers.size === 0) { + return; + } + const results = await Promise.allSettled(Array.from(barriers)); + const rejected = results.find( + (result): result is PromiseRejectedResult => result.status === "rejected", + ); + if (rejected) { + throw rejected.reason; + } + }, []); + + const drainGatewayBridgeEventsForRequest = useCallback( + async (requestId: string) => { + const normalizedRequestId = requestId.trim(); + if (!normalizedRequestId) { + return; + } + for (;;) { + flushGatewayBridgeEventBatchesForRequest(normalizedRequestId); + await waitForGatewayBridgeEventSendsForRequest(normalizedRequestId); + await Promise.resolve(); + if ( + !hasPendingGatewayBridgeEventBatchesForRequest(normalizedRequestId) && + !hasDeferredToolCallDeltasForRequest(normalizedRequestId) && + !hasGatewayBridgeEventSendsForRequest(normalizedRequestId) + ) { + return; + } + } + }, + [ + flushGatewayBridgeEventBatchesForRequest, + hasDeferredToolCallDeltasForRequest, + hasGatewayBridgeEventSendsForRequest, + hasPendingGatewayBridgeEventBatchesForRequest, + waitForGatewayBridgeEventSendsForRequest, + ], + ); + + const queueTerminalGatewayBridgeEventForRequest = useCallback( + (requestId: string, event: Record, options?: GatewayBridgeSendOptions) => { + const normalizedRequestId = requestId.trim(); + if (!normalizedRequestId) { + return sendGatewayBridgeEventForRequest(requestId, event, options); + } + const previousBarrier = + gatewayEventBarrierChainByRequestRef.current.get(normalizedRequestId) ?? + Promise.resolve(); + const barrierPromise = previousBarrier + .catch(() => undefined) + .then(async () => { + await drainGatewayBridgeEventsForRequest(normalizedRequestId); + await sendGatewayBridgeEventForRequest(normalizedRequestId, event, options); + }); + const barrierChain = barrierPromise.catch(() => undefined); + gatewayEventBarrierChainByRequestRef.current.set(normalizedRequestId, barrierChain); + barrierChain.then(() => { + if (gatewayEventBarrierChainByRequestRef.current.get(normalizedRequestId) === barrierChain) { + gatewayEventBarrierChainByRequestRef.current.delete(normalizedRequestId); + } + }); + trackGatewayBridgeEventBarrier(normalizedRequestId, barrierPromise); + barrierPromise.catch((error) => { + console.warn("gateway terminal chat event failed", error); + }); + return barrierPromise; + }, + [ + drainGatewayBridgeEventsForRequest, + sendGatewayBridgeEventForRequest, + trackGatewayBridgeEventBarrier, + ], + ); + + const flushGatewayBridgeEventsForRequest = useCallback( + async (requestId: string) => { + const normalizedRequestId = requestId.trim(); + if (!normalizedRequestId) { + return; + } + for (;;) { + await drainGatewayBridgeEventsForRequest(normalizedRequestId); + await waitForGatewayBridgeEventBarriersForRequest(normalizedRequestId); + await Promise.resolve(); + if ( + !hasPendingGatewayBridgeEventBatchesForRequest(normalizedRequestId) && + !hasDeferredToolCallDeltasForRequest(normalizedRequestId) && + !hasGatewayBridgeEventSendsForRequest(normalizedRequestId) && + !hasGatewayBridgeEventBarriersForRequest(normalizedRequestId) + ) { + return; + } + } + }, + [ + drainGatewayBridgeEventsForRequest, + hasDeferredToolCallDeltasForRequest, + hasGatewayBridgeEventBarriersForRequest, + hasGatewayBridgeEventSendsForRequest, + hasPendingGatewayBridgeEventBatchesForRequest, + waitForGatewayBridgeEventBarriersForRequest, + ], + ); + const scheduleGatewayBridgeEventBatchFlush = useCallback( (batchKey: string) => { const pending = pendingGatewayBridgeEventBatchesRef.current.get(batchKey); @@ -329,6 +576,9 @@ export function useGatewayBridgeBatcher() { (requestId: string, event: Record, options?: GatewayBridgeSendOptions) => { const batchable = toBatchableGatewayBridgeEvent(event); if (!batchable) { + if (isTerminalGatewayBridgeEvent(event)) { + return queueTerminalGatewayBridgeEventForRequest(requestId, event, options); + } flushGatewayBridgeEventBatchesForRequest(requestId); discardDeferredToolCallDeltasForRequest(requestId); return sendGatewayBridgeEventForRequest(requestId, event, options); @@ -371,6 +621,7 @@ export function useGatewayBridgeBatcher() { discardDeferredToolCallDeltasForRequest, flushGatewayBridgeEventBatchesForRequest, flushGatewayBridgeEventBatchForRequest, + queueTerminalGatewayBridgeEventForRequest, scheduleGatewayBridgeEventBatchFlush, sendGatewayBridgeEventForRequest, ], @@ -397,6 +648,9 @@ export function useGatewayBridgeBatcher() { pendingGatewayBridgeEventBatchesRef.current.clear(); deferredToolCallDeltaSendsRef.current.clear(); inFlightToolCallDeltaBatchesRef.current.clear(); + gatewayEventSendsByRequestRef.current.clear(); + gatewayEventBarriersByRequestRef.current.clear(); + gatewayEventBarrierChainByRequestRef.current.clear(); }, [], ); @@ -419,5 +673,6 @@ export function useGatewayBridgeBatcher() { return { queueGatewayBridgeEventForRequest, flushPendingGatewayBridgeEvents, + flushGatewayBridgeEventsForRequest, }; } diff --git a/crates/agent-gui/src/pages/chat/gateway/useGatewayBridgeListeners.ts b/crates/agent-gui/src/pages/chat/gateway/useGatewayBridgeListeners.ts index af29dfe8..f011e2e8 100644 --- a/crates/agent-gui/src/pages/chat/gateway/useGatewayBridgeListeners.ts +++ b/crates/agent-gui/src/pages/chat/gateway/useGatewayBridgeListeners.ts @@ -23,6 +23,7 @@ type UseGatewayBridgeListenersParams = GatewayBridgeRuntimeRefs & { event: Record, options?: { workerId?: string }, ) => Promise | void; + flushGatewayBridgeEventsForRequest: (requestId: string) => Promise; shouldQueueGatewayChatRequest: ( conversationId: string, queuePolicy: "auto" | "append" | "interrupt", @@ -162,6 +163,7 @@ export function useGatewayBridgeListeners( ensureGatewayBridgeConversationReadyRef, sendActionRef, queueGatewayBridgeEventForRequest, + flushGatewayBridgeEventsForRequest, shouldQueueGatewayChatRequest, enqueueGatewayChatRequest, isConversationRunning, @@ -366,22 +368,24 @@ export function useGatewayBridgeListeners( ); }; - const failClaimedRequest = ( + const failClaimedRequest = async ( requestId: string, conversationId: string, errorCode: string, message: string, ) => { - void invoke("gateway_chat_fail", { - request_id: requestId, - conversation_id: conversationId || undefined, - error_code: errorCode, - message, - terminal: true, - worker_id: workerId, - } as any).catch((error) => { + try { + await invoke("gateway_chat_fail", { + request_id: requestId, + conversation_id: conversationId || undefined, + error_code: errorCode, + message, + terminal: true, + worker_id: workerId, + } as any); + } catch (error) { console.warn("gateway_chat_fail failed", error); - }); + } }; const markQueuedInGui = async ( @@ -433,7 +437,10 @@ export function useGatewayBridgeListeners( workerId, }, ); - failClaimedRequest( + await flushGatewayBridgeEventsForRequest(requestId).catch((error) => { + console.warn("flush gateway chat events failed", error); + }); + await failClaimedRequest( requestId, targetConversationId, "empty_remote_message", @@ -503,7 +510,10 @@ export function useGatewayBridgeListeners( workerId, }, ); - failClaimedRequest( + await flushGatewayBridgeEventsForRequest(requestId).catch((error) => { + console.warn("flush gateway chat events failed", error); + }); + await failClaimedRequest( requestId, targetConversationId, "invalid_chat_command", @@ -597,7 +607,10 @@ export function useGatewayBridgeListeners( afterInitialHistoryPersist: markRuntimeStarted, }); if (!accepted) { - failClaimedRequest( + await flushGatewayBridgeEventsForRequest(requestId).catch((error) => { + console.warn("flush gateway chat events failed", error); + }); + await failClaimedRequest( requestId, resolvedConversationId, "desktop_runtime_rejected", @@ -605,6 +618,7 @@ export function useGatewayBridgeListeners( ); return; } + await flushGatewayBridgeEventsForRequest(requestId); await invoke("gateway_chat_complete", { request_id: requestId, conversation_id: resolvedConversationId, @@ -633,7 +647,10 @@ export function useGatewayBridgeListeners( workerId, }, ); - failClaimedRequest( + await flushGatewayBridgeEventsForRequest(requestId).catch((flushError) => { + console.warn("flush gateway chat events failed", flushError); + }); + await failClaimedRequest( requestId, resolvedConversationId || targetConversationId || @@ -778,6 +795,7 @@ export function useGatewayBridgeListeners( isConversationRunning, shouldQueueGatewayChatRequest, enqueueGatewayChatRequest, + flushGatewayBridgeEventsForRequest, queueGatewayBridgeEventForRequest, sendActionRef, ]); diff --git a/crates/agent-gui/src/pages/chat/turns/runAgentConversationTurn.ts b/crates/agent-gui/src/pages/chat/turns/runAgentConversationTurn.ts index 40c6162e..b0fb53bc 100644 --- a/crates/agent-gui/src/pages/chat/turns/runAgentConversationTurn.ts +++ b/crates/agent-gui/src/pages/chat/turns/runAgentConversationTurn.ts @@ -1317,7 +1317,7 @@ export async function runAgentConversationTurn(params: RunAgentConversationTurnP createdAt, titlePromise, }); - gatewayBridgeEvents.queueEvent({ + await gatewayBridgeEvents.queueEvent({ type: "done", conversation_id: conversationId, }); diff --git a/crates/agent-gui/src/pages/chat/turns/runTextConversationTurn.ts b/crates/agent-gui/src/pages/chat/turns/runTextConversationTurn.ts index 1da76e2f..dda7661d 100644 --- a/crates/agent-gui/src/pages/chat/turns/runTextConversationTurn.ts +++ b/crates/agent-gui/src/pages/chat/turns/runTextConversationTurn.ts @@ -487,7 +487,7 @@ export async function runTextConversationTurn(params: RunTextConversationTurnPar createdAt, titlePromise, }); - gatewayBridgeEvents.queueEvent({ + await gatewayBridgeEvents.queueEvent({ type: "done", conversation_id: conversationId, });