diff --git a/crates/jp_cli/src/cmd/query/stream/retry.rs b/crates/jp_cli/src/cmd/query/stream/retry.rs index 3f4d83e3..c02b59b8 100644 --- a/crates/jp_cli/src/cmd/query/stream/retry.rs +++ b/crates/jp_cli/src/cmd/query/stream/retry.rs @@ -24,7 +24,7 @@ //! //! [`StreamError::is_retryable`]: jp_llm::StreamError::is_retryable -use std::sync::Arc; +use std::{fmt::Write as _, sync::Arc}; use jp_config::assistant::request::RequestConfig; use jp_conversation::{ConversationStream, event::ChatResponse}; @@ -40,33 +40,61 @@ use crate::{ /// Tracks retry state for stream errors within a single turn. /// /// Counts consecutive stream failures and enforces retry limits from -/// [`RequestConfig`]. The counter resets when a streaming cycle completes -/// successfully (i.e., `Event::Finished` is received). +/// [`RequestConfig`]. The counter resets when a new streaming cycle produces +/// its first successful event. pub struct StreamRetryState { /// Retry configuration (max retries, backoff parameters). config: RequestConfig, /// Number of consecutive stream failures without a successful cycle. consecutive_failures: u32, + + /// Whether a temporary retry notification line is currently displayed. + /// + /// When `true`, the next retry or successful event should overwrite the + /// line using `\r\x1b[K` rather than printing a new one. + line_active: bool, + + /// Whether output is a TTY (enables temp-line rewriting). + is_tty: bool, } impl StreamRetryState { /// Create a new retry state from the given configuration. - pub fn new(config: RequestConfig) -> Self { + pub fn new(config: RequestConfig, is_tty: bool) -> Self { Self { config, consecutive_failures: 0, + line_active: false, + is_tty, } } - /// Reset the failure counter after a successful streaming cycle. + /// Reset the failure counter. /// - /// Call this when `Event::Finished` is received, indicating the stream - /// completed without error. + /// Call this when the first successful LLM event arrives in a new streaming + /// cycle. This ensures that partially successful streams (e.g. rate-limited + /// mid-response) don't permanently consume the retry budget. pub fn reset(&mut self) { self.consecutive_failures = 0; } + /// Clear the retry notification line if one is currently displayed. + /// + /// Call this when the first successful event arrives, before rendering any + /// LLM content. + pub fn clear_line(&mut self, printer: &Printer) { + if !self.line_active { + return; + } + + if self.is_tty { + let _ = write!(printer.out_writer(), "\r\x1b[K"); + } + + self.line_active = false; + } + /// Check whether we should retry the given error. fn can_retry(&self, error: &StreamError) -> bool { error.is_retryable() && self.consecutive_failures < self.config.max_retries @@ -93,6 +121,22 @@ impl StreamRetryState { ), } } + + /// Write the retry notification, overwriting any previous retry line on TTY + /// or printing a new permanent line otherwise. + fn notify(&mut self, kind: &str, printer: &Printer) { + let attempt = self.consecutive_failures; + let max = self.config.max_retries; + let msg = format!("⚠ {kind}, retrying ({attempt}/{max})…"); + + if self.is_tty { + // Overwrite any previous retry line in-place. + let _ = write!(printer.out_writer(), "\r\x1b[K{msg}"); + self.line_active = true; + } else { + printer.println(msg); + } + } } /// Single source of truth for handling stream errors during LLM streaming. @@ -116,6 +160,10 @@ pub async fn handle_stream_error( printer: &Arc, ) -> LoopAction> { if !retry_state.can_retry(&error) { + // Clear the temp line before printing the final error so it doesn't + // linger on screen. + retry_state.clear_line(printer); + error!("Stream error (not retryable or max retries exceeded): {error}"); return LoopAction::Return(Err(jp_llm::Error::Stream(error).into())); } @@ -145,7 +193,7 @@ pub async fn handle_stream_error( let kind = error.kind.as_str(); warn!(attempt, max, kind, "{error}"); - printer.println(format!("⚠ {kind}, retrying ({attempt}/{max})…")); + retry_state.notify(kind, printer); // 5. Backoff. let delay = retry_state.backoff_duration(&error); diff --git a/crates/jp_cli/src/cmd/query/stream/retry_tests.rs b/crates/jp_cli/src/cmd/query/stream/retry_tests.rs index 90879226..c8f0168d 100644 --- a/crates/jp_cli/src/cmd/query/stream/retry_tests.rs +++ b/crates/jp_cli/src/cmd/query/stream/retry_tests.rs @@ -21,7 +21,7 @@ fn make_retry_state(max_retries: u32) -> StreamRetryState { max_backoff_secs: 1, cache: CachePolicy::default(), }; - StreamRetryState::new(config) + StreamRetryState::new(config, false) } fn make_turn_coordinator() -> TurnCoordinator { @@ -68,7 +68,7 @@ fn backoff_uses_retry_after_when_present() { max_backoff_secs: 120, cache: CachePolicy::default(), }; - let state = StreamRetryState::new(config); + let state = StreamRetryState::new(config, false); let err = StreamError::rate_limit(Some(Duration::from_secs(42))); assert_eq!(state.backoff_duration(&err), Duration::from_secs(42)); } diff --git a/crates/jp_cli/src/cmd/query/turn_loop.rs b/crates/jp_cli/src/cmd/query/turn_loop.rs index 091a3f64..00716d96 100644 --- a/crates/jp_cli/src/cmd/query/turn_loop.rs +++ b/crates/jp_cli/src/cmd/query/turn_loop.rs @@ -153,7 +153,7 @@ pub(super) async fn run_turn_loop( chat_request: ChatRequest, ) -> Result<(), Error> { let mut turn_state = TurnState::default(); - let mut stream_retry = StreamRetryState::new(cfg.assistant.request); + let mut stream_retry = StreamRetryState::new(cfg.assistant.request, is_tty); let mut turn_coordinator = TurnCoordinator::new(printer.clone(), cfg.style.clone()); let mut tool_renderer = ToolRenderer::new( if cfg.style.tool_call.show && !printer.format().is_json() { @@ -281,6 +281,7 @@ pub(super) async fn run_turn_loop( let mut perm_skipped = vec![]; let mut perm_unavailable = vec![]; let mut perm_tool_index: usize = 0; + let mut received_provider_event = false; let mut streams: SelectAll<_> = SelectAll::from_iter([sig_stream, llm_stream, tick_stream]); @@ -349,6 +350,16 @@ pub(super) async fn run_turn_loop( } }; + // Reset the retry counter on the first successful + // event in this cycle. This ensures that partially + // successful streams (rate-limited mid-response) + // don't permanently consume the retry budget. + if !received_provider_event { + received_provider_event = true; + stream_retry.clear_line(&printer); + stream_retry.reset(); + } + // Register preparing tool calls. Flush the markdown // buffer first so buffered text appears before the // "Calling tool" line (fixes Issue 1). @@ -476,7 +487,6 @@ pub(super) async fn run_turn_loop( } if is_finished { - stream_retry.reset(); tool_renderer.cancel_all(); } } diff --git a/crates/jp_cli/src/cmd/query/turn_loop_tests.rs b/crates/jp_cli/src/cmd/query/turn_loop_tests.rs index 6c2f3681..d2f70239 100644 --- a/crates/jp_cli/src/cmd/query/turn_loop_tests.rs +++ b/crates/jp_cli/src/cmd/query/turn_loop_tests.rs @@ -1,4 +1,5 @@ use std::{ + fmt, sync::{ Arc, atomic::{AtomicUsize, Ordering}, @@ -12,29 +13,39 @@ use chrono::Utc; use futures::stream; use indexmap::IndexMap; use jp_config::{ - AppConfig, + AppConfig, PartialAppConfig, conversation::tool::{ - CommandConfigOrString, QuestionConfig, QuestionTarget, RunMode, ToolConfig, ToolSource, + CommandConfigOrString, PartialCommandConfigOrString, PartialToolConfig, QuestionConfig, + QuestionTarget, RunMode, ToolConfig, ToolSource, + style::{DisplayStyleConfig, InlineResults, LinkStyle, ParametersStyle}, }, + model::id::{self, Name, PartialModelIdConfig, ProviderId}, }; use jp_conversation::{ - Conversation, - event::{ChatRequest, InquirySource}, + Conversation, ConversationEvent, + event::{ChatRequest, ChatResponse, InquirySource, TurnStart}, }; -use jp_inquire::prompt::{MockPromptBackend, TerminalPromptBackend}; +use jp_inquire::prompt::MockPromptBackend; use jp_llm::{ Error as LlmError, EventStream, Provider, event::{Event, FinishReason}, model::ModelDetails, provider::mock::MockProvider, query::ChatQuery, - tool::executor::{Executor, ExecutorResult, MockExecutor, PermissionInfo, TestExecutorSource}, + tool::{ + builtin::BuiltinExecutors, + executor::{ + Executor, ExecutorResult, ExecutorSource, MockExecutor, PermissionInfo, + TestExecutorSource, + }, + }, }; use jp_printer::{OutputFormat, Printer}; use jp_storage::Storage; use jp_tool::{AnswerType, Question}; use jp_workspace::Workspace; -use serde_json::json; +use schematic::Config as _; +use serde_json::{Map, Value, json}; use tokio::{sync::broadcast, time::timeout}; use super::*; @@ -43,11 +54,8 @@ use crate::{ signals::SignalTo, }; -fn empty_executor_source() -> Box { - Box::new(TerminalExecutorSource::new( - jp_llm::tool::builtin::BuiltinExecutors::new(), - &[], - )) +fn empty_executor_source() -> Box { + Box::new(TerminalExecutorSource::new(BuiltinExecutors::new(), &[])) } /// A mock provider that returns different responses on each call. @@ -69,8 +77,6 @@ struct SequentialMockProvider { impl SequentialMockProvider { /// Create a provider that returns tool calls first, then a message. fn with_tool_then_message(tool_id: &str, tool_name: &str, final_message: &str) -> Self { - use jp_conversation::event::{ChatResponse, ConversationEvent, ToolCallRequest}; - // First response: tool call let tool_call_events = vec![ Event::Part { @@ -78,7 +84,7 @@ impl SequentialMockProvider { event: ConversationEvent::now(ToolCallRequest { id: tool_id.to_string(), name: tool_name.to_string(), - arguments: serde_json::Map::new(), + arguments: Map::new(), }), }, Event::flush(0), @@ -98,8 +104,8 @@ impl SequentialMockProvider { Self { responses: vec![tool_call_events, message_events], call_index: AtomicUsize::new(0), - model: ModelDetails::empty(jp_config::model::id::ModelIdConfig { - provider: jp_config::model::id::ProviderId::Test, + model: ModelDetails::empty(id::ModelIdConfig { + provider: ProviderId::Test, name: "sequential-mock".parse().expect("valid name"), }), } @@ -108,10 +114,7 @@ impl SequentialMockProvider { #[async_trait] impl Provider for SequentialMockProvider { - async fn model_details( - &self, - name: &jp_config::model::id::Name, - ) -> Result { + async fn model_details(&self, name: &id::Name) -> Result { let mut model = self.model.clone(); model.id.name = name.clone(); Ok(model) @@ -126,8 +129,6 @@ impl Provider for SequentialMockProvider { _model: &ModelDetails, _query: ChatQuery, ) -> Result { - use std::sync::atomic::Ordering; - let index = self.call_index.fetch_add(1, Ordering::SeqCst); let events = self .responses @@ -192,7 +193,7 @@ async fn test_quit_during_streaming_persists_content() { &[], // tools conv_id, printer.clone(), - Arc::new(TerminalPromptBackend), + Arc::new(MockPromptBackend::new()), ToolCoordinator::new(config.conversation.tools.clone(), empty_executor_source()), chat_request.clone(), ) @@ -265,7 +266,7 @@ async fn test_normal_completion_persists_content() { &[], conv_id, printer.clone(), - Arc::new(TerminalPromptBackend), + Arc::new(MockPromptBackend::new()), ToolCoordinator::new(config.conversation.tools.clone(), empty_executor_source()), chat_request.clone(), ) @@ -351,7 +352,7 @@ async fn test_tool_call_cycle_completes_with_followup() { &[], // No tool definitions - tests the "tool not found" path conv_id, printer.clone(), - Arc::new(TerminalPromptBackend), + Arc::new(MockPromptBackend::new()), ToolCoordinator::new(config.conversation.tools.clone(), empty_executor_source()), chat_request.clone(), ) @@ -452,7 +453,7 @@ async fn test_quit_during_tool_execution_persists() { &[], conv_id, printer.clone(), - Arc::new(TerminalPromptBackend), + Arc::new(MockPromptBackend::new()), ToolCoordinator::new(config.conversation.tools.clone(), empty_executor_source()), chat_request.clone(), ) @@ -486,8 +487,6 @@ async fn test_multiple_tool_calls_in_sequence() { // Tests that multiple tool calls are handled correctly. // The executing phase should process all pending calls. - use jp_conversation::event::{ChatResponse, ConversationEvent, ToolCallRequest}; - let tmp = tempdir().unwrap(); let root = tmp.path(); let storage = root.join(".jp"); @@ -512,7 +511,7 @@ async fn test_multiple_tool_calls_in_sequence() { event: ConversationEvent::now(ToolCallRequest { id: "call_1".to_string(), name: "tool_a".to_string(), - arguments: serde_json::Map::new(), + arguments: Map::new(), }), }, Event::Part { @@ -520,7 +519,7 @@ async fn test_multiple_tool_calls_in_sequence() { event: ConversationEvent::now(ToolCallRequest { id: "call_2".to_string(), name: "tool_b".to_string(), - arguments: serde_json::Map::new(), + arguments: Map::new(), }), }, Event::flush(0), @@ -540,8 +539,8 @@ async fn test_multiple_tool_calls_in_sequence() { SequentialMockProvider { responses: vec![tool_call_events, message_events], call_index: AtomicUsize::new(0), - model: ModelDetails::empty(jp_config::model::id::ModelIdConfig { - provider: jp_config::model::id::ProviderId::Test, + model: ModelDetails::empty(id::ModelIdConfig { + provider: ProviderId::Test, name: "multi-tool-mock".parse().expect("valid name"), }), } @@ -571,7 +570,7 @@ async fn test_multiple_tool_calls_in_sequence() { &[], conv_id, printer.clone(), - Arc::new(TerminalPromptBackend), + Arc::new(MockPromptBackend::new()), ToolCoordinator::new(config.conversation.tools.clone(), empty_executor_source()), chat_request.clone(), ) @@ -662,7 +661,7 @@ async fn test_empty_tool_response_continues_cycle() { &[], // No tools configured - tool_coordinator.prepare will fail conv_id, printer.clone(), - Arc::new(TerminalPromptBackend), + Arc::new(MockPromptBackend::new()), ToolCoordinator::new(config.conversation.tools.clone(), empty_executor_source()), chat_request.clone(), ) @@ -703,15 +702,6 @@ async fn test_empty_tool_response_continues_cycle() { async fn test_tool_restart_on_shutdown_signal() { // Wrap the entire test in a timeout to prevent infinite hangs let test_result = Box::pin(timeout(Duration::from_secs(10), async { - use indexmap::IndexMap; - use jp_config::{ - Config, PartialAppConfig, - conversation::tool::{ - PartialCommandConfigOrString, PartialToolConfig, RunMode, ToolSource, - }, - model::id::{Name, PartialModelIdConfig, ProviderId}, - }; - let tmp = tempdir().unwrap(); let root = tmp.path(); let storage = root.join(".jp"); @@ -851,7 +841,7 @@ async fn test_merged_stream_exits_after_tool_response() { result: None, style: None, questions: IndexMap::new(), - options: serde_json::Map::new(), + options: Map::new(), }); let mut workspace = Workspace::new(root) @@ -898,7 +888,7 @@ async fn test_merged_stream_exits_after_tool_response() { &[], // Tool definitions come from config, not this param conv_id, printer.clone(), - Arc::new(TerminalPromptBackend), + Arc::new(MockPromptBackend::new()), ToolCoordinator::new(config.conversation.tools.clone(), empty_executor_source()), chat_request.clone(), ) @@ -962,7 +952,7 @@ async fn test_tool_call_with_run_mode_ask_approves() { result: None, style: None, questions: IndexMap::new(), - options: serde_json::Map::new(), + options: Map::new(), }); let mut workspace = Workspace::new(root) @@ -1004,7 +994,7 @@ async fn test_tool_call_with_run_mode_ask_approves() { tool_name: req.name.clone(), tool_source: ToolSource::Local { tool: None }, run_mode: RunMode::Ask, - arguments: serde_json::Value::Object(req.arguments.clone()), + arguments: Value::Object(req.arguments.clone()), }, ), ) @@ -1107,7 +1097,7 @@ async fn test_tool_call_with_run_mode_ask_skips() { result: None, style: None, questions: IndexMap::new(), - options: serde_json::Map::new(), + options: Map::new(), }); let mut workspace = Workspace::new(root) @@ -1149,7 +1139,7 @@ async fn test_tool_call_with_run_mode_ask_skips() { tool_name: req.name.clone(), tool_source: ToolSource::Local { tool: None }, run_mode: RunMode::Ask, - arguments: serde_json::Value::Object(req.arguments.clone()), + arguments: Value::Object(req.arguments.clone()), }), ) }); @@ -1259,7 +1249,7 @@ async fn test_tool_call_with_run_mode_unattended() { result: None, style: None, questions: IndexMap::new(), - options: serde_json::Map::new(), + options: Map::new(), }); let mut workspace = Workspace::new(root) @@ -1399,7 +1389,7 @@ async fn test_tool_call_with_run_mode_skip() { result: None, style: None, questions: IndexMap::new(), - options: serde_json::Map::new(), + options: Map::new(), }); let mut workspace = Workspace::new(root) @@ -1446,7 +1436,7 @@ async fn test_tool_call_with_run_mode_skip() { tool_name: req.name.clone(), tool_source: ToolSource::Local { tool: None }, run_mode: RunMode::Skip, - arguments: serde_json::Value::Object(req.arguments.clone()), + arguments: Value::Object(req.arguments.clone()), }), ) }); @@ -1533,8 +1523,6 @@ async fn test_tool_call_with_run_mode_skip() { async fn test_multiple_tools_with_different_run_modes() { // Tests: LLM returns 2 tool calls → one Ask (approved), one Unattended // Both should complete successfully with proper handling. - use jp_conversation::event::{ChatResponse, ConversationEvent, ToolCallRequest}; - let test_result = Box::pin(timeout(Duration::from_secs(5), async { let tmp = tempdir().unwrap(); let root = tmp.path(); @@ -1557,7 +1545,7 @@ async fn test_multiple_tools_with_different_run_modes() { result: None, style: None, questions: IndexMap::new(), - options: serde_json::Map::new(), + options: Map::new(), }); // tool_unattended runs automatically config @@ -1575,7 +1563,7 @@ async fn test_multiple_tools_with_different_run_modes() { result: None, style: None, questions: IndexMap::new(), - options: serde_json::Map::new(), + options: Map::new(), }); let mut workspace = Workspace::new(root) @@ -1598,7 +1586,7 @@ async fn test_multiple_tools_with_different_run_modes() { event: ConversationEvent::now(ToolCallRequest { id: "call_ask".to_string(), name: "tool_ask".to_string(), - arguments: serde_json::Map::new(), + arguments: Map::new(), }), }, Event::Part { @@ -1606,7 +1594,7 @@ async fn test_multiple_tools_with_different_run_modes() { event: ConversationEvent::now(ToolCallRequest { id: "call_unattended".to_string(), name: "tool_unattended".to_string(), - arguments: serde_json::Map::new(), + arguments: Map::new(), }), }, Event::flush(0), @@ -1626,8 +1614,8 @@ async fn test_multiple_tools_with_different_run_modes() { SequentialMockProvider { responses: vec![tool_call_events, message_events], call_index: AtomicUsize::new(0), - model: ModelDetails::empty(jp_config::model::id::ModelIdConfig { - provider: jp_config::model::id::ProviderId::Test, + model: ModelDetails::empty(id::ModelIdConfig { + provider: ProviderId::Test, name: "multi-mode-mock".parse().expect("valid name"), }), } @@ -1655,7 +1643,7 @@ async fn test_multiple_tools_with_different_run_modes() { tool_name: req.name.clone(), tool_source: ToolSource::Local { tool: None }, run_mode: RunMode::Ask, - arguments: serde_json::Value::Object(req.arguments.clone()), + arguments: Value::Object(req.arguments.clone()), }), ) }) @@ -1777,7 +1765,7 @@ async fn test_tool_call_returns_error() { result: None, style: None, questions: IndexMap::new(), - options: serde_json::Map::new(), + options: Map::new(), }); let mut workspace = Workspace::new(root) @@ -1905,8 +1893,8 @@ impl DelayedMockProvider { Self { delay, response: response.to_string(), - model: ModelDetails::empty(jp_config::model::id::ModelIdConfig { - provider: jp_config::model::id::ProviderId::Test, + model: ModelDetails::empty(id::ModelIdConfig { + provider: ProviderId::Test, name: "delayed-mock".parse().expect("valid name"), }), } @@ -1915,10 +1903,7 @@ impl DelayedMockProvider { #[async_trait] impl Provider for DelayedMockProvider { - async fn model_details( - &self, - name: &jp_config::model::id::Name, - ) -> Result { + async fn model_details(&self, name: &id::Name) -> Result { let mut model = self.model.clone(); model.id.name = name.clone(); Ok(model) @@ -1933,8 +1918,6 @@ impl Provider for DelayedMockProvider { _model: &ModelDetails, _query: ChatQuery, ) -> Result { - use jp_conversation::event::{ChatResponse, ConversationEvent}; - tokio::time::sleep(self.delay).await; let events = vec![ @@ -2008,7 +1991,7 @@ async fn test_waiting_indicator_shows_during_delay() { &[], conv_id, printer.clone(), - Arc::new(TerminalPromptBackend), + Arc::new(MockPromptBackend::new()), ToolCoordinator::new(config.conversation.tools.clone(), empty_executor_source()), chat_request.clone(), ) @@ -2092,7 +2075,7 @@ async fn test_waiting_indicator_not_shown_when_disabled() { &[], conv_id, printer.clone(), - Arc::new(TerminalPromptBackend), + Arc::new(MockPromptBackend::new()), ToolCoordinator::new(config.conversation.tools.clone(), empty_executor_source()), chat_request.clone(), ) @@ -2170,7 +2153,7 @@ async fn test_waiting_indicator_not_shown_for_non_tty() { &[], conv_id, printer.clone(), - Arc::new(TerminalPromptBackend), + Arc::new(MockPromptBackend::new()), ToolCoordinator::new(config.conversation.tools.clone(), empty_executor_source()), chat_request.clone(), ) @@ -2201,9 +2184,6 @@ async fn test_multi_part_tool_call_shows_preparing_spinner() { // 3. Flush completes the tool call, spinner is cleared // 4. Tool executes (not found), follow-up LLM returns message // 5. Verify the spinner text appeared in the output - - use jp_conversation::event::{ChatResponse, ConversationEvent, ToolCallRequest}; - let test_result = Box::pin(timeout(Duration::from_secs(10), async { let tmp = tempdir().unwrap(); let root = tmp.path(); @@ -2232,7 +2212,7 @@ async fn test_multi_part_tool_call_shows_preparing_spinner() { // Build a provider that simulates multi-part tool call streaming // with a delay between the initial Part and the final Part, giving // the spawned indicator task time to tick. - let mut args = serde_json::Map::new(); + let mut args = Map::new(); args.insert("path".into(), "test.rs".into()); args.insert("content".into(), "fn main() {}".into()); @@ -2243,7 +2223,7 @@ async fn test_multi_part_tool_call_shows_preparing_spinner() { event: ConversationEvent::now(ToolCallRequest { id: "call_multi".to_string(), name: "fs_create_file".to_string(), - arguments: serde_json::Map::new(), + arguments: Map::new(), }), }), ]; @@ -2276,8 +2256,6 @@ async fn test_multi_part_tool_call_shows_preparing_spinner() { let rest_stream = futures::stream::iter(delayed_events); // Chain: initial Part → delay → final Part + Flush + Finished - use futures::StreamExt as _; - let combined_first_cycle: jp_llm::EventStream = Box::pin(first_stream.chain(delay_stream).chain(rest_stream)); @@ -2298,18 +2276,15 @@ async fn test_multi_part_tool_call_shows_preparing_spinner() { model: ModelDetails, } - impl std::fmt::Debug for DelayedToolCallProvider { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + impl fmt::Debug for DelayedToolCallProvider { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("DelayedToolCallProvider").finish() } } #[async_trait] impl Provider for DelayedToolCallProvider { - async fn model_details( - &self, - name: &jp_config::model::id::Name, - ) -> Result { + async fn model_details(&self, name: &id::Name) -> Result { let mut m = self.model.clone(); m.id.name = name.clone(); Ok(m) @@ -2339,8 +2314,8 @@ async fn test_multi_part_tool_call_shows_preparing_spinner() { let provider: Arc = Arc::new(DelayedToolCallProvider { first_cycle: std::sync::Mutex::new(Some(combined_first_cycle)), second_cycle: std::sync::Mutex::new(Some(message_events)), - model: ModelDetails::empty(jp_config::model::id::ModelIdConfig { - provider: jp_config::model::id::ProviderId::Test, + model: ModelDetails::empty(id::ModelIdConfig { + provider: ProviderId::Test, name: "delayed-tool-mock".parse().expect("valid name"), }), }); @@ -2369,7 +2344,7 @@ async fn test_multi_part_tool_call_shows_preparing_spinner() { &[], conv_id, printer.clone(), - Arc::new(TerminalPromptBackend), + Arc::new(MockPromptBackend::new()), ToolCoordinator::new(config.conversation.tools.clone(), empty_executor_source()), chat_request.clone(), ) @@ -2417,8 +2392,6 @@ async fn test_turn_start_event_is_emitted() { // A single run_turn_loop call should inject a TurnStart { index: 0 } // event at the beginning of the conversation stream. - use jp_conversation::event::TurnStart; - let tmp = tempdir().unwrap(); let root = tmp.path(); let storage = root.join(".jp"); @@ -2460,7 +2433,7 @@ async fn test_turn_start_event_is_emitted() { &[], conv_id, printer, - Arc::new(TerminalPromptBackend), + Arc::new(MockPromptBackend::new()), ToolCoordinator::new(config.conversation.tools.clone(), empty_executor_source()), chat_request.clone(), ) @@ -2481,8 +2454,6 @@ async fn test_turn_start_index_increments_across_turns() { // Two consecutive run_turn_loop calls should produce TurnStart events // with indices 0 and 1. - use jp_conversation::event::TurnStart; - let tmp = tempdir().unwrap(); let root = tmp.path(); let storage = root.join(".jp"); @@ -2526,7 +2497,7 @@ async fn test_turn_start_index_increments_across_turns() { &[], conv_id, printer, - Arc::new(TerminalPromptBackend), + Arc::new(MockPromptBackend::new()), ToolCoordinator::new(config.conversation.tools.clone(), empty_executor_source()), chat_request.clone(), ) @@ -2560,7 +2531,7 @@ async fn test_turn_start_index_increments_across_turns() { &[], conv_id, printer, - Arc::new(TerminalPromptBackend), + Arc::new(MockPromptBackend::new()), ToolCoordinator::new(config.conversation.tools.clone(), empty_executor_source()), chat_request.clone(), ) @@ -2581,8 +2552,6 @@ async fn test_turn_start_index_increments_across_turns() { /// tool" header appears in the output (Issue 1 fix). #[tokio::test] async fn test_markdown_flushed_before_tool_header() { - use jp_conversation::event::{ChatResponse, ConversationEvent, ToolCallRequest}; - let test_result = Box::pin(timeout(Duration::from_secs(5), async { let tmp = tempdir().unwrap(); let root = tmp.path(); @@ -2617,7 +2586,7 @@ async fn test_markdown_flushed_before_tool_header() { event: ConversationEvent::now(ToolCallRequest { id: "call_1".to_string(), name: "fs_read_file".to_string(), - arguments: serde_json::Map::new(), + arguments: Map::new(), }), }, Event::flush(1), @@ -2636,8 +2605,8 @@ async fn test_markdown_flushed_before_tool_header() { SequentialMockProvider { responses: vec![events, followup], call_index: AtomicUsize::new(0), - model: ModelDetails::empty(jp_config::model::id::ModelIdConfig { - provider: jp_config::model::id::ProviderId::Test, + model: ModelDetails::empty(id::ModelIdConfig { + provider: ProviderId::Test, name: "md-flush-mock".parse().expect("valid name"), }), } @@ -2667,7 +2636,7 @@ async fn test_markdown_flushed_before_tool_header() { &[], conv_id, printer.clone(), - Arc::new(TerminalPromptBackend), + Arc::new(MockPromptBackend::new()), ToolCoordinator::new(config.conversation.tools.clone(), empty_executor_source()), chat_request.clone(), ) @@ -2704,9 +2673,6 @@ async fn test_markdown_flushed_before_tool_header() { #[tokio::test] #[allow(clippy::too_many_lines)] async fn test_parallel_tool_calls_rendered_atomically() { - use jp_config::conversation::tool::style::{DisplayStyleConfig, ParametersStyle as PS}; - use jp_conversation::event::{ChatResponse, ConversationEvent, ToolCallRequest}; - let test_result = Box::pin(timeout(Duration::from_secs(5), async { let tmp = tempdir().unwrap(); let root = tmp.path(); @@ -2717,9 +2683,9 @@ async fn test_parallel_tool_calls_rendered_atomically() { let fn_call_style = Some(DisplayStyleConfig { hidden: false, - inline_results: jp_config::conversation::tool::style::InlineResults::Off, - results_file_link: jp_config::conversation::tool::style::LinkStyle::Off, - parameters: PS::FunctionCall, + inline_results: InlineResults::Off, + results_file_link: LinkStyle::Off, + parameters: ParametersStyle::FunctionCall, }); // Configure tools with FunctionCall style for readable output. @@ -2739,7 +2705,7 @@ async fn test_parallel_tool_calls_rendered_atomically() { result: None, style: fn_call_style.clone(), questions: IndexMap::new(), - options: serde_json::Map::new(), + options: Map::new(), }); config .conversation @@ -2756,7 +2722,7 @@ async fn test_parallel_tool_calls_rendered_atomically() { result: None, style: fn_call_style, questions: IndexMap::new(), - options: serde_json::Map::new(), + options: Map::new(), }); let mut workspace = Workspace::new(root) @@ -2772,9 +2738,9 @@ async fn test_parallel_tool_calls_rendered_atomically() { let chat_request = ChatRequest::from("Use both tools"); // Two tool calls with actual arguments. - let mut args_a = serde_json::Map::new(); + let mut args_a = Map::new(); args_a.insert("package".into(), "jp_cli".into()); - let mut args_b = serde_json::Map::new(); + let mut args_b = Map::new(); args_b.insert("path".into(), "/tmp/test.rs".into()); let provider: Arc = Arc::new({ @@ -2812,8 +2778,8 @@ async fn test_parallel_tool_calls_rendered_atomically() { SequentialMockProvider { responses: vec![tool_events, followup], call_index: AtomicUsize::new(0), - model: ModelDetails::empty(jp_config::model::id::ModelIdConfig { - provider: jp_config::model::id::ProviderId::Test, + model: ModelDetails::empty(id::ModelIdConfig { + provider: ProviderId::Test, name: "parallel-tools-mock".parse().expect("valid name"), }), } @@ -2858,7 +2824,7 @@ async fn test_parallel_tool_calls_rendered_atomically() { &tool_defs, conv_id, printer.clone(), - Arc::new(TerminalPromptBackend), + Arc::new(MockPromptBackend::new()), ToolCoordinator::new(config.conversation.tools.clone(), Box::new(executor_source)), chat_request.clone(), ) @@ -2919,8 +2885,6 @@ async fn test_parallel_tool_calls_rendered_atomically() { #[tokio::test] #[expect(clippy::too_many_lines)] async fn test_single_tool_call_rendered_with_args() { - use jp_conversation::event::{ChatResponse, ConversationEvent, ToolCallRequest}; - let test_result = Box::pin(timeout(Duration::from_secs(5), async { let tmp = tempdir().unwrap(); let root = tmp.path(); @@ -2944,7 +2908,7 @@ async fn test_single_tool_call_rendered_with_args() { result: None, style: None, questions: IndexMap::new(), - options: serde_json::Map::new(), + options: Map::new(), }); let mut workspace = Workspace::new(root) @@ -2959,7 +2923,7 @@ async fn test_single_tool_call_rendered_with_args() { let chat_request = ChatRequest::from("Read a file"); - let mut args = serde_json::Map::new(); + let mut args = Map::new(); args.insert("path".into(), "/etc/hosts".into()); let provider: Arc = Arc::new({ @@ -2988,8 +2952,8 @@ async fn test_single_tool_call_rendered_with_args() { SequentialMockProvider { responses: vec![events, followup], call_index: AtomicUsize::new(0), - model: ModelDetails::empty(jp_config::model::id::ModelIdConfig { - provider: jp_config::model::id::ProviderId::Test, + model: ModelDetails::empty(id::ModelIdConfig { + provider: ProviderId::Test, name: "single-tool-mock".parse().expect("valid name"), }), } @@ -3027,7 +2991,7 @@ async fn test_single_tool_call_rendered_with_args() { &tool_defs, conv_id, printer.clone(), - Arc::new(TerminalPromptBackend), + Arc::new(MockPromptBackend::new()), ToolCoordinator::new(config.conversation.tools.clone(), Box::new(executor_source)), chat_request.clone(), ) @@ -3069,7 +3033,7 @@ async fn test_single_tool_call_rendered_with_args() { struct InquiryMockExecutor { tool_id: String, tool_name: String, - arguments: serde_json::Map, + arguments: Map, questions: Vec, output: String, } @@ -3079,7 +3043,7 @@ impl InquiryMockExecutor { Self { tool_id: tool_id.to_string(), tool_name: tool_name.to_string(), - arguments: serde_json::Map::new(), + arguments: Map::new(), questions, output: output.to_string(), } @@ -3094,17 +3058,17 @@ impl Executor for InquiryMockExecutor { fn tool_name(&self) -> &str { &self.tool_name } - fn arguments(&self) -> &serde_json::Map { + fn arguments(&self) -> &Map { &self.arguments } fn permission_info(&self) -> Option { None } - fn set_arguments(&mut self, _args: serde_json::Value) {} + fn set_arguments(&mut self, _args: Value) {} async fn execute( &self, - answers: &IndexMap, + answers: &IndexMap, _mcp_client: &jp_mcp::Client, _root: &camino::Utf8Path, _cancellation_token: tokio_util::sync::CancellationToken, @@ -3130,10 +3094,7 @@ impl Executor for InquiryMockExecutor { /// /// Emits as `Value::String` to match real provider streaming behavior /// (the `EventBuilder` parses the JSON string on flush). -fn structured_inquiry_events(inquiry_id: &str, answer: &serde_json::Value) -> Vec { - use jp_conversation::event::{ChatResponse, ConversationEvent}; - use serde_json::Value; - +fn structured_inquiry_events(inquiry_id: &str, answer: &Value) -> Vec { let data = json!({ "inquiry_id": inquiry_id, "answer": answer, @@ -3142,9 +3103,7 @@ fn structured_inquiry_events(inquiry_id: &str, answer: &serde_json::Value) -> Ve vec![ Event::Part { index: 0, - event: ConversationEvent::now(ChatResponse::structured(Value::String( - data.to_string(), - ))), + event: ChatResponse::structured(Value::String(data.to_string())).into(), }, Event::flush(0), Event::Finished(FinishReason::Completed), @@ -3153,10 +3112,7 @@ fn structured_inquiry_events(inquiry_id: &str, answer: &serde_json::Value) -> Ve /// Build provider events for a structured response without `inquiry_id`. /// Used for parallel inquiry tests where call ordering is non-deterministic. -fn unkeyed_structured_events(answer: &serde_json::Value) -> Vec { - use jp_conversation::event::{ChatResponse, ConversationEvent}; - use serde_json::Value; - +fn unkeyed_structured_events(answer: &Value) -> Vec { let data = json!({ "answer": answer, }); @@ -3164,9 +3120,7 @@ fn unkeyed_structured_events(answer: &serde_json::Value) -> Vec { vec![ Event::Part { index: 0, - event: ConversationEvent::now(ChatResponse::structured(Value::String( - data.to_string(), - ))), + event: ChatResponse::structured(Value::String(data.to_string())).into(), }, Event::flush(0), Event::Finished(FinishReason::Completed), @@ -3174,15 +3128,13 @@ fn unkeyed_structured_events(answer: &serde_json::Value) -> Vec { } fn single_tool_call_events(id: &str, name: &str) -> Vec { - use jp_conversation::event::{ConversationEvent, ToolCallRequest}; - vec![ Event::Part { index: 0, event: ConversationEvent::now(ToolCallRequest { id: id.to_string(), name: name.to_string(), - arguments: serde_json::Map::new(), + arguments: Map::new(), }), }, Event::flush(0), @@ -3191,8 +3143,6 @@ fn single_tool_call_events(id: &str, name: &str) -> Vec { } fn final_message_events(content: &str) -> Vec { - use jp_conversation::event::{ChatResponse, ConversationEvent}; - vec![ Event::Part { index: 0, @@ -3225,13 +3175,13 @@ fn inquiry_tool_config(questions: &[&str]) -> ToolConfig { }) }) .collect(), - options: serde_json::Map::new(), + options: Map::new(), } } fn inquiry_mock_model() -> ModelDetails { - ModelDetails::empty(jp_config::model::id::ModelIdConfig { - provider: jp_config::model::id::ProviderId::Test, + ModelDetails::empty(id::ModelIdConfig { + provider: ProviderId::Test, name: "inquiry-mock".parse().expect("valid name"), }) } @@ -3317,7 +3267,7 @@ async fn test_tool_with_single_inquiry() { &tool_defs, conv_id, printer.clone(), - Arc::new(TerminalPromptBackend), + Arc::new(MockPromptBackend::new()), ToolCoordinator::new(config.conversation.tools.clone(), Box::new(executor_source)), chat_request, ) @@ -3471,7 +3421,7 @@ async fn test_tool_with_multiple_inquiries() { &tool_defs, conv_id, printer.clone(), - Arc::new(TerminalPromptBackend), + Arc::new(MockPromptBackend::new()), ToolCoordinator::new(config.conversation.tools.clone(), Box::new(executor_source)), chat_request, ) @@ -3525,8 +3475,6 @@ async fn test_tool_with_multiple_inquiries() { #[tokio::test] #[expect(clippy::too_many_lines)] async fn test_parallel_tools_one_with_inquiry() { - use jp_conversation::event::{ConversationEvent, ToolCallRequest}; - let test_result = Box::pin(timeout(Duration::from_secs(5), async { let tmp = tempdir().unwrap(); let root = tmp.path(); @@ -3553,7 +3501,7 @@ async fn test_parallel_tools_one_with_inquiry() { result: None, style: None, questions: IndexMap::new(), - options: serde_json::Map::new(), + options: Map::new(), }); let mut workspace = Workspace::new(root) @@ -3578,7 +3526,7 @@ async fn test_parallel_tools_one_with_inquiry() { event: ConversationEvent::now(ToolCallRequest { id: "call_inq".to_string(), name: "inquiry_tool".to_string(), - arguments: serde_json::Map::new(), + arguments: Map::new(), }), }, Event::Part { @@ -3586,7 +3534,7 @@ async fn test_parallel_tools_one_with_inquiry() { event: ConversationEvent::now(ToolCallRequest { id: "call_norm".to_string(), name: "normal_tool".to_string(), - arguments: serde_json::Map::new(), + arguments: Map::new(), }), }, Event::flush(0), @@ -3646,7 +3594,7 @@ async fn test_parallel_tools_one_with_inquiry() { &tool_defs, conv_id, printer.clone(), - Arc::new(TerminalPromptBackend), + Arc::new(MockPromptBackend::new()), ToolCoordinator::new(config.conversation.tools.clone(), Box::new(executor_source)), chat_request, ) @@ -3694,8 +3642,6 @@ async fn test_parallel_tools_one_with_inquiry() { #[tokio::test] #[expect(clippy::too_many_lines)] async fn test_parallel_tools_both_with_inquiries() { - use jp_conversation::event::{ConversationEvent, ToolCallRequest}; - let test_result = Box::pin(timeout(Duration::from_secs(5), async { let tmp = tempdir().unwrap(); let root = tmp.path(); @@ -3735,7 +3681,7 @@ async fn test_parallel_tools_both_with_inquiries() { event: ConversationEvent::now(ToolCallRequest { id: "call_a".to_string(), name: "tool_a".to_string(), - arguments: serde_json::Map::new(), + arguments: Map::new(), }), }, Event::Part { @@ -3743,7 +3689,7 @@ async fn test_parallel_tools_both_with_inquiries() { event: ConversationEvent::now(ToolCallRequest { id: "call_b".to_string(), name: "tool_b".to_string(), - arguments: serde_json::Map::new(), + arguments: Map::new(), }), }, Event::flush(0), @@ -3814,7 +3760,7 @@ async fn test_parallel_tools_both_with_inquiries() { &tool_defs, conv_id, printer.clone(), - Arc::new(TerminalPromptBackend), + Arc::new(MockPromptBackend::new()), ToolCoordinator::new(config.conversation.tools.clone(), Box::new(executor_source)), chat_request, ) @@ -3855,6 +3801,152 @@ async fn test_parallel_tools_both_with_inquiries() { assert!(test_result.is_ok(), "Test timed out"); } +/// Verifies that the retry counter resets after a successful event arrives in a +/// new streaming cycle. +/// +/// Scenario with `max_retries=1`: +/// 1. Stream produces content, then rate-limits mid-stream (no Finished) +/// 2. Retry: stream produces content (counter resets here), then rate-limits +/// again mid-stream +/// 3. Retry: stream completes successfully +/// +/// Without the fix, the counter would reach 2 after step 2, exceeding the +/// budget of 1 and causing a hard failure. +#[tokio::test] +#[expect(clippy::too_many_lines)] +async fn test_retry_counter_resets_on_successful_event() { + struct MidStreamRateLimitProvider { + call_index: Arc, + model: ModelDetails, + } + + #[async_trait] + impl Provider for MidStreamRateLimitProvider { + async fn model_details(&self, name: &id::Name) -> Result { + let mut m = self.model.clone(); + m.id.name = name.clone(); + Ok(m) + } + + async fn models(&self) -> Result, LlmError> { + Ok(vec![self.model.clone()]) + } + + async fn chat_completion_stream( + &self, + _model: &ModelDetails, + _query: ChatQuery, + ) -> Result { + let idx = self.call_index.fetch_add(1, Ordering::SeqCst); + + let events: Vec> = if idx < 2 { + // Calls 0 and 1: partial content then rate limit error. + vec![ + Ok(Event::Part { + index: 0, + event: ChatResponse::message("partial ").into(), + }), + Err(StreamError::rate_limit(None)), + ] + } else { + // Call 2: complete successfully. + vec![ + Ok(Event::Part { + index: 0, + event: ChatResponse::message("done.").into(), + }), + Ok(Event::flush(0)), + Ok(Event::Finished(FinishReason::Completed)), + ] + }; + + Ok(Box::pin(stream::iter(events))) + } + } + + let test_result = Box::pin(timeout(Duration::from_secs(5), async { + let tmp = tempdir().unwrap(); + let root = tmp.path(); + let storage = root.join(".jp"); + + let mut config = AppConfig::new_test(); + config.assistant.request.max_retries = 1; + config.assistant.request.base_backoff_ms = 1; + config.assistant.request.max_backoff_secs = 1; + + let mut workspace = Workspace::new(root) + .persisted_at(&storage) + .expect("failed to enable persistence"); + + let conv_id = + workspace.create_conversation(Conversation::default(), Arc::new(config.clone())); + workspace + .set_active_conversation_id(conv_id, Utc::now()) + .unwrap(); + + let chat_request = ChatRequest::from("Hello"); + let call_index = Arc::new(AtomicUsize::new(0)); + let call_index_clone = Arc::clone(&call_index); + + // Provider that returns partial content + rate limit error on the first + // two calls, then succeeds on the third. + let provider: Arc = Arc::new(MidStreamRateLimitProvider { + call_index: call_index_clone, + model: ModelDetails::empty(id::ModelIdConfig { + provider: ProviderId::Test, + name: "rate-limit-mock".parse().expect("valid name"), + }), + }); + let model = provider + .model_details(&"test-model".parse().unwrap()) + .await + .unwrap(); + + let (printer, _out, _err) = Printer::memory(OutputFormat::TextPretty); + let printer = Arc::new(printer); + let mcp_client = jp_mcp::Client::default(); + let (_signal_tx, signal_rx) = broadcast::channel(16); + + let result = run_turn_loop( + Arc::clone(&provider), + &model, + &config, + &signal_rx, + &mcp_client, + root, + false, + &[], + &mut workspace, + ToolChoice::Auto, + &[], + conv_id, + printer, + Arc::new(MockPromptBackend::new()), + ToolCoordinator::new(config.conversation.tools.clone(), empty_executor_source()), + chat_request, + ) + .await; + + // With the fix, this succeeds (counter resets between retries). Without + // the fix, this would fail with a rate limit error after the second + // stream failure exhausts the budget. + assert!(result.is_ok(), "Turn loop should complete: {result:?}"); + + // Provider should have been called 3 times: + // call 0: partial + rate limit + // call 1: partial + rate limit (budget restored by reset) + // call 2: success + let total_calls = call_index.load(Ordering::SeqCst); + assert_eq!( + total_calls, 3, + "Expected 3 provider calls (2 partial + 1 success), got {total_calls}" + ); + })) + .await; + + assert!(test_result.is_ok(), "Test timed out"); +} + /// When the inquiry provider returns a non-structured response, the inquiry /// fails and the tool is marked as completed with an error. #[tokio::test] @@ -3935,7 +4027,7 @@ async fn test_inquiry_failure_marks_tool_as_error() { &tool_defs, conv_id, printer.clone(), - Arc::new(TerminalPromptBackend), + Arc::new(MockPromptBackend::new()), ToolCoordinator::new(config.conversation.tools.clone(), Box::new(executor_source)), chat_request, ) diff --git a/crates/jp_llm/src/provider.rs b/crates/jp_llm/src/provider.rs index afa4d6ad..e5962cce 100644 --- a/crates/jp_llm/src/provider.rs +++ b/crates/jp_llm/src/provider.rs @@ -26,7 +26,7 @@ use crate::{ }; #[async_trait] -pub trait Provider: std::fmt::Debug + Send + Sync { +pub trait Provider: Send + Sync { /// Get details of a model. async fn model_details(&self, name: &Name) -> Result;