Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 56 additions & 8 deletions crates/jp_cli/src/cmd/query/stream/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
//!
//! [`StreamError::is_retryable`]: jp_llm::StreamError::is_retryable

use std::sync::Arc;
use std::{fmt::Write as _, sync::Arc};

use jp_config::assistant::request::RequestConfig;
use jp_conversation::{ConversationStream, event::ChatResponse};
Expand All @@ -40,33 +40,61 @@ use crate::{
/// Tracks retry state for stream errors within a single turn.
///
/// Counts consecutive stream failures and enforces retry limits from
/// [`RequestConfig`]. The counter resets when a streaming cycle completes
/// successfully (i.e., `Event::Finished` is received).
/// [`RequestConfig`]. The counter resets when a new streaming cycle produces
/// its first successful event.
pub struct StreamRetryState {
/// Retry configuration (max retries, backoff parameters).
config: RequestConfig,

/// Number of consecutive stream failures without a successful cycle.
consecutive_failures: u32,

/// Whether a temporary retry notification line is currently displayed.
///
/// When `true`, the next retry or successful event should overwrite the
/// line using `\r\x1b[K` rather than printing a new one.
line_active: bool,

/// Whether output is a TTY (enables temp-line rewriting).
is_tty: bool,
}

impl StreamRetryState {
/// Create a new retry state from the given configuration.
pub fn new(config: RequestConfig) -> Self {
pub fn new(config: RequestConfig, is_tty: bool) -> Self {
Self {
config,
consecutive_failures: 0,
line_active: false,
is_tty,
}
}

/// Reset the failure counter after a successful streaming cycle.
/// Reset the failure counter.
///
/// Call this when `Event::Finished` is received, indicating the stream
/// completed without error.
/// Call this when the first successful LLM event arrives in a new streaming
/// cycle. This ensures that partially successful streams (e.g. rate-limited
/// mid-response) don't permanently consume the retry budget.
pub fn reset(&mut self) {
self.consecutive_failures = 0;
}

/// Clear the retry notification line if one is currently displayed.
///
/// Call this when the first successful event arrives, before rendering any
/// LLM content.
pub fn clear_line(&mut self, printer: &Printer) {
if !self.line_active {
return;
}

if self.is_tty {
let _ = write!(printer.out_writer(), "\r\x1b[K");
}

self.line_active = false;
}

/// Check whether we should retry the given error.
fn can_retry(&self, error: &StreamError) -> bool {
error.is_retryable() && self.consecutive_failures < self.config.max_retries
Expand All @@ -93,6 +121,22 @@ impl StreamRetryState {
),
}
}

/// Write the retry notification, overwriting any previous retry line on TTY
/// or printing a new permanent line otherwise.
fn notify(&mut self, kind: &str, printer: &Printer) {
let attempt = self.consecutive_failures;
let max = self.config.max_retries;
let msg = format!("⚠ {kind}, retrying ({attempt}/{max})…");

if self.is_tty {
// Overwrite any previous retry line in-place.
let _ = write!(printer.out_writer(), "\r\x1b[K{msg}");
self.line_active = true;
} else {
printer.println(msg);
}
}
}

/// Single source of truth for handling stream errors during LLM streaming.
Expand All @@ -116,6 +160,10 @@ pub async fn handle_stream_error(
printer: &Arc<Printer>,
) -> LoopAction<Result<(), Error>> {
if !retry_state.can_retry(&error) {
// Clear the temp line before printing the final error so it doesn't
// linger on screen.
retry_state.clear_line(printer);

error!("Stream error (not retryable or max retries exceeded): {error}");
return LoopAction::Return(Err(jp_llm::Error::Stream(error).into()));
}
Expand Down Expand Up @@ -145,7 +193,7 @@ pub async fn handle_stream_error(
let kind = error.kind.as_str();

warn!(attempt, max, kind, "{error}");
printer.println(format!("⚠ {kind}, retrying ({attempt}/{max})…"));
retry_state.notify(kind, printer);

// 5. Backoff.
let delay = retry_state.backoff_duration(&error);
Expand Down
4 changes: 2 additions & 2 deletions crates/jp_cli/src/cmd/query/stream/retry_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ fn make_retry_state(max_retries: u32) -> StreamRetryState {
max_backoff_secs: 1,
cache: CachePolicy::default(),
};
StreamRetryState::new(config)
StreamRetryState::new(config, false)
}

fn make_turn_coordinator() -> TurnCoordinator {
Expand Down Expand Up @@ -68,7 +68,7 @@ fn backoff_uses_retry_after_when_present() {
max_backoff_secs: 120,
cache: CachePolicy::default(),
};
let state = StreamRetryState::new(config);
let state = StreamRetryState::new(config, false);
let err = StreamError::rate_limit(Some(Duration::from_secs(42)));
assert_eq!(state.backoff_duration(&err), Duration::from_secs(42));
}
Expand Down
14 changes: 12 additions & 2 deletions crates/jp_cli/src/cmd/query/turn_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ pub(super) async fn run_turn_loop(
chat_request: ChatRequest,
) -> Result<(), Error> {
let mut turn_state = TurnState::default();
let mut stream_retry = StreamRetryState::new(cfg.assistant.request);
let mut stream_retry = StreamRetryState::new(cfg.assistant.request, is_tty);
let mut turn_coordinator = TurnCoordinator::new(printer.clone(), cfg.style.clone());
let mut tool_renderer = ToolRenderer::new(
if cfg.style.tool_call.show && !printer.format().is_json() {
Expand Down Expand Up @@ -281,6 +281,7 @@ pub(super) async fn run_turn_loop(
let mut perm_skipped = vec![];
let mut perm_unavailable = vec![];
let mut perm_tool_index: usize = 0;
let mut received_provider_event = false;

let mut streams: SelectAll<_> =
SelectAll::from_iter([sig_stream, llm_stream, tick_stream]);
Expand Down Expand Up @@ -349,6 +350,16 @@ pub(super) async fn run_turn_loop(
}
};

// Reset the retry counter on the first successful
// event in this cycle. This ensures that partially
// successful streams (rate-limited mid-response)
// don't permanently consume the retry budget.
if !received_provider_event {
received_provider_event = true;
stream_retry.clear_line(&printer);
stream_retry.reset();
}

// Register preparing tool calls. Flush the markdown
// buffer first so buffered text appears before the
// "Calling tool" line (fixes Issue 1).
Expand Down Expand Up @@ -476,7 +487,6 @@ pub(super) async fn run_turn_loop(
}

if is_finished {
stream_retry.reset();
tool_renderer.cancel_all();
}
}
Expand Down
Loading
Loading