Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/features/token_pricing/spend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ pub struct BudgetLimits {
pub struct BudgetError {
/// Human-readable budget exceeded message.
pub message: String,
/// Configured budget limit in USD (whichever scope tripped first).
pub limit_usd: f64,
/// Recorded spend in USD at check time.
pub actual_usd: f64,
}

impl std::fmt::Display for BudgetError {
Expand Down Expand Up @@ -216,6 +220,8 @@ impl SpendTracker {
"Monthly budget for model '{}' reached: ${:.2}/${:.2}",
model, spend, limit
),
limit_usd: limit,
actual_usd: spend,
});
}
}
Expand All @@ -228,6 +234,8 @@ impl SpendTracker {
"Monthly budget for provider '{}' reached: ${:.2}/${:.2}",
provider, spend, limit
),
limit_usd: limit,
actual_usd: spend,
});
}
}
Expand All @@ -240,6 +248,8 @@ impl SpendTracker {
"Monthly global budget reached: ${:.2}/${:.2}",
total, global_limit
),
limit_usd: global_limit,
actual_usd: total,
});
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/security/audit_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ pub enum AuditEvent {
HitApproval,
/// TEE attestation report generated at startup.
TeeAttestation,
/// HTTP request fully processed (emitted by the audit middleware once a
/// response has been produced — covers the entire request lifecycle from
/// authentication through dispatch and error handling).
RequestProcessed,
}

/// Immutable audit log entry.
Expand Down
11 changes: 7 additions & 4 deletions src/server/budget.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::providers::AuthType;
use std::sync::Arc;
use tracing::warn;

use super::{AppError, AppState, ReloadableState};
use super::{AppState, ReloadableState, RequestError};

/// Maximum retries per provider before falling back to the next mapping.
/// NOTE: 2 retries (3 total attempts) balances latency vs resilience — most
Expand Down Expand Up @@ -62,13 +62,13 @@ pub(crate) fn record_request_metrics(m: &RequestMetrics<'_>) {
}
}

/// Check budget before a request. Returns Err(AppError::BudgetExceeded) if any limit is hit.
/// Check budget before a request. Returns `Err(RequestError::BudgetExceeded)` if any limit is hit.
pub(crate) async fn check_budget(
state: &Arc<AppState>,
inner: &Arc<ReloadableState>,
provider_name: &str,
model_name: &str,
) -> Result<(), AppError> {
) -> Result<(), RequestError> {
let budget_config = &inner.config.budget;
let global_limit = budget_config.monthly_limit_usd.value();

Expand All @@ -92,7 +92,10 @@ pub(crate) async fn check_budget(
provider_limit,
model_limit,
) {
return Err(AppError::BudgetExceeded(e.message));
return Err(RequestError::BudgetExceeded {
limit_usd: e.limit_usd,
actual_usd: e.actual_usd,
});
}

if let Some(warning) = tracker.check_warnings(
Expand Down
22 changes: 11 additions & 11 deletions src/server/config_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::sync::Arc;
use tracing::{error, info, warn};

use super::config_guard::is_section_or_key_denied;
use super::{AppError, AppState, ReloadableState};
use super::{AppState, ReloadableState, RequestError};

/// Redact an API key for safe display (show first 4 + last 4 chars)
pub(crate) fn redact_api_key(key: &str) -> String {
Expand Down Expand Up @@ -89,7 +89,7 @@ pub(crate) async fn get_config_json(State(state): State<Arc<AppState>>) -> impl
pub(crate) async fn update_config_json(
State(state): State<Arc<AppState>>,
Json(mut new_config): Json<serde_json::Value>,
) -> Result<Json<serde_json::Value>, AppError> {
) -> Result<Json<serde_json::Value>, RequestError> {
// Remove null values (TOML doesn't support null)
remove_null_values(&mut new_config);

Expand All @@ -99,7 +99,7 @@ pub(crate) async fn update_config_json(
// Whole-section deny check (providers, dlp).
if is_section_or_key_denied(section, "") {
warn!(section = %section, "config API: denied write to protected section");
return Err(AppError::ParseError(format!(
return Err(RequestError::Forbidden(format!(
"denied: section '{}' cannot be modified via the config API",
section
)));
Expand All @@ -109,7 +109,7 @@ pub(crate) async fn update_config_json(
for key in inner.keys() {
if is_section_or_key_denied(section, key) {
warn!(section = %section, key = %key, "config API: denied write to protected key");
return Err(AppError::ParseError(format!(
return Err(RequestError::Forbidden(format!(
"denied: {}.{} cannot be modified via the config API",
section, key
)));
Expand All @@ -123,7 +123,7 @@ pub(crate) async fn update_config_json(
let config_path = match &state.config_source {
crate::cli::ConfigSource::File(p) => p,
crate::cli::ConfigSource::Url(_) => {
return Err(AppError::ParseError(
return Err(RequestError::BadRequest(
"Cannot save config: loaded from remote URL (read-only)".to_string(),
));
}
Expand All @@ -132,15 +132,15 @@ pub(crate) async fn update_config_json(
// Read current config and merge the incoming JSON updates into it.
let config_str = tokio::fs::read_to_string(config_path)
.await
.map_err(|e| AppError::ParseError(format!("Failed to read config: {e}")))?;
.map_err(|e| RequestError::Internal(anyhow::anyhow!("Failed to read config: {e}")))?;

let mut config: toml::Value = toml::from_str(&config_str)
.map_err(|e| AppError::ParseError(format!("Failed to parse config: {e}")))?;
.map_err(|e| RequestError::ParseError(format!("Failed to parse config: {e}")))?;

// Update providers section
if let Some(providers) = new_config.get("providers") {
let providers_toml: toml::Value = serde_json::from_str(&providers.to_string())
.map_err(|e| AppError::ParseError(format!("Failed to convert providers: {e}")))?;
.map_err(|e| RequestError::ParseError(format!("Failed to convert providers: {e}")))?;

if let Some(table) = config.as_table_mut() {
table.insert("providers".to_string(), providers_toml);
Expand All @@ -150,7 +150,7 @@ pub(crate) async fn update_config_json(
// Update models section
if let Some(models) = new_config.get("models") {
let models_toml: toml::Value = serde_json::from_str(&models.to_string())
.map_err(|e| AppError::ParseError(format!("Failed to convert models: {e}")))?;
.map_err(|e| RequestError::ParseError(format!("Failed to convert models: {e}")))?;

if let Some(table) = config.as_table_mut() {
table.insert("models".to_string(), models_toml);
Expand Down Expand Up @@ -192,9 +192,9 @@ pub(crate) async fn update_config_json(

// Deserialise the merged TOML into AppConfig so we can validate and reload.
let merged_toml_str = toml::to_string_pretty(&config)
.map_err(|e| AppError::ParseError(format!("Failed to serialize config: {e}")))?;
.map_err(|e| RequestError::Internal(anyhow::anyhow!("Failed to serialize config: {e}")))?;
let merged_config: crate::models::config::AppConfig = toml::from_str(&merged_toml_str)
.map_err(|e| AppError::ParseError(format!("Invalid config after merge: {e}")))?;
.map_err(|e| RequestError::ParseError(format!("Invalid config after merge: {e}")))?;

// Backup, write, and hot-reload via the shared pipeline.
super::config_guard::persist_and_reload(&state, &merged_config).await?;
Expand Down
23 changes: 13 additions & 10 deletions src/server/config_guard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ pub fn is_key_denied(section: &ConfigSection, key: &str) -> bool {
pub async fn persist_and_reload(
state: &Arc<super::AppState>,
config: &crate::models::config::AppConfig,
) -> Result<(), super::AppError> {
) -> Result<(), super::RequestError> {
let config_path = match &state.config_source {
crate::cli::ConfigSource::File(p) => p,
crate::cli::ConfigSource::Url(_) => {
return Err(super::AppError::ParseError(
return Err(super::RequestError::BadRequest(
"Cannot save config: loaded from remote URL (read-only)".to_string(),
));
}
Expand All @@ -81,15 +81,18 @@ pub async fn persist_and_reload(
let backup_path = config_path.with_extension("toml.backup");
tokio::fs::copy(config_path, &backup_path)
.await
.map_err(|e| super::AppError::ParseError(format!("Failed to create backup: {e}")))?;
.map_err(|e| {
super::RequestError::Internal(anyhow::anyhow!("Failed to create backup: {e}"))
})?;

// 2. Serialise and write
let toml_str = toml::to_string_pretty(config)
.map_err(|e| super::AppError::ParseError(format!("Failed to serialize config: {e}")))?;
let toml_str = toml::to_string_pretty(config).map_err(|e| {
super::RequestError::Internal(anyhow::anyhow!("Failed to serialize config: {e}"))
})?;

tokio::fs::write(config_path, toml_str)
.await
.map_err(|e| super::AppError::ParseError(format!("Failed to write config: {e}")))?;
tokio::fs::write(config_path, toml_str).await.map_err(|e| {
super::RequestError::Internal(anyhow::anyhow!("Failed to write config: {e}"))
})?;

// 3. Hot-reload: rebuild router + provider registry from the new config
reload_state(state, config.clone(), config_path)?;
Expand All @@ -109,7 +112,7 @@ fn reload_state(
state: &Arc<super::AppState>,
config: crate::models::config::AppConfig,
_config_path: &Path,
) -> Result<(), super::AppError> {
) -> Result<(), super::RequestError> {
let new_router = crate::routing::classify::Router::new(config.clone());

let secret_backend =
Expand All @@ -123,7 +126,7 @@ fn reload_state(
&config.server.timeouts,
)
.map_err(|e| {
super::AppError::ProviderError(format!("Failed to rebuild provider registry: {e}"))
super::RequestError::Internal(anyhow::anyhow!("Failed to rebuild provider registry: {e}"))
})?;

let new_inner = Arc::new(super::ReloadableState::new(
Expand Down
28 changes: 19 additions & 9 deletions src/server/dispatch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use std::sync::Arc;

use super::{
calculate_cost, is_provider_subscription, log_audit, record_request_metrics,
resolve_provider_mappings, sanitize_provider_response_reported, AppError, AppState,
AuditCompliance, AuditParams, ReloadableState, RequestMetrics,
resolve_provider_mappings, sanitize_provider_response_reported, AppState, AuditCompliance,
AuditParams, ReloadableState, RequestError, RequestMetrics,
};
use crate::features::watch::events::{DlpDirection, WatchEvent};

Expand All @@ -46,6 +46,9 @@ pub(crate) struct DispatchContext<'a> {
pub headers: &'a HeaderMap,
/// Message tracer context. None for OpenAI compat endpoint.
pub trace_id: Option<String>,
/// Audit-emitted flag — flipped by `log_audit_if_enabled` so the
/// outer audit middleware can skip writing a duplicate entry.
pub audited: std::sync::Arc<std::sync::atomic::AtomicBool>,
/// Resolved policy for this request (when policies feature is enabled).
#[cfg(feature = "policies")]
#[allow(dead_code)]
Expand Down Expand Up @@ -169,6 +172,9 @@ impl DispatchContext<'_> {
dlp_had_pii: entry.dlp_had_pii,
dlp_had_redact_or_warn: entry.dlp_had_redact_or_warn,
});
// Flag so the outer audit middleware skips a duplicate entry.
self.audited
.store(true, std::sync::atomic::Ordering::Release);
}
}
}
Expand Down Expand Up @@ -244,7 +250,7 @@ pub(crate) fn resolve_grob_hint(
pub(crate) async fn dispatch(
ctx: &DispatchContext<'_>,
request: &mut CanonicalRequest,
) -> Result<DispatchResult, AppError> {
) -> Result<DispatchResult, RequestError> {
// ── Step 0: Resolve complexity hint ──
// Resolved up-front (borrows `request` immutably) but applied post-routing
// so the client-declared tier overrides the algorithmic scorer.
Expand Down Expand Up @@ -291,7 +297,7 @@ pub(crate) async fn dispatch(
.inner
.router
.route(request)
.map_err(|e| AppError::RoutingError(e.to_string()))?;
.map_err(|e| RequestError::RoutingError(e.to_string()))?;

// ── Step 3.5: Apply client-declared complexity hint ──
// The hint (header / body metadata / MCP one-shot) overrides whatever tier
Expand Down Expand Up @@ -398,7 +404,7 @@ async fn check_cache(
fn scan_dlp_input(
ctx: &DispatchContext<'_>,
request: &mut CanonicalRequest,
) -> Result<(), AppError> {
) -> Result<(), RequestError> {
let Some(ref dlp_engine) = ctx.dlp else {
return Ok(());
};
Expand Down Expand Up @@ -476,7 +482,7 @@ fn scan_dlp_input(
dlp_had_pii: false,
dlp_had_redact_or_warn: false,
});
Err(AppError::DlpBlocked(format!("{}", block_err)))
Err(RequestError::DlpBlocked(format!("{}", block_err)))
}
}
}
Expand All @@ -488,7 +494,7 @@ async fn dispatch_fan_out(
sorted_mappings: &[crate::cli::ModelMapping],
fan_out_config: &crate::cli::FanOutConfig,
decision: &crate::models::RouteDecision,
) -> Result<DispatchResult, AppError> {
) -> Result<DispatchResult, RequestError> {
let mut fan_request = request.clone();
ctx.sanitize_input(&mut fan_request);

Expand All @@ -503,7 +509,11 @@ async fn dispatch_fan_out(
Ok((response, provider_info)) => {
handle_fan_out_success(ctx, response, &provider_info, decision).await
}
Err(e) => Err(AppError::ProviderError(format!("Fan-out failed: {}", e))),
Err(e) => Err(RequestError::ProviderUpstream {
provider: "fan_out".to_string(),
status: 502,
body: Some(format!("Fan-out failed: {}", e)),
}),
}
}

Expand All @@ -513,7 +523,7 @@ async fn handle_fan_out_success(
mut response: ProviderResponse,
provider_info: &[(String, String)],
decision: &crate::models::RouteDecision,
) -> Result<DispatchResult, AppError> {
) -> Result<DispatchResult, RequestError> {
ctx.sanitize_output(&mut response);

let latency_ms = ctx.start_time.elapsed().as_millis() as u64;
Expand Down
22 changes: 13 additions & 9 deletions src/server/dispatch/provider_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
//!
//! After the loop exhausts the list, [`resolver::try_direct_provider_lookup`]
//! offers a backward-compat path for unmapped models. A final audit entry
//! is written before returning `AppError::ProviderError`.
//! is written before returning `RequestError::ProviderUpstream`.

use super::super::{
check_budget, format_route_type, inject_continuation_text, is_provider_subscription,
should_inject_continuation, AppError,
should_inject_continuation, RequestError,
};
use super::resolver::{resolve_provider, try_direct_provider_lookup};
use super::retry::{
Expand All @@ -39,7 +39,7 @@ pub(super) async fn dispatch_provider_loop(
sorted_mappings: &[crate::cli::ModelMapping],
decision: &crate::models::RouteDecision,
cache_key: &Option<String>,
) -> Result<DispatchResult, AppError> {
) -> Result<DispatchResult, RequestError> {
// Re-sort mappings by adaptive score when scorer is enabled
let rescored;
let effective_mappings: &[crate::cli::ModelMapping] =
Expand Down Expand Up @@ -156,7 +156,7 @@ pub(super) async fn dispatch_provider_loop(
);
// Abort the fallback cascade: this is a user-actionable error,
// not a transient provider failure.
return Err(AppError::AuthenticationError(format!(
return Err(RequestError::AuthRevoked(format!(
"OAuth token for provider '{}' revoked. Run: grob connect --force-reauth. Details: {}",
mapping.provider, msg
)));
Expand Down Expand Up @@ -189,11 +189,15 @@ pub(super) async fn dispatch_provider_loop(
"All provider mappings failed for model: {}",
decision.model_name
);
Err(AppError::ProviderError(format!(
"All {} provider mappings failed for model: {}",
effective_mappings.len(),
decision.model_name
)))
Err(RequestError::ProviderUpstream {
provider: "all".to_string(),
status: 502,
body: Some(format!(
"All {} provider mappings failed for model: {}",
effective_mappings.len(),
decision.model_name
)),
})
}

/// Log the dispatch attempt info line (route type, stream mode, model -> provider).
Expand Down
Loading
Loading