Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/executor-retry-consistency.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@googleworkspace/cli": patch
---

Route generic executor HTTP sends through shared retry behavior for 429 responses to match resilient helper paths and improve throttling robustness.
121 changes: 114 additions & 7 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,17 @@ pub fn build_client() -> Result<reqwest::Client, crate::error::GwsError> {
}

const MAX_RETRIES: u32 = 3;
const MAX_RETRY_AFTER_SECS: u64 = 30;

fn retry_delay_secs(headers: &reqwest::header::HeaderMap, attempt: u32) -> u64 {
let from_header = headers
.get("retry-after")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<u64>().ok());

let backoff = 1u64 << attempt; // 1, 2, 4 seconds
from_header.unwrap_or(backoff).min(MAX_RETRY_AFTER_SECS)
}

/// Send an HTTP request with automatic retry on 429 (rate limit) responses.
/// Respects the `Retry-After` header; falls back to exponential backoff (1s, 2s, 4s).
Expand All @@ -33,13 +44,9 @@ pub async fn send_with_retry(
return Ok(resp);
}

// Parse Retry-After header (seconds), fall back to exponential backoff
let retry_after = resp
.headers()
.get("retry-after")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(1 << attempt); // 1, 2, 4 seconds
// Parse Retry-After (seconds), fall back to exponential backoff.
// Clamp to avoid unbounded server-controlled sleep.
let retry_after = retry_delay_secs(resp.headers(), attempt);

tokio::time::sleep(std::time::Duration::from_secs(retry_after)).await;
}
Expand All @@ -48,12 +55,112 @@ pub async fn send_with_retry(
build_request().send().await
}

/// Send an already-built request with retry on 429 when the request can be
/// safely cloned for subsequent attempts.
///
/// If the request cannot be cloned (e.g. streaming body), this falls back to a
/// single send.
pub async fn send_builder_with_retry(
request: reqwest::RequestBuilder,
) -> Result<reqwest::Response, reqwest::Error> {
let Some(template) = request.try_clone() else {
return request.send().await;
};

for attempt in 0..MAX_RETRIES {
// `template` came from `try_clone()`, so this should remain cloneable.
// If clone unexpectedly fails, degrade safely to a single send.
let attempt_request = match template.try_clone() {
Some(r) => r,
None => return template.send().await,
};

let resp = attempt_request.send().await?;

if resp.status() != reqwest::StatusCode::TOO_MANY_REQUESTS {
return Ok(resp);
}

let retry_after = retry_delay_secs(resp.headers(), attempt);

tokio::time::sleep(std::time::Duration::from_secs(retry_after)).await;
}

template.send().await
}

#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::Ordering;

#[test]
fn build_client_succeeds() {
assert!(build_client().is_ok());
}

#[test]
fn retry_delay_secs_clamps_large_retry_after() {
let mut headers = reqwest::header::HeaderMap::new();
headers.insert("retry-after", HeaderValue::from_static("9999"));
assert_eq!(retry_delay_secs(&headers, 0), MAX_RETRY_AFTER_SECS);
}

#[test]
fn retry_delay_secs_uses_exponential_fallback() {
let headers = reqwest::header::HeaderMap::new();
assert_eq!(retry_delay_secs(&headers, 0), 1);
assert_eq!(retry_delay_secs(&headers, 1), 2);
assert_eq!(retry_delay_secs(&headers, 2), 4);
}

#[tokio::test]
async fn send_with_retry_retries_on_429() {
let (url, hits, handle) = crate::test_utils::spawn_response_server(vec![
crate::test_utils::mock_http_response(429, Some(0), "{}"),
crate::test_utils::mock_http_response(200, None, "{}"),
])
.await;
let client = reqwest::Client::new();

let resp = send_with_retry(|| client.get(&url)).await.unwrap();
assert_eq!(resp.status(), reqwest::StatusCode::OK);
assert_eq!(hits.load(Ordering::SeqCst), 2);

handle.await.unwrap().unwrap();
}

#[tokio::test]
async fn send_builder_with_retry_retries_on_429() {
let (url, hits, handle) = crate::test_utils::spawn_response_server(vec![
crate::test_utils::mock_http_response(429, Some(0), "{}"),
crate::test_utils::mock_http_response(200, None, "{}"),
])
.await;
let client = reqwest::Client::new();
let request = client.get(&url).header("x-test", "1");

let resp = send_builder_with_retry(request).await.unwrap();
assert_eq!(resp.status(), reqwest::StatusCode::OK);
assert_eq!(hits.load(Ordering::SeqCst), 2);

handle.await.unwrap().unwrap();
}

#[tokio::test]
async fn send_builder_with_retry_does_not_retry_non_429() {
let (url, hits, handle) =
crate::test_utils::spawn_response_server(vec![crate::test_utils::mock_http_response(
500, None, "{}",
)])
.await;
let client = reqwest::Client::new();
let request = client.get(&url);

let resp = send_builder_with_retry(request).await.unwrap();
assert_eq!(resp.status(), reqwest::StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(hits.load(Ordering::SeqCst), 1);

handle.await.unwrap().unwrap();
}
}
100 changes: 99 additions & 1 deletion src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,10 @@ async fn handle_binary_response(
Ok(None)
}

fn is_retry_safe_method(http_method: &str) -> bool {
matches!(http_method, "GET" | "HEAD" | "OPTIONS" | "PUT" | "DELETE")
}

/// Executes an API method call.
///
/// This is the core function of the CLI that handles:
Expand Down Expand Up @@ -413,7 +417,13 @@ pub async fn execute_method(
)
.await?;

let response = request.send().await.context("HTTP request failed")?;
let response = if is_retry_safe_method(method.http_method.as_str()) {
crate::client::send_builder_with_retry(request)
.await
.context("HTTP request failed")?
} else {
request.send().await.context("HTTP request failed")?
};

let status = response.status();
let content_type = response
Expand Down Expand Up @@ -1727,6 +1737,94 @@ async fn test_execute_method_missing_path_param() {
.contains("Required path parameter"));
}

#[tokio::test]
async fn test_execute_method_retries_on_429_in_generic_path() {
let (base, hits, handle) = crate::test_utils::spawn_response_server(vec![
crate::test_utils::mock_http_response(429, Some(0), "{}"),
crate::test_utils::mock_http_response(200, None, "{\"ok\":true}"),
])
.await;

let doc = RestDescription {
base_url: Some(base),
..Default::default()
};
let method = RestMethod {
http_method: "GET".to_string(),
path: "files".to_string(),
flat_path: Some("files".to_string()),
..Default::default()
};

let result = execute_method(
&doc,
&method,
None,
None,
None,
AuthMethod::None,
None,
None,
false,
&PaginationConfig::default(),
None,
&crate::helpers::modelarmor::SanitizeMode::Warn,
&crate::formatter::OutputFormat::default(),
true,
)
.await
.unwrap()
.unwrap();

assert_eq!(hits.load(std::sync::atomic::Ordering::SeqCst), 2);
assert_eq!(result.get("ok").and_then(|v| v.as_bool()), Some(true));
handle.await.unwrap().unwrap();
}

#[tokio::test]
async fn test_execute_method_post_does_not_retry_on_429() {
let (base, hits, handle) =
crate::test_utils::spawn_response_server(vec![crate::test_utils::mock_http_response(
429,
Some(0),
"{}",
)])
.await;

let doc = RestDescription {
base_url: Some(base),
..Default::default()
};
let method = RestMethod {
http_method: "POST".to_string(),
path: "files".to_string(),
flat_path: Some("files".to_string()),
..Default::default()
};

let result = execute_method(
&doc,
&method,
None,
None,
None,
AuthMethod::None,
None,
None,
false,
&PaginationConfig::default(),
None,
&crate::helpers::modelarmor::SanitizeMode::Warn,
&crate::formatter::OutputFormat::default(),
true,
)
.await;

assert!(result.is_err());
assert_eq!(hits.load(std::sync::atomic::Ordering::SeqCst), 1);
handle.await.unwrap().unwrap();
}

#[test]
fn test_handle_error_response_non_json() {
let err = handle_error_response::<()>(
Expand Down
15 changes: 8 additions & 7 deletions src/helpers/gmail/triage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,14 @@ pub async fn handle_triage(matches: &ArgMatches) -> Result<(), GwsError> {
// 1. List message IDs
let list_url = "https://gmail.googleapis.com/gmail/v1/users/me/messages";

let list_resp = client
.get(list_url)
.query(&[("q", query), ("maxResults", &max.to_string())])
.bearer_auth(&token)
.send()
.await
.map_err(|e| GwsError::Other(anyhow::anyhow!("Failed to list messages: {e}")))?;
let list_resp = crate::client::send_with_retry(|| {
client
.get(list_url)
.query(&[("q", query), ("maxResults", &max.to_string())])
.bearer_auth(&token)
})
.await
.map_err(|e| GwsError::Other(anyhow::anyhow!("Failed to list messages: {e}")))?;

if !list_resp.status().is_success() {
let err = list_resp.text().await.unwrap_or_default();
Expand Down
2 changes: 2 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ mod schema;
mod services;
mod setup;
mod setup_tui;
#[cfg(test)]
mod test_utils;
mod text;
mod token_storage;
pub(crate) mod validate;
Expand Down
79 changes: 79 additions & 0 deletions src/test_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};

use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;

pub(crate) struct MockHttpResponse {
pub status: u16,
pub retry_after_secs: Option<u64>,
pub body: &'static str,
}

pub(crate) fn mock_http_response(
status: u16,
retry_after_secs: Option<u64>,
body: &'static str,
) -> MockHttpResponse {
MockHttpResponse {
status,
retry_after_secs,
body,
}
}

fn reason_phrase(code: u16) -> &'static str {
match code {
200 => "OK",
429 => "Too Many Requests",
500 => "Internal Server Error",
_ => "Status",
}
}

pub(crate) async fn spawn_response_server(
responses: Vec<MockHttpResponse>,
) -> (
String,
Arc<AtomicUsize>,
tokio::task::JoinHandle<Result<(), std::io::Error>>,
) {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let hits = Arc::new(AtomicUsize::new(0));
let hits_clone = Arc::clone(&hits);

let handle = tokio::spawn(async move {
for MockHttpResponse {
status,
retry_after_secs,
body,
} in responses
{
let (mut socket, _) = listener.accept().await?;
let mut buf = [0u8; 2048];
let _ = socket.read(&mut buf).await?;
hits_clone.fetch_add(1, Ordering::SeqCst);

let mut extra_headers = String::new();
if let Some(v) = retry_after_secs {
extra_headers.push_str(&format!("Retry-After: {v}\r\n"));
}

let response = format!(
"HTTP/1.1 {} {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n{}\r\n{}",
status,
reason_phrase(status),
body.len(),
extra_headers,
body
);
socket.write_all(response.as_bytes()).await?;
}
Ok(())
});

(format!("http://{addr}/"), hits, handle)
}
Loading