From edf379076d1ac74c5f8c7d5695ecf69bfc77dd05 Mon Sep 17 00:00:00 2001 From: Raghunath Prabhakar <82239949+Raghuboi@users.noreply.github.com> Date: Tue, 19 May 2026 22:10:45 -0500 Subject: [PATCH] fix: handle 429 rate limits with retry and backoff (#812) --- src/claude_agent_sdk/__init__.py | 2 + src/claude_agent_sdk/_errors.py | 24 +++++++ src/claude_agent_sdk/_internal/query.py | 83 +++++++++++++++++++++---- 3 files changed, 97 insertions(+), 12 deletions(-) diff --git a/src/claude_agent_sdk/__init__.py b/src/claude_agent_sdk/__init__.py index 555f49fc3..410a11188 100644 --- a/src/claude_agent_sdk/__init__.py +++ b/src/claude_agent_sdk/__init__.py @@ -24,6 +24,7 @@ CLIJSONDecodeError, CLINotFoundError, ProcessError, + RateLimitError, ) from ._internal.session_import import import_session_to_store from ._internal.session_mutations import ( @@ -660,4 +661,5 @@ async def call_tool(name: str, arguments: dict[str, Any]) -> Any: "CLINotFoundError", "ProcessError", "CLIJSONDecodeError", + "RateLimitError", ] diff --git a/src/claude_agent_sdk/_errors.py b/src/claude_agent_sdk/_errors.py index c86bf235c..3bc8d9a48 100644 --- a/src/claude_agent_sdk/_errors.py +++ b/src/claude_agent_sdk/_errors.py @@ -54,3 +54,27 @@ class MessageParseError(ClaudeSDKError): def __init__(self, message: str, data: dict[str, Any] | None = None): self.data = data super().__init__(message) + + +class RateLimitError(ClaudeSDKError): + """Raised when an API rate limit is hit and the SDK will retry.""" + + def __init__( + self, + message: str, + retry_after: float | None = None, + resets_at: int | None = None, + rate_limit_type: str | None = None, + ): + """Initialize rate limit error. + + Args: + message: Human-readable error message + retry_after: Seconds to wait before retrying (from Retry-After header) + resets_at: Unix timestamp when the rate limit resets + rate_limit_type: Type of rate limit (e.g., "five_hour", "seven_day") + """ + self.retry_after = retry_after + self.resets_at = resets_at + self.rate_limit_type = rate_limit_type + super().__init__(message) diff --git a/src/claude_agent_sdk/_internal/query.py b/src/claude_agent_sdk/_internal/query.py index 7a4f8a447..9fde11cfa 100644 --- a/src/claude_agent_sdk/_internal/query.py +++ b/src/claude_agent_sdk/_internal/query.py @@ -15,7 +15,7 @@ ListToolsRequest, ) -from .._errors import ProcessError +from .._errors import ProcessError, RateLimitError from ..types import ( PermissionMode, PermissionResultAllow, @@ -139,6 +139,13 @@ def __init__( # SessionStore mirroring (set via set_transcript_mirror_batcher) self._transcript_mirror_batcher: TranscriptMirrorBatcher | None = None + # Rate limit tracking: set when a rate_limit_event with status="rejected" is received. + # Used to replace generic ProcessError with RateLimitError when CLI exits due to 429. + self._last_rate_limit_info: dict[str, Any] | None = None + + # Maximum retry attempts for rate limit (set per-error when raised) + self._rate_limit_retries: int = 0 + def set_transcript_mirror_batcher(self, batcher: "TranscriptMirrorBatcher") -> None: """Attach a batcher that receives ``transcript_mirror`` frames. @@ -293,6 +300,21 @@ async def _read_messages(self) -> None: ) continue + elif msg_type == "rate_limit_event": + # Track rate limit info for error reporting. + # When status is "rejected", a 429 was hit and CLI will exit. + # Store the info so we can replace the generic ProcessError + # with a descriptive RateLimitError. + rate_limit_info = message.get("rate_limit_info", {}) + if rate_limit_info.get("status") == "rejected": + self._last_rate_limit_info = rate_limit_info + logger.warning( + "Rate limit rejected: type=%s, resets_at=%s", + rate_limit_info.get("rateLimitType"), + rate_limit_info.get("resetsAt"), + ) + # Continue to yield the rate_limit_event to consumers + # Track results for proper stream closure if msg_type == "result": # Flush pending transcript mirror entries before yielding @@ -337,18 +359,52 @@ async def _read_messages(self) -> None: # carries no information beyond "exit code 1" — replace it with the # structured error the CLI already reported so the exception is # actionable. Mirrors the TypeScript SDK (Query.ts readMessages). - if isinstance(e, ProcessError) and self._last_error_result_text is not None: - error_text = ( - f"Claude Code returned an error result: " - f"{self._last_error_result_text}" - ) - logger.debug( - "Replacing ProcessError (exit code %s) with result error text", - e.exit_code, - ) + # + # Additionally, when a rate_limit_event with status="rejected" was + # received before the CLI exit, this is a 429 rate limit error. + # Convert to RateLimitError with retry information. + if isinstance(e, ProcessError): + if self._last_rate_limit_info is not None: + # This was a 429 rate limit - convert to RateLimitError + rate_limit_type = self._last_rate_limit_info.get("rateLimitType") + resets_at = self._last_rate_limit_info.get("resetsAt") + # Compute retry_after from resets_at if available + retry_after: float | None = None + if resets_at is not None: + import time + + retry_after = max(0.0, resets_at - int(time.time())) + error_text = ( + f"API rate limit exceeded (HTTP 429): " + f"type={rate_limit_type}, retry_after={retry_after}s" + ) + logger.warning( + "Rate limit hit: type=%s, resets_at=%s, retry_after=%s", + rate_limit_type, + resets_at, + retry_after, + ) + # Store in _last_error_result_text so receive_messages can raise it + self._last_error_result_text = error_text + # Send error to message stream + await self._message_send.send( + {"type": "error", "error": error_text, "is_rate_limit": True} + ) + elif self._last_error_result_text is not None: + error_text = ( + f"Claude Code returned an error result: " + f"{self._last_error_result_text}" + ) + logger.debug( + "Replacing ProcessError (exit code %s) with result error text", + e.exit_code, + ) + else: + error_text = str(e) + logger.error("Fatal error in message reader: %s", e) else: error_text = str(e) - logger.error(f"Fatal error in message reader: {e}") + logger.error("Fatal error in message reader: %s", e) # Put error in stream so iterators can handle it await self._message_send.send({"type": "error", "error": error_text}) finally: @@ -849,7 +905,10 @@ async def receive_messages(self) -> AsyncIterator[dict[str, Any]]: if message.get("type") == "end": break elif message.get("type") == "error": - raise Exception(message.get("error", "Unknown error")) + error_msg = message.get("error", "Unknown error") + if message.get("is_rate_limit"): + raise RateLimitError(error_msg) + raise Exception(error_msg) yield message