Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/agent-gui/src-tauri/src/services/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ impl GatewayController {
worker_id: Option<String>,
) -> Result<(), String> {
if !self.renew_remote_chat_request_lease(&request_id, worker_id.as_deref(), true)? {
return Ok(());
return Err("remote chat request lease is no longer active".to_string());
}
let envelope = build_chat_event_envelope(request_id, event)?;
self.send_agent_envelope(envelope).await
Expand Down
6 changes: 3 additions & 3 deletions crates/agent-gui/src/lib/system/powerActivity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ function createActivityId(scope: string) {

async function beginPowerActivity(activityId: string, reason: string) {
await invoke("system_begin_power_activity", {
activityId,
activity_id: activityId,
reason,
ttlMs: POWER_ACTIVITY_TTL_MS,
ttl_ms: POWER_ACTIVITY_TTL_MS,
});
}

Expand All @@ -40,7 +40,7 @@ export async function withPowerActivity<T>(scope: string, reason: string, run: (
clearInterval(refreshTimer);
}
try {
await invoke("system_end_power_activity", { activityId });
await invoke("system_end_power_activity", { activity_id: activityId });
} catch (error) {
console.warn("system_end_power_activity failed", error);
}
Expand Down
128 changes: 118 additions & 10 deletions crates/agent-gui/src/pages/ChatPage.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -1439,7 +1439,8 @@ export function ChatPage(props: ChatPageProps) {
scrollAreaRef,
composerBusyRef,
});
const { queueGatewayBridgeEventForRequest } = useGatewayBridgeBatcher();
const { queueGatewayBridgeEventForRequest, flushGatewayBridgeEventsForRequest } =
useGatewayBridgeBatcher();
const {
currentConversationIdRef,
conversationRuntimeCacheRef,
Expand Down Expand Up @@ -2295,13 +2296,29 @@ export function ChatPage(props: ChatPageProps) {
);
inFlightQueuedTurn = null;
} else if (gatewayRequest) {
void invoke("gateway_chat_complete", {
request_id: gatewayRequest.requestId,
conversation_id: targetConversationId,
worker_id: gatewayWorkerId,
} as any).catch((error) => {
try {
await flushGatewayBridgeEventsForRequest(gatewayRequest.requestId);
await invoke("gateway_chat_complete", {
request_id: gatewayRequest.requestId,
conversation_id: targetConversationId,
worker_id: gatewayWorkerId,
} as any);
} catch (error) {
console.warn("gateway_chat_complete failed", error);
});
await invoke("gateway_chat_fail", {
request_id: gatewayRequest.requestId,
conversation_id: targetConversationId,
error_code: "desktop_event_flush_failed",
message: asErrorMessage(
error,
"Failed to flush gateway chat events before completing queued request.",
),
terminal: true,
worker_id: gatewayWorkerId,
} as any).catch((failError) => {
console.warn("gateway_chat_fail failed", failError);
});
}
}
return accepted;
})
Expand Down Expand Up @@ -3441,6 +3458,7 @@ export function ChatPage(props: ChatPageProps) {
ensureGatewayBridgeConversationReadyRef,
sendActionRef,
queueGatewayBridgeEventForRequest,
flushGatewayBridgeEventsForRequest,
shouldQueueGatewayChatRequest,
enqueueGatewayChatRequest,
isConversationRunning,
Expand Down Expand Up @@ -3517,12 +3535,54 @@ export function ChatPage(props: ChatPageProps) {
gatewayBridgeRequest?.requestId ?? createLocalGatewayChatRunId(conversationId);
const gatewayBridgeWorkerId =
gatewayBridgeRequest?.workerId ?? (mirrorsLocalRunToGateway ? "gui-live" : undefined);
let gatewayBridgeAbortSignal: AbortSignal | null = null;
const waitForLocalGatewayMirror = (sendResult: Promise<void> | void) => {
const sendPromise = Promise.resolve(sendResult);
const signal = gatewayBridgeAbortSignal;
if (!signal) {
return sendPromise;
}
if (signal.aborted) {
return Promise.resolve();
}
return new Promise<void>((resolve, reject) => {
const handleAbort = () => {
resolve();
};
signal.addEventListener("abort", handleAbort, { once: true });
sendPromise.then(
() => {
signal.removeEventListener("abort", handleAbort);
resolve();
},
(error) => {
signal.removeEventListener("abort", handleAbort);
reject(error);
},
);
});
};
const sendGatewayBridgeEventForRun = mirrorsLocalRunToGateway
? (
requestId: string,
event: Record<string, unknown>,
options?: { workerId?: string },
) => {
const sendPromise = waitForLocalGatewayMirror(
queueGatewayBridgeEventForRequest(requestId, event, options),
);
sendPromise.catch((error) => {
console.warn("local gateway chat event mirror failed", error);
});
return sendPromise;
}
: queueGatewayBridgeEventForRequest;
const gatewayBridgeEvents = createGatewayBridgeEventController({
conversationId,
requestId: gatewayBridgeRequestId,
workerId: gatewayBridgeWorkerId,
enabled: Boolean(gatewayBridgeRequest) || hasRemoteGatewayTarget,
sendEvent: queueGatewayBridgeEventForRequest,
sendEvent: sendGatewayBridgeEventForRun,
resolveErrorConversationId: () =>
gatewayBridgeRequest?.conversationId ?? currentConversationIdRef.current,
});
Expand Down Expand Up @@ -3696,6 +3756,7 @@ export function ChatPage(props: ChatPageProps) {
const conversationThrottleState = getCompactionThrottleState(conversationId);
const isConversationVisible = () => currentConversationIdRef.current === conversationId;
let requestController = new AbortController();
gatewayBridgeAbortSignal = requestController.signal;
const conversationDebugLogger = createStreamDebugLogger({
enabled: effectiveIsAgentDevExecutionMode,
conversationId,
Expand Down Expand Up @@ -3821,6 +3882,22 @@ export function ChatPage(props: ChatPageProps) {
queueGatewayConversationActivity(conversationId, false, conversationCwd);
}
}
function finishAbortedBeforeRuntimeStart() {
if (!requestController.signal.aborted) {
return false;
}
gatewayBridgeEvents.queueEvent({
type: "error",
message: "Cancelled",
conversation_id: conversationId,
});
gatewayBridgeEvents.close();
updateToolStatus(null, transcriptStore, isConversationVisible());
clearAbortSnapshot(transcriptStore);
markConversationRunStopped();
requestQueuedChatTurnProcessing(conversationId);
return true;
}
let localGatewayRunStarted = false;
async function markLocalGatewayRunStarted() {
if (!mirrorsLocalRunToGateway || localGatewayRunStarted) {
Expand All @@ -3836,9 +3913,18 @@ export function ChatPage(props: ChatPageProps) {
markConversationRunStarted();
if (mirrorsLocalRunToGateway) {
try {
await markLocalGatewayRunStarted();
await waitForLocalGatewayMirror(markLocalGatewayRunStarted());
} catch (error) {
const message = asErrorMessage(error, "Gateway 本地会话启动同步失败");
setConversationErrorState(message);
gatewayBridgeEvents.emitError(message, conversationId);
gatewayBridgeEvents.close();
markConversationRunStopped();
console.warn("gateway_chat_mark_local_started failed", error);
return true;
}
if (finishAbortedBeforeRuntimeStart()) {
return true;
}
}
if (overrides?.beforeRuntimeStart) {
Expand All @@ -3852,6 +3938,9 @@ export function ChatPage(props: ChatPageProps) {
markConversationRunStopped();
return false;
}
if (finishAbortedBeforeRuntimeStart()) {
return true;
}
}

// Persist the user turn immediately so WebUI/GUI sidebars can surface the
Expand Down Expand Up @@ -3920,7 +4009,22 @@ export function ChatPage(props: ChatPageProps) {
console.warn("gateway stream started before initial user turn was persisted");
}
}
await gatewayBridgeEvents.queueUserMessage(text, uploadedFiles);
try {
await gatewayBridgeEvents.queueUserMessage(text, uploadedFiles);
} catch (error) {
if (!mirrorsLocalRunToGateway) {
throw error;
}
const message = asErrorMessage(error, "Gateway 用户消息同步失败");
setConversationErrorState(message);
gatewayBridgeEvents.emitError(message, conversationId);
gatewayBridgeEvents.close();
markConversationRunStopped();
return true;
}
if (finishAbortedBeforeRuntimeStart()) {
return true;
}
acknowledgeGatewayRunStarted();
let activeCompactionRollback: {
state: ConversationViewState;
Expand Down Expand Up @@ -4217,6 +4321,7 @@ export function ChatPage(props: ChatPageProps) {

function renewRequestController() {
requestController = new AbortController();
gatewayBridgeAbortSignal = requestController.signal;
setConversationAbortController(conversationId, requestController);
return requestController;
}
Expand Down Expand Up @@ -4489,6 +4594,9 @@ export function ChatPage(props: ChatPageProps) {
hookWarning: null,
}));
}
if (finishAbortedBeforeRuntimeStart()) {
return true;
}

try {
if (effectiveIsAgentMode) {
Expand Down
Loading
Loading