From 449c600f3d791327ee918b7a460f8685a535855d Mon Sep 17 00:00:00 2001 From: snowdamiz Date: Fri, 29 May 2026 19:11:11 -0700 Subject: [PATCH 01/11] save --- MANUAL-COMPUTER-USE-OPTIMIZATION-PLAN.md | 325 +++++++++ .../crates/xero-cli/src/remote_cli.rs | 1 + .../crates/xero-cli/src/tui/remote.rs | 2 +- .../crates/xero-remote-bridge/src/lib.rs | 617 ++++++++++++++-- .../src-tauri/src/commands/remote_bridge.rs | 459 ++++++++++-- cloud/src/lib/relay/relay-client.test.ts | 161 +++- cloud/src/lib/relay/relay-client.ts | 686 ++++++++++++++++-- .../src/lib/relay/sessions-shell-context.tsx | 3 + .../src/lib/relay/use-session-stream.test.ts | 22 + cloud/src/lib/relay/use-session-stream.ts | 120 ++- .../src/routes/-desktop-click-ripple.test.tsx | 437 ++++++++++- cloud/src/routes/-sessions-shell.test.tsx | 79 ++ .../sessions.$computerId.$sessionId.tsx | 662 +++++++++++++---- cloud/src/routes/sessions.tsx | 6 + .../ui/src/components/empty-session-state.tsx | 568 ++++++++------- server/lib/xero/application.ex | 1 + .../xero/remote/control_session_registry.ex | 247 +++++++ .../channels/remote_channel_rate_limit.ex | 99 ++- .../channels/remote_session_channel.ex | 440 ++++++++++- server/lib/xero_web/telemetry.ex | 8 + .../xero_web/channels/remote_channel_test.exs | 367 +++++++++- 21 files changed, 4678 insertions(+), 632 deletions(-) create mode 100644 MANUAL-COMPUTER-USE-OPTIMIZATION-PLAN.md create mode 100644 server/lib/xero/remote/control_session_registry.ex diff --git a/MANUAL-COMPUTER-USE-OPTIMIZATION-PLAN.md b/MANUAL-COMPUTER-USE-OPTIMIZATION-PLAN.md new file mode 100644 index 00000000..d82bfe5b --- /dev/null +++ b/MANUAL-COMPUTER-USE-OPTIMIZATION-PLAN.md @@ -0,0 +1,325 @@ +# Manual Computer Use Optimization Plan + +## Reader And Goal + +This plan is for engineers improving Xero's manual computer use path. After reading it, an engineer should be able to implement the reliability work in a safe order without needing the original audit conversation. + +The product goal is a consistent manual control experience with no silent interruptions, even when the network is degraded, relay connections reconnect, WebRTC fails, or the desktop bridge restarts. + +## Target Experience + +- Manual control requests either become active or produce a visible denial. They never appear active before the desktop grants control. +- Clicks, key presses, text input, drag gestures, release requests, and heartbeats are either executed once or visibly rejected. They are never silently lost. +- Pointer movement and stream status can be coalesced under pressure, but critical input and signaling cannot be dropped. +- WebRTC remains the preferred stream path, but the system automatically moves through fallback transports when needed. +- Recovery is automatic after temporary network degradation, token expiry, relay reconnects, sidecar failure, and desktop bridge reconnects. +- The user receives a stable visual state during recovery instead of abrupt pauses, stale frames, or misleading "active" control. + +## Current Strengths + +- WebRTC is already the primary desktop stream path. +- Screenshot fallback, keyframe refresh, stale-frame recovery, and adaptive quality hooks already exist. +- Manual control is protected by desktop-side leases and recurring heartbeats. +- Stream tokens and run identifiers already scope remote commands. +- Drag gesture support exists in the current manual control surface. +- Desktop-side execution already audits manual control acquisition, refresh, input, and release. + +## Main Reliability Gaps + +### Fire-And-Forget Web Commands + +The web relay client sends many commands without waiting for command-level success, failure, or timeout. This affects manual input, manual control requests, heartbeats, WebRTC signaling, stream status, and keyframe requests. + +Impact: + +- Rate-limit failures can be invisible. +- Relay timeouts can look like accepted input. +- Manual control may enter an active-looking state before the desktop grants the lease. +- The UI cannot distinguish "still sending" from "executed" from "rejected." + +### Coarse Server Rate Limiting + +The server currently treats many web-originated command families as one relay event class. Manual input, pointer movement, screenshot fallback polling, keyframes, heartbeats, and signaling can compete for the same rate budget. + +Impact: + +- Degraded streaming can starve manual input. +- Pointer movement can starve lease heartbeats or critical input. +- WebRTC recovery traffic can collide with manual control traffic. +- Network degradation amplifies command loss because retries create more traffic. + +### Optimistic Manual State + +The UI can move into manual mode immediately after requesting control, before the desktop confirms the matching lease. + +Impact: + +- The user can interact while control is not actually held. +- Denials and stale leases can feel like broken input. +- Recovery paths are harder to reason about because "manual" means both requested and active. + +### Weak Delivery Guarantees In The Desktop Bridge + +The desktop bridge has reconnect and replay foundations, but outbound frames can be consumed before a successful relay send. Inbound broadcast lag can also drop commands without a command-aware policy. + +Impact: + +- A reconnect at the wrong moment can lose events. +- Critical commands and best-effort updates are not treated differently. +- Relay recovery depends too heavily on session cursor behavior. + +### Fallback Stream Uses The Control Plane + +Screenshot fallback sends encoded image data through the same broad relay command path used by control messages. + +Impact: + +- Fallback frames are large and expensive. +- Degraded streaming can interfere with manual input. +- The fallback path is especially vulnerable to rate limits and websocket instability. + +### Limited Degraded-Network Test Coverage + +The current test surface covers many local behaviors, but the manual control path needs explicit loss, delay, reconnect, token expiry, rate-limit, and sidecar-failure scenarios. + +Impact: + +- Regressions can ship in the exact conditions this feature must tolerate. +- Optimizations may improve happy-path latency while making recovery less predictable. + +## Implementation Phases + +### Phase 1: Protocol Acknowledgements And Command Envelopes + +Goal: make every important command observable from the UI to the desktop and back. + +Tasks: + +- Add a command envelope with `clientCommandId`, `clientSeq`, `kind`, `priority`, `sentAt`, `dedupeKey`, and optional `expiresAt`. +- Change the web relay client so command pushes return an acknowledgement result instead of fire-and-forget completion. +- Define command outcomes: `accepted`, `executed`, `rejected`, `rate_limited`, `timed_out`, `stale`, and `duplicate`. +- Emit command outcome events back to the sender so the UI can recover even if a push response is missed. +- Add desktop-side dedupe for critical commands by command id. +- Preserve best-effort behavior only for command kinds that are explicitly marked coalescible. + +Acceptance criteria: + +- Manual acquire, release, heartbeat, click, key, text, drag, WebRTC answer, and ICE commands all have visible ack or rejection paths. +- A server-side rejection is surfaced to the requesting UI. +- Duplicate critical commands are not executed twice. + +### Phase 2: Priority Queues And Backpressure + +Goal: protect critical control traffic when the network or relay is degraded. + +Tasks: + +- Introduce command classes: + - critical reliable: acquire, release, heartbeat, click, key, text, drag, signaling. + - reliable idempotent: stream start, stream stop, status request, keyframe request. + - coalesced best-effort: pointer move, quality update, cursor position, repeated status refresh. +- Add a client-side scheduler that prioritizes critical reliable commands. +- Coalesce pointer moves and repeated stream status updates to the newest pending value. +- Add bounded queues with explicit drop policies. +- Add timeout and retry policies by command class. +- Avoid automatic replay of unsafe manual input unless desktop dedupe can prove the command was not executed twice. + +Acceptance criteria: + +- Pointer movement cannot starve clicks, keys, heartbeats, or release. +- Queue drops are explicit, logged, and limited to coalescible commands. +- Critical commands either complete, retry within policy, or visibly fail. + +### Phase 3: Per-Kind Server Rate Limits + +Goal: stop unrelated traffic from starving manual control. + +Tasks: + +- Replace the single broad web command bucket with per-kind token buckets. +- Give critical manual commands a separate burst budget. +- Give pointer movement a higher but coalesced budget. +- Give fallback frame requests, keyframes, status polling, and signaling separate budgets. +- Include structured rate-limit metadata in command rejections. +- Add telemetry for accepted, rejected, delayed, and coalesced command counts by kind. + +Acceptance criteria: + +- Degraded screenshot polling cannot rate-limit manual click/key/drag commands. +- Pointer movement cannot rate-limit heartbeats. +- WebRTC recovery traffic cannot rate-limit manual release. + +### Phase 4: Manual Control State Machine + +Goal: make manual control state honest and recoverable. + +Tasks: + +- Replace the single optimistic manual state with explicit states: + - `manual_idle` + - `manual_requesting` + - `manual_active` + - `manual_reconnecting` + - `manual_denied` + - `manual_releasing` + - `manual_released` +- Only enter `manual_active` after the desktop confirms the matching manual control id. +- Disable manual input while requesting, reconnecting, denied, or releasing. +- Add heartbeat acknowledgements. +- On heartbeat failure, enter `manual_reconnecting`, attempt lease refresh or reacquire, and show a stable recovery state. +- Release control explicitly when the user exits manual control or the session closes. + +Acceptance criteria: + +- The UI never reports active manual control before desktop grant. +- Missed heartbeat recovery does not silently drop into a stale active state. +- Manual control denial is visible and actionable. + +### Phase 5: Desktop Bridge Delivery Hardening + +Goal: keep the bridge reliable across reconnects and local backpressure. + +Tasks: + +- Requeue outbound frames when relay send fails before acknowledgement. +- Keep critical pending frames until acknowledged or expired. +- Persist critical pending state under OS app-data when required for restart recovery. +- Replace lag-prone broadcast handling with a command-aware queue. +- Never drop critical manual or signaling commands due to local lag. +- Coalesce safe high-frequency events before they enter the bridge. +- Add bridge telemetry for reconnects, replay counts, dropped coalescible events, and command ack latency. + +Acceptance criteria: + +- Simulated relay disconnect during click/key/drag delivery does not silently lose the command. +- Bridge restart during an active session results in visible recovery or rejection. +- Local queue pressure cannot drop release or heartbeat commands. + +### Phase 6: Degraded Streaming Transport Ladder + +Goal: keep a useful picture on screen while protecting input reliability. + +Tasks: + +- Define the transport ladder: + - WebRTC video stream. + - WebRTC data-channel still frames. + - Dedicated fallback image channel or chunked media path. + - Slow relay stills as last resort. +- Move high-frequency fallback images away from the manual control command budget. +- Add adaptive fallback interval and quality based on ack latency, frame age, payload size, and network errors. +- Keep trying WebRTC recovery with cooldown and ICE restart while fallback is active. +- Queue ICE candidates until the peer connection is ready. +- Add stream token refresh or rejoin before long sessions hit token expiry. + +Acceptance criteria: + +- WebRTC failure automatically falls back without breaking manual control. +- Fallback streaming cannot starve critical manual commands. +- Long manual sessions recover or refresh before stream authorization expires. + +### Phase 7: Native Input Refinement + +Goal: make manual input feel consistent once delivery is reliable. + +Tasks: + +- Preserve click, scroll, keyboard, paste, and drag behavior behind the reliable command path. +- Add interpolation or duration support for native drag execution where platform APIs need smoother motion. +- Add input ack feedback for click, drag, key, and text actions. +- Keep local visual feedback optimistic but reconcile it with command outcome. +- Avoid replaying unsafe input after uncertain transport unless command dedupe confirms safety. + +Acceptance criteria: + +- Drag works consistently across WebRTC and fallback stream modes. +- Failed input produces visible recovery or rejection. +- Repeated text input respects backpressure and acknowledgement. + +### Phase 8: Observability + +Goal: make interruptions measurable before and after optimization. + +Tasks: + +- Track command ack latency by kind. +- Track command timeout, retry, duplicate, and rejection rates. +- Track manual lease acquisition time, heartbeat latency, missed heartbeat count, and reacquire success. +- Track stream mode transitions, frame age, fallback interval, WebRTC disconnect duration, ICE restart count, and recovery time. +- Track bridge reconnect count, replay count, queue depth, and dropped coalescible events. +- Add dashboard thresholds for manual control interruption, stale frame duration, and critical command loss. + +Acceptance criteria: + +- Engineers can identify whether an interruption came from UI scheduling, server rate limit, relay disconnect, bridge reconnect, desktop execution, WebRTC failure, or token expiry. +- Manual control reliability can be compared before and after rollout. + +### Phase 9: Test Matrix + +Goal: prove the feature works in the conditions it is designed for. + +Tasks: + +- Add web tests for command ack handling, timeout handling, manual state transitions, denial handling, and no-input-before-grant. +- Add server channel tests for per-kind rate limits, structured rejections, sender outcome events, and async authorization behavior. +- Add bridge tests for requeue-on-send-failure, reconnect replay, critical queue preservation, and coalescible drops. +- Add integration tests for: + - websocket disconnect during click. + - websocket disconnect during drag. + - delayed manual grant. + - denied manual grant. + - missed heartbeat and reacquire. + - WebRTC failure with fallback stream. + - fallback stream under low bandwidth. + - token expiry during long manual session. + - sidecar failure during active manual control. +- Add a degraded-network harness that can simulate latency, jitter, packet loss, reconnects, and relay rate limits. + +Acceptance criteria: + +- Critical manual commands are executed once or visibly rejected under simulated degradation. +- Manual control stays recoverable during stream fallback. +- Tests fail if rate limits or reconnects silently drop critical input. + +## Rollout Plan + +1. Add observability behind existing behavior. +2. Add command envelopes and acknowledgements behind a feature flag. +3. Enable per-kind rate limits in shadow mode and compare with existing behavior. +4. Enable the manual state machine for internal testing. +5. Enable bridge delivery hardening. +6. Move degraded fallback streaming off the manual control command budget. +7. Run the degraded-network test matrix. +8. Roll out to all users once interruption metrics improve and critical command loss is zero in test scenarios. + +## Success Metrics + +- Critical manual command loss: zero in automated degraded-network tests. +- Duplicate critical manual execution: zero in automated degraded-network tests. +- Manual acquire success or visible denial: 100 percent of attempts. +- Manual heartbeat recovery: successful reacquire or visible release after simulated transient disconnect. +- Stale frame duration: bounded and visible during degraded streaming. +- WebRTC recovery: automatic fallback and retry without breaking manual control. +- User-facing interruption: measurable decrease after rollout. + +## Non-Goals + +- Backwards compatibility with legacy repo-local state. +- Adding temporary debug UI. +- Replacing the whole remote desktop stack before command reliability is fixed. +- Optimizing visual frame rate at the expense of manual input reliability. +- Hiding degraded network state from the user when control is not truly active. + +## Recommended First Slice + +Start with command acknowledgements and manual state correctness. + +Deliverables: + +- Command envelope type shared by web, server, and desktop bridge. +- Web relay client method that returns structured ack results. +- Server sender outcome event for rejected or rate-limited commands. +- Manual request flow that enters `manual_requesting` first and only enters `manual_active` after desktop grant. +- Tests for grant, denial, timeout, and no-input-before-grant. + +This slice gives the team a reliable signal for every later optimization. Once command outcomes are visible, rate limits, bridge reconnects, fallback stream pressure, and WebRTC recovery can be optimized without guessing where interruptions come from. diff --git a/client/src-tauri/crates/xero-cli/src/remote_cli.rs b/client/src-tauri/crates/xero-cli/src/remote_cli.rs index 730f1ca4..81484667 100644 --- a/client/src-tauri/crates/xero-cli/src/remote_cli.rs +++ b/client/src-tauri/crates/xero-cli/src/remote_cli.rs @@ -806,6 +806,7 @@ fn map_bridge_error(error: BridgeError) -> CliError { | BridgeError::Decode(_) | BridgeError::Json(_) | BridgeError::MissingServerField(_) + | BridgeError::OutboundQueueFull { .. } | BridgeError::LockPoisoned => { CliError::system_fault("xero_cli_remote_bridge_failed", error.to_string()) } diff --git a/client/src-tauri/crates/xero-cli/src/tui/remote.rs b/client/src-tauri/crates/xero-cli/src/tui/remote.rs index 5e8a940b..67f48e22 100644 --- a/client/src-tauri/crates/xero-cli/src/tui/remote.rs +++ b/client/src-tauri/crates/xero-cli/src/tui/remote.rs @@ -290,7 +290,7 @@ impl TuiRemoteBridgeState { return Ok(()); } - let mut inbound = bridge.subscribe_inbound(); + let inbound = bridge.subscribe_inbound().map_err(map_bridge_error)?; let globals = globals.clone(); let handle = thread::spawn(move || { while !shutdown.load(Ordering::Relaxed) { diff --git a/client/src-tauri/crates/xero-remote-bridge/src/lib.rs b/client/src-tauri/crates/xero-remote-bridge/src/lib.rs index 5a882d22..642e3be6 100644 --- a/client/src-tauri/crates/xero-remote-bridge/src/lib.rs +++ b/client/src-tauri/crates/xero-remote-bridge/src/lib.rs @@ -1,12 +1,11 @@ use std::{ - collections::{BTreeSet, HashMap}, + collections::{BTreeSet, HashMap, VecDeque}, ffi::OsString, fs, path::PathBuf, sync::{ - atomic::{AtomicBool, Ordering}, - mpsc::{self, TryRecvError}, - Arc, Mutex, + atomic::{AtomicBool, AtomicU64, Ordering}, + mpsc, Arc, Mutex, }, thread, time::{Duration, Instant}, @@ -17,7 +16,6 @@ use reqwest::blocking::Client; use serde::{Deserialize, Serialize}; use serde_json::{json, Value as JsonValue}; use time::OffsetDateTime; -use tokio::sync::broadcast; use tungstenite::{connect, stream::MaybeTlsStream, Message, WebSocket}; use url::Url; @@ -76,6 +74,11 @@ pub enum BridgeError { UnsupportedUrlScheme(String), #[error("remote bridge server response is missing `{0}`")] MissingServerField(&'static str), + #[error("remote bridge outbound queue `{priority}` is full at {limit} pending frames")] + OutboundQueueFull { + priority: &'static str, + limit: usize, + }, #[error("remote bridge lock was poisoned")] LockPoisoned, } @@ -323,10 +326,23 @@ pub struct BridgeStatus { pub signed_in: bool, pub account: Option, pub devices: Vec, + pub telemetry: BridgeTelemetry, #[serde(skip_serializing_if = "Option::is_none")] pub devices_error: Option, } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub struct BridgeTelemetry { + pub reconnect_count: u64, + pub replay_count: u64, + pub requeued_outbound_count: u64, + pub coalesced_outbound_count: u64, + pub dropped_coalescible_outbound_count: u64, + pub expired_outbound_count: u64, + pub outbound_queue_depth: usize, +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "camelCase")] pub struct AuthStatus { @@ -466,9 +482,29 @@ pub struct InboundCommand { pub session_id: Option, pub kind: InboundCommandKind, pub device_id: String, + #[serde(default, alias = "clientCommandId")] + pub client_command_id: Option, + #[serde(default, alias = "clientSeq")] + pub client_seq: Option, + #[serde(default)] + pub priority: Option, + #[serde(default, alias = "sentAt")] + pub sent_at: Option, + #[serde(default, alias = "dedupeKey")] + pub dedupe_key: Option, + #[serde(default, alias = "expiresAt")] + pub expires_at: Option, pub payload: JsonValue, } +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum InboundCommandPriority { + CriticalReliable, + ReliableIdempotent, + CoalescedBestEffort, +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct PhoenixMessage( pub Option, @@ -756,9 +792,15 @@ pub struct RemoteBridge { connected: AtomicBool, seq_by_session: Mutex>, replay_by_session: Mutex>>, - outbound_tx: mpsc::Sender, - outbound_rx: Mutex>, - inbound_tx: broadcast::Sender, + outbound_queue: Mutex>, + inbound_tx: mpsc::SyncSender, + inbound_rx: Mutex>>, + reconnect_count: AtomicU64, + replay_count: AtomicU64, + requeued_outbound_count: AtomicU64, + coalesced_outbound_count: AtomicU64, + dropped_coalescible_outbound_count: AtomicU64, + expired_outbound_count: AtomicU64, } #[derive(Debug, Clone)] @@ -792,6 +834,37 @@ enum OutboundFrame { }, } +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +enum OutboundPriority { + CoalescedBestEffort = 0, + ReliableIdempotent = 1, + CriticalReliable = 2, +} + +#[derive(Debug, Clone, PartialEq)] +struct QueuedOutboundFrame { + priority: OutboundPriority, + coalesce_key: Option, + expires_at: Option, + frame: OutboundFrame, +} + +impl QueuedOutboundFrame { + fn new(priority: OutboundPriority, frame: OutboundFrame) -> Self { + let now = Instant::now(); + Self { + priority, + coalesce_key: outbound_frame_coalesce_key(&frame, priority), + expires_at: outbound_frame_expires_at(&frame, priority, now), + frame, + } + } + + fn is_expired(&self, now: Instant) -> bool { + matches!(self.expires_at, Some(expires_at) if expires_at <= now) + } +} + #[derive(Debug, Clone, PartialEq, Eq)] pub struct DesktopBridgeLoopOptions { pub heartbeat_interval: Duration, @@ -811,6 +884,10 @@ const CONTROL_SESSION_IDS: [&str; 4] = ["__sessions__", "__projects__", "__new__ const MAX_SESSION_REPLAY_FRAMES: usize = 512; const RELAY_TOKEN_REFRESH_SKEW_SECONDS: i64 = 120; const ACCOUNT_DEVICES_CACHE_TTL: Duration = Duration::from_secs(5); +const OUTBOUND_MAX_CRITICAL_QUEUE: usize = 4096; +const OUTBOUND_MAX_RELIABLE_QUEUE: usize = 2048; +const OUTBOUND_MAX_COALESCED_QUEUE: usize = 256; +const OUTBOUND_COALESCED_TTL: Duration = Duration::from_secs(10); #[derive(Debug, Clone, PartialEq, Eq)] enum RelayTokenRefreshAuth { @@ -823,8 +900,7 @@ where I: IdentityStore, { pub fn new(config: BridgeConfig, identity_store: I) -> Self { - let (inbound_tx, _inbound_rx) = broadcast::channel(256); - let (outbound_tx, outbound_rx) = mpsc::channel(); + let (inbound_tx, inbound_rx) = mpsc::sync_channel(4096); Self { config, identity_store, @@ -833,9 +909,15 @@ where connected: AtomicBool::new(false), seq_by_session: Mutex::new(HashMap::new()), replay_by_session: Mutex::new(HashMap::new()), - outbound_tx, - outbound_rx: Mutex::new(outbound_rx), + outbound_queue: Mutex::new(VecDeque::new()), inbound_tx, + inbound_rx: Mutex::new(Some(inbound_rx)), + reconnect_count: AtomicU64::new(0), + replay_count: AtomicU64::new(0), + requeued_outbound_count: AtomicU64::new(0), + coalesced_outbound_count: AtomicU64::new(0), + dropped_coalescible_outbound_count: AtomicU64::new(0), + expired_outbound_count: AtomicU64::new(0), } } @@ -868,10 +950,29 @@ where signed_in, account, devices, + telemetry: self.telemetry_snapshot()?, devices_error, }) } + fn telemetry_snapshot(&self) -> BridgeResult { + Ok(BridgeTelemetry { + reconnect_count: self.reconnect_count.load(Ordering::Relaxed), + replay_count: self.replay_count.load(Ordering::Relaxed), + requeued_outbound_count: self.requeued_outbound_count.load(Ordering::Relaxed), + coalesced_outbound_count: self.coalesced_outbound_count.load(Ordering::Relaxed), + dropped_coalescible_outbound_count: self + .dropped_coalescible_outbound_count + .load(Ordering::Relaxed), + expired_outbound_count: self.expired_outbound_count.load(Ordering::Relaxed), + outbound_queue_depth: self + .outbound_queue + .lock() + .map_err(|_| BridgeError::LockPoisoned)? + .len(), + }) + } + pub fn sign_in_with_github(&self) -> BridgeResult { self.sign_in_with_github_kind("desktop") } @@ -1084,8 +1185,12 @@ where Ok(()) } - pub fn subscribe_inbound(&self) -> broadcast::Receiver { - self.inbound_tx.subscribe() + pub fn subscribe_inbound(&self) -> BridgeResult> { + self.inbound_rx + .lock() + .map_err(|_| BridgeError::LockPoisoned)? + .take() + .ok_or(BridgeError::MissingServerField("inbound_rx")) } pub fn authorize_session_join( @@ -1096,14 +1201,16 @@ where authorized: bool, run_id: Option<&str>, ) -> BridgeResult<()> { - let _ = self.outbound_tx.send(OutboundFrame::SessionAuthorization { - join_ref: join_ref.to_owned(), - auth_topic: auth_topic.to_owned(), - session_id: session_id.to_owned(), - authorized, - run_id: run_id.map(ToOwned::to_owned), - }); - Ok(()) + self.enqueue_outbound(QueuedOutboundFrame::new( + OutboundPriority::CriticalReliable, + OutboundFrame::SessionAuthorization { + join_ref: join_ref.to_owned(), + auth_topic: auth_topic.to_owned(), + session_id: session_id.to_owned(), + authorized, + run_id: run_id.map(ToOwned::to_owned), + }, + )) } pub fn forward( @@ -1219,6 +1326,7 @@ where for envelope in replay { self.enqueue_envelope(&envelope)?; } + self.replay_count.fetch_add(count as u64, Ordering::Relaxed); Ok(count) } @@ -1269,6 +1377,7 @@ where } Err(_error) if !shutdown.load(Ordering::Relaxed) => { self.connected.store(false, Ordering::Relaxed); + self.reconnect_count.fetch_add(1, Ordering::Relaxed); thread::sleep(backoff.next_jittered_delay()); } Err(error) => { @@ -1388,13 +1497,89 @@ where fn enqueue_envelope(&self, envelope: &RuntimeEnvelope) -> BridgeResult<()> { let payload = encode_relay_frame_payload(envelope)?; - let _ = self.outbound_tx.send(OutboundFrame::SessionFrame { - session_id: envelope.session_id.clone(), - payload, - }); + self.enqueue_outbound(QueuedOutboundFrame::new( + outbound_priority_for_payload(&payload), + OutboundFrame::SessionFrame { + session_id: envelope.session_id.clone(), + payload, + }, + )) + } + + fn enqueue_outbound(&self, queued: QueuedOutboundFrame) -> BridgeResult<()> { + let mut queue = self + .outbound_queue + .lock() + .map_err(|_| BridgeError::LockPoisoned)?; + let now = Instant::now(); + self.prune_expired_outbound_locked(&mut queue, now); + if queued.is_expired(now) { + self.expired_outbound_count.fetch_add(1, Ordering::Relaxed); + return Ok(()); + } + if let Some(coalesce_key) = queued.coalesce_key.as_deref() { + if let Some(position) = queue + .iter() + .position(|existing| existing.coalesce_key.as_deref() == Some(coalesce_key)) + { + queue.remove(position); + self.coalesced_outbound_count + .fetch_add(1, Ordering::Relaxed); + } + } + self.enforce_outbound_queue_limit_locked(&mut queue, queued.priority)?; + let insert_at = queue + .iter() + .position(|existing| existing.priority < queued.priority) + .unwrap_or(queue.len()); + queue.insert(insert_at, queued); Ok(()) } + fn enforce_outbound_queue_limit_locked( + &self, + queue: &mut VecDeque, + priority: OutboundPriority, + ) -> BridgeResult<()> { + let limit = outbound_queue_limit(priority); + let count = queue + .iter() + .filter(|queued| queued.priority == priority) + .count(); + if count < limit { + return Ok(()); + } + if priority == OutboundPriority::CoalescedBestEffort { + if let Some(position) = queue + .iter() + .position(|queued| queued.priority == OutboundPriority::CoalescedBestEffort) + { + queue.remove(position); + self.dropped_coalescible_outbound_count + .fetch_add(1, Ordering::Relaxed); + return Ok(()); + } + } + Err(BridgeError::OutboundQueueFull { + priority: outbound_priority_name(priority), + limit, + }) + } + + fn prune_expired_outbound_locked( + &self, + queue: &mut VecDeque, + now: Instant, + ) { + let before = queue.len(); + queue.retain(|queued| !queued.is_expired(now)); + let expired = before.saturating_sub(queue.len()); + if expired > 0 { + self.expired_outbound_count + .fetch_add(expired as u64, Ordering::Relaxed); + } + } + fn run_desktop_once( &self, shutdown: &AtomicBool, @@ -1463,6 +1648,12 @@ where session_id: Some(session_id.to_owned()), kind: InboundCommandKind::AuthorizeSessionJoin, device_id: web_device_id.to_owned(), + client_command_id: None, + client_seq: None, + priority: None, + sent_at: None, + dedupe_key: None, + expires_at: None, payload: json!({ "joinRef": join_ref, "authTopic": auth_topic, @@ -1484,6 +1675,12 @@ where session_id: Some(session_id.to_owned()), kind: InboundCommandKind::SessionAttached, device_id: web_device_id.to_owned(), + client_command_id: None, + client_seq: None, + priority: None, + sent_at: None, + dedupe_key: None, + expires_at: None, payload: json!({ "lastSeq": last_seq }), }); } @@ -1515,17 +1712,21 @@ where joined_sessions: &mut BTreeSet, ) -> BridgeResult<()> { loop { - let frame = { - let receiver = self - .outbound_rx + let queued = { + let mut queue = self + .outbound_queue .lock() .map_err(|_| BridgeError::LockPoisoned)?; - match receiver.try_recv() { - Ok(frame) => frame, - Err(TryRecvError::Empty) | Err(TryRecvError::Disconnected) => break, - } + queue.pop_front() }; - match frame { + let Some(queued) = queued else { + break; + }; + if queued.is_expired(Instant::now()) { + self.expired_outbound_count.fetch_add(1, Ordering::Relaxed); + continue; + } + let result = match &queued.frame { OutboundFrame::SessionFrame { session_id, payload, @@ -1533,7 +1734,7 @@ where if joined_sessions.insert(session_id.clone()) { connection.join_session(&session_id)?; } - connection.push_session_frame(&session_id, payload)?; + connection.push_session_frame(&session_id, payload.clone()) } OutboundFrame::SessionAuthorization { join_ref, @@ -1542,20 +1743,40 @@ where authorized, run_id, } => { - connection.authorize_session_join( + let result = connection.authorize_session_join( &join_ref, &auth_topic, - authorized, + *authorized, run_id.as_deref(), - )?; - if authorized && joined_sessions.insert(session_id.clone()) { + ); + if result.is_ok() && *authorized && joined_sessions.insert(session_id.clone()) { connection.join_session(&session_id)?; } + result } + }; + if let Err(error) = result { + self.requeue_outbound_front(queued)?; + return Err(error); } } Ok(()) } + + fn requeue_outbound_front(&self, queued: QueuedOutboundFrame) -> BridgeResult<()> { + if queued.is_expired(Instant::now()) { + self.expired_outbound_count.fetch_add(1, Ordering::Relaxed); + return Ok(()); + } + self.requeued_outbound_count.fetch_add(1, Ordering::Relaxed); + let mut queue = self + .outbound_queue + .lock() + .map_err(|_| BridgeError::LockPoisoned)?; + self.prune_expired_outbound_locked(&mut queue, Instant::now()); + queue.push_front(queued); + Ok(()) + } } fn decode_http(response: reqwest::blocking::Response) -> BridgeResult { @@ -1720,6 +1941,132 @@ fn is_control_session_id(session_id: &str) -> bool { CONTROL_SESSION_IDS.contains(&session_id) } +fn outbound_priority_for_payload(payload: &JsonValue) -> OutboundPriority { + let Some(encoded) = payload.get("envelope").and_then(JsonValue::as_str) else { + return OutboundPriority::ReliableIdempotent; + }; + let Ok(bytes) = URL_SAFE_NO_PAD.decode(encoded) else { + return OutboundPriority::ReliableIdempotent; + }; + let Ok(envelope) = decode_envelope(&bytes) else { + return OutboundPriority::ReliableIdempotent; + }; + let schema = envelope + .payload + .get("schema") + .and_then(JsonValue::as_str) + .unwrap_or_default(); + if schema.starts_with("xero.computer_use_manual_control_") + || schema == "xero.computer_use_stream_offer.v1" + || schema == "xero.computer_use_stream_answer.v1" + || schema == "xero.computer_use_stream_ice_candidate.v1" + || schema == "xero.remote_command_result.v1" + { + return OutboundPriority::CriticalReliable; + } + if schema == "xero.computer_use_stream_status.v1" + || schema == "xero.computer_use_stream_set_quality.v1" + || schema == "xero.computer_use_stream_frame.v1" + { + return OutboundPriority::CoalescedBestEffort; + } + OutboundPriority::ReliableIdempotent +} + +fn outbound_queue_limit(priority: OutboundPriority) -> usize { + match priority { + OutboundPriority::CriticalReliable => OUTBOUND_MAX_CRITICAL_QUEUE, + OutboundPriority::ReliableIdempotent => OUTBOUND_MAX_RELIABLE_QUEUE, + OutboundPriority::CoalescedBestEffort => OUTBOUND_MAX_COALESCED_QUEUE, + } +} + +fn outbound_priority_name(priority: OutboundPriority) -> &'static str { + match priority { + OutboundPriority::CriticalReliable => "critical_reliable", + OutboundPriority::ReliableIdempotent => "reliable_idempotent", + OutboundPriority::CoalescedBestEffort => "coalesced_best_effort", + } +} + +fn outbound_frame_coalesce_key( + frame: &OutboundFrame, + priority: OutboundPriority, +) -> Option { + if priority != OutboundPriority::CoalescedBestEffort { + return None; + } + let OutboundFrame::SessionFrame { + session_id, + payload, + } = frame + else { + return None; + }; + let envelope = decoded_relay_payload_envelope(payload)?; + let schema = envelope + .payload + .get("schema") + .and_then(JsonValue::as_str) + .unwrap_or_default(); + let stream_id = envelope + .payload + .get("streamId") + .or_else(|| envelope.payload.get("stream_id")) + .and_then(JsonValue::as_str) + .unwrap_or_default(); + Some(format!("{session_id}:{schema}:{stream_id}")) +} + +fn outbound_frame_expires_at( + frame: &OutboundFrame, + priority: OutboundPriority, + now: Instant, +) -> Option { + let explicit_expires_at = match frame { + OutboundFrame::SessionFrame { payload, .. } => decoded_relay_payload_envelope(payload) + .and_then(|envelope| { + envelope + .payload + .get("expiresAt") + .or_else(|| envelope.payload.get("expires_at")) + .and_then(json_unix_millis) + .and_then(|millis| instant_from_unix_millis(millis, now)) + }), + OutboundFrame::SessionAuthorization { .. } => None, + }; + explicit_expires_at.or_else(|| { + (priority == OutboundPriority::CoalescedBestEffort) + .then(|| now.checked_add(OUTBOUND_COALESCED_TTL)) + .flatten() + }) +} + +fn decoded_relay_payload_envelope(payload: &JsonValue) -> Option { + let encoded = payload.get("envelope").and_then(JsonValue::as_str)?; + let bytes = URL_SAFE_NO_PAD.decode(encoded).ok()?; + decode_envelope(&bytes).ok() +} + +fn json_unix_millis(value: &JsonValue) -> Option { + if let Some(value) = value.as_i64() { + return Some(value as i128); + } + if let Some(value) = value.as_u64() { + return Some(value as i128); + } + value.as_str()?.parse::().ok() +} + +fn instant_from_unix_millis(millis: i128, now: Instant) -> Option { + let current_millis = OffsetDateTime::now_utc().unix_timestamp_nanos() / 1_000_000; + if millis <= current_millis { + return Some(now); + } + let delta = u64::try_from(millis - current_millis).ok()?; + now.checked_add(Duration::from_millis(delta)) +} + fn session_id_from_topic(topic: &str) -> Option { topic .rsplit_once(':') @@ -2281,13 +2628,176 @@ mod tests { assert_eq!(relay_payloads.len(), 2); assert_eq!(relay_payloads[0]["encoding"], "msgpack.base64url"); - let receiver = bridge.outbound_rx.lock().expect("outbound lock"); - let queued_first = receiver.try_recv().expect("queued first"); - let queued_second = receiver.try_recv().expect("queued second"); + let mut queue = bridge.outbound_queue.lock().expect("outbound lock"); + let queued_first = queue.pop_front().expect("queued first").frame; + let queued_second = queue.pop_front().expect("queued second").frame; assert_session_frame(&queued_first, "session-1", 1); assert_session_frame(&queued_second, "session-1", 2); } + #[test] + fn bridge_outbound_queue_prioritizes_critical_manual_frames() { + let temp = tempfile_path("bridge-priority"); + let identity_store = FileIdentityStore::new(temp.join("identity.json")); + identity_store.save(&test_identity()).expect("identity"); + let bridge = RemoteBridge::new(BridgeConfig::local_default(), identity_store); + + bridge + .forward( + "session-1", + json!({ + "schema": "xero.computer_use_stream_status.v1", + "ok": true, + }), + ) + .expect("status forward"); + bridge + .forward( + "session-1", + json!({"schema": "xero.remote_runtime_event.v1"}), + ) + .expect("runtime forward"); + bridge + .forward( + "session-1", + json!({ + "schema": "xero.computer_use_manual_control_input.v1", + "outcome": "executed", + }), + ) + .expect("manual forward"); + assert_eq!( + bridge + .telemetry_snapshot() + .expect("telemetry") + .outbound_queue_depth, + 3 + ); + + let mut queue = bridge.outbound_queue.lock().expect("outbound lock"); + let first = queue.pop_front().expect("first").frame; + let second = queue.pop_front().expect("second").frame; + let third = queue.pop_front().expect("third").frame; + + assert_eq!( + queued_session_frame_schema(&first), + "xero.computer_use_manual_control_input.v1" + ); + assert_eq!( + queued_session_frame_schema(&second), + "xero.remote_runtime_event.v1" + ); + assert_eq!( + queued_session_frame_schema(&third), + "xero.computer_use_stream_status.v1" + ); + } + + #[test] + fn bridge_outbound_queue_coalesces_best_effort_stream_frames() { + let temp = tempfile_path("bridge-coalesced"); + let identity_store = FileIdentityStore::new(temp.join("identity.json")); + identity_store.save(&test_identity()).expect("identity"); + let bridge = RemoteBridge::new(BridgeConfig::local_default(), identity_store); + + bridge + .forward( + "session-1", + json!({ + "schema": "xero.computer_use_stream_status.v1", + "streamId": "stream-1", + "status": "degraded", + }), + ) + .expect("first status"); + bridge + .forward( + "session-1", + json!({ + "schema": "xero.computer_use_stream_status.v1", + "streamId": "stream-1", + "status": "live", + }), + ) + .expect("second status"); + + let telemetry = bridge.telemetry_snapshot().expect("telemetry"); + assert_eq!(telemetry.outbound_queue_depth, 1); + assert_eq!(telemetry.coalesced_outbound_count, 1); + + let mut queue = bridge.outbound_queue.lock().expect("outbound lock"); + let queued = queue.pop_front().expect("queued latest").frame; + let envelope = queued_session_frame_envelope(&queued); + assert_eq!( + envelope.payload["schema"], + "xero.computer_use_stream_status.v1" + ); + assert_eq!(envelope.payload["status"], "live"); + } + + #[test] + fn bridge_outbound_queue_expires_stale_best_effort_frames() { + let temp = tempfile_path("bridge-expired"); + let identity_store = FileIdentityStore::new(temp.join("identity.json")); + identity_store.save(&test_identity()).expect("identity"); + let bridge = RemoteBridge::new(BridgeConfig::local_default(), identity_store); + let expired_at = OffsetDateTime::now_utc().unix_timestamp_nanos() / 1_000_000 - 1; + + bridge + .forward( + "session-1", + json!({ + "schema": "xero.computer_use_stream_status.v1", + "streamId": "stream-1", + "expiresAt": expired_at, + }), + ) + .expect("expired status"); + + let telemetry = bridge.telemetry_snapshot().expect("telemetry"); + assert_eq!(telemetry.outbound_queue_depth, 0); + assert_eq!(telemetry.expired_outbound_count, 1); + } + + #[test] + fn bridge_outbound_requeue_preserves_unsent_frame() { + let temp = tempfile_path("bridge-requeue"); + let identity_store = FileIdentityStore::new(temp.join("identity.json")); + identity_store.save(&test_identity()).expect("identity"); + let bridge = RemoteBridge::new(BridgeConfig::local_default(), identity_store); + + bridge + .forward( + "session-1", + json!({ + "schema": "xero.computer_use_manual_control_input.v1", + "outcome": "executed", + }), + ) + .expect("manual outcome"); + + let queued = bridge + .outbound_queue + .lock() + .expect("outbound lock") + .pop_front() + .expect("queued frame"); + bridge + .requeue_outbound_front(queued) + .expect("requeue unsent frame"); + + let telemetry = bridge.telemetry_snapshot().expect("telemetry"); + assert_eq!(telemetry.requeued_outbound_count, 1); + assert_eq!(telemetry.outbound_queue_depth, 1); + + let mut queue = bridge.outbound_queue.lock().expect("outbound lock"); + let queued = queue.pop_front().expect("requeued frame").frame; + assert_eq!( + queued_session_frame_schema(&queued), + "xero.computer_use_manual_control_input.v1" + ); + } + #[test] fn snapshot_emits_snapshot_envelope_when_registered() { let temp = tempfile_path("bridge-snapshot"); @@ -2355,8 +2865,8 @@ mod tests { .expect("removed payload"); assert_eq!(payload["kind"], "session_removed"); - let receiver = bridge.outbound_rx.lock().expect("outbound lock"); - let queued = receiver.try_recv().expect("queued removal"); + let mut queue = bridge.outbound_queue.lock().expect("outbound lock"); + let queued = queue.pop_front().expect("queued removal").frame; let (session_id, payload) = queued_session_frame(&queued); assert_eq!(session_id, "__sessions__"); assert_eq!(payload["kind"], "session_removed"); @@ -2423,6 +2933,25 @@ mod tests { } } + fn queued_session_frame_schema(frame: &OutboundFrame) -> String { + queued_session_frame_envelope(frame) + .payload + .get("schema") + .and_then(JsonValue::as_str) + .expect("schema") + .to_owned() + } + + fn queued_session_frame_envelope(frame: &OutboundFrame) -> RuntimeEnvelope { + let (_session_id, payload) = queued_session_frame(frame); + let encoded = payload + .get("envelope") + .and_then(JsonValue::as_str) + .expect("encoded envelope"); + let bytes = URL_SAFE_NO_PAD.decode(encoded).expect("base64 envelope"); + decode_envelope(&bytes).expect("decode envelope") + } + fn tempfile_path(name: &str) -> PathBuf { let unique = format!( "xero-remote-bridge-{name}-{}", diff --git a/client/src-tauri/src/commands/remote_bridge.rs b/client/src-tauri/src/commands/remote_bridge.rs index f6f43446..b6fdbec2 100644 --- a/client/src-tauri/src/commands/remote_bridge.rs +++ b/client/src-tauri/src/commands/remote_bridge.rs @@ -1,9 +1,10 @@ use std::{ - collections::BTreeSet, + collections::{BTreeSet, HashSet, VecDeque}, fs, path::Path, sync::{ atomic::{AtomicBool, Ordering}, + mpsc::TryRecvError as InboundTryRecvError, Arc, Mutex, OnceLock, }, thread, @@ -13,11 +14,10 @@ use std::{ use serde::{Deserialize, Serialize}; use serde_json::{json, Map as JsonMap, Value as JsonValue}; use tauri::{AppHandle, Emitter, Manager, Runtime, State}; -use tokio::sync::broadcast::error::TryRecvError as BroadcastTryRecvError; use xero_remote_bridge::{ AccountDevice, AuthStatus, BridgeAccount, BridgeConfig, BridgeError, BridgeResult, - DesktopBridgeLoopOptions, FileIdentityStore, IdentityStore, InboundCommand, InboundCommandKind, - RemoteBridge, + BridgeTelemetry, DesktopBridgeLoopOptions, FileIdentityStore, IdentityStore, InboundCommand, + InboundCommandKind, RemoteBridge, }; use crate::{ @@ -70,7 +70,8 @@ use crate::{ AutonomousDesktopScreenshot, AutonomousDesktopSessionDescription, AutonomousDesktopStreamAction, AutonomousDesktopStreamQuality, AutonomousDesktopStreamRequest, AutonomousDesktopStreamTransport, - AutonomousDesktopToolOutput, AutonomousToolOutput, AutonomousToolRuntime, + AutonomousDesktopToolOutput, AutonomousDesktopToolStatus, AutonomousToolOutput, + AutonomousToolRuntime, }, state::DesktopState, }; @@ -88,6 +89,8 @@ const COMPOSER_SETTINGS_VERSION: u64 = 1; const PROJECT_REMOTE_SESSION_ID_PREFIX: &str = "project:"; const STREAM_FALLBACK_FRAME_MAX_BYTES: usize = 5 * 1024 * 1024; const STREAM_FALLBACK_JPEG_QUALITY: u8 = 74; +const COMMAND_OUTCOME_SCHEMA: &str = "xero.remote_command_outcome.v1"; +const MAX_DEDUPE_COMMAND_IDS: usize = 2048; type AppRemoteBridge = RemoteBridge; @@ -108,6 +111,7 @@ pub struct BridgeStatusResponseDto { pub signed_in: bool, pub account: Option, pub devices: Vec, + pub telemetry: BridgeTelemetry, #[serde(default, skip_serializing_if = "Option::is_none")] pub devices_error: Option, } @@ -149,6 +153,7 @@ pub fn bridge_status( signed_in: status.signed_in, account: status.account, devices: status.devices, + telemetry: status.telemetry, devices_error: status.devices_error, }) } @@ -555,7 +560,7 @@ impl RemoteBridgeRuntimeState { return Ok(()); } - let mut inbound = bridge.subscribe_inbound(); + let inbound = bridge.subscribe_inbound().map_err(map_bridge_error)?; let app = app.clone(); let state = state.clone(); let handle = thread::spawn(move || { @@ -568,11 +573,10 @@ impl RemoteBridgeRuntimeState { eprintln!("[remote-bridge] inbound command failed: {error}"); } } - Err(BroadcastTryRecvError::Empty) => { + Err(InboundTryRecvError::Empty) => { thread::sleep(Duration::from_millis(100)); } - Err(BroadcastTryRecvError::Lagged(_)) => {} - Err(BroadcastTryRecvError::Closed) => break, + Err(InboundTryRecvError::Disconnected) => break, } } }); @@ -618,7 +622,16 @@ fn handle_inbound_command( .as_deref() .unwrap_or("__sessions__") .to_string(); - let result = route_inbound_command(app, state, Arc::clone(&bridge), command); + if command_is_duplicate(&command) { + bridge + .forward_control_event( + &response_session, + command_outcome_payload(&command, "duplicate", Some("duplicate_command")), + ) + .map_err(map_bridge_error)?; + return Ok(()); + } + let result = route_inbound_command(app, state, Arc::clone(&bridge), command.clone()); if let Err(error) = &result { let _ = bridge.forward_control_event( &response_session, @@ -629,9 +642,220 @@ fn handle_inbound_command( }), ); } + if let Err(error) = &result { + let _ = bridge.forward_control_event( + &response_session, + command_outcome_payload(&command, "rejected", Some(error.code.as_str())), + ); + } result.map(|_| ()) } +#[derive(Default)] +struct CommandDedupeState { + seen: HashSet, + order: VecDeque, +} + +fn command_is_duplicate(command: &InboundCommand) -> bool { + if !is_critical_command(&command.kind) { + return false; + } + let Some(key) = command_dedupe_key(command) else { + return false; + }; + let Ok(mut state) = command_dedupe_state().lock() else { + return false; + }; + if state.seen.contains(&key) { + return true; + } + state.seen.insert(key.clone()); + state.order.push_back(key); + while state.order.len() > MAX_DEDUPE_COMMAND_IDS { + if let Some(expired) = state.order.pop_front() { + state.seen.remove(&expired); + } + } + false +} + +fn command_dedupe_state() -> &'static Mutex { + static STATE: OnceLock> = OnceLock::new(); + STATE.get_or_init(|| Mutex::new(CommandDedupeState::default())) +} + +fn command_dedupe_key(command: &InboundCommand) -> Option { + command + .client_command_id + .as_deref() + .or(command.dedupe_key.as_deref()) + .map(str::trim) + .filter(|value| !value.is_empty()) + .map(ToOwned::to_owned) +} + +fn is_critical_command(kind: &InboundCommandKind) -> bool { + matches!( + kind, + InboundCommandKind::ComputerUseManualControlRequest + | InboundCommandKind::ComputerUseManualControlGrant + | InboundCommandKind::ComputerUseManualControlHeartbeat + | InboundCommandKind::ComputerUseManualControlInput + | InboundCommandKind::ComputerUseManualControlRelease + | InboundCommandKind::ComputerUseStreamOffer + | InboundCommandKind::ComputerUseStreamAnswer + | InboundCommandKind::ComputerUseStreamIceCandidate + ) +} + +fn command_outcome_payload( + command: &InboundCommand, + outcome: &str, + reason: Option<&str>, +) -> JsonValue { + let mut payload = json!({ + "schema": COMMAND_OUTCOME_SCHEMA, + "clientCommandId": command.client_command_id.as_deref(), + "clientSeq": command.client_seq, + "kind": command_kind_wire_value(&command.kind), + "outcome": outcome, + "priority": command_priority_wire_value(command), + "reason": reason, + "sentAt": command.sent_at.clone(), + "dedupeKey": command.dedupe_key.as_deref(), + "expiresAt": command.expires_at.clone(), + "receivedAt": crate::auth::now_timestamp(), + }); + attach_command_context(&mut payload, command, None); + payload +} + +fn attach_command_context( + payload: &mut JsonValue, + command: &InboundCommand, + outcome: Option<&str>, +) { + let Some(object) = payload.as_object_mut() else { + return; + }; + object.insert( + "clientCommandId".into(), + command + .client_command_id + .as_deref() + .map(JsonValue::from) + .unwrap_or(JsonValue::Null), + ); + object.insert("clientSeq".into(), json!(command.client_seq)); + object.insert( + "kind".into(), + JsonValue::from(command_kind_wire_value(&command.kind)), + ); + object.insert( + "priority".into(), + JsonValue::from(command_priority_wire_value(command)), + ); + object.insert( + "sentAt".into(), + command.sent_at.clone().unwrap_or(JsonValue::Null), + ); + object.insert( + "dedupeKey".into(), + command + .dedupe_key + .as_deref() + .map(JsonValue::from) + .unwrap_or(JsonValue::Null), + ); + object.insert( + "expiresAt".into(), + command.expires_at.clone().unwrap_or(JsonValue::Null), + ); + if let Some(outcome) = outcome { + object.insert("outcome".into(), JsonValue::from(outcome)); + } + if let Some(stream_id) = payload_string(&command.payload, &["streamId", "stream_id"]) { + object.insert("streamId".into(), JsonValue::from(stream_id)); + } + if let Some(manual_control_id) = + payload_string(&command.payload, &["manualControlId", "manual_control_id"]) + { + object.insert("manualControlId".into(), JsonValue::from(manual_control_id)); + } +} + +fn command_outcome_for_desktop_output( + output: Option<&AutonomousDesktopToolOutput>, +) -> &'static str { + match output.map(|output| output.status) { + Some(AutonomousDesktopToolStatus::Executed | AutonomousDesktopToolStatus::Stopped) => { + "executed" + } + Some(AutonomousDesktopToolStatus::Starting) | None => "accepted", + Some( + AutonomousDesktopToolStatus::ApprovalRequired + | AutonomousDesktopToolStatus::Denied + | AutonomousDesktopToolStatus::Unavailable + | AutonomousDesktopToolStatus::Failed, + ) => "rejected", + } +} + +fn command_kind_wire_value(kind: &InboundCommandKind) -> &'static str { + match kind { + InboundCommandKind::AuthorizeSessionJoin => "authorize_session_join", + InboundCommandKind::SendMessage => "send_message", + InboundCommandKind::ListSessions => "list_sessions", + InboundCommandKind::ListProjects => "list_projects", + InboundCommandKind::ArchiveSession => "archive_session", + InboundCommandKind::SessionAttached => "session_attached", + InboundCommandKind::StartSession => "start_session", + InboundCommandKind::ResolveOperatorAction => "resolve_operator_action", + InboundCommandKind::CancelRun => "cancel_run", + InboundCommandKind::ContextSnapshot => "context_snapshot", + InboundCommandKind::StageAttachment => "stage_attachment", + InboundCommandKind::DiscardAttachment => "discard_attachment", + InboundCommandKind::UpdateSessionControls => "update_session_controls", + InboundCommandKind::FetchRuntimeMediaArtifact => "fetch_runtime_media_artifact", + InboundCommandKind::ComputerUseStreamRequest => "computer_use_stream_request", + InboundCommandKind::ComputerUseStreamOffer => "computer_use_stream_offer", + InboundCommandKind::ComputerUseStreamAnswer => "computer_use_stream_answer", + InboundCommandKind::ComputerUseStreamIceCandidate => "computer_use_stream_ice_candidate", + InboundCommandKind::ComputerUseStreamStop => "computer_use_stream_stop", + InboundCommandKind::ComputerUseStreamStatus => "computer_use_stream_status", + InboundCommandKind::ComputerUseStreamSetQuality => "computer_use_stream_set_quality", + InboundCommandKind::ComputerUseStreamRequestKeyframe => { + "computer_use_stream_request_keyframe" + } + InboundCommandKind::ComputerUseManualControlRequest => { + "computer_use_manual_control_request" + } + InboundCommandKind::ComputerUseManualControlGrant => "computer_use_manual_control_grant", + InboundCommandKind::ComputerUseManualControlHeartbeat => { + "computer_use_manual_control_heartbeat" + } + InboundCommandKind::ComputerUseManualControlInput => "computer_use_manual_control_input", + InboundCommandKind::ComputerUseManualControlRelease => { + "computer_use_manual_control_release" + } + } +} + +fn command_priority_wire_value(command: &InboundCommand) -> &'static str { + match command.priority { + Some(xero_remote_bridge::InboundCommandPriority::CriticalReliable) => "critical_reliable", + Some(xero_remote_bridge::InboundCommandPriority::CoalescedBestEffort) => { + "coalesced_best_effort" + } + Some(xero_remote_bridge::InboundCommandPriority::ReliableIdempotent) => { + "reliable_idempotent" + } + None if is_critical_command(&command.kind) => "critical_reliable", + None => "reliable_idempotent", + } +} + fn route_inbound_command( app: &AppHandle, state: &DesktopState, @@ -1514,13 +1738,11 @@ fn route_computer_use_stream_command( bridge, &session_id, schema, - command.seq, - Some(command.device_id.as_str()), + &command, stream_id.as_deref(), None, "cloud_streaming_disabled", "Cloud desktop viewing is disabled in the local desktop app.", - command.payload, ); } Some(run_desktop_stream_command( @@ -1583,24 +1805,25 @@ fn route_computer_use_stream_command( .as_ref() .and_then(stream_signal_schema_for_output) .unwrap_or(schema); - let forwarded_payload = stream_signal_payload - .unwrap_or_else(|| remote_stream_payload_for_forward(&command.kind, command.payload)); + let forwarded_payload = stream_signal_payload.unwrap_or_else(|| { + remote_stream_payload_for_forward(&command.kind, command.payload.clone()) + }); + let response_outcome = command_outcome_for_desktop_output(desktop_output.as_ref()); + let mut response_payload = json!({ + "schema": forwarded_schema, + "ok": response_outcome != "rejected", + "commandSeq": command.seq, + "deviceId": command.device_id.as_str(), + "sessionId": session_id.as_str(), + "streamId": stream_id, + "receivedAt": crate::auth::now_timestamp(), + "payload": forwarded_payload, + "desktop": desktop_output, + "desktopFrame": desktop_frame, + }); + attach_command_context(&mut response_payload, &command, Some(response_outcome)); bridge - .forward_control_event( - &session_id, - json!({ - "schema": forwarded_schema, - "ok": true, - "commandSeq": command.seq, - "deviceId": command.device_id, - "sessionId": session_id, - "streamId": stream_id, - "receivedAt": crate::auth::now_timestamp(), - "payload": forwarded_payload, - "desktop": desktop_output, - "desktopFrame": desktop_frame, - }), - ) + .forward_control_event(&session_id, response_payload) .map_err(map_bridge_error)?; Ok(()) } @@ -1628,13 +1851,11 @@ fn route_computer_use_manual_control_command( bridge, &session_id, schema, - command.seq, - Some(command.device_id.as_str()), + &command, None, Some(manual_control_id.as_str()), "manual_cloud_control_disabled", "Cloud manual control is disabled in the local desktop app.", - command.payload, ); } let runtime = desktop_runtime_for_located(&located, &command)?; @@ -1649,13 +1870,11 @@ fn route_computer_use_manual_control_command( bridge, &session_id, schema, - command.seq, - Some(command.device_id.as_str()), + &command, None, Some(manual_control_id.as_str()), "manual_cloud_control_disabled", "Cloud manual control is disabled in the local desktop app.", - command.payload, ); } let runtime = desktop_runtime_for_located(&located, &command)?; @@ -1673,13 +1892,11 @@ fn route_computer_use_manual_control_command( bridge, &session_id, schema, - command.seq, - Some(command.device_id.as_str()), + &command, None, Some(manual_control_id.as_str()), "manual_cloud_control_disabled", "Cloud manual control is disabled in the local desktop app.", - command.payload, ); } let request = manual_control_input_request(&command.payload)?; @@ -1699,22 +1916,22 @@ fn route_computer_use_manual_control_command( } _ => None, }; + let response_outcome = command_outcome_for_desktop_output(desktop_output.as_ref()); + let mut response_payload = json!({ + "schema": schema, + "ok": response_outcome != "rejected", + "commandSeq": command.seq, + "deviceId": command.device_id.as_str(), + "sessionId": session_id.as_str(), + "manualControlId": manual_control_id, + "receivedAt": crate::auth::now_timestamp(), + "payload": remote_desktop_payload_for_forward(command.payload.clone()), + "desktop": desktop_output, + "brokered": true, + }); + attach_command_context(&mut response_payload, &command, Some(response_outcome)); bridge - .forward_control_event( - &session_id, - json!({ - "schema": schema, - "ok": true, - "commandSeq": command.seq, - "deviceId": command.device_id, - "sessionId": session_id, - "manualControlId": manual_control_id, - "receivedAt": crate::auth::now_timestamp(), - "payload": remote_desktop_payload_for_forward(command.payload), - "desktop": desktop_output, - "brokered": true, - }), - ) + .forward_control_event(&session_id, response_payload) .map_err(map_bridge_error)?; Ok(()) } @@ -2178,33 +2395,30 @@ fn forward_computer_use_desktop_rejection( bridge: &AppRemoteBridge, session_id: &str, schema: &str, - command_seq: u64, - device_id: Option<&str>, + command: &InboundCommand, stream_id: Option<&str>, manual_control_id: Option<&str>, code: &str, message: &str, - payload: JsonValue, ) -> CommandResult<()> { + let mut payload = json!({ + "schema": schema, + "ok": false, + "commandSeq": command.seq, + "deviceId": command.device_id.as_str(), + "sessionId": session_id, + "streamId": stream_id, + "manualControlId": manual_control_id, + "receivedAt": crate::auth::now_timestamp(), + "payload": remote_desktop_payload_for_forward(command.payload.clone()), + "error": { + "code": code, + "message": message, + }, + }); + attach_command_context(&mut payload, command, Some("rejected")); bridge - .forward_control_event( - session_id, - json!({ - "schema": schema, - "ok": false, - "commandSeq": command_seq, - "deviceId": device_id, - "sessionId": session_id, - "streamId": stream_id, - "manualControlId": manual_control_id, - "receivedAt": crate::auth::now_timestamp(), - "payload": remote_desktop_payload_for_forward(payload), - "error": { - "code": code, - "message": message, - }, - }), - ) + .forward_control_event(session_id, payload) .map(|_| ()) .map_err(map_bridge_error) } @@ -3393,6 +3607,7 @@ fn map_bridge_error(error: BridgeError) -> CommandError { | BridgeError::Decode(_) | BridgeError::Json(_) | BridgeError::MissingServerField(_) + | BridgeError::OutboundQueueFull { .. } | BridgeError::LockPoisoned => { CommandError::system_fault("remote_bridge_failed", error.to_string()) } @@ -4216,13 +4431,86 @@ mod tests { ); } + #[test] + fn command_outcome_payload_includes_envelope_and_desktop_context() { + let command = inbound_command( + InboundCommandKind::ComputerUseManualControlInput, + json!({ + "manualControlId": "manual-1", + "streamId": "stream-1", + "action": "mouse_click" + }), + ); + + let payload = command_outcome_payload(&command, "duplicate", Some("duplicate_command")); + + assert_eq!(payload["schema"], COMMAND_OUTCOME_SCHEMA); + assert_eq!(payload["clientCommandId"], "client-command-1"); + assert_eq!(payload["clientSeq"], 7); + assert_eq!(payload["kind"], "computer_use_manual_control_input"); + assert_eq!(payload["outcome"], "duplicate"); + assert_eq!(payload["priority"], "critical_reliable"); + assert_eq!(payload["manualControlId"], "manual-1"); + assert_eq!(payload["streamId"], "stream-1"); + assert_eq!(payload["dedupeKey"], "dedupe-client-command-1"); + } + + #[test] + fn command_dedupe_only_marks_repeated_critical_command_ids() { + let unique = format!( + "client-command-{}", + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("time") + .as_nanos() + ); + let mut command = inbound_command( + InboundCommandKind::ComputerUseManualControlInput, + json!({"manualControlId": "manual-1", "action": "mouse_click"}), + ); + command.client_command_id = Some(unique.clone()); + command.dedupe_key = Some(format!("dedupe-{unique}")); + + assert!(!command_is_duplicate(&command)); + assert!(command_is_duplicate(&command)); + + command.kind = InboundCommandKind::ListSessions; + assert!(!command_is_duplicate(&command)); + } + + #[test] + fn desktop_output_status_maps_to_command_outcomes() { + let executed = desktop_output_with_status("executed"); + let starting = desktop_output_with_status("starting"); + let denied = desktop_output_with_status("denied"); + + assert_eq!( + command_outcome_for_desktop_output(Some(&executed)), + "executed" + ); + assert_eq!( + command_outcome_for_desktop_output(Some(&starting)), + "accepted" + ); + assert_eq!( + command_outcome_for_desktop_output(Some(&denied)), + "rejected" + ); + } + fn desktop_output_with_stream_signal(stream_signal: JsonValue) -> AutonomousDesktopToolOutput { + let mut output = desktop_output_with_status("starting"); + output.stream_signal = Some(serde_json::from_value(stream_signal).expect("stream signal")); + output + } + + fn desktop_output_with_status(status: &str) -> AutonomousDesktopToolOutput { serde_json::from_value(json!({ "tool": "desktop_stream", "action": "stream_start", "requestId": "desktop_request_test", "phase": "phase_computer_use_desktop_control", - "status": "starting", + "status": status, "platform": "test", "sidecar": { "schemaVersion": 1, @@ -4262,9 +4550,26 @@ mod tests { "approvalRequired": false, "userActionRequired": false }, - "streamSignal": stream_signal, "message": "ok" })) .expect("desktop output") } + + fn inbound_command(kind: InboundCommandKind, payload: JsonValue) -> InboundCommand { + InboundCommand { + v: 1, + seq: 42, + computer_id: "desktop-1".into(), + session_id: Some("session-1".into()), + kind, + device_id: "web-1".into(), + client_command_id: Some("client-command-1".into()), + client_seq: Some(7), + priority: Some(xero_remote_bridge::InboundCommandPriority::CriticalReliable), + sent_at: Some(json!(1_700_000_000_000_i64)), + dedupe_key: Some("dedupe-client-command-1".into()), + expires_at: Some(json!(1_700_000_008_000_i64)), + payload, + } + } } diff --git a/cloud/src/lib/relay/relay-client.test.ts b/cloud/src/lib/relay/relay-client.test.ts index 8b461c9a..564af730 100644 --- a/cloud/src/lib/relay/relay-client.test.ts +++ b/cloud/src/lib/relay/relay-client.test.ts @@ -88,7 +88,7 @@ describe("getRelaySocket", () => { }); describe("pushInboundCommand", () => { - it("sends the command as the Phoenix frame payload", () => { + it("sends the command as an enveloped Phoenix frame payload", async () => { const push = vi.fn(); const command: InboundCommand = { v: 1, @@ -100,9 +100,164 @@ describe("pushInboundCommand", () => { payload: {}, }; - pushInboundCommand({ push } as never, command); + const result = await pushInboundCommand({ push } as never, command); - expect(push).toHaveBeenCalledWith("frame", command); + expect(result.outcome).toBe("accepted"); + expect(push).toHaveBeenCalledWith( + "frame", + expect.objectContaining({ + ...command, + clientCommandId: expect.any(String), + clientSeq: expect.any(Number), + dedupeKey: expect.any(String), + expiresAt: expect.any(Number), + priority: "reliable_idempotent", + sentAt: expect.any(Number), + }), + ); + }); + + it("returns structured rate-limit acknowledgements from Phoenix errors", async () => { + let errorHandler: ((payload?: unknown) => void) | null = null; + const push = vi.fn(() => { + const pushLike = { + receive: vi.fn( + ( + status: "ok" | "error" | "timeout", + callback: (payload?: unknown) => void, + ) => { + if (status === "error") errorHandler = callback; + return pushLike; + }, + ), + }; + return pushLike; + }); + const resultPromise = requestComputerUseManualControl({ push } as never, { + computerId: "desktop-1", + sessionId: "session-1", + deviceId: "web-1", + manualControlId: "manual-web-1", + streamToken: "stream-token-1", + }); + + errorHandler?.({ + reason: "rate_limited", + rateLimit: { + bucket: "frame:computer_use:manual_critical", + class: "manual_critical", + kind: "computer_use_manual_control_request", + limit: 1, + retryAfterMs: 250, + windowMs: 60_000, + }, + }); + + await expect(resultPromise).resolves.toEqual( + expect.objectContaining({ + kind: "computer_use_manual_control_request", + outcome: "rate_limited", + retryAfterMs: 250, + rateLimit: expect.objectContaining({ + class: "manual_critical", + }), + }), + ); + }); + + it("retries timed-out critical commands and then fails visibly", async () => { + vi.useFakeTimers(); + try { + const push = vi.fn(() => ({ + receive: vi.fn(() => ({ + receive: vi.fn(() => ({ + receive: vi.fn(), + })), + })), + })); + const resultPromise = requestComputerUseManualControl({ push } as never, { + computerId: "desktop-1", + sessionId: "session-1", + deviceId: "web-1", + manualControlId: "manual-web-1", + streamToken: "stream-token-1", + }); + + await vi.advanceTimersByTimeAsync(16_000); + + await expect(resultPromise).resolves.toEqual( + expect.objectContaining({ + kind: "computer_use_manual_control_request", + outcome: "timed_out", + reason: "push_timeout", + }), + ); + expect(push).toHaveBeenCalledTimes(2); + } finally { + vi.useRealTimers(); + } + }); + + it("coalesces queued pointer movement instead of letting it starve critical input", async () => { + vi.useFakeTimers(); + try { + const push = vi.fn(() => ({ + receive: vi.fn(() => ({ + receive: vi.fn(() => ({ + receive: vi.fn(), + })), + })), + })); + const channel = { push } as never; + for (let index = 0; index < 4; index += 1) { + void requestComputerUseManualControl(channel, { + computerId: "desktop-1", + sessionId: "session-1", + deviceId: "web-1", + manualControlId: `manual-web-${index}`, + streamToken: "stream-token-1", + }); + } + + const staleMove = sendComputerUseManualInput(channel, { + computerId: "desktop-1", + sessionId: "session-1", + deviceId: "web-1", + manualControlId: "manual-web-1", + streamToken: "stream-token-1", + input: { + action: "mouse_move", + x: 10, + y: 10, + sourceWidth: 100, + sourceHeight: 100, + }, + }); + void sendComputerUseManualInput(channel, { + computerId: "desktop-1", + sessionId: "session-1", + deviceId: "web-1", + manualControlId: "manual-web-1", + streamToken: "stream-token-1", + input: { + action: "mouse_move", + x: 20, + y: 20, + sourceWidth: 100, + sourceHeight: 100, + }, + }); + + await expect(staleMove).resolves.toEqual( + expect.objectContaining({ + outcome: "stale", + reason: "coalesced", + }), + ); + expect(push).toHaveBeenCalledTimes(4); + } finally { + vi.useRealTimers(); + } }); it("requests a fresh snapshot using the desktop session-attached command", () => { diff --git a/cloud/src/lib/relay/relay-client.ts b/cloud/src/lib/relay/relay-client.ts index 53dad36d..99622dad 100644 --- a/cloud/src/lib/relay/relay-client.ts +++ b/cloud/src/lib/relay/relay-client.ts @@ -2,39 +2,86 @@ import { type Channel, Socket } from "phoenix"; import { getServerUrl } from "../server-url"; +export type InboundCommandKind = + | "session_attached" + | "send_message" + | "start_session" + | "archive_session" + | "resolve_operator_action" + | "cancel_run" + | "context_snapshot" + | "list_sessions" + | "list_projects" + | "stage_attachment" + | "discard_attachment" + | "update_session_controls" + | "fetch_runtime_media_artifact" + | "computer_use_stream_request" + | "computer_use_stream_offer" + | "computer_use_stream_answer" + | "computer_use_stream_ice_candidate" + | "computer_use_stream_stop" + | "computer_use_stream_status" + | "computer_use_stream_set_quality" + | "computer_use_stream_request_keyframe" + | "computer_use_manual_control_request" + | "computer_use_manual_control_grant" + | "computer_use_manual_control_heartbeat" + | "computer_use_manual_control_input" + | "computer_use_manual_control_release"; + +export type RelayCommandPriority = + | "critical_reliable" + | "reliable_idempotent" + | "coalesced_best_effort"; + +export type RelayCommandOutcome = + | "accepted" + | "executed" + | "rejected" + | "rate_limited" + | "timed_out" + | "stale" + | "duplicate"; + +export interface RelayRateLimitMetadata { + bucket?: string; + class?: string; + kind?: string; + limit?: number; + retryAfterMs?: number; + windowMs?: number; +} + +export interface CommandAckResult { + schema: "xero.remote_command_outcome.v1"; + clientCommandId: string; + clientSeq: number; + kind: InboundCommandKind; + outcome: RelayCommandOutcome; + priority: RelayCommandPriority; + reason?: string | null; + message?: string | null; + rateLimit?: RelayRateLimitMetadata | null; + retryAfterMs?: number | null; + sentAt: number; + receivedAt?: string | number | null; + acceptedAt?: string | number | null; +} + export interface InboundCommand { v: number; seq: number; computer_id: string; session_id?: string; device_id?: string; - kind: - | "session_attached" - | "send_message" - | "start_session" - | "archive_session" - | "resolve_operator_action" - | "cancel_run" - | "context_snapshot" - | "list_sessions" - | "list_projects" - | "stage_attachment" - | "discard_attachment" - | "update_session_controls" - | "fetch_runtime_media_artifact" - | "computer_use_stream_request" - | "computer_use_stream_offer" - | "computer_use_stream_answer" - | "computer_use_stream_ice_candidate" - | "computer_use_stream_stop" - | "computer_use_stream_status" - | "computer_use_stream_set_quality" - | "computer_use_stream_request_keyframe" - | "computer_use_manual_control_request" - | "computer_use_manual_control_grant" - | "computer_use_manual_control_heartbeat" - | "computer_use_manual_control_input" - | "computer_use_manual_control_release"; + kind: InboundCommandKind; + clientCommandId?: string; + clientSeq?: number; + priority?: RelayCommandPriority; + sentAt?: number; + dedupeKey?: string; + expiresAt?: number; payload: unknown; } @@ -56,12 +103,518 @@ type ComputerUseManualInputAction = | "paste_text"; let socket: Socket | null = null; +let nextClientSeq = 0; +let fallbackCloudInstanceId: string | null = null; +const commandSchedulers = new WeakMap(); +const CLOUD_INSTANCE_STORAGE_KEY = "xero.cloud.instanceId.v1"; + +const CRITICAL_COMMAND_KINDS = new Set([ + "computer_use_manual_control_request", + "computer_use_manual_control_grant", + "computer_use_manual_control_heartbeat", + "computer_use_manual_control_input", + "computer_use_manual_control_release", + "computer_use_stream_offer", + "computer_use_stream_answer", + "computer_use_stream_ice_candidate", +]); + +const RELIABLE_IDEMPOTENT_COMMAND_KINDS = new Set([ + "session_attached", + "context_snapshot", + "list_sessions", + "list_projects", + "computer_use_stream_request", + "computer_use_stream_stop", + "computer_use_stream_status", + "computer_use_stream_request_keyframe", +]); +const RELAY_COMMAND_MAX_IN_FLIGHT = 4; +const RELAY_COMMAND_MAX_CRITICAL_QUEUE = 256; +const RELAY_COMMAND_MAX_RELIABLE_QUEUE = 128; +const RELAY_COMMAND_MAX_COALESCED_QUEUE = 64; + +interface QueuedRelayCommand { + attempt: number; + command: InboundCommand; + resolve: (result: CommandAckResult) => void; +} + +interface RelayCommandPolicy { + priority: RelayCommandPriority; + timeoutMs: number; + maxAttempts: number; + coalesceKey: string | null; +} + +interface PhoenixPushLike { + receive?: ( + status: "ok" | "error" | "timeout", + callback: (payload?: unknown) => void, + ) => PhoenixPushLike; +} + +class RelayCommandScheduler { + private readonly criticalQueue: QueuedRelayCommand[] = []; + private readonly reliableQueue: QueuedRelayCommand[] = []; + private readonly coalescedQueue = new Map(); + private readonly pending = new Map< + string, + { + finish: (result: CommandAckResult) => void; + timeout: ReturnType; + } + >(); + private inFlight = 0; + private outcomeRef: number | string | null = null; + + constructor(private readonly channel: Channel) {} + + enqueue(command: InboundCommand): Promise { + this.installOutcomeListener(); + return new Promise((resolve) => { + const enriched = completeCommandEnvelope(command); + const policy = commandPolicy(enriched); + enriched.priority = policy.priority; + if (policy.coalesceKey) { + const previous = this.coalescedQueue.get(policy.coalesceKey); + if (previous) { + previous.resolve( + commandResult(previous.command, "stale", "coalesced"), + ); + emitRelayCommandMetric("coalesced", previous.command, { + coalesceKey: policy.coalesceKey, + }); + } else if ( + this.coalescedQueue.size >= RELAY_COMMAND_MAX_COALESCED_QUEUE + ) { + const oldest = this.coalescedQueue.entries().next().value as + | [string, QueuedRelayCommand] + | undefined; + if (oldest) { + this.coalescedQueue.delete(oldest[0]); + oldest[1].resolve( + commandResult(oldest[1].command, "stale", "coalesced_queue_full"), + ); + emitRelayCommandMetric("dropped", oldest[1].command, { + coalesceKey: oldest[0], + reason: "coalesced_queue_full", + }); + } + } + this.coalescedQueue.set(policy.coalesceKey, { + attempt: 0, + command: enriched, + resolve, + }); + } else if (policy.priority === "critical_reliable") { + if (this.criticalQueue.length >= RELAY_COMMAND_MAX_CRITICAL_QUEUE) { + resolve(commandResult(enriched, "rejected", "critical_queue_full")); + emitRelayCommandMetric("dropped", enriched, { + reason: "critical_queue_full", + }); + return; + } + this.criticalQueue.push({ attempt: 0, command: enriched, resolve }); + } else { + if (this.reliableQueue.length >= RELAY_COMMAND_MAX_RELIABLE_QUEUE) { + resolve(commandResult(enriched, "rejected", "reliable_queue_full")); + emitRelayCommandMetric("dropped", enriched, { + reason: "reliable_queue_full", + }); + return; + } + this.reliableQueue.push({ attempt: 0, command: enriched, resolve }); + } + this.flush(); + }); + } + + private installOutcomeListener() { + if (this.outcomeRef !== null || typeof this.channel.on !== "function") { + return; + } + this.outcomeRef = this.channel.on( + "computer_use_command_outcome", + (payload: unknown) => { + const outcome = commandOutcomeFromPayload(payload); + if (!outcome) return; + const pending = this.pending.get(outcome.clientCommandId); + if (!pending) return; + pending.finish(outcome); + }, + ); + } + + private flush() { + while (this.inFlight < RELAY_COMMAND_MAX_IN_FLIGHT) { + const next = this.nextQueuedCommand(); + if (!next) return; + this.inFlight += 1; + this.send(next); + } + } + + private nextQueuedCommand(): QueuedRelayCommand | null { + const critical = this.criticalQueue.shift(); + if (critical) return critical; + const reliable = this.reliableQueue.shift(); + if (reliable) return reliable; + const coalesced = this.coalescedQueue.entries().next().value as + | [string, QueuedRelayCommand] + | undefined; + if (!coalesced) return null; + this.coalescedQueue.delete(coalesced[0]); + return coalesced[1]; + } + + private send(queued: QueuedRelayCommand) { + const policy = commandPolicy(queued.command); + let settled = false; + const finish = (result: CommandAckResult) => { + if (settled) return; + settled = true; + const pending = this.pending.get(queued.command.clientCommandId ?? ""); + if (pending) clearTimeout(pending.timeout); + this.pending.delete(queued.command.clientCommandId ?? ""); + this.inFlight = Math.max(0, this.inFlight - 1); + queued.resolve(result); + this.flush(); + }; + const retryOrFinish = (result: CommandAckResult) => { + if ( + result.outcome === "timed_out" && + queued.attempt + 1 < policy.maxAttempts + ) { + this.pending.delete(queued.command.clientCommandId ?? ""); + this.inFlight = Math.max(0, this.inFlight - 1); + const retry = { ...queued, attempt: queued.attempt + 1 }; + if (policy.priority === "critical_reliable") { + this.criticalQueue.unshift(retry); + } else if (policy.coalesceKey) { + this.coalescedQueue.set(policy.coalesceKey, retry); + } else { + this.reliableQueue.unshift(retry); + } + emitRelayCommandMetric("retry", queued.command, { + attempt: retry.attempt + 1, + }); + this.flush(); + return; + } + finish(result); + }; + const timeout = setTimeout(() => { + retryOrFinish(commandResult(queued.command, "timed_out", "push_timeout")); + }, policy.timeoutMs); + this.pending.set(queued.command.clientCommandId ?? "", { finish, timeout }); + + let push: PhoenixPushLike | undefined; + try { + push = this.channel.push("frame", queued.command) as PhoenixPushLike; + } catch { + retryOrFinish(commandResult(queued.command, "rejected", "push_failed")); + return; + } + + if (!push || typeof push.receive !== "function") { + finish(commandResult(queued.command, "accepted", "push_unobservable")); + return; + } + + push + .receive("ok", (payload) => { + finish( + commandOutcomeFromPayload(payload) ?? + commandResult(queued.command, "accepted", null), + ); + }) + .receive("error", (payload) => { + const outcome = commandOutcomeFromPayload(payload); + finish( + outcome ?? + commandResult( + queued.command, + rateLimitedPayload(payload) ? "rate_limited" : "rejected", + errorReason(payload), + rateLimitMetadata(payload), + ), + ); + }) + .receive("timeout", () => { + retryOrFinish( + commandResult(queued.command, "timed_out", "push_timeout"), + ); + }); + } +} function socketIsReusable(socketInstance: Socket): boolean { const state = socketInstance.connectionState(); return state === "connecting" || state === "open"; } +function completeCommandEnvelope(command: InboundCommand): InboundCommand { + const clientSeq = command.clientSeq ?? nextCommandSeq(); + const clientCommandId = + command.clientCommandId ?? `cmd_${clientSeq}_${Date.now().toString(36)}`; + const sentAt = command.sentAt ?? Date.now(); + const priority = command.priority ?? commandPolicy(command).priority; + const expiresAt = + command.expiresAt ?? + sentAt + commandPolicy({ ...command, priority }).timeoutMs; + return { + ...command, + clientCommandId, + clientSeq, + priority, + sentAt, + dedupeKey: command.dedupeKey ?? clientCommandId, + expiresAt, + }; +} + +function nextCommandSeq(): number { + nextClientSeq = (nextClientSeq + 1) % Number.MAX_SAFE_INTEGER; + return nextClientSeq || nextCommandSeq(); +} + +function commandPolicy(command: InboundCommand): RelayCommandPolicy { + const coalesceKey = commandCoalesceKey(command); + if (coalesceKey) { + return { + priority: "coalesced_best_effort", + timeoutMs: 2_000, + maxAttempts: 1, + coalesceKey, + }; + } + if (CRITICAL_COMMAND_KINDS.has(command.kind)) { + return { + priority: "critical_reliable", + timeoutMs: 8_000, + maxAttempts: 2, + coalesceKey: null, + }; + } + if (RELIABLE_IDEMPOTENT_COMMAND_KINDS.has(command.kind)) { + return { + priority: "reliable_idempotent", + timeoutMs: 5_000, + maxAttempts: 2, + coalesceKey: null, + }; + } + return { + priority: "reliable_idempotent", + timeoutMs: 5_000, + maxAttempts: 1, + coalesceKey: null, + }; +} + +function commandCoalesceKey(command: InboundCommand): string | null { + const payload = recordPayload(command.payload); + if ( + command.kind === "computer_use_manual_control_input" && + payload?.action === "mouse_move" + ) { + return [ + command.kind, + command.session_id ?? "", + stringValue(payload.manualControlId), + ].join(":"); + } + if (command.kind === "computer_use_stream_status") { + return [ + command.kind, + command.session_id ?? "", + stringValue(payload?.streamId), + ].join(":"); + } + if (command.kind === "computer_use_stream_set_quality") { + return [ + command.kind, + command.session_id ?? "", + stringValue(payload?.streamId), + ].join(":"); + } + return null; +} + +function recordPayload(value: unknown): Record | null { + return value && typeof value === "object" + ? (value as Record) + : null; +} + +function stringValue(value: unknown): string { + return typeof value === "string" ? value : ""; +} + +function commandResult( + command: InboundCommand, + outcome: RelayCommandOutcome, + reason: string | null, + rateLimit: RelayRateLimitMetadata | null = null, +): CommandAckResult { + return { + schema: "xero.remote_command_outcome.v1", + clientCommandId: command.clientCommandId ?? "", + clientSeq: command.clientSeq ?? command.seq, + kind: command.kind, + outcome, + priority: command.priority ?? commandPolicy(command).priority, + reason, + message: null, + rateLimit, + retryAfterMs: rateLimit?.retryAfterMs ?? null, + sentAt: command.sentAt ?? Date.now(), + acceptedAt: outcome === "accepted" ? Date.now() : null, + }; +} + +function commandOutcomeFromPayload(payload: unknown): CommandAckResult | null { + const record = recordPayload(payload); + const outcomeRecord = + recordPayload(record?.command) ?? + recordPayload(record?.outcome) ?? + (record?.schema === "xero.remote_command_outcome.v1" ? record : null); + if (!outcomeRecord) return null; + const clientCommandId = stringValue(outcomeRecord.clientCommandId); + const kind = stringValue(outcomeRecord.kind) as InboundCommandKind; + const outcome = stringValue(outcomeRecord.outcome) as RelayCommandOutcome; + const priority = stringValue(outcomeRecord.priority) as RelayCommandPriority; + if ( + !clientCommandId || + !isInboundCommandKind(kind) || + !isRelayOutcome(outcome) + ) { + return null; + } + return { + schema: "xero.remote_command_outcome.v1", + clientCommandId, + clientSeq: numberValue(outcomeRecord.clientSeq) ?? 0, + kind, + outcome, + priority: isRelayCommandPriority(priority) + ? priority + : "reliable_idempotent", + reason: stringValue(outcomeRecord.reason) || null, + message: stringValue(outcomeRecord.message) || null, + rateLimit: rateLimitMetadata(outcomeRecord), + retryAfterMs: numberValue(outcomeRecord.retryAfterMs), + sentAt: numberValue(outcomeRecord.sentAt) ?? Date.now(), + receivedAt: outcomeRecord.receivedAt as string | number | null | undefined, + acceptedAt: outcomeRecord.acceptedAt as string | number | null | undefined, + }; +} + +function numberValue(value: unknown): number | null { + return typeof value === "number" && Number.isFinite(value) ? value : null; +} + +function isInboundCommandKind(value: string): value is InboundCommandKind { + return ( + value === "session_attached" || + value === "send_message" || + value === "start_session" || + value === "archive_session" || + value === "resolve_operator_action" || + value === "cancel_run" || + value === "context_snapshot" || + value === "list_sessions" || + value === "list_projects" || + value === "stage_attachment" || + value === "discard_attachment" || + value === "update_session_controls" || + value === "fetch_runtime_media_artifact" || + value === "computer_use_stream_request" || + value === "computer_use_stream_offer" || + value === "computer_use_stream_answer" || + value === "computer_use_stream_ice_candidate" || + value === "computer_use_stream_stop" || + value === "computer_use_stream_status" || + value === "computer_use_stream_set_quality" || + value === "computer_use_stream_request_keyframe" || + value === "computer_use_manual_control_request" || + value === "computer_use_manual_control_grant" || + value === "computer_use_manual_control_heartbeat" || + value === "computer_use_manual_control_input" || + value === "computer_use_manual_control_release" + ); +} + +function isRelayOutcome(value: string): value is RelayCommandOutcome { + return ( + value === "accepted" || + value === "executed" || + value === "rejected" || + value === "rate_limited" || + value === "timed_out" || + value === "stale" || + value === "duplicate" + ); +} + +function isRelayCommandPriority(value: string): value is RelayCommandPriority { + return ( + value === "critical_reliable" || + value === "reliable_idempotent" || + value === "coalesced_best_effort" + ); +} + +function rateLimitedPayload(payload: unknown): boolean { + return errorReason(payload) === "rate_limited"; +} + +function errorReason(payload: unknown): string | null { + const record = recordPayload(payload); + return stringValue(record?.reason) || stringValue(record?.error) || null; +} + +function rateLimitMetadata(payload: unknown): RelayRateLimitMetadata | null { + const record = recordPayload(payload); + const source = + recordPayload(record?.rateLimit) ?? recordPayload(record?.rate_limit); + if (!source && errorReason(payload) !== "rate_limited") return null; + const metadata = source ?? record ?? {}; + const retryAfterMs = + numberValue(metadata.retryAfterMs) ?? numberValue(metadata.retry_after_ms); + return { + bucket: stringValue(metadata.bucket) || undefined, + class: stringValue(metadata.class) || undefined, + kind: stringValue(metadata.kind) || undefined, + limit: numberValue(metadata.limit) ?? undefined, + retryAfterMs: retryAfterMs ?? undefined, + windowMs: + numberValue(metadata.windowMs) ?? + numberValue(metadata.window_ms) ?? + undefined, + }; +} + +function emitRelayCommandMetric( + event: string, + command: InboundCommand, + detail: Record = {}, +) { + if (typeof window === "undefined") return; + window.dispatchEvent( + new CustomEvent("xero:relay-command-metric", { + detail: { + event, + kind: command.kind, + clientCommandId: command.clientCommandId, + priority: command.priority, + ...detail, + }, + }), + ); +} + /** * Lazily open the singleton browser → relay WebSocket. The relay URL is derived * from `XERO_SERVER_URL` (http → ws, https → wss) and the connection is @@ -86,6 +639,31 @@ export function disconnectRelay(): void { } } +function getCloudInstanceId(): string { + if (typeof window !== "undefined") { + try { + const existing = window.sessionStorage + .getItem(CLOUD_INSTANCE_STORAGE_KEY) + ?.trim(); + if (existing) return existing; + const next = createCloudInstanceId(); + window.sessionStorage.setItem(CLOUD_INSTANCE_STORAGE_KEY, next); + return next; + } catch { + // Fall through to the in-memory id when storage is unavailable. + } + } + fallbackCloudInstanceId ??= createCloudInstanceId(); + return fallbackCloudInstanceId; +} + +function createCloudInstanceId(): string { + return ( + globalThis.crypto?.randomUUID?.() ?? + `cloud_${Math.random().toString(36).slice(2)}` + ); +} + export function joinAccountChannel( socketInstance: Socket, accountId: string, @@ -104,6 +682,7 @@ export function joinSessionChannel( onJoinError?: (payload: unknown) => void, ): Channel { const channel = socketInstance.channel(`session:${computerId}:${sessionId}`, { + cloud_instance_id: getCloudInstanceId(), last_seq: lastSeq ?? 0, }); const join = channel.join(); @@ -125,8 +704,13 @@ export function joinSessionChannel( export function pushInboundCommand( channel: Channel, command: InboundCommand, -): void { - channel.push("frame", command); +): Promise { + let scheduler = commandSchedulers.get(channel); + if (!scheduler) { + scheduler = new RelayCommandScheduler(channel); + commandSchedulers.set(channel, scheduler); + } + return scheduler.enqueue(command); } export function requestSessionSnapshot( @@ -227,8 +811,8 @@ export function requestComputerUseStream( quality?: "low" | "balanced" | "high"; iceServers?: RTCIceServer[]; } & StreamTokenOptions, -): void { - pushInboundCommand(channel, { +): Promise { + return pushInboundCommand(channel, { v: 1, seq: Date.now(), computer_id: options.computerId, @@ -279,8 +863,8 @@ export function stopComputerUseStream( deviceId: string; streamId?: string | null; } & StreamTokenOptions, -): void { - pushInboundCommand(channel, { +): Promise { + return pushInboundCommand(channel, { v: 1, seq: Date.now(), computer_id: options.computerId, @@ -302,8 +886,8 @@ export function requestComputerUseStreamStatus( deviceId: string; streamId?: string | null; } & StreamTokenOptions, -): void { - pushInboundCommand(channel, { +): Promise { + return pushInboundCommand(channel, { v: 1, seq: Date.now(), computer_id: options.computerId, @@ -326,8 +910,8 @@ export function setComputerUseStreamQuality( streamId?: string | null; quality: "low" | "balanced" | "high"; } & StreamTokenOptions, -): void { - pushInboundCommand(channel, { +): Promise { + return pushInboundCommand(channel, { v: 1, seq: Date.now(), computer_id: options.computerId, @@ -350,8 +934,8 @@ export function requestComputerUseStreamKeyframe( deviceId: string; streamId?: string | null; } & StreamTokenOptions, -): void { - pushInboundCommand(channel, { +): Promise { + return pushInboundCommand(channel, { v: 1, seq: Date.now(), computer_id: options.computerId, @@ -374,8 +958,8 @@ export function answerComputerUseStreamOffer( streamId?: string | null; answer: RTCSessionDescriptionInit; } & StreamTokenOptions, -): void { - pushInboundCommand(channel, { +): Promise { + return pushInboundCommand(channel, { v: 1, seq: Date.now(), computer_id: options.computerId, @@ -400,8 +984,8 @@ export function sendComputerUseStreamIceCandidate( streamId?: string | null; candidate: RTCIceCandidateInit; } & StreamTokenOptions, -): void { - pushInboundCommand(channel, { +): Promise { + return pushInboundCommand(channel, { v: 1, seq: Date.now(), computer_id: options.computerId, @@ -425,8 +1009,8 @@ export function requestComputerUseManualControl( manualControlId?: string | null; reason?: string | null; } & StreamTokenOptions, -): void { - pushInboundCommand(channel, { +): Promise { + return pushInboundCommand(channel, { v: 1, seq: Date.now(), computer_id: options.computerId, @@ -450,8 +1034,8 @@ export function heartbeatComputerUseManualControl( manualControlId?: string | null; reason?: string | null; } & StreamTokenOptions, -): void { - pushInboundCommand(channel, { +): Promise { + return pushInboundCommand(channel, { v: 1, seq: Date.now(), computer_id: options.computerId, @@ -474,8 +1058,8 @@ export function releaseComputerUseManualControl( deviceId: string; manualControlId?: string | null; } & StreamTokenOptions, -): void { - pushInboundCommand(channel, { +): Promise { + return pushInboundCommand(channel, { v: 1, seq: Date.now(), computer_id: options.computerId, @@ -514,8 +1098,8 @@ export function sendComputerUseManualInput( reason?: string; }; } & StreamTokenOptions, -): void { - pushInboundCommand(channel, { +): Promise { + return pushInboundCommand(channel, { v: 1, seq: Date.now(), computer_id: options.computerId, diff --git a/cloud/src/lib/relay/sessions-shell-context.tsx b/cloud/src/lib/relay/sessions-shell-context.tsx index 7c8a0832..24e2c28d 100644 --- a/cloud/src/lib/relay/sessions-shell-context.tsx +++ b/cloud/src/lib/relay/sessions-shell-context.tsx @@ -6,6 +6,7 @@ import type { SessionKind, VisibleSessionSummary, } from "#/lib/relay/session-store"; +import type { RemoteControlJoinState } from "#/lib/relay/use-session-stream"; export interface ActiveSessionTarget { computerId: string; @@ -24,7 +25,9 @@ export interface SessionsShellContextValue { computerPresenceKnown: boolean; currentComputerOnline: boolean; currentComputerReconciled: boolean; + currentComputerRemoteControl: RemoteControlJoinState | null; isSessionDirectoryLoading: boolean; + remoteControlByComputer: Record; topBarAccessoryElement: HTMLDivElement | null; visibleSessionsVersion: number; selectSession: (computerId: string, sessionId: string) => void; diff --git a/cloud/src/lib/relay/use-session-stream.test.ts b/cloud/src/lib/relay/use-session-stream.test.ts index e8681ac1..b170bb89 100644 --- a/cloud/src/lib/relay/use-session-stream.test.ts +++ b/cloud/src/lib/relay/use-session-stream.test.ts @@ -7,6 +7,7 @@ import { } from "./session-store"; import { iceServersFromJoinPayload, + remoteControlFromJoinPayload, remoteSnapshotControlSelection, streamRunIdFromJoinPayload, streamTokenFromJoinPayload, @@ -60,6 +61,27 @@ describe("remoteVisibleSessionUpdateFromEnvelope", () => { expect(streamRunIdFromJoinPayload({ stream_run_id: "" })).toBeNull(); }); + it("normalizes remote-control availability from session joins", () => { + expect( + remoteControlFromJoinPayload({ + remote_control: { + available: false, + reason: "computer_use_connection_already_active", + message: "Stop the running connection first.", + ownerDeviceId: "web-1", + startedAt: "2026-05-29T18:29:00Z", + }, + }), + ).toEqual({ + available: false, + reason: "computer_use_connection_already_active", + message: "Stop the running connection first.", + ownerDeviceId: "web-1", + startedAt: "2026-05-29T18:29:00Z", + }); + expect(remoteControlFromJoinPayload({})).toBeNull(); + }); + it("uses durable composer settings when a snapshot has no active runtime run", () => { expect( remoteSnapshotControlSelection({ diff --git a/cloud/src/lib/relay/use-session-stream.ts b/cloud/src/lib/relay/use-session-stream.ts index 2454044e..f0baea88 100644 --- a/cloud/src/lib/relay/use-session-stream.ts +++ b/cloud/src/lib/relay/use-session-stream.ts @@ -1,5 +1,5 @@ import { type Channel, Presence, type Socket } from "phoenix"; -import { useEffect, useRef, useState } from "react"; +import { useCallback, useEffect, useRef, useState } from "react"; import { applyRemoteThemeEnvelope } from "#/lib/theme/cloud-theme"; import type { AccountDevice } from "../auth/session"; import { decodeRelayFrame } from "./envelope"; @@ -88,6 +88,7 @@ interface UseSessionStreamOptions { export interface AccountRemoteSessionsState { sessions: VisibleSessionSummary[]; projects: RemoteProjectSummary[]; + remoteControlByComputer: Record; startSession: ( project: RemoteProjectSummary, options?: { sessionKind?: SessionKind }, @@ -102,6 +103,14 @@ type RelayIceServer = RTCIceServer & { credentialType?: "password" | "oauth"; }; +export interface RemoteControlJoinState { + available: boolean; + reason: string | null; + message: string | null; + ownerDeviceId: string | null; + startedAt: string | null; +} + /** * Connects to a remote session channel and pushes decoded snapshot/event frames * into the Zustand session store. Returns the underlying channel so callers can @@ -116,12 +125,15 @@ export function useSessionStream({ channel: Channel | null; iceServers: RTCIceServer[]; joinRejected: boolean; + remoteControl: RemoteControlJoinState | null; streamRunId: string | null; streamToken: string | null; } { const [channel, setChannel] = useState(null); const [iceServers, setIceServers] = useState([]); const [joinRejected, setJoinRejected] = useState(false); + const [remoteControl, setRemoteControl] = + useState(null); const [streamRunId, setStreamRunId] = useState(null); const [streamToken, setStreamToken] = useState(null); const replaceWithSnapshot = useSessionStore((s) => s.replaceWithSnapshot); @@ -137,6 +149,7 @@ export function useSessionStream({ setChannel(null); setIceServers([]); setJoinRejected(false); + setRemoteControl(null); setStreamRunId(null); setStreamToken(null); return; @@ -155,6 +168,7 @@ export function useSessionStream({ initialLastSeq, (joinedChannel, payload) => { setIceServers(iceServersFromJoinPayload(payload)); + setRemoteControl(remoteControlFromJoinPayload(payload)); setStreamRunId(streamRunIdFromJoinPayload(payload)); setStreamToken(streamTokenFromJoinPayload(payload)); if (!disposed) setChannel(joinedChannel); @@ -164,6 +178,7 @@ export function useSessionStream({ removeVisibleSession(computerId, sessionId); setLive(key, false); setIceServers([]); + setRemoteControl(null); setStreamRunId(null); setStreamToken(null); setJoinRejected(true); @@ -257,6 +272,7 @@ export function useSessionStream({ return () => { disposed = true; setLive(key, false); + setRemoteControl(null); setStreamRunId(null); setStreamToken(null); sessionChannel.leave(); @@ -275,21 +291,73 @@ export function useSessionStream({ updateControls, ]); - return { channel, iceServers, joinRejected, streamRunId, streamToken }; + return { + channel, + iceServers, + joinRejected, + remoteControl, + streamRunId, + streamToken, + }; +} + +export function remoteControlFromJoinPayload( + payload: unknown, +): RemoteControlJoinState | null { + if (!payload || typeof payload !== "object") return null; + const record = payload as Record; + const value = record.remote_control ?? record.remoteControl; + if (!value || typeof value !== "object") return null; + const remoteControl = value as Record; + return { + available: remoteControl.available !== false, + reason: stringFromJoinPayload(remoteControl.reason), + message: stringFromJoinPayload(remoteControl.message), + ownerDeviceId: stringFromJoinPayload( + remoteControl.ownerDeviceId ?? remoteControl.owner_device_id, + ), + startedAt: stringFromJoinPayload( + remoteControl.startedAt ?? remoteControl.started_at, + ), + }; +} + +function availableRemoteControlJoinState(): RemoteControlJoinState { + return { + available: true, + reason: null, + message: null, + ownerDeviceId: null, + startedAt: null, + }; +} + +function remoteControlJoinStatesEqual( + left: RemoteControlJoinState | undefined, + right: RemoteControlJoinState, +): boolean { + if (!left) return false; + return ( + left.available === right.available && + left.reason === right.reason && + left.message === right.message && + left.ownerDeviceId === right.ownerDeviceId && + left.startedAt === right.startedAt + ); } export function streamRunIdFromJoinPayload(payload: unknown): string | null { if (!payload || typeof payload !== "object") return null; const record = payload as Record; const runId = record.stream_run_id ?? record.streamRunId; - return typeof runId === "string" && runId.trim() ? runId.trim() : null; + return stringFromJoinPayload(runId); } export function streamTokenFromJoinPayload(payload: unknown): string | null { if (!payload || typeof payload !== "object") return null; const record = payload as Record; const token = record.stream_token ?? record.streamToken; - return typeof token === "string" && token.trim() ? token.trim() : null; + return stringFromJoinPayload(token); } export function iceServersFromJoinPayload(payload: unknown): RTCIceServer[] { @@ -323,6 +391,10 @@ function iceServerUrls(value: unknown): string | string[] | null { return urls.length > 0 ? urls : null; } +function stringFromJoinPayload(value: unknown): string | null { + return typeof value === "string" && value.trim() ? value.trim() : null; +} + /** Subscribe account presence and request visible sessions from online desktops. */ export function useAccountRemoteSessions( relayToken: string, @@ -356,6 +428,24 @@ export function useAccountRemoteSessions( const projectListChannelsRef = useRef(new Map()); const newSessionChannelsRef = useRef(new Map()); const themeChannelsRef = useRef(new Map()); + const [remoteControlByComputer, setRemoteControlByComputer] = useState< + Record + >({}); + const applyRemoteControlJoinState = useCallback( + (computerId: string, payload: unknown) => { + const remoteControl = + remoteControlFromJoinPayload(payload) ?? + availableRemoteControlJoinState(); + setRemoteControlByComputer((current) => { + const previous = current[computerId]; + if (remoteControlJoinStatesEqual(previous, remoteControl)) { + return current; + } + return { ...current, [computerId]: remoteControl }; + }); + }, + [], + ); useEffect(() => { if (!relayTokenRef.current || !accountId) return; @@ -405,6 +495,17 @@ export function useAccountRemoteSessions( ); clearVisibleSessionsForComputers(offlineComputerIds); clearRemoteProjectsForComputers(offlineComputerIds); + setRemoteControlByComputer((current) => { + let changed = false; + const next = { ...current }; + for (const computerId of offlineComputerIds) { + if (computerId in next) { + delete next[computerId]; + changed = true; + } + } + return changed ? next : current; + }); const applyUpdate = (update: RemoteVisibleSessionUpdate) => { if (update.kind === "replace") { replaceVisibleSessionsForComputer(update.computerId, update.sessions); @@ -423,8 +524,9 @@ export function useAccountRemoteSessions( computerId, "__sessions__", 0, - (joinedChannel) => { + (joinedChannel, payload) => { if (disposed || !webDeviceId) return; + applyRemoteControlJoinState(computerId, payload); stopReconciliationTimers.push( scheduleRemoteListReconciliation({ isDisposed: () => disposed, @@ -569,6 +671,7 @@ export function useAccountRemoteSessions( clearVisibleSessionsForComputers, devices, onlineComputerIds, + applyRemoteControlJoinState, relayTokenRef, removeVisibleSession, replaceRemoteProjectsForComputer, @@ -582,6 +685,9 @@ export function useAccountRemoteSessions( options: { sessionKind?: SessionKind } = {}, ): boolean => { if (!webDeviceId) return false; + if (remoteControlByComputer[project.computerId]?.available === false) { + return false; + } const channel = newSessionChannelsRef.current.get(project.computerId); if (!channel) return false; const sessionKind = options.sessionKind ?? "standard"; @@ -597,6 +703,9 @@ export function useAccountRemoteSessions( const archiveSession = (summary: VisibleSessionSummary): boolean => { if (!webDeviceId) return false; + if (remoteControlByComputer[summary.computerId]?.available === false) { + return false; + } const channel = sessionListChannelsRef.current.get(summary.computerId); if (!channel) return false; requestSessionArchive(channel, { @@ -619,6 +728,7 @@ export function useAccountRemoteSessions( return { sessions, projects, + remoteControlByComputer, startSession, archiveSession, }; diff --git a/cloud/src/routes/-desktop-click-ripple.test.tsx b/cloud/src/routes/-desktop-click-ripple.test.tsx index 6eb74c60..7b2bc9ad 100644 --- a/cloud/src/routes/-desktop-click-ripple.test.tsx +++ b/cloud/src/routes/-desktop-click-ripple.test.tsx @@ -95,6 +95,74 @@ describe("ComputerUseDesktopViewport click feedback", () => { ); }); + it("tells the user to stop the other cloud connection before starting here", async () => { + const push = vi.fn((_event: string, frame: Record) => { + const response = { + command: { + schema: "xero.remote_command_outcome.v1", + clientCommandId: frame.clientCommandId, + clientSeq: frame.clientSeq, + kind: frame.kind, + outcome: "rejected", + priority: frame.priority, + reason: "computer_use_connection_already_active", + message: + "Computer Use is already connected from another device or location. Stop the running connection first to use it here.", + sentAt: frame.sentAt, + }, + }; + + return { + receive(status: string, callback: (payload: unknown) => void) { + if (status === "error") queueMicrotask(() => callback(response)); + return this; + }, + }; + }); + const channel = { + on: vi.fn(() => "frame-ref"), + off: vi.fn(), + push, + } as unknown as Channel; + + render( + , + ); + + const desktop = screen.getByLabelText("Desktop"); + const toolbar = within(desktop).getByRole("toolbar", { + name: "Desktop stream controls", + }); + fireEvent.click(within(toolbar).getByRole("button", { name: /start/i })); + + expect(await screen.findByText("Already in use")).toBeTruthy(); + expect( + screen.getByText( + "Stop the running connection in the other cloud app before using it here.", + ), + ).toBeTruthy(); + expect( + within(toolbar).getByRole("button", { name: /start/i }), + ).toBeTruthy(); + }); + it("retries the desktop stream request when connecting stalls before media arrives", () => { vi.useFakeTimers(); try { @@ -144,6 +212,168 @@ describe("ComputerUseDesktopViewport click feedback", () => { } }); + it("keeps a healthy live WebRTC stream mounted when decoded frames are quiet", async () => { + vi.useFakeTimers(); + const peerConnections = installMockPeerConnection(); + try { + let frameHandler: ((rawFrame: unknown) => void) | null = null; + const push = vi.fn(); + const channel = { + on: vi.fn((event: string, handler: (rawFrame: unknown) => void) => { + if (event === "frame") frameHandler = handler; + return "frame-ref"; + }), + off: vi.fn(), + push, + } as unknown as Channel; + + render( + , + ); + + const desktop = screen.getByLabelText("Desktop"); + const toolbar = within(desktop).getByRole("toolbar", { + name: "Desktop stream controls", + }); + fireEvent.click(within(toolbar).getByRole("button", { name: /start/i })); + expect(streamRequestCalls(push)).toHaveLength(1); + + await act(async () => { + frameHandler?.( + relayFrame({ + schema: "xero.computer_use_stream_offer.v1", + streamId: "stream-1", + payload: { + type: "offer", + sdp: "v=0\r\n", + }, + desktop: { + stream: { + status: "starting", + transport: "web_rtc", + quality: "balanced", + }, + }, + }), + ); + await Promise.resolve(); + }); + expect(peerConnections.instances).toHaveLength(1); + + act(() => { + peerConnections.instances[0]?.emitTrack(); + }); + expect(desktop.querySelector("video")).toBeTruthy(); + + push.mockClear(); + act(() => { + vi.advanceTimersByTime(9_000); + }); + + expect(streamRequestCalls(push)).toHaveLength(0); + expect(desktop.querySelector("video")).toBeTruthy(); + expect(screen.queryByText("Connecting stream")).toBeNull(); + } finally { + peerConnections.restore(); + vi.useRealTimers(); + } + }); + + it("queues ICE candidates that arrive before the WebRTC offer is applied", async () => { + const peerConnections = installMockPeerConnection(); + try { + let frameHandler: ((rawFrame: unknown) => void) | null = null; + const push = vi.fn(); + const channel = { + on: vi.fn((event: string, handler: (rawFrame: unknown) => void) => { + if (event === "frame") frameHandler = handler; + return "frame-ref"; + }), + off: vi.fn(), + push, + } as unknown as Channel; + + render( + , + ); + + await act(async () => { + frameHandler?.( + relayFrame({ + schema: "xero.computer_use_stream_ice_candidate.v1", + streamId: "stream-1", + payload: { + candidate: { + candidate: "candidate:1", + sdpMid: "0", + sdpMLineIndex: 0, + }, + }, + }), + ); + frameHandler?.( + relayFrame({ + schema: "xero.computer_use_stream_offer.v1", + streamId: "stream-1", + payload: { + type: "offer", + sdp: "v=0\r\n", + }, + desktop: { + stream: { + status: "starting", + transport: "web_rtc", + quality: "balanced", + }, + }, + }), + ); + await Promise.resolve(); + }); + + expect(peerConnections.instances).toHaveLength(1); + expect(peerConnections.instances[0]?.addedIceCandidates).toEqual([ + expect.objectContaining({ candidate: "candidate:1" }), + ]); + } finally { + peerConnections.restore(); + } + }); + it("shows a click ripple where manual input lands on the streamed desktop", async () => { const { desktop, image, push } = await renderManualDesktopViewport(); image.getBoundingClientRect = () => domRect(0, 0, 640, 360); @@ -187,6 +417,84 @@ describe("ComputerUseDesktopViewport click feedback", () => { ); }); + it("does not send manual input before the desktop grants control", async () => { + const { desktop, image, push, toolbar } = await renderManualDesktopViewport( + { + grantManual: false, + }, + ); + image.getBoundingClientRect = () => domRect(0, 0, 640, 360); + desktop.getBoundingClientRect = () => domRect(0, 0, 640, 360); + push.mockClear(); + + expect( + within(toolbar) + .getByRole("button", { name: /requesting/i }) + .hasAttribute("disabled"), + ).toBe(true); + + fireEvent.pointerDown(desktop, { + button: 0, + clientX: 160, + clientY: 90, + detail: 1, + pointerId: 71, + }); + fireEvent.pointerUp(desktop, { + button: 0, + clientX: 160, + clientY: 90, + detail: 1, + pointerId: 71, + }); + + expect(push).not.toHaveBeenCalled(); + expect(desktop.querySelector(".desktop-click-ripple")).toBeNull(); + }); + + it("shows a visible manual-control denial and keeps input disabled", async () => { + const { desktop, frameHandler, image, manualControlId, push, toolbar } = + await renderManualDesktopViewport({ grantManual: false }); + image.getBoundingClientRect = () => domRect(0, 0, 640, 360); + desktop.getBoundingClientRect = () => domRect(0, 0, 640, 360); + + act(() => { + frameHandler?.( + relayFrame({ + schema: "xero.computer_use_manual_control_request.v1", + ok: false, + outcome: "rejected", + manualControlId, + streamId: "stream-1", + }), + ); + }); + + await waitFor(() => { + expect( + within(toolbar).getByRole("button", { name: /retry/i }), + ).toBeTruthy(); + }); + + push.mockClear(); + fireEvent.pointerDown(desktop, { + button: 0, + clientX: 160, + clientY: 90, + detail: 1, + pointerId: 72, + }); + fireEvent.pointerUp(desktop, { + button: 0, + clientX: 160, + clientY: 90, + detail: 1, + pointerId: 72, + }); + + expect(push).not.toHaveBeenCalled(); + }); + it("maps manual input against the painted stream area when object-contain letterboxes the media", async () => { const { desktop, image, push } = await renderManualDesktopViewport(); image.getBoundingClientRect = () => domRect(0, 0, 640, 640); @@ -461,7 +769,7 @@ describe("ComputerUseDesktopViewport click feedback", () => { await waitFor(() => { expect(mediaLayer.style.transform).toContain("scale(2)"); }); - expect(push).not.toHaveBeenCalled(); + expect(manualInputCalls(push)).toHaveLength(0); fireEvent.pointerUp(desktop, { clientX: 120, @@ -793,12 +1101,14 @@ async function armManualKeyboardCapture() { } async function renderManualDesktopViewport({ + grantManual = true, presentation = { isMobile: false, override: "desktop", rotateDesktop: false, }, }: { + grantManual?: boolean; presentation?: { isMobile: boolean; override: "auto" | "desktop" | "mobile"; @@ -882,13 +1192,48 @@ async function renderManualDesktopViewport({ ).toBe(false); }); fireEvent.click(within(toolbar).getByRole("button", { name: /manual/i })); + const manualRequest = push.mock.calls + .map( + ([, frame]) => + frame as { kind?: string; payload?: { manualControlId?: string } }, + ) + .find((frame) => frame.kind === "computer_use_manual_control_request"); + const manualControlId = manualRequest?.payload?.manualControlId; + expect(manualControlId).toBeTruthy(); + if (grantManual) { + act(() => { + frameHandler?.( + relayFrame({ + schema: "xero.computer_use_manual_control_request.v1", + ok: true, + outcome: "executed", + manualControlId, + streamId: "stream-1", + }), + ); + }); + } await waitFor(() => { - expect( - within(toolbar).getByRole("button", { name: /release/i }), - ).toBeTruthy(); + if (grantManual) { + expect( + within(toolbar).getByRole("button", { name: /release/i }), + ).toBeTruthy(); + } else { + expect( + within(toolbar).getByRole("button", { name: /requesting/i }), + ).toBeTruthy(); + } }); - return { desktop, image, keyboard, push, toolbar }; + return { + desktop, + frameHandler, + image, + keyboard, + manualControlId, + push, + toolbar, + }; } function fireBeforeInput( @@ -913,6 +1258,81 @@ function fireTextInput(element: HTMLTextAreaElement, text: string) { }); } +function installMockPeerConnection() { + const previous = globalThis.RTCPeerConnection; + const instances: MockPeerConnection[] = []; + + class MockPeerConnection { + addedIceCandidates: RTCIceCandidateInit[] = []; + connectionState: RTCPeerConnectionState = "connected"; + iceConnectionState: RTCIceConnectionState = "connected"; + localDescription: RTCSessionDescriptionInit | null = null; + onconnectionstatechange: ((event: Event) => void) | null = null; + ondatachannel: ((event: RTCDataChannelEvent) => void) | null = null; + onicecandidate: ((event: RTCPeerConnectionIceEvent) => void) | null = null; + ontrack: ((event: RTCTrackEvent) => void) | null = null; + remoteDescription: RTCSessionDescriptionInit | null = null; + + constructor() { + instances.push(this); + } + + async addIceCandidate(candidate: RTCIceCandidateInit) { + this.addedIceCandidates.push(candidate); + return undefined; + } + + close() { + this.connectionState = "closed"; + this.iceConnectionState = "closed"; + this.onconnectionstatechange?.(new Event("connectionstatechange")); + } + + async createAnswer(): Promise { + return { type: "answer", sdp: "v=0\r\nmock-answer" }; + } + + emitTrack() { + this.ontrack?.({ + streams: [{} as MediaStream], + } as unknown as RTCTrackEvent); + } + + getReceivers() { + return [ + { + track: { stop: vi.fn() }, + }, + ]; + } + + async setLocalDescription(description: RTCSessionDescriptionInit) { + this.localDescription = description; + } + + async setRemoteDescription(description: RTCSessionDescriptionInit) { + this.remoteDescription = description; + } + } + + Object.defineProperty(globalThis, "RTCPeerConnection", { + configurable: true, + writable: true, + value: MockPeerConnection, + }); + + return { + instances, + restore: () => { + Object.defineProperty(globalThis, "RTCPeerConnection", { + configurable: true, + writable: true, + value: previous, + }); + }, + }; +} + function relayFrame(payload: unknown) { const bytes = msgpackEncode({ v: 1, @@ -942,6 +1362,13 @@ function streamRequestCalls(push: ReturnType) { ); } +function manualInputCalls(push: ReturnType) { + return push.mock.calls.filter( + ([, frame]) => + (frame as { kind?: string }).kind === "computer_use_manual_control_input", + ); +} + function domRect(left: number, top: number, width: number, height: number) { return { bottom: top + height, diff --git a/cloud/src/routes/-sessions-shell.test.tsx b/cloud/src/routes/-sessions-shell.test.tsx index 5d3e2115..454b5fe2 100644 --- a/cloud/src/routes/-sessions-shell.test.tsx +++ b/cloud/src/routes/-sessions-shell.test.tsx @@ -39,6 +39,23 @@ const streamMock = vi.hoisted(() => ({ on: ReturnType; off: ReturnType; }, + remoteControl: null as null | { + available: boolean; + reason: string | null; + message: string | null; + ownerDeviceId: string | null; + startedAt: string | null; + }, + remoteControlByComputer: {} as Record< + string, + { + available: boolean; + reason: string | null; + message: string | null; + ownerDeviceId: string | null; + startedAt: string | null; + } + >, })); const DESKTOP_CONTROL_PRESENTATION_STORAGE_KEY = @@ -104,6 +121,7 @@ vi.mock("#/lib/relay/use-session-stream", async () => { return { sessions: streamMock.sessions, projects: streamMock.projects, + remoteControlByComputer: streamMock.remoteControlByComputer, startSession: streamMock.startSession, archiveSession: streamMock.archiveSession, }; @@ -112,6 +130,7 @@ vi.mock("#/lib/relay/use-session-stream", async () => { channel: streamMock.channel, iceServers: [], joinRejected: false, + remoteControl: streamMock.remoteControl, streamRunId: null, streamToken: null, }), @@ -254,6 +273,8 @@ beforeEach(() => { streamMock.accountHookMounts = 0; streamMock.accountHookUnmounts = 0; streamMock.channel = null; + streamMock.remoteControl = null; + streamMock.remoteControlByComputer = {}; window.localStorage.removeItem(DESKTOP_CONTROL_PRESENTATION_STORAGE_KEY); useSessionStore.setState({ transcripts: {}, @@ -689,6 +710,64 @@ describe.sequential("cloud sessions shell", () => { }); }); + it("disables Computer Use when another cloud app owns the connection", async () => { + setupComputerUseSession({ withSnapshot: false }); + streamMock.remoteControlByComputer = { + "desktop-1": { + available: false, + reason: "computer_use_connection_already_active", + message: + "Stop the running connection in the other cloud app before using it here.", + ownerDeviceId: "web-other", + startedAt: "2026-05-29T18:29:00Z", + }, + }; + + renderCloudRoute(`/sessions/desktop-1/${REMOTE_COMPUTER_USE_SESSION_ID}`); + + const desktopButton = await screen.findByRole("button", { + name: "Open desktop controls", + }); + expect((desktopButton as HTMLButtonElement).disabled).toBe(true); + expect( + screen.getByRole("heading", { name: "Computer Use is already in use" }), + ).toBeTruthy(); + expect( + screen.getByText( + "Stop the running connection in the other cloud app before using it here.", + ), + ).toBeTruthy(); + expect(screen.queryByText("Use an app")).toBeNull(); + + expect(screen.queryByTestId("composer")).toBeNull(); + expect(streamMock.composerProps).toHaveLength(0); + }); + + it("disables standard sessions when another cloud app owns the desktop", async () => { + streamMock.remoteControlByComputer = { + "desktop-1": { + available: false, + reason: "computer_use_connection_already_active", + message: + "Stop the running connection in the other cloud app before using it here.", + ownerDeviceId: "web-other", + startedAt: "2026-05-29T18:29:00Z", + }, + }; + + renderCloudRoute("/sessions/desktop-1/session-1"); + + expect( + await screen.findByRole("heading", { + name: "Xero Cloud is already connected elsewhere", + }), + ).toBeTruthy(); + expect(screen.queryByLabelText("Loading")).toBeNull(); + + expect(screen.queryByTestId("composer")).toBeNull(); + expect(streamMock.composerProps).toHaveLength(0); + }); + it("renders Computer Use after reload before the synthetic session has a transcript snapshot", async () => { setupComputerUseSession({ withSnapshot: false }); diff --git a/cloud/src/routes/sessions.$computerId.$sessionId.tsx b/cloud/src/routes/sessions.$computerId.$sessionId.tsx index 064d3b6f..fec9597a 100644 --- a/cloud/src/routes/sessions.$computerId.$sessionId.tsx +++ b/cloud/src/routes/sessions.$computerId.$sessionId.tsx @@ -70,6 +70,7 @@ import { LoadingScreen } from "#/components/loading-screen"; import { decodeRelayFrame } from "#/lib/relay/envelope"; import { answerComputerUseStreamOffer, + type CommandAckResult, heartbeatComputerUseManualControl, type InboundCommand, pushInboundCommand, @@ -155,13 +156,27 @@ function SessionView() { const contextSnapshot = transcript?.contextSnapshot ?? null; const contextSnapshotError = transcript?.contextSnapshotError ?? null; const isLive = transcript?.isLive ?? false; - const { channel, iceServers, joinRejected, streamRunId, streamToken } = - useSessionStream({ - computerId, - enabled: currentComputerOnline && currentSessionAvailable, - sessionId, - relayToken: session.relayToken, - }); + const { + channel, + iceServers, + joinRejected, + remoteControl, + streamRunId, + streamToken, + } = useSessionStream({ + computerId, + enabled: currentComputerOnline && currentSessionAvailable, + sessionId, + relayToken: session.relayToken, + }); + const hostRemoteControl = shell.currentComputerRemoteControl ?? remoteControl; + const hostConnectionBlocked = hostRemoteControl?.available === false; + const hostConnectionBlockedTitle = isComputerUseSession + ? "Computer Use is already in use" + : "Xero Cloud is already connected elsewhere"; + const hostConnectionBlockedMessage = + hostRemoteControl?.message ?? + "Stop the running connection in the other cloud app before using it here."; const resolvedTurns = useResolvedRemoteMedia({ channel, computerId, @@ -357,6 +372,7 @@ function SessionView() { useEffect(() => { if (!channel || !session.deviceId || !currentSessionAvailable) return; + if (hostConnectionBlocked) return; if (storedTranscript) return; const requestSnapshot = () => { if (useSessionStore.getState().transcripts[key]) return; @@ -373,6 +389,7 @@ function SessionView() { channel, computerId, currentSessionAvailable, + hostConnectionBlocked, key, session.deviceId, sessionId, @@ -381,6 +398,7 @@ function SessionView() { useEffect(() => { if (!channel || !session.deviceId || !transcript || isLive) return; + if (hostConnectionBlocked) return; if (isComputerUseSession) return; if (contextRequestSettled) { setPendingContextRequestKey((current) => @@ -405,6 +423,7 @@ function SessionView() { contextRequestKey, contextRequestSettled, debouncedDraftPrompt, + hostConnectionBlocked, isLive, isComputerUseSession, pendingContextRequestKey, @@ -424,6 +443,7 @@ function SessionView() { const pushControlUpdate = useCallback( (overrides: ControlUpdateOverrides = {}) => { + if (hostConnectionBlocked) return; if (!channel || !session.deviceId) return; const nextAgentId = isComputerUseSession ? "computer_use" @@ -472,6 +492,7 @@ function SessionView() { autoCompactEnabled, channel, computerId, + hostConnectionBlocked, resolvedAgentId, resolvedApprovalMode, resolvedModelId, @@ -486,6 +507,7 @@ function SessionView() { const dispatchSend = useCallback( (submittedPrompt?: string) => { const message = (submittedPrompt ?? draftPrompt).trim(); + if (hostConnectionBlocked) return; if (!channel || !message || !session.deviceId) return; const readyAttachments = attachmentsHook.getReadyAttachments(); const payload: Record = { @@ -536,6 +558,7 @@ function SessionView() { autoCompactEnabled, channel, computerId, + hostConnectionBlocked, draftPrompt, followLatestConversation, key, @@ -662,6 +685,9 @@ function SessionView() { const [computerUseSidebarDensity, setComputerUseSidebarDensity] = useState("comfortable"); const mobileTextKeyboardActive = useMobileTextKeyboardActive(); + useEffect(() => { + if (hostConnectionBlocked) setDesktopControlsOpen(false); + }, [hostConnectionBlocked]); const renderConversationPane = ( surface: "main" | "sidebar" = "main", density: ComputerUseSidebarDensity = "comfortable", @@ -690,7 +716,19 @@ function SessionView() { : "max-w-3xl gap-4 pb-24 lg:max-w-[47rem]", )} > - {transcript ? ( + {hostConnectionBlocked && !transcript ? ( + shouldCollapseEmptyState ? null : ( +
+ +
+ ) + ) : transcript ? ( turns.length === 0 ? ( shouldCollapseEmptyState ? null : (
@@ -700,7 +738,19 @@ function SessionView() { isComputerUseSession ? "computer-use" : "default" } variant={isCompactSidebar ? "dense" : "default"} - onSelectSuggestion={setDraftPrompt} + disabledTitle={ + hostConnectionBlocked + ? hostConnectionBlockedTitle + : undefined + } + disabledDescription={ + hostConnectionBlocked + ? hostConnectionBlockedMessage + : undefined + } + onSelectSuggestion={ + hostConnectionBlocked ? undefined : setDraftPrompt + } />
) @@ -729,62 +779,64 @@ function SessionView() { isSidebarSurface ? "from-sidebar" : "from-background", )} /> -
+ {hostConnectionBlocked ? null : (
- +
+ +
-
+ )} ); }; @@ -795,6 +847,8 @@ function SessionView() { ([ "degraded", "manual", ]); - function createManualControlId(deviceId: string, sessionId: string): string { const nonce = Math.random().toString(36).slice(2, 10); const issuedAt = Date.now().toString(36); @@ -1055,6 +1119,8 @@ const MANUAL_KEYBOARD_COMPOSITION_DUPLICATE_MS = 80; const DESKTOP_MOBILE_ZOOM_MIN = 1; const DESKTOP_MOBILE_ZOOM_MAX = 4; const DESKTOP_POINTER_TAP_SLOP_PX = 8; +const REMOTE_CONTROL_ALREADY_ACTIVE_REASON = + "computer_use_connection_already_active"; type ManualKeyboardCaptureState = "inactive" | "armed" | "composing"; @@ -1431,6 +1497,8 @@ interface ComputerUseDesktopViewportProps { interface ComputerUseDesktopControlsProps extends ComputerUseDesktopViewportProps { agentSidebar: ReactNode; + disabled?: boolean; + disabledReason?: string; onAgentSidebarDensityChange: (density: ComputerUseSidebarDensity) => void; onOpenChange: (open: boolean) => void; open: boolean; @@ -1438,6 +1506,8 @@ interface ComputerUseDesktopControlsProps function ComputerUseDesktopDialog({ agentSidebar, + disabled = false, + disabledReason, onAgentSidebarDensityChange, onOpenChange, open, @@ -1451,10 +1521,14 @@ function ComputerUseDesktopDialog({ size="sm" aria-label="Open desktop controls" className="h-8 gap-2 px-2.5" + disabled={disabled} + title={disabled ? disabledReason : undefined} onClick={ - presentation.isMobile - ? () => requestNativeDesktopOrientationLock() - : () => onOpenChange(true) + disabled + ? undefined + : presentation.isMobile + ? () => requestNativeDesktopOrientationLock() + : () => onOpenChange(true) } >