From 1253292c8f8bf7a890651b7a1465768af3243425 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20LIARD?= Date: Tue, 28 Apr 2026 22:40:57 +0200 Subject: [PATCH] fix(dispatch): hot-reload race + per-provider retry + RateLimitHandler trait MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three related fixes around dispatch retry semantics and the hot-reload race: 1. Block hot-reload until validation completes. Both `/api/config/reload` (HTTP) and the `grob/server/reload_config` JSON-RPC endpoint awaited `validate_config()` *after* the atomic swap, so an invalid config could serve traffic for several seconds. They now validate against the candidate provider registry before swapping; failure returns 422 with a list of broken router models and leaves the live snapshot intact, so in-flight requests continue on the old config. 2. Per-provider `max_retries`. Add `max_retries: Option` to `ProviderConfig` and a `provider_max_retries()` resolver that reads the per-provider value or falls back to the global `MAX_RETRIES = 2`. The dispatch retry loop in `src/server/dispatch/retry.rs` now consumes this resolved budget so Anthropic can stay at 2 while OpenAI and OpenRouter / DeepSeek can opt into 3. 3. `RateLimitHandler` trait. Centralise the 429/529/Anthropic-401 logic that was duplicated across three sites in `retry.rs`. The trait is implemented for `ProviderError`, exposes `is_rate_limit()` and a future-facing `retry_after_ms()` hook, and replaces the inline `matches!(e, ProviderError::ApiError { status: 429, .. })` checks. Tests cover per-provider retry resolution (Anthropic = 2, OpenAI = 3, OpenRouter = 3, missing → default, explicit 0), the rate-limit handler across upstream variants, and the validation gate (empty / all-ok / any-ok passes; broken-model detail surfacing). Full nextest workspace run is green (1289 tests). Co-Authored-By: Claude Opus 4.7 (1M context) --- src/cli/config/providers.rs | 11 ++ src/providers/registry.rs | 3 + src/server/budget.rs | 115 ++++++++++++++++ src/server/config_api.rs | 219 ++++++++++++++++++++++++++++-- src/server/dispatch/mod.rs | 1 + src/server/dispatch/rate_limit.rs | 150 ++++++++++++++++++++ src/server/dispatch/retry.rs | 46 +++---- src/server/mod.rs | 6 +- src/server/rpc/server_ns.rs | 35 +++-- src/storage/secrets.rs | 1 + tests/helpers/fixtures.rs | 1 + tests/unit/provider_test.rs | 2 + 12 files changed, 539 insertions(+), 51 deletions(-) create mode 100644 src/server/dispatch/rate_limit.rs diff --git a/src/cli/config/providers.rs b/src/cli/config/providers.rs index 87c9ab77..ee7b65e8 100644 --- a/src/cli/config/providers.rs +++ b/src/cli/config/providers.rs @@ -116,6 +116,17 @@ pub struct ProviderConfig { /// signals agree. #[serde(default, skip_serializing_if = "Option::is_none")] pub health_check: Option, + + /// Per-provider retry budget before falling back to the next mapping. + /// + /// Different providers benefit from different retry counts: Anthropic + /// (smaller scale, frequent 429) defaults globally to 2; OpenAI and + /// OpenRouter / DeepSeek tolerate 3 thanks to better queueing / + /// occasional 5xx. Absent → use the global [`MAX_RETRIES`] default. + /// + /// [`MAX_RETRIES`]: crate::server::MAX_RETRIES + #[serde(default, skip_serializing_if = "Option::is_none")] + pub max_retries: Option, } impl ProviderConfig { diff --git a/src/providers/registry.rs b/src/providers/registry.rs index d20545a4..32d0afaa 100644 --- a/src/providers/registry.rs +++ b/src/providers/registry.rs @@ -622,6 +622,7 @@ mod tests { circuit_breaker: None, health_check: None, + max_retries: None, }, ProviderConfig { name: "provider-b".to_string(), @@ -645,6 +646,7 @@ mod tests { circuit_breaker: None, health_check: None, + max_retries: None, }, ]; @@ -756,6 +758,7 @@ mod tests { pool: None, circuit_breaker: None, health_check: None, + max_retries: None, }]; let registry = ProviderRegistry::from_configs_with_models( diff --git a/src/server/budget.rs b/src/server/budget.rs index ef919045..164e5864 100644 --- a/src/server/budget.rs +++ b/src/server/budget.rs @@ -10,8 +10,38 @@ use super::{AppError, AppState, ReloadableState}; /// NOTE: 2 retries (3 total attempts) balances latency vs resilience — most /// transient 429/5xx errors resolve within 2 exponential-backoff cycles (~1-4s), /// while more retries would unacceptably delay user-facing LLM responses. +/// +/// Acts as the **global default**. Individual providers can override the +/// budget via `[[providers]] max_retries = N` — see +/// [`provider_max_retries`] for the per-provider lookup helper. pub(crate) const MAX_RETRIES: u32 = 2; +/// Resolves the retry budget for a named provider. +/// +/// Returns the value of `[[providers]] max_retries = N` when set, or the +/// global [`MAX_RETRIES`] default when the provider is absent or did not +/// override the budget. Used by the dispatch retry loop so per-provider +/// tuning (Anthropic = 2, OpenAI / OpenRouter = 3, DeepSeek = 3) applies +/// without hard-coding provider names in the dispatch path. +pub(crate) fn provider_max_retries(inner: &Arc, provider_name: &str) -> u32 { + resolve_max_retries(&inner.config.providers, provider_name) +} + +/// Pure lookup helper for [`provider_max_retries`]. +/// +/// Decoupled from `ReloadableState` so unit tests can pass a literal +/// `[ProviderConfig]` slice without standing up the full app state graph. +pub(crate) fn resolve_max_retries( + providers: &[crate::cli::ProviderConfig], + provider_name: &str, +) -> u32 { + providers + .iter() + .find(|p| p.name == provider_name) + .and_then(|p| p.max_retries) + .unwrap_or(MAX_RETRIES) +} + /// Data needed to record Prometheus metrics for a completed request. pub(crate) struct RequestMetrics<'a> { pub model: &'a str, @@ -324,4 +354,89 @@ mod tests { cost_usd: 0.001, }); } + + // ── per-provider max_retries resolution ──────────────────────────────── + + /// Builds a stub provider config — only the two fields the lookup reads. + fn provider_with_retries(name: &str, max_retries: Option) -> crate::cli::ProviderConfig { + crate::cli::ProviderConfig { + name: name.into(), + provider_type: "stub".into(), + auth_type: crate::cli::AuthType::ApiKey, + api_key: None, + oauth_provider: None, + project_id: None, + location: None, + base_url: None, + headers: None, + models: vec![], + enabled: Some(true), + budget_usd: None, + region: None, + pass_through: None, + tls_cert: None, + tls_key: None, + tls_ca: None, + pool: None, + circuit_breaker: None, + health_check: None, + max_retries, + } + } + + #[test] + fn resolve_max_retries_falls_back_to_default_for_unknown_provider() { + let providers = vec![provider_with_retries("anthropic", Some(5))]; + assert_eq!(resolve_max_retries(&providers, "openai"), MAX_RETRIES); + } + + #[test] + fn resolve_max_retries_falls_back_to_default_when_unset() { + let providers = vec![provider_with_retries("anthropic", None)]; + assert_eq!(resolve_max_retries(&providers, "anthropic"), MAX_RETRIES); + } + + #[test] + fn resolve_max_retries_honors_anthropic_override_at_two() { + // Anthropic: smaller scale, frequent 429 — keep budget tight at 2. + let providers = vec![provider_with_retries("anthropic", Some(2))]; + assert_eq!(resolve_max_retries(&providers, "anthropic"), 2); + } + + #[test] + fn resolve_max_retries_honors_openai_override_at_three() { + // OpenAI: better queueing — 3 retries amortise transient 429s. + let providers = vec![provider_with_retries("openai", Some(3))]; + assert_eq!(resolve_max_retries(&providers, "openai"), 3); + } + + #[test] + fn resolve_max_retries_honors_openrouter_override_at_three() { + // DeepSeek / OpenRouter: sporadic 5xx — 3 retries. + let providers = vec![provider_with_retries("openrouter", Some(3))]; + assert_eq!(resolve_max_retries(&providers, "openrouter"), 3); + } + + #[test] + fn resolve_max_retries_honors_zero_override() { + // Explicit `max_retries = 0` disables retries (no fallback). + let providers = vec![provider_with_retries("flaky", Some(0))]; + assert_eq!(resolve_max_retries(&providers, "flaky"), 0); + } + + #[test] + fn resolve_max_retries_isolates_per_provider_overrides() { + // Two providers with different budgets — neither should leak. + let providers = vec![ + provider_with_retries("anthropic", Some(2)), + provider_with_retries("openai", Some(3)), + provider_with_retries("default-provider", None), + ]; + assert_eq!(resolve_max_retries(&providers, "anthropic"), 2); + assert_eq!(resolve_max_retries(&providers, "openai"), 3); + assert_eq!( + resolve_max_retries(&providers, "default-provider"), + MAX_RETRIES + ); + } } diff --git a/src/server/config_api.rs b/src/server/config_api.rs index eb1cd5cd..b568c140 100644 --- a/src/server/config_api.rs +++ b/src/server/config_api.rs @@ -207,8 +207,16 @@ pub(crate) async fn update_config_json( }))) } -/// Reload configuration without restarting the server +/// Reload configuration without restarting the server. +/// +/// The handler **awaits** validation against the candidate provider registry +/// before swapping the live config. A failure in any router model surfaces as +/// a 4xx response and the in-flight `inner` snapshot is left untouched, so +/// in-flight requests never see a half-validated config and a misconfigured +/// reload cannot temporarily serve traffic. pub(crate) async fn reload_config(State(state): State>) -> Response { + use axum::http::StatusCode; + info!("🔄 Configuration reload requested via UI"); // 1. Read and parse new config from source @@ -216,7 +224,14 @@ pub(crate) async fn reload_config(State(state): State>) -> Respons Ok(c) => c, Err(e) => { error!("Failed to reload config: {}", e); - return Json(serde_json::json!({"status": "error", "message": format!("Failed to reload config: {}", e)})).into_response(); + return ( + StatusCode::BAD_REQUEST, + Json(serde_json::json!({ + "status": "error", + "message": format!("Failed to reload config: {}", e), + })), + ) + .into_response(); } }; @@ -239,18 +254,53 @@ pub(crate) async fn reload_config(State(state): State>) -> Respons Ok(r) => Arc::new(r), Err(e) => { error!("Failed to init providers: {}", e); - return Json(serde_json::json!({"status": "error", "message": format!("Failed to init providers: {}", e)})).into_response(); + return ( + StatusCode::BAD_REQUEST, + Json(serde_json::json!({ + "status": "error", + "message": format!("Failed to init providers: {}", e), + })), + ) + .into_response(); } }; - // 4. Create new reloadable state + // 4. Validate the candidate config BEFORE swapping. Awaiting here is + // intentional — if validation fails we must surface the error and keep + // the live snapshot intact, so in-flight requests never observe a + // half-validated config. + info!("🔍 Validating reloaded config..."); + let validation = crate::preset::validate_config(&new_config, &new_registry).await; + crate::preset::log_validation_results(&validation); + + if let Some(rejection) = reject_if_validation_broken(&validation) { + error!( + "Configuration reload rejected: validation failed for {}", + rejection.detail + ); + return ( + StatusCode::UNPROCESSABLE_ENTITY, + Json(serde_json::json!({ + "status": "error", + "message": format!( + "Validation failed — config not reloaded. Models with no healthy provider: {}", + rejection.detail, + ), + "broken_models": rejection.broken_models, + })), + ) + .into_response(); + } + + // 5. Create new reloadable state and atomically swap (write lock held + // for microseconds). In-flight requests continue on the old snapshot + // because they hold an `Arc` from before the swap. let new_inner = Arc::new(ReloadableState::new(new_config, new_router, new_registry)); - // 5. Atomic swap (write lock held for microseconds) let active = state .active_requests .load(std::sync::atomic::Ordering::Relaxed); - *state.inner.write().unwrap_or_else(|e| e.into_inner()) = new_inner.clone(); + *state.inner.write().unwrap_or_else(|e| e.into_inner()) = new_inner; if active > 0 { info!( @@ -261,13 +311,154 @@ pub(crate) async fn reload_config(State(state): State>) -> Respons info!("✅ Configuration reloaded successfully"); } - // 6. Validate new config in background (non-blocking) - tokio::spawn(async move { - info!("🔍 Validating reloaded config..."); - let results = - crate::preset::validate_config(&new_inner.config, &new_inner.provider_registry).await; - crate::preset::log_validation_results(&results); - }); + Json(serde_json::json!({ + "status": "success", + "message": "Configuration reloaded", + "active_requests": active, + })) + .into_response() +} + +/// Internal carrier for a validation rejection — feeds the 4xx response body. +/// +/// Extracted so the rejection logic can be unit-tested without standing up an +/// `AppState` or a full `Router`. +struct ValidationRejection { + detail: String, + broken_models: Vec, +} + +/// Returns `Some(rejection)` when at least one router model has zero healthy +/// providers in the candidate registry; otherwise `None`. +/// +/// The reload handler short-circuits on `Some(_)` and leaves the live config +/// untouched, satisfying the "in-flight requests keep using the old snapshot" +/// invariant. +fn reject_if_validation_broken( + validation: &[crate::preset::ModelValidation], +) -> Option { + let broken: Vec<&crate::preset::ModelValidation> = + validation.iter().filter(|m| !m.any_ok()).collect(); + if broken.is_empty() { + return None; + } + let detail = broken + .iter() + .map(|m| format!("{} [{}]", m.model_name, m.role)) + .collect::>() + .join(", "); + let broken_models = broken + .iter() + .map(|m| { + serde_json::json!({ + "model": m.model_name, + "role": m.role, + }) + }) + .collect(); + Some(ValidationRejection { + detail, + broken_models, + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::preset::{MappingResult, ModelValidation}; + + fn ok_mapping() -> MappingResult { + MappingResult { + priority: 1, + provider: "p".into(), + actual_model: "m".into(), + ok: true, + detail: "OK (12ms)".into(), + } + } + + fn broken_mapping(detail: &str) -> MappingResult { + MappingResult { + priority: 1, + provider: "p".into(), + actual_model: "m".into(), + ok: false, + detail: detail.into(), + } + } - Json(serde_json::json!({"status": "success", "message": "Configuration reloaded", "active_requests": active})).into_response() + #[test] + fn empty_validation_passes() { + // No router models declared → nothing to validate → no rejection. + // Preserves the prior "soft" reload contract for minimal configs. + assert!(reject_if_validation_broken(&[]).is_none()); + } + + #[test] + fn all_ok_validation_passes() { + let results = vec![ModelValidation { + model_name: "default".into(), + role: "default".into(), + mappings: vec![ok_mapping()], + }]; + assert!(reject_if_validation_broken(&results).is_none()); + } + + #[test] + fn at_least_one_healthy_mapping_passes() { + // any_ok() — a single healthy mapping is enough; a fallback being + // rate-limited at reload time should not block the reload. + let results = vec![ModelValidation { + model_name: "default".into(), + role: "default".into(), + mappings: vec![ok_mapping(), broken_mapping("429 - rate limited")], + }]; + assert!(reject_if_validation_broken(&results).is_none()); + } + + #[test] + fn rejects_when_any_router_model_has_zero_healthy_mappings() { + // Regression guard for the race the parent fix targets: a config where + // a router slot points at a model with no working provider must NOT + // be swapped in. + let results = vec![ + ModelValidation { + model_name: "default".into(), + role: "default".into(), + mappings: vec![ok_mapping()], + }, + ModelValidation { + model_name: "missing-think-model".into(), + role: "think".into(), + mappings: vec![broken_mapping("Model not found in [[models]]")], + }, + ]; + let rejection = reject_if_validation_broken(&results) + .expect("expected rejection for the broken router model"); + assert!(rejection.detail.contains("missing-think-model [think]")); + assert_eq!(rejection.broken_models.len(), 1); + assert_eq!(rejection.broken_models[0]["model"], "missing-think-model"); + assert_eq!(rejection.broken_models[0]["role"], "think"); + } + + #[test] + fn rejects_with_multiple_broken_models_in_detail() { + let results = vec![ + ModelValidation { + model_name: "default".into(), + role: "default".into(), + mappings: vec![broken_mapping("connection refused")], + }, + ModelValidation { + model_name: "think".into(), + role: "think".into(), + mappings: vec![broken_mapping("connection refused")], + }, + ]; + let rejection = + reject_if_validation_broken(&results).expect("expected rejection for broken models"); + assert!(rejection.detail.contains("default [default]")); + assert!(rejection.detail.contains("think [think]")); + assert_eq!(rejection.broken_models.len(), 2); + } } diff --git a/src/server/dispatch/mod.rs b/src/server/dispatch/mod.rs index ffb8c64e..9d81f426 100644 --- a/src/server/dispatch/mod.rs +++ b/src/server/dispatch/mod.rs @@ -5,6 +5,7 @@ //! DLP scanning → cache lookup → routing → provider loop with fallback → audit → response. mod provider_loop; +mod rate_limit; mod resolver; mod retry; mod telemetry; diff --git a/src/server/dispatch/rate_limit.rs b/src/server/dispatch/rate_limit.rs new file mode 100644 index 00000000..706e3516 --- /dev/null +++ b/src/server/dispatch/rate_limit.rs @@ -0,0 +1,150 @@ +//! Rate-limit detection and decision helpers. +//! +//! Centralises the 429-handling logic that was previously duplicated across +//! three call sites in [`super::retry`]. Provider error variants (429, 529, +//! Anthropic-specific 401-with-`rate_limit_error` payload) now flow through a +//! single [`RateLimitHandler`] implementation, so adding a new upstream +//! variant means touching one match arm instead of three. +//! +//! # Examples +//! +//! ```ignore +//! use crate::providers::error::ProviderError; +//! use crate::server::dispatch::rate_limit::RateLimitHandler; +//! +//! let err = ProviderError::ApiError { status: 429, message: "slow".into() }; +//! assert!(err.is_rate_limit()); +//! ``` +//! +//! See `src/server/budget.rs::is_rate_limit_payload` for the 401-with-rate-limit +//! payload heuristic this trait reuses. + +use crate::providers::error::ProviderError; + +/// Upstream-agnostic rate-limit decision surface for a single provider attempt. +/// +/// Implemented for [`ProviderError`] in this module. Replaces three inline +/// `matches!(e, ProviderError::ApiError { status: 429, .. })` checks in the +/// retry path with a single `err.is_rate_limit()` call so future provider +/// variants (e.g. 529, Anthropic 401-`rate_limit_error`) can be wired in +/// one place. +pub(crate) trait RateLimitHandler { + /// Returns `true` when this error should be treated as a rate-limit signal. + /// + /// Recognised variants: + /// + /// - HTTP 429 (canonical "Too Many Requests") + /// - HTTP 529 (Anthropic-specific "Overloaded") + /// - HTTP 401 carrying an Anthropic `rate_limit_error` payload + fn is_rate_limit(&self) -> bool; + + /// Returns the upstream-suggested cool-down in milliseconds, when + /// available. + /// + /// The current `ProviderError` shape does not retain the upstream + /// `Retry-After` header, so the default is `None`. The retry loop falls + /// back to its own exponential-backoff schedule. The hook exists so that + /// when [`ProviderError::ApiError`] grows headers (post unified-error + /// refactor) the retry loop can consume them through the same trait. + #[allow(dead_code)] + fn retry_after_ms(&self) -> Option { + None + } +} + +impl RateLimitHandler for ProviderError { + fn is_rate_limit(&self) -> bool { + match self { + ProviderError::ApiError { status, message } => match status { + 429 | 529 => true, + 401 => super::super::budget::is_rate_limit_payload(message), + _ => false, + }, + _ => false, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn anthropic_429_is_rate_limit() { + let err = ProviderError::ApiError { + status: 429, + message: "Too Many Requests".into(), + }; + assert!(err.is_rate_limit()); + } + + #[test] + fn anthropic_529_overloaded_is_rate_limit() { + let err = ProviderError::ApiError { + status: 529, + message: "overloaded_error".into(), + }; + assert!(err.is_rate_limit()); + } + + #[test] + fn anthropic_401_with_rate_limit_payload_is_rate_limit() { + let err = ProviderError::ApiError { + status: 401, + message: r#"{"type":"error","error":{"type":"rate_limit_error","message":"slow"}}"# + .into(), + }; + assert!(err.is_rate_limit()); + } + + #[test] + fn openai_429_is_rate_limit() { + let err = ProviderError::ApiError { + status: 429, + message: r#"{"error":{"code":"rate_limit_exceeded"}}"#.into(), + }; + assert!(err.is_rate_limit()); + } + + #[test] + fn deepseek_429_is_rate_limit() { + let err = ProviderError::ApiError { + status: 429, + message: "Rate limit reached for deepseek-chat".into(), + }; + assert!(err.is_rate_limit()); + } + + #[test] + fn openrouter_5xx_is_not_rate_limit() { + let err = ProviderError::ApiError { + status: 503, + message: "service unavailable".into(), + }; + assert!(!err.is_rate_limit()); + } + + #[test] + fn auth_401_is_not_rate_limit() { + let err = ProviderError::ApiError { + status: 401, + message: r#"{"type":"error","error":{"type":"authentication_error"}}"#.into(), + }; + assert!(!err.is_rate_limit()); + } + + #[test] + fn http_error_is_not_rate_limit() { + let err = ProviderError::AuthError("token expired".into()); + assert!(!err.is_rate_limit()); + } + + #[test] + fn retry_after_ms_default_is_none() { + let err = ProviderError::ApiError { + status: 429, + message: "rate limited".into(), + }; + assert_eq!(err.retry_after_ms(), None); + } +} diff --git a/src/server/dispatch/retry.rs b/src/server/dispatch/retry.rs index aa7fd690..62839f7f 100644 --- a/src/server/dispatch/retry.rs +++ b/src/server/dispatch/retry.rs @@ -15,7 +15,8 @@ use std::pin::Pin; use std::sync::Arc; use tracing::{info, warn}; -use super::super::{is_auth_revoked_error, is_retryable, retry_delay, MAX_RETRIES}; +use super::super::{is_auth_revoked_error, is_retryable, provider_max_retries, retry_delay}; +use super::rate_limit::RateLimitHandler; use super::telemetry::{ calculate_and_record_metrics, record_success_telemetry, store_response_cache, }; @@ -48,11 +49,7 @@ fn emit_provider_error_metrics( mapping: &crate::cli::ModelMapping, e: &crate::providers::error::ProviderError, ) { - let is_rate_limit = matches!( - e, - crate::providers::error::ProviderError::ApiError { status: 429, .. } - ); - if is_rate_limit { + if e.is_rate_limit() { warn!("Provider {} rate limited", mapping.provider); metrics::counter!( "grob_ratelimit_hits_total", @@ -74,6 +71,7 @@ fn classify_and_handle_error( mapping: &crate::cli::ModelMapping, e: &crate::providers::error::ProviderError, attempt: u32, + max_retries: u32, ) -> bool { if let Some(ref trace_id) = ctx.trace_id { ctx.state @@ -82,7 +80,7 @@ fn classify_and_handle_error( .trace_error(trace_id, &e.to_string()); } emit_provider_error_metrics(mapping, e); - is_retryable(e) && attempt < MAX_RETRIES + is_retryable(e) && attempt < max_retries } /// Log provider error metrics for the streaming path. @@ -177,11 +175,7 @@ pub(super) async fn dispatch_streaming( if is_auth_revoked_error(&e) { return Err(ProviderLoopAction::AuthRevoked(e.to_string())); } - let is_rate_limit = matches!( - e, - crate::providers::error::ProviderError::ApiError { status: 429, .. } - ); - if is_rate_limit { + if e.is_rate_limit() { Err(ProviderLoopAction::RateLimited) } else { Err(ProviderLoopAction::Continue) @@ -199,21 +193,25 @@ pub(super) async fn dispatch_non_streaming( ) -> Result { // Wrap in Option so we can move (not clone) on the final attempt. let mut owned_request = Some(provider_request); - for retry in 0..=MAX_RETRIES { + // Resolve per-provider retry budget (`[[providers]] max_retries = N`) + // with fallback to the global default. Looked up once per provider + // dispatch — config is static for the lifetime of the loop. + let max_retries = provider_max_retries(ctx.inner, &attempt.mapping.provider); + for retry in 0..=max_retries { if retry > 0 { let delay = retry_delay(retry - 1); warn!( "Retrying provider {} (attempt {}/{}), backoff {}ms", attempt.mapping.provider, retry + 1, - MAX_RETRIES + 1, + max_retries + 1, delay.as_millis() ); tokio::time::sleep(delay).await; } // Clone for earlier attempts; move on the last to avoid an extra allocation. - let req = if retry < MAX_RETRIES { + let req = if retry < max_retries { owned_request.as_ref().expect("set before loop").clone() } else { owned_request.take().expect("set before loop") @@ -295,13 +293,9 @@ pub(super) async fn dispatch_non_streaming( return Err(ProviderLoopAction::AuthRevoked(e.to_string())); } - if classify_and_handle_error(ctx, attempt.mapping, &e, retry) { - // On 429, try rotating to next pooled key before retrying. - let is_rate_limit = matches!( - e, - crate::providers::error::ProviderError::ApiError { status: 429, .. } - ); - if is_rate_limit && provider.rotate_key_pool() { + if classify_and_handle_error(ctx, attempt.mapping, &e, retry, max_retries) { + // On rate-limit, try rotating to next pooled key before retrying. + if e.is_rate_limit() && provider.rotate_key_pool() { info!( "Provider {} rate-limited, rotated to next pooled key", attempt.mapping.provider @@ -314,12 +308,8 @@ pub(super) async fn dispatch_non_streaming( continue; } - // Before giving up on this provider, try key rotation for 429. - let is_rate_limit = matches!( - e, - crate::providers::error::ProviderError::ApiError { status: 429, .. } - ); - if is_rate_limit && provider.rotate_key_pool() { + // Before giving up on this provider, try key rotation on rate-limit. + if e.is_rate_limit() && provider.rotate_key_pool() { info!( "Provider {} exhausted retries but rotated to next pooled key", attempt.mapping.provider diff --git a/src/server/mod.rs b/src/server/mod.rs index e815fa3d..fb69b516 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -35,8 +35,12 @@ pub use audit::AuditEntryBuilder; pub(crate) use audit::{log_audit, AuditCompliance, AuditParams}; pub(crate) use budget::{ calculate_cost, check_budget, is_auth_revoked_error, is_provider_subscription, is_retryable, - record_request_metrics, record_spend, retry_delay, RequestMetrics, MAX_RETRIES, + provider_max_retries, record_request_metrics, record_spend, retry_delay, RequestMetrics, }; +// Re-export the global default so intra-doc links from +// `cli::config::providers::ProviderConfig::max_retries` resolve. +#[allow(unused_imports)] +pub(crate) use budget::MAX_RETRIES; pub use error::AppError; pub(crate) use helpers::{ format_route_type, inject_continuation_text, resolve_provider_mappings, diff --git a/src/server/rpc/server_ns.rs b/src/server/rpc/server_ns.rs index dfe88266..7569f1b2 100644 --- a/src/server/rpc/server_ns.rs +++ b/src/server/rpc/server_ns.rs @@ -38,6 +38,11 @@ pub async fn status( } /// Triggers an atomic configuration reload. +/// +/// Awaits validation against the candidate registry **before** swapping +/// the live snapshot. A failure surfaces as a JSON-RPC error and the +/// in-flight `inner` snapshot stays untouched — the same contract the +/// HTTP `/api/config/reload` endpoint enforces. pub async fn reload_config( state: &Arc, caller: &CallerIdentity, @@ -75,19 +80,33 @@ pub async fn reload_config( .map(Arc::new) .map_err(|e| rpc_err(ERR_INTERNAL, format!("Failed to init providers: {e}")))?; + // Awaited validation BEFORE swap so a misconfigured reload cannot + // briefly serve traffic. In-flight requests continue on the old + // snapshot via their cached `Arc`. + let validation = crate::preset::validate_config(&new_config, &new_registry).await; + crate::preset::log_validation_results(&validation); + let broken: Vec<&crate::preset::ModelValidation> = + validation.iter().filter(|m| !m.any_ok()).collect(); + if !broken.is_empty() { + let detail = broken + .iter() + .map(|m| format!("{} [{}]", m.model_name, m.role)) + .collect::>() + .join(", "); + return Err(rpc_err( + ERR_INTERNAL, + format!( + "Validation failed — config not reloaded. Models with no healthy provider: {detail}" + ), + )); + } + let new_inner = Arc::new(ReloadableState::new(new_config, new_router, new_registry)); let active = state .active_requests .load(std::sync::atomic::Ordering::Relaxed); - *state.inner.write().unwrap_or_else(|e| e.into_inner()) = new_inner.clone(); - - // Background validation (non-blocking) - tokio::spawn(async move { - let results = - crate::preset::validate_config(&new_inner.config, &new_inner.provider_registry).await; - crate::preset::log_validation_results(&results); - }); + *state.inner.write().unwrap_or_else(|e| e.into_inner()) = new_inner; Ok(StatusResponse { status: "ok".into(), diff --git a/src/storage/secrets.rs b/src/storage/secrets.rs index b95f3e4a..8e1b8d75 100644 --- a/src/storage/secrets.rs +++ b/src/storage/secrets.rs @@ -233,6 +233,7 @@ mod tests { pool: None, circuit_breaker: None, health_check: None, + max_retries: None, } } diff --git a/tests/helpers/fixtures.rs b/tests/helpers/fixtures.rs index 1fc70bb8..91ecac00 100644 --- a/tests/helpers/fixtures.rs +++ b/tests/helpers/fixtures.rs @@ -126,6 +126,7 @@ pub fn base_provider_config(name: &str) -> grob::providers::ProviderConfig { circuit_breaker: None, health_check: None, + max_retries: None, } } diff --git a/tests/unit/provider_test.rs b/tests/unit/provider_test.rs index 5ad2f06c..f3e28a2e 100644 --- a/tests/unit/provider_test.rs +++ b/tests/unit/provider_test.rs @@ -32,6 +32,7 @@ mod tests { circuit_breaker: None, health_check: None, + max_retries: None, }; assert!(config.is_enabled()); @@ -62,6 +63,7 @@ mod tests { circuit_breaker: None, health_check: None, + max_retries: None, }; assert!(!config.is_enabled());