diff --git a/PolyPilot.Tests/BridgeDisconnectTests.cs b/PolyPilot.Tests/BridgeDisconnectTests.cs index ad56ffb4d..fdca1b24a 100644 --- a/PolyPilot.Tests/BridgeDisconnectTests.cs +++ b/PolyPilot.Tests/BridgeDisconnectTests.cs @@ -348,4 +348,32 @@ public async Task SyncRemoteSessions_AllowsSessionsListToClearProcessing() Assert.False(session.IsProcessing); } + + [Fact] + public async Task ForceSync_ClearsIsProcessing_EvenWithStreamingGuard() + { + // Scenario: Streaming guard is stuck (TurnStart received but TurnEnd lost). + // SyncRemoteSessions skips the session. But ForceRefreshRemoteAsync should + // always apply the server's authoritative IsProcessing state. + var svc = CreateRemoteService(); + await AddRemoteSession(svc, "stuck-session"); + var session = svc.GetSession("stuck-session")!; + + // Session appears processing with a stuck streaming guard + session.IsProcessing = true; + svc.SetRemoteStreamingGuardForTesting("stuck-session", true); + + // SyncRemoteSessions should skip (streaming guard active) + _bridgeClient.Sessions = new() { new SessionSummary { Name = "stuck-session", IsProcessing = false } }; + svc.SyncRemoteSessions(); + Assert.True(session.IsProcessing); // Guard blocks the update + + // Force sync should override the streaming guard + _bridgeClient.SessionHistories["stuck-session"] = new List(); + var result = await svc.ForceRefreshRemoteAsync("stuck-session"); + + Assert.True(result.Success); + Assert.False(session.IsProcessing); + Assert.False(svc.IsRemoteStreamingGuardActive("stuck-session")); + } } diff --git a/PolyPilot/Services/CopilotService.Bridge.cs b/PolyPilot/Services/CopilotService.Bridge.cs index a3ff0fbbd..068b01f8c 100644 --- a/PolyPilot/Services/CopilotService.Bridge.cs +++ b/PolyPilot/Services/CopilotService.Bridge.cs @@ -294,6 +294,7 @@ private async Task InitializeRemoteAsync(ConnectionSettings settings, Cancellati { Debug($"[BRIDGE-SESSION-COMPLETE] '{session.Name}' clearing stale IsProcessing"); session.IsProcessing = false; + session.IsResumed = false; session.ProcessingStartedAt = null; session.ToolCallCount = 0; session.ProcessingPhase = 0; @@ -528,13 +529,23 @@ internal void SyncRemoteSessions() if (!turnEndGuardActive) { + if (state.Info.IsProcessing != rs.IsProcessing) + Debug($"SyncRemoteSessions: '{rs.Name}' IsProcessing {state.Info.IsProcessing} -> {rs.IsProcessing}"); state.Info.IsProcessing = rs.IsProcessing; state.Info.ProcessingStartedAt = rs.ProcessingStartedAt; state.Info.ToolCallCount = rs.ToolCallCount; state.Info.ProcessingPhase = rs.ProcessingPhase; } + else + { + Debug($"SyncRemoteSessions: '{rs.Name}' TurnEnd guard blocked IsProcessing=true"); + } state.Info.MessageCount = rs.MessageCount; } + else + { + Debug($"SyncRemoteSessions: '{rs.Name}' skipped — streaming guard active"); + } if (!string.IsNullOrEmpty(rs.Model)) state.Info.Model = rs.Model; } @@ -730,6 +741,27 @@ public async Task ForceRefreshRemoteAsync(string? activeSessionName } } + // Force-sync processing state for ALL sessions from the server snapshot. + // SyncRemoteSessions skips sessions in _remoteStreamingSessions, but a user-initiated + // force sync should always apply the server's authoritative IsProcessing state. + // Also clear stuck streaming guards — if the server says a session is idle, + // any lingering guard from a dropped connection should be cleared. + foreach (var rs in _bridgeClient.Sessions) + { + if (_sessions.TryGetValue(rs.Name, out var syncState)) + { + if (syncState.Info.IsProcessing != rs.IsProcessing) + Debug($"[SYNC] '{rs.Name}' IsProcessing {syncState.Info.IsProcessing} -> {rs.IsProcessing}"); + syncState.Info.IsProcessing = rs.IsProcessing; + syncState.Info.ProcessingStartedAt = rs.ProcessingStartedAt; + syncState.Info.ToolCallCount = rs.ToolCallCount; + syncState.Info.ProcessingPhase = rs.ProcessingPhase; + // Clear stuck streaming guard if server says session is idle + if (!rs.IsProcessing) + _remoteStreamingSessions.TryRemove(rs.Name, out _); + } + } + // Snapshot post-sync state var postSyncSessionCount = _sessions.Count; var postSyncMessageCount = 0; diff --git a/PolyPilot/Services/CopilotService.Events.cs b/PolyPilot/Services/CopilotService.Events.cs index 9e8246f45..b4b6cf9fd 100644 --- a/PolyPilot/Services/CopilotService.Events.cs +++ b/PolyPilot/Services/CopilotService.Events.cs @@ -643,6 +643,7 @@ void Invoke(Action action) // Do NOT treat this as terminal — flush text and wait for the real idle. if (HasActiveBackgroundTasks(idle)) { + state.HasDeferredIdle = true; // Track for watchdog freshness window Debug($"[IDLE-DEFER] '{sessionName}' session.idle received with active background tasks — " + $"deferring completion (IsProcessing={state.Info.IsProcessing}, " + $"response={state.CurrentResponse.Length}+{state.FlushedResponse.Length} chars)"); @@ -781,6 +782,7 @@ await notifService.SendNotificationAsync( CancelToolHealthCheck(state); Interlocked.Exchange(ref state.ActiveToolCallCount, 0); state.HasUsedToolsThisTurn = false; + state.HasDeferredIdle = false; Interlocked.Exchange(ref state.SuccessfulToolCountThisTurn, 0); Interlocked.Exchange(ref state.ToolHealthStaleChecks, 0); Interlocked.Exchange(ref state.EventCountThisTurn, 0); @@ -1027,6 +1029,7 @@ private void CompleteResponse(SessionState state, long? expectedGeneration = nul CancelToolHealthCheck(state); Interlocked.Exchange(ref state.ActiveToolCallCount, 0); state.HasUsedToolsThisTurn = false; + state.HasDeferredIdle = false; state.IsReconnectedSend = false; // Clear reconnect flag on turn completion (defense-in-depth) state.FallbackCanceledByTurnStart = false; Interlocked.Exchange(ref state.SuccessfulToolCountThisTurn, 0); @@ -1775,6 +1778,7 @@ private void TriggerToolHealthRecovery(SessionState state, string sessionName, s // Full cleanup mirroring CompleteResponse — missing fields here caused stuck sessions Interlocked.Exchange(ref state.ActiveToolCallCount, 0); state.HasUsedToolsThisTurn = false; + state.HasDeferredIdle = false; state.FallbackCanceledByTurnStart = false; Interlocked.Exchange(ref state.SuccessfulToolCountThisTurn, 0); Interlocked.Exchange(ref state.WatchdogCaseAResets, 0); @@ -2173,7 +2177,7 @@ private async Task RunProcessingWatchdogAsync(SessionState state, string session // - "after turn start" alone stays true forever once any event is written // - "recent" alone could match stale files from a previous turn var caseBEventsActive = false; - var freshnessSeconds = isMultiAgentSession + var freshnessSeconds = (isMultiAgentSession || state.HasDeferredIdle) ? WatchdogMultiAgentCaseBFreshnessSeconds : WatchdogCaseBFreshnessSeconds; try @@ -2348,6 +2352,7 @@ private async Task RunProcessingWatchdogAsync(SessionState state, string session CancelToolHealthCheck(state); Interlocked.Exchange(ref state.ActiveToolCallCount, 0); state.HasUsedToolsThisTurn = false; + state.HasDeferredIdle = false; Interlocked.Exchange(ref state.SuccessfulToolCountThisTurn, 0); Interlocked.Exchange(ref state.ToolHealthStaleChecks, 0); Interlocked.Exchange(ref state.EventCountThisTurn, 0); @@ -2442,6 +2447,7 @@ private async Task RunProcessingWatchdogAsync(SessionState state, string session Interlocked.Exchange(ref state.SendingFlag, 0); Interlocked.Exchange(ref state.ActiveToolCallCount, 0); state.HasUsedToolsThisTurn = false; + state.HasDeferredIdle = false; Interlocked.Exchange(ref state.SuccessfulToolCountThisTurn, 0); Interlocked.Exchange(ref state.ToolHealthStaleChecks, 0); Interlocked.Exchange(ref state.EventCountThisTurn, 0); @@ -2497,6 +2503,7 @@ private void ClearProcessingStateForRecoveryFailure(SessionState state, string s state.Info.IsProcessing = false; state.Info.IsResumed = false; state.HasUsedToolsThisTurn = false; + state.HasDeferredIdle = false; Interlocked.Exchange(ref state.SuccessfulToolCountThisTurn, 0); Interlocked.Exchange(ref state.ActiveToolCallCount, 0); Interlocked.Exchange(ref state.ToolHealthStaleChecks, 0); @@ -2595,6 +2602,7 @@ private async Task TryRecoverPermissionAsync(SessionState state, string sessionN Interlocked.Exchange(ref state.SendingFlag, 0); // Clear stale tool flag so watchdog uses normal timeout if resend is skipped newState.HasUsedToolsThisTurn = false; + newState.HasDeferredIdle = false; // Replace in sessions dictionary BEFORE registering event handler // so HandleSessionEvent's isCurrentState check passes for the new state. @@ -2632,6 +2640,7 @@ private async Task TryRecoverPermissionAsync(SessionState state, string sessionN state.Info.IsProcessing = false; state.Info.IsResumed = false; state.HasUsedToolsThisTurn = false; + state.HasDeferredIdle = false; Interlocked.Exchange(ref state.SuccessfulToolCountThisTurn, 0); Interlocked.Exchange(ref state.ActiveToolCallCount, 0); Interlocked.Exchange(ref state.ToolHealthStaleChecks, 0); diff --git a/PolyPilot/Services/CopilotService.Organization.cs b/PolyPilot/Services/CopilotService.Organization.cs index 394f2d144..31c410505 100644 --- a/PolyPilot/Services/CopilotService.Organization.cs +++ b/PolyPilot/Services/CopilotService.Organization.cs @@ -2084,6 +2084,7 @@ private async Task ForceCompleteProcessingAsync(string sessionName, SessionState Interlocked.Exchange(ref state.WatchdogCaseBLastFileSize, 0); Interlocked.Exchange(ref state.WatchdogCaseBStaleCount, 0); state.HasUsedToolsThisTurn = false; + state.HasDeferredIdle = false; state.FallbackCanceledByTurnStart = false; state.Info.IsResumed = false; state.Info.ProcessingStartedAt = null; diff --git a/PolyPilot/Services/CopilotService.Persistence.cs b/PolyPilot/Services/CopilotService.Persistence.cs index 3d73b18e3..432c342a2 100644 --- a/PolyPilot/Services/CopilotService.Persistence.cs +++ b/PolyPilot/Services/CopilotService.Persistence.cs @@ -407,7 +407,12 @@ private async Task EnsureSessionConnectedAsync(string sessionName, SessionState // waiting for tool results that will never arrive. It silently queues/ignores // new SendAsync calls until the pending tools are resolved. An explicit abort // clears this state and allows new messages to flow. - if (wasResumed && HasInterruptedToolExecution(sessionId)) + // + // IMPORTANT: Only abort if the CLI has actually stopped working. In persistent + // mode, the headless server keeps running tools even while PolyPilot is down. + // If IsSessionStillProcessing() says the CLI is active, the tool results WILL + // arrive — aborting would kill legitimate in-progress work. + if (wasResumed && HasInterruptedToolExecution(sessionId) && !IsSessionStillProcessing(sessionId)) { Debug($"[RESUME-ABORT] '{sessionName}' has interrupted tool execution — sending abort to clear pending state"); try @@ -420,6 +425,23 @@ private async Task EnsureSessionConnectedAsync(string sessionName, SessionState Debug($"[RESUME-ABORT] '{sessionName}' abort failed (non-fatal): {abortEx.Message}"); } } + else if (wasResumed && HasInterruptedToolExecution(sessionId)) + { + Debug($"[RESUME-SKIP-ABORT] '{sessionName}' has unmatched tool starts but CLI is still active — NOT aborting"); + // The CLI is still running tools — mark the session as processing so the UI + // shows it as busy. Set watchdog flags so it gets the longer tool timeout. + // INV-2: marshal to UI thread — EnsureSessionConnectedAsync runs from Task.Run. + InvokeOnUI(() => + { + state.Info.IsProcessing = true; + state.Info.IsResumed = true; + state.HasUsedToolsThisTurn = true; + state.Info.ProcessingPhase = 3; // Working + state.Info.ProcessingStartedAt = DateTime.UtcNow; + StartProcessingWatchdog(state, sessionName); + NotifyStateChanged(); + }); + } Debug($"Lazy-resume complete: '{sessionName}'"); } @@ -601,10 +623,16 @@ public async Task RestorePreviousSessionsAsync(CancellationToken cancellationTok _sessions[entry.DisplayName] = lazyState; _activeSessionName ??= entry.DisplayName; RestoreUsageStats(entry); - if (!string.IsNullOrWhiteSpace(entry.LastPrompt)) + // Eagerly resume sessions that are still actively processing on the + // headless server. Check events.jsonl (authoritative) first, then fall + // back to LastPrompt (saved when IsProcessing=true at debounce time). + // Without this, actively-running sessions appear idle after app restart + // because they're only loaded as lazy placeholders with no SDK connection. + var isStillActive = IsSessionStillProcessing(entry.SessionId); + if (isStillActive || !string.IsNullOrWhiteSpace(entry.LastPrompt)) { eagerResumeCandidates.Add((entry.DisplayName, lazyState)); - Debug($"Queued eager resume for interrupted session: {entry.DisplayName}"); + Debug($"Queued eager resume for interrupted session: {entry.DisplayName} (active={isStillActive}, hasLastPrompt={!string.IsNullOrWhiteSpace(entry.LastPrompt)})"); } Debug($"Loaded session placeholder: {entry.DisplayName} ({lazyHistory.Count} messages)"); } diff --git a/PolyPilot/Services/CopilotService.cs b/PolyPilot/Services/CopilotService.cs index 7dc691855..3747ce3a4 100644 --- a/PolyPilot/Services/CopilotService.cs +++ b/PolyPilot/Services/CopilotService.cs @@ -534,6 +534,11 @@ private class SessionState /// When this reaches WatchdogCaseBMaxStaleChecks, deferral is stopped even if the file /// modification time is within the freshness window (dead connection detected). public int WatchdogCaseBStaleCount; + /// True when an IDLE-DEFER has been observed for this session — the CLI reported + /// active background tasks (subagents/shells). The watchdog uses this to apply the longer + /// multi-agent freshness window even for non-multi-agent-group sessions, because the CLI + /// has confirmed it's running background work that won't produce events.jsonl writes. + public volatile bool HasDeferredIdle; /// True if the TurnEnd→Idle fallback was canceled by an AssistantTurnStartEvent. /// Used for diagnostic logging: when the next TurnEnd re-arms the fallback, the log shows /// the self-healing loop in action (TurnEnd → TurnStart cancel → TurnEnd re-arm). @@ -1087,6 +1092,7 @@ public async Task ReconnectAsync(ConnectionSettings settings, CancellationToken _sessions.Clear(); _closedSessionIds.Clear(); _closedSessionNames.Clear(); + _recentTurnEndSessions.Clear(); lock (_imageQueueLock) { _queuedImagePaths.Clear(); @@ -1300,6 +1306,7 @@ public async Task RestartServerAsync(CancellationToken cancellationToken = defau _sessions.Clear(); _closedSessionIds.Clear(); _closedSessionNames.Clear(); + _recentTurnEndSessions.Clear(); // 2. Dispose old client if (_client != null) @@ -3049,6 +3056,7 @@ public async Task SendPromptAsync(string sessionName, string prompt, Lis state.Info.ClearPermissionDenials(); Interlocked.Exchange(ref state.ActiveToolCallCount, 0); // Reset stale tool count from previous turn state.HasUsedToolsThisTurn = false; // Reset stale tool flag from previous turn + state.HasDeferredIdle = false; // Reset deferred idle flag from previous turn state.IsReconnectedSend = false; // Clear reconnect flag — new turn starts fresh (see watchdog reconnect timeout) state.PrematureIdleSignal.Reset(); // Clear premature idle detection from previous turn state.FallbackCanceledByTurnStart = false; @@ -3337,6 +3345,7 @@ public async Task SendPromptAsync(string sessionName, string prompt, Lis }; // Mirror primary reconnect: reset tool tracking for new connection siblingState.HasUsedToolsThisTurn = false; + siblingState.HasDeferredIdle = false; Interlocked.Exchange(ref siblingState.ActiveToolCallCount, 0); Interlocked.Exchange(ref siblingState.SuccessfulToolCountThisTurn, 0); Interlocked.Exchange(ref siblingState.ToolHealthStaleChecks, 0); @@ -3528,6 +3537,7 @@ public async Task SendPromptAsync(string sessionName, string prompt, Lis // inflates the watchdog timeout from 120s to 600s, making stuck // sessions wait 5x longer than necessary to recover. newState.HasUsedToolsThisTurn = false; + newState.HasDeferredIdle = false; Interlocked.Exchange(ref newState.ActiveToolCallCount, 0); Interlocked.Exchange(ref newState.SuccessfulToolCountThisTurn, 0); newState.IsMultiAgentSession = state.IsMultiAgentSession; @@ -3563,6 +3573,7 @@ public async Task SendPromptAsync(string sessionName, string prompt, Lis // Reset HasUsedToolsThisTurn so the retried turn starts with the default // 120s watchdog tier instead of the inflated 600s from stale tool state. state.HasUsedToolsThisTurn = false; + state.HasDeferredIdle = false; // Schedule persistence of the new session ID so it survives app restart. // Without this, the debounced save captures the pre-reconnect snapshot @@ -3624,6 +3635,7 @@ public async Task SendPromptAsync(string sessionName, string prompt, Lis Debug($"[ERROR] '{sessionName}' reconnect+retry failed, clearing IsProcessing"); Interlocked.Exchange(ref state.ActiveToolCallCount, 0); state.HasUsedToolsThisTurn = false; + state.HasDeferredIdle = false; Interlocked.Exchange(ref state.SuccessfulToolCountThisTurn, 0); state.Info.IsResumed = false; state.Info.IsProcessing = false; @@ -3646,6 +3658,7 @@ public async Task SendPromptAsync(string sessionName, string prompt, Lis Debug($"[ERROR] '{sessionName}' SendAsync failed, clearing IsProcessing (error={ex.Message})"); Interlocked.Exchange(ref state.ActiveToolCallCount, 0); state.HasUsedToolsThisTurn = false; + state.HasDeferredIdle = false; Interlocked.Exchange(ref state.SuccessfulToolCountThisTurn, 0); state.Info.IsResumed = false; state.Info.IsProcessing = false; @@ -3796,6 +3809,7 @@ public async Task AbortSessionAsync(string sessionName, bool markAsInterrupted = state.Info.ProcessingPhase = 0; Interlocked.Exchange(ref state.ActiveToolCallCount, 0); state.HasUsedToolsThisTurn = false; + state.HasDeferredIdle = false; state.IsReconnectedSend = false; // INV-1: clear all per-turn flags on abort Interlocked.Exchange(ref state.SuccessfulToolCountThisTurn, 0); // Release send lock — allows a subsequent SteerSessionAsync to acquire it immediately @@ -3903,6 +3917,7 @@ await InvokeOnUIAsync(() => Debug($"[STEER-ERROR] '{sessionName}' soft steer SendAsync failed, clearing IsProcessing (error={ex.Message})"); Interlocked.Exchange(ref state.ActiveToolCallCount, 0); state.HasUsedToolsThisTurn = false; + state.HasDeferredIdle = false; Interlocked.Exchange(ref state.SuccessfulToolCountThisTurn, 0); state.Info.IsResumed = false; Interlocked.Exchange(ref state.SendingFlag, 0); @@ -4188,6 +4203,7 @@ await InvokeOnUIAsync(() => state.Info.IsProcessing = false; state.Info.IsResumed = false; state.HasUsedToolsThisTurn = false; + state.HasDeferredIdle = false; Interlocked.Exchange(ref state.ActiveToolCallCount, 0); Interlocked.Exchange(ref state.SendingFlag, 0); state.Info.ProcessingStartedAt = null; diff --git a/PolyPilot/Services/WsBridgeServer.cs b/PolyPilot/Services/WsBridgeServer.cs index 8c751d793..b26dc6c4d 100644 --- a/PolyPilot/Services/WsBridgeServer.cs +++ b/PolyPilot/Services/WsBridgeServer.cs @@ -55,33 +55,44 @@ public void Start(int bridgePort, int targetPort) _bridgePort = bridgePort; _cts = new CancellationTokenSource(); - _listener = new HttpListener(); - _listener.Prefixes.Add($"http://+:{bridgePort}/"); - - try + if (TryBindListener(bridgePort)) { - _listener.Start(); - Console.WriteLine($"[WsBridge] Listening on port {bridgePort} (state-sync mode)"); _acceptTask = AcceptLoopAsync(_cts.Token); OnStateChanged?.Invoke(); } - catch (Exception ex) + else + { + // Port likely in TIME_WAIT from a previous instance (relaunch). + // Start the accept loop anyway — it will retry via TryRestartListenerAsync + // with exponential backoff until the port is released (typically 5-15s). + Console.WriteLine($"[WsBridge] Port {bridgePort} busy — will retry in accept loop"); + _acceptTask = AcceptLoopAsync(_cts.Token); + } + } + + /// + /// Try to bind the HttpListener on the given port. Tries wildcard first (LAN access), + /// falls back to localhost. Returns true if the listener is now listening. + /// + private bool TryBindListener(int port) + { + foreach (var prefix in new[] { $"http://+:{port}/", $"http://localhost:{port}/" }) { - Console.WriteLine($"[WsBridge] Failed to start on wildcard: {ex.Message}"); try { - _listener = new HttpListener(); - _listener.Prefixes.Add($"http://localhost:{bridgePort}/"); - _listener.Start(); - Console.WriteLine($"[WsBridge] Listening on localhost:{bridgePort} (state-sync mode)"); - _acceptTask = AcceptLoopAsync(_cts.Token); - OnStateChanged?.Invoke(); + var listener = new HttpListener(); + listener.Prefixes.Add(prefix); + listener.Start(); + _listener = listener; + Console.WriteLine($"[WsBridge] Listening on port {port} (state-sync mode)"); + return true; } - catch (Exception ex2) + catch (Exception ex) { - Console.WriteLine($"[WsBridge] Failed to start on localhost: {ex2.Message}"); + Console.WriteLine($"[WsBridge] Bind on {prefix} failed: {ex.Message}"); } } + return false; } /// @@ -293,26 +304,15 @@ private async Task TryRestartListenerAsync(CancellationToken ct) try { _listener?.Stop(); } catch { } _listener = null; - // Brief pause so the OS has time to release the port after a crash. - try { await Task.Delay(500, ct); } catch (OperationCanceledException) { return false; } + // Wait for the OS to release the port after the old process died. + // macOS TIME_WAIT can hold the port for several seconds after kill. + try { await Task.Delay(2000, ct); } catch (OperationCanceledException) { return false; } - // Try wildcard binding first (allows LAN / Tailscale access). - foreach (var prefix in new[] { $"http://+:{_bridgePort}/", $"http://localhost:{_bridgePort}/" }) + if (TryBindListener(_bridgePort)) { - try - { - var listener = new HttpListener(); - listener.Prefixes.Add(prefix); - listener.Start(); - _listener = listener; - Console.WriteLine($"[WsBridge] Restarted listening on {prefix}"); - OnStateChanged?.Invoke(); - return true; - } - catch (Exception ex) - { - Console.WriteLine($"[WsBridge] Restart on {prefix} failed: {ex.Message}"); - } + Console.WriteLine($"[WsBridge] Restarted listening on port {_bridgePort}"); + OnStateChanged?.Invoke(); + return true; } return false; }