Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions PolyPilot.Tests/BridgeDisconnectTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -348,4 +348,32 @@ public async Task SyncRemoteSessions_AllowsSessionsListToClearProcessing()

Assert.False(session.IsProcessing);
}

[Fact]
public async Task ForceSync_ClearsIsProcessing_EvenWithStreamingGuard()
{
// Scenario: Streaming guard is stuck (TurnStart received but TurnEnd lost).
// SyncRemoteSessions skips the session. But ForceRefreshRemoteAsync should
// always apply the server's authoritative IsProcessing state.
var svc = CreateRemoteService();
await AddRemoteSession(svc, "stuck-session");
var session = svc.GetSession("stuck-session")!;

// Session appears processing with a stuck streaming guard
session.IsProcessing = true;
svc.SetRemoteStreamingGuardForTesting("stuck-session", true);

// SyncRemoteSessions should skip (streaming guard active)
_bridgeClient.Sessions = new() { new SessionSummary { Name = "stuck-session", IsProcessing = false } };
svc.SyncRemoteSessions();
Assert.True(session.IsProcessing); // Guard blocks the update

// Force sync should override the streaming guard
_bridgeClient.SessionHistories["stuck-session"] = new List<ChatMessage>();
var result = await svc.ForceRefreshRemoteAsync("stuck-session");

Assert.True(result.Success);
Assert.False(session.IsProcessing);
Assert.False(svc.IsRemoteStreamingGuardActive("stuck-session"));
}
}
32 changes: 32 additions & 0 deletions PolyPilot/Services/CopilotService.Bridge.cs
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ private async Task InitializeRemoteAsync(ConnectionSettings settings, Cancellati
{
Debug($"[BRIDGE-SESSION-COMPLETE] '{session.Name}' clearing stale IsProcessing");
session.IsProcessing = false;
session.IsResumed = false;
session.ProcessingStartedAt = null;
session.ToolCallCount = 0;
session.ProcessingPhase = 0;
Expand Down Expand Up @@ -528,13 +529,23 @@ internal void SyncRemoteSessions()

if (!turnEndGuardActive)
{
if (state.Info.IsProcessing != rs.IsProcessing)
Debug($"SyncRemoteSessions: '{rs.Name}' IsProcessing {state.Info.IsProcessing} -> {rs.IsProcessing}");
state.Info.IsProcessing = rs.IsProcessing;
state.Info.ProcessingStartedAt = rs.ProcessingStartedAt;
state.Info.ToolCallCount = rs.ToolCallCount;
state.Info.ProcessingPhase = rs.ProcessingPhase;
}
else
{
Debug($"SyncRemoteSessions: '{rs.Name}' TurnEnd guard blocked IsProcessing=true");
}
state.Info.MessageCount = rs.MessageCount;
}
else
{
Debug($"SyncRemoteSessions: '{rs.Name}' skipped — streaming guard active");
}
if (!string.IsNullOrEmpty(rs.Model))
state.Info.Model = rs.Model;
}
Expand Down Expand Up @@ -730,6 +741,27 @@ public async Task<SyncResult> ForceRefreshRemoteAsync(string? activeSessionName
}
}

// Force-sync processing state for ALL sessions from the server snapshot.
// SyncRemoteSessions skips sessions in _remoteStreamingSessions, but a user-initiated
// force sync should always apply the server's authoritative IsProcessing state.
// Also clear stuck streaming guards — if the server says a session is idle,
// any lingering guard from a dropped connection should be cleared.
foreach (var rs in _bridgeClient.Sessions)
{
if (_sessions.TryGetValue(rs.Name, out var syncState))
{
if (syncState.Info.IsProcessing != rs.IsProcessing)
Debug($"[SYNC] '{rs.Name}' IsProcessing {syncState.Info.IsProcessing} -> {rs.IsProcessing}");
syncState.Info.IsProcessing = rs.IsProcessing;
syncState.Info.ProcessingStartedAt = rs.ProcessingStartedAt;
syncState.Info.ToolCallCount = rs.ToolCallCount;
syncState.Info.ProcessingPhase = rs.ProcessingPhase;
// Clear stuck streaming guard if server says session is idle
if (!rs.IsProcessing)
_remoteStreamingSessions.TryRemove(rs.Name, out _);
}
}

// Snapshot post-sync state
var postSyncSessionCount = _sessions.Count;
var postSyncMessageCount = 0;
Expand Down
11 changes: 10 additions & 1 deletion PolyPilot/Services/CopilotService.Events.cs
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,7 @@ void Invoke(Action action)
// Do NOT treat this as terminal — flush text and wait for the real idle.
if (HasActiveBackgroundTasks(idle))
{
state.HasDeferredIdle = true; // Track for watchdog freshness window
Debug($"[IDLE-DEFER] '{sessionName}' session.idle received with active background tasks — " +
$"deferring completion (IsProcessing={state.Info.IsProcessing}, " +
$"response={state.CurrentResponse.Length}+{state.FlushedResponse.Length} chars)");
Expand Down Expand Up @@ -781,6 +782,7 @@ await notifService.SendNotificationAsync(
CancelToolHealthCheck(state);
Interlocked.Exchange(ref state.ActiveToolCallCount, 0);
state.HasUsedToolsThisTurn = false;
state.HasDeferredIdle = false;
Interlocked.Exchange(ref state.SuccessfulToolCountThisTurn, 0);
Interlocked.Exchange(ref state.ToolHealthStaleChecks, 0);
Interlocked.Exchange(ref state.EventCountThisTurn, 0);
Expand Down Expand Up @@ -1027,6 +1029,7 @@ private void CompleteResponse(SessionState state, long? expectedGeneration = nul
CancelToolHealthCheck(state);
Interlocked.Exchange(ref state.ActiveToolCallCount, 0);
state.HasUsedToolsThisTurn = false;
state.HasDeferredIdle = false;
state.IsReconnectedSend = false; // Clear reconnect flag on turn completion (defense-in-depth)
state.FallbackCanceledByTurnStart = false;
Interlocked.Exchange(ref state.SuccessfulToolCountThisTurn, 0);
Expand Down Expand Up @@ -1775,6 +1778,7 @@ private void TriggerToolHealthRecovery(SessionState state, string sessionName, s
// Full cleanup mirroring CompleteResponse — missing fields here caused stuck sessions
Interlocked.Exchange(ref state.ActiveToolCallCount, 0);
state.HasUsedToolsThisTurn = false;
state.HasDeferredIdle = false;
state.FallbackCanceledByTurnStart = false;
Interlocked.Exchange(ref state.SuccessfulToolCountThisTurn, 0);
Interlocked.Exchange(ref state.WatchdogCaseAResets, 0);
Expand Down Expand Up @@ -2173,7 +2177,7 @@ private async Task RunProcessingWatchdogAsync(SessionState state, string session
// - "after turn start" alone stays true forever once any event is written
// - "recent" alone could match stale files from a previous turn
var caseBEventsActive = false;
var freshnessSeconds = isMultiAgentSession
var freshnessSeconds = (isMultiAgentSession || state.HasDeferredIdle)
? WatchdogMultiAgentCaseBFreshnessSeconds
: WatchdogCaseBFreshnessSeconds;
try
Expand Down Expand Up @@ -2348,6 +2352,7 @@ private async Task RunProcessingWatchdogAsync(SessionState state, string session
CancelToolHealthCheck(state);
Interlocked.Exchange(ref state.ActiveToolCallCount, 0);
state.HasUsedToolsThisTurn = false;
state.HasDeferredIdle = false;
Interlocked.Exchange(ref state.SuccessfulToolCountThisTurn, 0);
Interlocked.Exchange(ref state.ToolHealthStaleChecks, 0);
Interlocked.Exchange(ref state.EventCountThisTurn, 0);
Expand Down Expand Up @@ -2442,6 +2447,7 @@ private async Task RunProcessingWatchdogAsync(SessionState state, string session
Interlocked.Exchange(ref state.SendingFlag, 0);
Interlocked.Exchange(ref state.ActiveToolCallCount, 0);
state.HasUsedToolsThisTurn = false;
state.HasDeferredIdle = false;
Interlocked.Exchange(ref state.SuccessfulToolCountThisTurn, 0);
Interlocked.Exchange(ref state.ToolHealthStaleChecks, 0);
Interlocked.Exchange(ref state.EventCountThisTurn, 0);
Expand Down Expand Up @@ -2497,6 +2503,7 @@ private void ClearProcessingStateForRecoveryFailure(SessionState state, string s
state.Info.IsProcessing = false;
state.Info.IsResumed = false;
state.HasUsedToolsThisTurn = false;
state.HasDeferredIdle = false;
Interlocked.Exchange(ref state.SuccessfulToolCountThisTurn, 0);
Interlocked.Exchange(ref state.ActiveToolCallCount, 0);
Interlocked.Exchange(ref state.ToolHealthStaleChecks, 0);
Expand Down Expand Up @@ -2595,6 +2602,7 @@ private async Task TryRecoverPermissionAsync(SessionState state, string sessionN
Interlocked.Exchange(ref state.SendingFlag, 0);
// Clear stale tool flag so watchdog uses normal timeout if resend is skipped
newState.HasUsedToolsThisTurn = false;
newState.HasDeferredIdle = false;

// Replace in sessions dictionary BEFORE registering event handler
// so HandleSessionEvent's isCurrentState check passes for the new state.
Expand Down Expand Up @@ -2632,6 +2640,7 @@ private async Task TryRecoverPermissionAsync(SessionState state, string sessionN
state.Info.IsProcessing = false;
state.Info.IsResumed = false;
state.HasUsedToolsThisTurn = false;
state.HasDeferredIdle = false;
Interlocked.Exchange(ref state.SuccessfulToolCountThisTurn, 0);
Interlocked.Exchange(ref state.ActiveToolCallCount, 0);
Interlocked.Exchange(ref state.ToolHealthStaleChecks, 0);
Expand Down
1 change: 1 addition & 0 deletions PolyPilot/Services/CopilotService.Organization.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2084,6 +2084,7 @@ private async Task ForceCompleteProcessingAsync(string sessionName, SessionState
Interlocked.Exchange(ref state.WatchdogCaseBLastFileSize, 0);
Interlocked.Exchange(ref state.WatchdogCaseBStaleCount, 0);
state.HasUsedToolsThisTurn = false;
state.HasDeferredIdle = false;
state.FallbackCanceledByTurnStart = false;
state.Info.IsResumed = false;
state.Info.ProcessingStartedAt = null;
Expand Down
34 changes: 31 additions & 3 deletions PolyPilot/Services/CopilotService.Persistence.cs
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,12 @@ private async Task EnsureSessionConnectedAsync(string sessionName, SessionState
// waiting for tool results that will never arrive. It silently queues/ignores
// new SendAsync calls until the pending tools are resolved. An explicit abort
// clears this state and allows new messages to flow.
if (wasResumed && HasInterruptedToolExecution(sessionId))
//
// IMPORTANT: Only abort if the CLI has actually stopped working. In persistent
// mode, the headless server keeps running tools even while PolyPilot is down.
// If IsSessionStillProcessing() says the CLI is active, the tool results WILL
// arrive — aborting would kill legitimate in-progress work.
if (wasResumed && HasInterruptedToolExecution(sessionId) && !IsSessionStillProcessing(sessionId))
{
Debug($"[RESUME-ABORT] '{sessionName}' has interrupted tool execution — sending abort to clear pending state");
try
Expand All @@ -420,6 +425,23 @@ private async Task EnsureSessionConnectedAsync(string sessionName, SessionState
Debug($"[RESUME-ABORT] '{sessionName}' abort failed (non-fatal): {abortEx.Message}");
}
}
else if (wasResumed && HasInterruptedToolExecution(sessionId))
{
Debug($"[RESUME-SKIP-ABORT] '{sessionName}' has unmatched tool starts but CLI is still active — NOT aborting");
// The CLI is still running tools — mark the session as processing so the UI
// shows it as busy. Set watchdog flags so it gets the longer tool timeout.
// INV-2: marshal to UI thread — EnsureSessionConnectedAsync runs from Task.Run.
InvokeOnUI(() =>
{
state.Info.IsProcessing = true;
state.Info.IsResumed = true;
state.HasUsedToolsThisTurn = true;
state.Info.ProcessingPhase = 3; // Working
state.Info.ProcessingStartedAt = DateTime.UtcNow;
StartProcessingWatchdog(state, sessionName);
NotifyStateChanged();
});
}

Debug($"Lazy-resume complete: '{sessionName}'");
}
Expand Down Expand Up @@ -601,10 +623,16 @@ public async Task RestorePreviousSessionsAsync(CancellationToken cancellationTok
_sessions[entry.DisplayName] = lazyState;
_activeSessionName ??= entry.DisplayName;
RestoreUsageStats(entry);
if (!string.IsNullOrWhiteSpace(entry.LastPrompt))
// Eagerly resume sessions that are still actively processing on the
// headless server. Check events.jsonl (authoritative) first, then fall
// back to LastPrompt (saved when IsProcessing=true at debounce time).
// Without this, actively-running sessions appear idle after app restart
// because they're only loaded as lazy placeholders with no SDK connection.
var isStillActive = IsSessionStillProcessing(entry.SessionId);
if (isStillActive || !string.IsNullOrWhiteSpace(entry.LastPrompt))
{
eagerResumeCandidates.Add((entry.DisplayName, lazyState));
Debug($"Queued eager resume for interrupted session: {entry.DisplayName}");
Debug($"Queued eager resume for interrupted session: {entry.DisplayName} (active={isStillActive}, hasLastPrompt={!string.IsNullOrWhiteSpace(entry.LastPrompt)})");
}
Debug($"Loaded session placeholder: {entry.DisplayName} ({lazyHistory.Count} messages)");
}
Expand Down
Loading