diff --git a/crates/core/src/rpc/client.rs b/crates/core/src/rpc/client.rs index 74f8cc65..13ed0f39 100644 --- a/crates/core/src/rpc/client.rs +++ b/crates/core/src/rpc/client.rs @@ -1,8 +1,8 @@ //! Soroban RPC client. //! //! Communicates with Soroban RPC endpoints: `getTransaction`, `simulateTransaction`, -//! `getLedgerEntries`, `getEvents`, `getLatestLedger`. Handles retries and -//! basic rate-limit backoff. +//! `getLedgerEntries`, `getEvents`, `getLatestLedger`. Handles retries, +//! rate-limit backoff, and 5xx server-error backoff. use crate::error::{PrismError, PrismResult}; use crate::network::NetworkConfig; @@ -11,6 +11,30 @@ use reqwest::header::{HeaderMap, HeaderValue, CONTENT_TYPE}; use serde::{Deserialize, Serialize}; use std::time::{Duration, Instant}; +/// Base delay (ms) for exponential backoff: delay = `BASE_DELAY_MS × 2^attempt`. +const BASE_DELAY_MS: u64 = 100; + +/// Hard ceiling on any single backoff sleep to avoid excessively long waits. +const MAX_DELAY_MS: u64 = 10_000; // 10 seconds + +/// Compute the capped exponential backoff duration for a given attempt number. +/// +/// Returns `BASE_DELAY_MS × 2^attempt`, clamped to [`MAX_DELAY_MS`]. +/// +/// | attempt | raw ms | clamped ms | +/// |---------|--------|------------| +/// | 1 | 200 | 200 | +/// | 2 | 400 | 400 | +/// | 3 | 800 | 800 | +/// | 4 | 1600 | 1600 | +/// | 6 | 6400 | 6400 | +/// | 7 | 12800 | 10000 | +fn backoff_duration(attempt: u32) -> Duration { + // saturating_shl prevents overflow for large attempt values + let ms = BASE_DELAY_MS.saturating_mul(1u64.saturating_shl(attempt)); + Duration::from_millis(ms.min(MAX_DELAY_MS)) +} + /// Ledger footprint returned by `simulateTransaction`. #[derive(Debug, Clone, Serialize, Deserialize)] @@ -222,7 +246,14 @@ impl SorobanRpcClient { self.call("getLatestLedger", serde_json::json!({})).await } - /// Internal JSON-RPC call with retry and rate-limit backoff. + /// Internal JSON-RPC call with retry and backoff. + /// + /// Retries are triggered by: + /// - Transport-level failures (connection refused, timeout, etc.) + /// - HTTP 429 Too Many Requests + /// - HTTP 5xx Server Errors (500–599) + /// + /// Backoff follows `BASE_DELAY_MS × 2^attempt`, capped at `MAX_DELAY_MS`. async fn call Deserialize<'de>>( &self, method: &'static str, @@ -235,8 +266,14 @@ impl SorobanRpcClient { for attempt in 0..=MAX_RETRIES { if attempt > 0 { - let backoff = Duration::from_millis(100 * 2u64.pow(attempt)); - tokio::time::sleep(backoff).await; + let delay = backoff_duration(attempt); + tracing::debug!( + method, + attempt, + delay_ms = delay.as_millis(), + "Backing off before retry" + ); + tokio::time::sleep(delay).await; tracing::debug!(attempt, method, "Retrying RPC request"); } @@ -256,6 +293,29 @@ impl SorobanRpcClient { "RPC request latency" ); + // Retry on 429 Too Many Requests. + if status == reqwest::StatusCode::TOO_MANY_REQUESTS { + tracing::warn!(method, attempt, "Rate limited by RPC node (429), will retry"); + last_error = + Some(PrismError::RpcError(format!("Rate limited (attempt {attempt})"))); + continue; + } + + // Retry on any 5xx Server Error — these are transient node failures. + if status.is_server_error() { + tracing::warn!( + method, + attempt, + status = %status, + elapsed_ms, + "RPC node returned a server error (5xx), will retry" + ); + last_error = Some(PrismError::RpcError(format!( + "Server error {status} on attempt {attempt}" + ))); + continue; + } + let body = response.text().await.map_err(|e| { PrismError::RpcError(format!("Failed to read response body: {e}")) })?; @@ -269,18 +329,12 @@ impl SorobanRpcClient { "RPC response received" ); - if status == 429 { - tracing::warn!(method, "Rate limited by RPC node, backing off"); - last_error = - Some(PrismError::RpcError("Rate limited (HTTP 429)".to_string())); - continue; - } - if !status.is_success() { return Err(PrismError::RpcError(format!( "RPC request failed with HTTP {status}: {body}" ))); } + let rpc_response: JsonRpcResponse = serde_json::from_str(&body) .map_err(|e| PrismError::RpcError(format!("Response parse error: {e}")))?; @@ -310,7 +364,6 @@ impl SorobanRpcClient { error = %e, "RPC request latency" ); - tracing::debug!( method, endpoint = %self.rpc_url, @@ -331,6 +384,102 @@ impl SorobanRpcClient { #[cfg(test)] mod tests { use super::*; + use tokio::io::AsyncWriteExt; + use tokio::net::TcpListener; + + // ------------------------------------------------------------------------- + // Test helpers + // ------------------------------------------------------------------------- + + /// Spawn an in-process HTTP/1.1 mock server that replies to each successive + /// connection with the next entry from `responses`. If there are more + /// connections than responses the last entry is repeated. + /// Returns the bound local socket address. + async fn spawn_mock_server(responses: Vec) -> std::net::SocketAddr { + use std::sync::{Arc, atomic::{AtomicUsize, Ordering}}; + use tokio::io::AsyncReadExt; + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let responses = Arc::new(responses); + let counter = Arc::new(AtomicUsize::new(0)); + + tokio::spawn(async move { + loop { + let Ok((mut stream, _)) = listener.accept().await else { break }; + let responses = Arc::clone(&responses); + let counter = Arc::clone(&counter); + tokio::spawn(async move { + let mut buf = [0u8; 4096]; + let _ = stream.read(&mut buf).await; + let idx = counter.fetch_add(1, Ordering::SeqCst); + let raw = responses + .get(idx) + .cloned() + .unwrap_or_else(|| responses.last().cloned().unwrap_or_default()); + let _ = stream.write_all(raw.as_bytes()).await; + }); + } + }); + + addr + } + + /// Build a raw HTTP/1.1 response string. + fn http_response(status: u16, reason: &str, body: &str) -> String { + format!( + "HTTP/1.1 {status} {reason}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{body}", + body.len() + ) + } + + /// Minimal valid JSON-RPC 2.0 success body (getLatestLedger shape). + fn ok_body() -> &'static str { + r#"{"jsonrpc":"2.0","id":1,"result":{"id":"test","protocolVersion":"21","sequence":100}}"# + } + + fn make_client(addr: std::net::SocketAddr) -> SorobanRpcClient { + let config = NetworkConfig { + network: crate::network::Network::Testnet, + rpc_url: format!("http://{addr}"), + network_passphrase: "test".to_string(), + archive_urls: vec![], + api_key: None, + request_timeout_secs: 5, + }; + SorobanRpcClient::new(&config) + } + + // ------------------------------------------------------------------------- + // backoff_duration — pure unit tests, no I/O + // ------------------------------------------------------------------------- + + #[test] + fn backoff_increases_exponentially() { + assert_eq!(backoff_duration(1), Duration::from_millis(200)); + assert_eq!(backoff_duration(2), Duration::from_millis(400)); + assert_eq!(backoff_duration(3), Duration::from_millis(800)); + assert_eq!(backoff_duration(4), Duration::from_millis(1_600)); + assert_eq!(backoff_duration(5), Duration::from_millis(3_200)); + assert_eq!(backoff_duration(6), Duration::from_millis(6_400)); + } + + #[test] + fn backoff_is_capped_at_max_delay() { + // attempt 7 → raw = 100 × 128 = 12 800 ms → clamped to MAX_DELAY_MS + assert_eq!(backoff_duration(7), Duration::from_millis(MAX_DELAY_MS)); + // Very large attempt must not overflow u64 + assert_eq!(backoff_duration(63), Duration::from_millis(MAX_DELAY_MS)); + } + + #[test] + fn backoff_attempt_zero_returns_base_delay() { + assert_eq!(backoff_duration(0), Duration::from_millis(BASE_DELAY_MS)); + } + + // ------------------------------------------------------------------------- + // Deserialisation tests (kept from original upstream suite) + // ------------------------------------------------------------------------- #[test] fn get_transaction_response_deserializes() { @@ -420,6 +569,383 @@ mod tests { assert!(result.is_success()); } + #[test] + fn test_get_transaction_success_status() { + let json = r#"{ + "jsonrpc": "2.0", + "id": 1, + "result": { + "status": "SUCCESS", + "latestLedger": 500, + "latestLedgerCloseTime": 1711620000, + "oldestLedger": 100, + "oldestLedgerCloseTime": 1711610000, + "ledger": 450, + "createdAt": "2024-03-28T10:00:00Z", + "applicationOrder": 2, + "envelopeXdr": "AAAAAgAAAABqYWNrQGV4YW1wbGUuY29tAAABkA==", + "resultXdr": "AAAAAAAAAGQAAAAAAAAAAQAAAAAAAAABAAAAAAAAAAA=", + "resultMetaXdr": "AAAAAwAAAAAAAAACAAAAAwAAAcQAAAAAAAAAAA==" + } + }"#; + + let resp: JsonRpcResponse = serde_json::from_str(json).unwrap(); + let result = resp.result.unwrap(); + + assert_eq!(result.status, TransactionStatus::Success); + assert_eq!(result.latest_ledger, 500); + assert_eq!(result.latest_ledger_close_time, Some(1711620000)); + assert_eq!(result.oldest_ledger, Some(100)); + assert_eq!(result.oldest_ledger_close_time, Some(1711610000)); + assert_eq!(result.ledger, Some(450)); + assert_eq!(result.created_at, Some("2024-03-28T10:00:00Z".to_string())); + assert_eq!(result.application_order, Some(2)); + assert_eq!( + result.envelope_xdr, + Some("AAAAAgAAAABqYWNrQGV4YW1wbGUuY29tAAABkA==".to_string()) + ); + } + + #[test] + fn test_get_transaction_not_found_status() { + let json = r#"{ + "jsonrpc": "2.0", + "id": 1, + "result": { + "status": "NOT_FOUND", + "latestLedger": 600, + "latestLedgerCloseTime": 1711625000, + "oldestLedger": 200, + "oldestLedgerCloseTime": 1711615000 + } + }"#; + + let resp: JsonRpcResponse = serde_json::from_str(json).unwrap(); + let result = resp.result.unwrap(); + + assert_eq!(result.status, TransactionStatus::NotFound); + assert_eq!(result.latest_ledger, 600); + assert_eq!(result.ledger, None); + } + + #[test] + fn test_get_transaction_failed_status() { + let json = r#"{ + "jsonrpc": "2.0", + "id": 1, + "result": { + "status": "FAILED", + "latestLedger": 700, + "latestLedgerCloseTime": 1711630000, + "oldestLedger": 300, + "oldestLedgerCloseTime": 1711620000, + "ledger": 650, + "createdAt": "2024-03-28T11:00:00Z", + "applicationOrder": 5, + "envelopeXdr": "AAAAAgAAAABmYWlsZWRAdHguY29tAAABkA==", + "resultXdr": "AAAAAAAAAGT////7AAAAAA==", + "resultMetaXdr": "AAAAAwAAAAAAAAACAAAAAwAAAoYAAAAAAAAAAA==" + } + }"#; + + let resp: JsonRpcResponse = serde_json::from_str(json).unwrap(); + let result = resp.result.unwrap(); + + assert_eq!(result.status, TransactionStatus::Failed); + assert_eq!(result.latest_ledger, 700); + assert_eq!(result.ledger, Some(650)); + } + + // ------------------------------------------------------------------------- + // Retry / backoff integration tests — real TCP, no extra deps + // ------------------------------------------------------------------------- + + /// One 500 followed by a 200 — should succeed on the second attempt. + #[tokio::test] + async fn retries_once_on_500_then_succeeds() { + let responses = vec![ + http_response(500, "Internal Server Error", ""), + http_response(200, "OK", ok_body()), + ]; + let addr = spawn_mock_server(responses).await; + let result = make_client(addr).get_latest_ledger().await; + assert!(result.is_ok(), "Expected success after retry, got: {result:?}"); + } + + /// Persistent 500 — client exhausts all 3 retries (4 total attempts) and errors. + #[tokio::test] + async fn exhausts_retries_on_persistent_500() { + let responses = vec![ + http_response(500, "Internal Server Error", ""), + http_response(500, "Internal Server Error", ""), + http_response(500, "Internal Server Error", ""), + http_response(500, "Internal Server Error", ""), + ]; + let addr = spawn_mock_server(responses).await; + let result = make_client(addr).get_latest_ledger().await; + assert!(result.is_err(), "Expected error after retries exhausted"); + let err = result.unwrap_err().to_string(); + assert!( + err.contains("Server error") || err.contains("500"), + "Error should mention the server error, got: {err}" + ); + } + + /// 503 Service Unavailable is retried (another common transient 5xx). + #[tokio::test] + async fn retries_on_503_service_unavailable() { + let responses = vec![ + http_response(503, "Service Unavailable", ""), + http_response(503, "Service Unavailable", ""), + http_response(200, "OK", ok_body()), + ]; + let addr = spawn_mock_server(responses).await; + let result = make_client(addr).get_latest_ledger().await; + assert!(result.is_ok(), "Expected success after retrying 503s, got: {result:?}"); + } + + /// 502 Bad Gateway is a 5xx and must be retried. + #[tokio::test] + async fn retries_on_502_bad_gateway() { + let responses = vec![ + http_response(502, "Bad Gateway", ""), + http_response(200, "OK", ok_body()), + ]; + let addr = spawn_mock_server(responses).await; + let result = make_client(addr).get_latest_ledger().await; + assert!(result.is_ok(), "Expected success after retrying 502, got: {result:?}"); + } + + /// 429 Too Many Requests — pre-existing behaviour must be preserved. + #[tokio::test] + async fn retries_on_429_rate_limit() { + let responses = vec![ + http_response(429, "Too Many Requests", ""), + http_response(200, "OK", ok_body()), + ]; + let addr = spawn_mock_server(responses).await; + let result = make_client(addr).get_latest_ledger().await; + assert!(result.is_ok(), "Expected success after retrying 429, got: {result:?}"); + } + + /// A 4xx client error (400) is NOT retried — it is a permanent caller error. + #[tokio::test] + async fn does_not_retry_on_4xx_client_error() { + // Return only one response; if the client retried it would get a stale + // connection / empty response and produce a different error. + let bad_body = + r#"{"jsonrpc":"2.0","id":1,"error":{"code":-32600,"message":"Invalid request"}}"#; + let responses = vec![http_response(400, "Bad Request", bad_body)]; + let addr = spawn_mock_server(responses).await; + let result = make_client(addr).get_latest_ledger().await; + // The client should return an error immediately, not after retries. + assert!(result.is_err(), "Expected error for 4xx response"); + } + + /// A JSON-RPC error inside a 200 is returned immediately without retry. + #[tokio::test] + async fn returns_immediately_on_jsonrpc_error_in_200() { + let rpc_err = + r#"{"jsonrpc":"2.0","id":1,"error":{"code":-32000,"message":"not found"}}"#; + let responses = vec![http_response(200, "OK", rpc_err)]; + let addr = spawn_mock_server(responses).await; + let result = make_client(addr).get_latest_ledger().await; + assert!(result.is_err()); + assert!( + result.unwrap_err().to_string().contains("not found"), + "Error should propagate the JSON-RPC error message" + ); + } + + // ------------------------------------------------------------------------- + // Existing integration tests (kept from upstream) + // ------------------------------------------------------------------------- + + #[tokio::test] + async fn test_get_ledger_entries_empty_response() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let rpc_url = format!("http://{}", addr); + + let config = NetworkConfig { + network: crate::network::Network::Testnet, + rpc_url, + network_passphrase: "test".to_string(), + archive_urls: vec![], + api_key: None, + request_timeout_secs: 30, + }; + let client = SorobanRpcClient::new(&config); + + tokio::spawn(async move { + let (mut socket, _) = listener.accept().await.unwrap(); + let body = r#"{"jsonrpc":"2.0","id":1,"result":{"latestLedger":123,"entries":[]}}"#; + let response = format!( + "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}", + body.len(), + body + ); + socket.write_all(response.as_bytes()).await.unwrap(); + }); + + let result = client.get_ledger_entries(&["key1".to_string()]).await.unwrap(); + assert_eq!(result["entries"].as_array().unwrap().len(), 0); + assert_eq!(result["latestLedger"], 123); + } + + #[tokio::test] + async fn test_client_respects_timeout() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let rpc_url = format!("http://{}", addr); + + let config = NetworkConfig { + network: crate::network::Network::Testnet, + rpc_url, + network_passphrase: "test".to_string(), + archive_urls: vec![], + api_key: None, + request_timeout_secs: 1, + }; + let client = SorobanRpcClient::new(&config); + + tokio::spawn(async move { + while let Ok((_socket, _)) = listener.accept().await { + tokio::time::sleep(Duration::from_secs(2)).await; + } + }); + + let result = client.get_latest_ledger().await; + assert!(result.is_err()); + let err_msg = result.unwrap_err().to_string(); + assert!( + err_msg.to_lowercase().contains("timeout") + || err_msg.to_lowercase().contains("error sending request"), + "Actual error: {err_msg}" + ); + } + + #[tokio::test] + async fn test_simulate_transaction_returns_rpc_error_on_failure() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let rpc_url = format!("http://{}", addr); + + let config = NetworkConfig { + network: crate::network::Network::Testnet, + rpc_url, + network_passphrase: "test".to_string(), + archive_urls: vec![], + api_key: None, + request_timeout_secs: 30, + }; + let client = SorobanRpcClient::new(&config); + + tokio::spawn(async move { + let (mut socket, _) = listener.accept().await.unwrap(); + let body = r#"{"jsonrpc":"2.0","id":1,"result":{"latestLedger":100,"error":"contract trap"}}"#; + let response = format!( + "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}", + body.len(), + body + ); + socket.write_all(response.as_bytes()).await.unwrap(); + }); + + let result = client.simulate_transaction("AAAA").await; + assert!(result.is_err()); + match result.unwrap_err() { + PrismError::RpcError(msg) => assert!(msg.contains("contract trap")), + _ => panic!("Expected PrismError::RpcError"), + } + } +} + let json = r#"{ + "jsonrpc": "2.0", + "id": 1, + "result": { + "status": "SUCCESS", + "latestLedger": 123, + "latestLedgerCloseTime": 1711620000, + "ledger": 120, + "createdAt": "2024-03-28T10:00:00Z", + "applicationOrder": 1, + "envelopeXdr": "AAAAAg...", + "resultXdr": "AAAAAw...", + "resultMetaXdr": "AAAABA..." + } + }"#; + + let resp: JsonRpcResponse = serde_json::from_str(json).unwrap(); + let result = resp.result.unwrap(); + + assert_eq!(result.status, TransactionStatus::Success); + assert_eq!(result.latest_ledger, 123); + assert_eq!(result.ledger, Some(120)); + } + + #[test] + fn transaction_status_variants_deserialize() { + let cases = [ + ("\"SUCCESS\"", TransactionStatus::Success), + ("\"NOT_FOUND\"", TransactionStatus::NotFound), + ("\"FAILED\"", TransactionStatus::Failed), + ]; + + for (raw, expected) in cases { + let got: TransactionStatus = serde_json::from_str(raw).unwrap(); + assert_eq!(got, expected); + } + } + + #[test] + fn test_simulate_response_is_success() { + let ok = SimulateTransactionResponse { + latest_ledger: 100, + soroban_data: Some("AAAA".to_string()), + min_resource_fee: Some("1000".to_string()), + auth: vec![], + results: vec![], + error: None, + events: vec![], + cost: None, + }; + assert!(ok.is_success()); + + let err = SimulateTransactionResponse { + error: Some("contract trap".to_string()), + ..ok + }; + assert!(!err.is_success()); + } + + #[test] + fn test_simulate_response_deserialization() { + let json = r#"{ + "jsonrpc": "2.0", + "id": 1, + "result": { + "latestLedger": 200, + "transactionData": "AAAAXDR=", + "minResourceFee": "5000", + "auth": ["AUTHXDR="], + "results": [{"xdr": "RETVAL=", "auth": []}], + "events": [] + } + }"#; + + let resp: JsonRpcResponse = + serde_json::from_str(json).unwrap(); + let result = resp.result.unwrap(); + + assert_eq!(result.latest_ledger, 200); + assert_eq!(result.soroban_data.as_deref(), Some("AAAAXDR=")); + assert_eq!(result.min_resource_fee.as_deref(), Some("5000")); + assert_eq!(result.auth, vec!["AUTHXDR="]); + assert_eq!(result.return_value_xdr(), Some("RETVAL=")); + assert!(result.is_success()); + } + #[test] fn test_get_transaction_success_status() { let json = r#"{ diff --git a/crates/core/src/rpc/jsonrpc.rs b/crates/core/src/rpc/jsonrpc.rs index 04dcc45a..6f209816 100644 --- a/crates/core/src/rpc/jsonrpc.rs +++ b/crates/core/src/rpc/jsonrpc.rs @@ -5,7 +5,21 @@ use crate::error::{PrismError, PrismResult, JsonRpcError}; use serde::{Deserialize, Serialize}; -use std::time::Instant; +use std::time::{Duration, Instant}; + +/// Base delay (ms) for exponential backoff: delay = `BASE_DELAY_MS × 2^attempt`. +const BASE_DELAY_MS: u64 = 100; + +/// Hard ceiling on any single backoff sleep to avoid excessively long waits. +const MAX_DELAY_MS: u64 = 10_000; // 10 seconds + +/// Compute the capped exponential backoff duration for a given attempt number. +/// +/// Returns `BASE_DELAY_MS × 2^attempt`, clamped to [`MAX_DELAY_MS`]. +fn backoff_duration(attempt: u32) -> Duration { + let ms = BASE_DELAY_MS.saturating_mul(1u64.saturating_shl(attempt)); + Duration::from_millis(ms.min(MAX_DELAY_MS)) +} /// JSON-RPC 2.0 request envelope. @@ -99,7 +113,12 @@ impl JsonRpcTransport { /// Execute a typed JSON-RPC call and return the typed result. /// - /// Retries on network errors and HTTP 429 with exponential backoff. + /// Retries are triggered by: + /// - Transport-level failures (connection refused, timeout, etc.) + /// - HTTP 429 Too Many Requests + /// - HTTP 5xx Server Errors (500–599) + /// + /// Backoff follows `BASE_DELAY_MS × 2^attempt`, capped at `MAX_DELAY_MS`. pub async fn call(&self, request: &JsonRpcRequest

) -> PrismResult where P: Serialize + std::fmt::Debug, @@ -110,8 +129,9 @@ impl JsonRpcTransport { for attempt in 0..=self.max_retries { if attempt > 0 { - let backoff = std::time::Duration::from_millis(100 * 2u64.pow(attempt)); - tokio::time::sleep(backoff).await; + let delay = backoff_duration(attempt); + tracing::debug!(attempt, method, delay_ms = delay.as_millis(), "backing off before retry"); + tokio::time::sleep(delay).await; tracing::debug!(attempt, method, "retrying RPC request"); } @@ -121,9 +141,6 @@ impl JsonRpcTransport { match self.client.post(&self.endpoint).json(request).send().await { Ok(response) => { let status = response.status(); - let body = response.text().await.map_err(|e| { - PrismError::RpcError(format!("response read error: {e}")) - })?; let elapsed_ms = started_at.elapsed().as_millis(); tracing::debug!( @@ -134,14 +151,35 @@ impl JsonRpcTransport { elapsed_ms, "RPC response received" ); - tracing::trace!(method, elapsed_ms, response = %body, "RPC response payload"); - if status == 429 { - tracing::warn!("rate limited by RPC endpoint, backing off"); - last_error = Some(PrismError::RpcError("rate limited".to_string())); + // Retry on 429 Too Many Requests. + if status == reqwest::StatusCode::TOO_MANY_REQUESTS { + tracing::warn!(method, attempt, "rate limited by RPC endpoint, will retry"); + last_error = Some(PrismError::RpcError(format!("rate limited (attempt {attempt})"))); continue; } + // Retry on any 5xx Server Error. + if status.is_server_error() { + tracing::warn!( + method, + attempt, + status = %status, + elapsed_ms, + "RPC endpoint returned server error (5xx), will retry" + ); + last_error = Some(PrismError::RpcError(format!( + "server error {status} on attempt {attempt}" + ))); + continue; + } + + let body = response.text().await.map_err(|e| { + PrismError::RpcError(format!("response read error: {e}")) + })?; + + tracing::trace!(method, elapsed_ms, response = %body, "RPC response payload"); + let envelope: JsonRpcResponse = serde_json::from_str(&body).map_err(|e| { PrismError::RpcError(format!("response parse error: {e}"))