From 5b6db69dd5439e299677cedad9492feb9ac80b78 Mon Sep 17 00:00:00 2001 From: Codex Date: Mon, 13 Apr 2026 09:57:42 +0800 Subject: [PATCH 1/2] Fix proxy reload and invalid-credential retries --- app/control/proxy/__init__.py | 62 ++++++++++++++++---- app/control/proxy/scheduler.py | 2 + app/dataplane/proxy/__init__.py | 2 +- app/dataplane/proxy/adapters/session.py | 4 +- app/dataplane/reverse/transport/websocket.py | 4 +- app/products/anthropic/messages.py | 7 ++- app/products/openai/chat.py | 44 +++++++++++--- app/products/openai/responses.py | 15 ++++- 8 files changed, 110 insertions(+), 30 deletions(-) diff --git a/app/control/proxy/__init__.py b/app/control/proxy/__init__.py index 278a3d00..2ef5e417 100644 --- a/app/control/proxy/__init__.py +++ b/app/control/proxy/__init__.py @@ -39,6 +39,7 @@ def __init__(self) -> None: self._flare = FlareSolverrClearanceProvider() self._egress_mode: EgressMode = EgressMode.DIRECT self._clearance_mode: ClearanceMode = ClearanceMode.NONE + self._config_sig: tuple | None = None # Pool cursor for PROXY_POOL mode: sticky routing with failure-driven rotate. # Incremented on node failure; all callers see the same cursor under _lock. self._pool_cursor: int = 0 @@ -50,36 +51,73 @@ def __init__(self) -> None: async def load(self) -> None: """Load proxy configuration from the current config snapshot.""" cfg = get_config() - self._egress_mode = EgressMode(cfg.get_str("proxy.egress.mode", "direct")) - self._clearance_mode = ClearanceMode.parse(cfg.get_str("proxy.clearance.mode", "none")) + egress_mode = EgressMode(cfg.get_str("proxy.egress.mode", "direct")) + clearance_mode = ClearanceMode.parse(cfg.get_str("proxy.clearance.mode", "none")) + base_url = cfg.get_str("proxy.egress.proxy_url", "") + res_url = cfg.get_str("proxy.egress.resource_proxy_url", "") + base_pool = tuple(cfg.get_list("proxy.egress.proxy_pool", [])) + res_pool = tuple(cfg.get_list("proxy.egress.resource_proxy_pool", [])) + config_sig = ( + egress_mode.value, + clearance_mode.value, + base_url, + res_url, + base_pool, + res_pool, + cfg.get_str("proxy.clearance.flaresolverr_url", ""), + cfg.get_str("proxy.clearance.cf_cookies", ""), + cfg.get_str("proxy.clearance.user_agent", ""), + cfg.get_int("proxy.clearance.timeout_sec", 60), + ) nodes: list[EgressNode] = [] resource_nodes: list[EgressNode] = [] - if self._egress_mode == EgressMode.SINGLE_PROXY: - base_url = cfg.get_str("proxy.egress.proxy_url", "") - res_url = cfg.get_str("proxy.egress.resource_proxy_url", "") + if egress_mode == EgressMode.SINGLE_PROXY: if base_url: nodes.append(EgressNode(node_id="single", proxy_url=base_url)) if res_url: resource_nodes.append(EgressNode(node_id="res-single", proxy_url=res_url)) - elif self._egress_mode == EgressMode.PROXY_POOL: - base_pool: list[str] = cfg.get_list("proxy.egress.proxy_pool", []) - res_pool: list[str] = cfg.get_list("proxy.egress.resource_proxy_pool", []) + elif egress_mode == EgressMode.PROXY_POOL: for i, url in enumerate(base_pool): nodes.append(EgressNode(node_id=f"pool-{i}", proxy_url=url)) for i, url in enumerate(res_pool): resource_nodes.append(EgressNode(node_id=f"res-pool-{i}", proxy_url=url)) + valid_affinities = { + n.proxy_url or "direct" + for n in [*nodes, *resource_nodes] + } + if not valid_affinities: + valid_affinities = {"direct"} + async with self._lock: + if self._config_sig == config_sig: + return + from .models import ClearanceBundleState + + self._egress_mode = egress_mode + self._clearance_mode = clearance_mode self._nodes = nodes self._resource_nodes = resource_nodes + self._pool_cursor = 0 + self._bundles = { + key: bundle.model_copy(update={"state": ClearanceBundleState.INVALID}) + for key, bundle in self._bundles.items() + if key in valid_affinities + } + self._refresh_events = { + key: event + for key, event in self._refresh_events.items() + if key in valid_affinities + } + self._config_sig = config_sig logger.info( "proxy directory loaded: egress_mode={} clearance_mode={} node_count={} resource_node_count={}", - self._egress_mode, - self._clearance_mode, + egress_mode, + clearance_mode, len(nodes), len(resource_nodes), ) @@ -326,11 +364,11 @@ def bundles(self) -> dict[str, ClearanceBundle]: async def get_proxy_directory() -> ProxyDirectory: - """Return the module-level ProxyDirectory, loading config on first call.""" + """Return the module-level ProxyDirectory, reloading config if it changed.""" global _directory if _directory is None: _directory = ProxyDirectory() - await _directory.load() + await _directory.load() return _directory diff --git a/app/control/proxy/scheduler.py b/app/control/proxy/scheduler.py index 0115f8a0..70bb49b8 100644 --- a/app/control/proxy/scheduler.py +++ b/app/control/proxy/scheduler.py @@ -55,6 +55,7 @@ async def _loop(self) -> None: async def _warm_up(self) -> None: """Pre-fetch clearance bundles without invalidating existing ones.""" try: + await self._directory.load() await self._directory.warm_up() logger.debug("proxy clearance warm-up completed") except Exception as exc: @@ -67,6 +68,7 @@ async def _refresh(self) -> None: refresh failure never leaves requests without clearance. """ try: + await self._directory.load() await self._directory.refresh_clearance_safe() logger.debug("proxy clearance refresh completed") except Exception as exc: diff --git a/app/dataplane/proxy/__init__.py b/app/dataplane/proxy/__init__.py index 094f584b..f7dc43ea 100644 --- a/app/dataplane/proxy/__init__.py +++ b/app/dataplane/proxy/__init__.py @@ -40,8 +40,8 @@ def has_proxy(self) -> bool: async def get_proxy_runtime() -> ProxyRuntime: global _runtime + directory = await get_proxy_directory() if _runtime is None: - directory = await get_proxy_directory() _runtime = ProxyRuntime(directory) return _runtime diff --git a/app/dataplane/proxy/adapters/session.py b/app/dataplane/proxy/adapters/session.py index b0c2437e..b2911e59 100644 --- a/app/dataplane/proxy/adapters/session.py +++ b/app/dataplane/proxy/adapters/session.py @@ -27,10 +27,12 @@ def _skip_proxy_ssl(proxy_url: str) -> bool: def normalize_proxy_url(url: str) -> str: - """Remap socks5:// → socks5h:// for proper DNS-over-proxy.""" + """Normalize SOCKS schemes for consistent DNS-over-proxy behaviour.""" if not url: return url scheme = urlparse(url).scheme.lower() + if scheme == "socks": + return "socks5h://" + url[len("socks://"):] if scheme == "socks5": return "socks5h://" + url[len("socks5://"):] if scheme == "socks4": diff --git a/app/dataplane/reverse/transport/websocket.py b/app/dataplane/reverse/transport/websocket.py index 59a96dfe..1c0959df 100644 --- a/app/dataplane/reverse/transport/websocket.py +++ b/app/dataplane/reverse/transport/websocket.py @@ -23,7 +23,9 @@ def _normalize_socks(proxy_url: str) -> tuple[str, Optional[bool]]: scheme = urlparse(proxy_url).scheme.lower() rdns: Optional[bool] = None base = scheme - if scheme == "socks5h": + if scheme == "socks": + base, rdns = "socks5", True + elif scheme == "socks5h": base, rdns = "socks5", True elif scheme == "socks4a": base, rdns = "socks4", True diff --git a/app/products/anthropic/messages.py b/app/products/anthropic/messages.py index e5743a8e..cacbe97b 100644 --- a/app/products/anthropic/messages.py +++ b/app/products/anthropic/messages.py @@ -30,6 +30,7 @@ from app.products.openai.chat import ( _stream_chat, _extract_message, _resolve_image, _quota_sync, _fail_sync, _parse_retry_codes, _feedback_kind, _log_task_exception, + _configured_retry_codes, _should_retry_upstream, ) from app.products.openai._tool_sieve import ToolSieve @@ -308,7 +309,7 @@ async def create( directory = _acct_dir max_retries = cfg.get_int("retry.max_retries", 1) - retry_codes = _parse_retry_codes(cfg.get_str("retry.on_codes", "429,503")) + retry_codes = _configured_retry_codes(cfg) timeout_s = cfg.get_float("chat.timeout", 120.0) msg_id = _make_msg_id() @@ -572,7 +573,7 @@ async def _run_stream() -> AsyncGenerator[str, None]: except UpstreamError as exc: fail_exc = exc - if exc.status in retry_codes and attempt < max_retries: + if _should_retry_upstream(exc, retry_codes) and attempt < max_retries: _retry = True logger.warning( "messages stream retry: attempt={}/{} status={} token={}...", @@ -649,7 +650,7 @@ async def _run_stream() -> AsyncGenerator[str, None]: except UpstreamError as exc: fail_exc = exc - if exc.status in retry_codes and attempt < max_retries: + if _should_retry_upstream(exc, retry_codes) and attempt < max_retries: _retry = True logger.warning( "messages retry: attempt={}/{} status={} token={}...", diff --git a/app/products/openai/chat.py b/app/products/openai/chat.py index 5e5db437..a245db0e 100644 --- a/app/products/openai/chat.py +++ b/app/products/openai/chat.py @@ -91,15 +91,37 @@ async def _fail_sync( def _parse_retry_codes(s: str) -> frozenset[int]: - """Parse a comma-separated list of HTTP status codes into a frozenset.""" + """Parse retry status-code config from either a CSV string or a list.""" result: set[int] = set() - for part in s.split(","): - part = part.strip() - if part.isdigit(): - result.add(int(part)) + parts: list[object] + if isinstance(s, str): + parts = [part.strip() for part in s.split(",")] + elif isinstance(s, (list, tuple, set)): + parts = list(s) + else: + return frozenset() + for part in parts: + text = str(part).strip() + if text.isdigit(): + result.add(int(text)) return frozenset(result) +def _configured_retry_codes(cfg) -> frozenset[int]: + """Read retry codes from current config, including legacy array keys.""" + raw = cfg.get("retry.on_codes") + if raw is None: + raw = cfg.get("retry.retry_status_codes", "429,401,503") + return _parse_retry_codes(raw) + + +def _should_retry_upstream(exc: UpstreamError, retry_codes: frozenset[int]) -> bool: + """Return whether this upstream error should switch to another token.""" + from app.dataplane.reverse.protocol.xai_usage import is_invalid_credentials_error + + return exc.status in retry_codes or is_invalid_credentials_error(exc) + + def _feedback_kind(exc: BaseException) -> "FeedbackKind": """Map an upstream exception to the appropriate FeedbackKind.""" return feedback_kind_for_error(exc) @@ -361,7 +383,7 @@ async def completions( directory = _acct_dir max_retries = cfg.get_int("retry.max_retries", 1) - retry_codes = _parse_retry_codes(cfg.get_str("retry.on_codes", "429,503")) + retry_codes = _configured_retry_codes(cfg) response_id = make_response_id() timeout_s = cfg.get_float("chat.timeout", 120.0) @@ -528,7 +550,7 @@ async def _run_stream() -> AsyncGenerator[str, None]: except UpstreamError as exc: fail_exc = exc - if exc.status in retry_codes and attempt < max_retries: + if _should_retry_upstream(exc, retry_codes) and attempt < max_retries: _retry = True logger.warning( "chat stream retry scheduled: attempt={}/{} status={} token={}...", @@ -612,7 +634,7 @@ async def _run_stream() -> AsyncGenerator[str, None]: except UpstreamError as exc: fail_exc = exc - if exc.status in retry_codes and attempt < max_retries: + if _should_retry_upstream(exc, retry_codes) and attempt < max_retries: _retry = True logger.warning( "chat retry scheduled: attempt={}/{} status={} token={}...", @@ -710,4 +732,8 @@ async def _run_stream() -> AsyncGenerator[str, None]: ) -__all__ = ["completions"] +__all__ = [ + "completions", + "_configured_retry_codes", + "_should_retry_upstream", +] diff --git a/app/products/openai/responses.py b/app/products/openai/responses.py index ffded583..c8c3d7bc 100644 --- a/app/products/openai/responses.py +++ b/app/products/openai/responses.py @@ -19,6 +19,7 @@ from app.dataplane.reverse.protocol.xai_chat import classify_line, StreamAdapter from .chat import _stream_chat, _extract_message, _resolve_image, _quota_sync, _fail_sync, _parse_retry_codes, _feedback_kind, _log_task_exception +from .chat import _configured_retry_codes, _should_retry_upstream from ._format import ( make_resp_id, build_resp_usage, make_resp_object, format_sse, ) @@ -183,6 +184,14 @@ def _parse_input(input_val: str | list) -> list[dict]: url = src.get("url", "") if url: normalized.append({"type": "image_url", "image_url": {"url": url}}) + elif ptype == "input_image": + src = part.get("image_url") or part.get("source") or {} + if isinstance(src, dict): + url = src.get("url", "") + else: + url = str(src or "") + if url: + normalized.append({"type": "image_url", "image_url": {"url": url}}) else: normalized.append(part) content = normalized @@ -237,7 +246,7 @@ async def create( directory = _acct_dir max_retries = cfg.get_int("retry.max_retries", 1) - retry_codes = _parse_retry_codes(cfg.get_str("retry.on_codes", "429,503")) + retry_codes = _configured_retry_codes(cfg) response_id = make_resp_id("resp") reasoning_id = make_resp_id("rs") message_id = make_resp_id("msg") @@ -534,7 +543,7 @@ async def _run_stream() -> AsyncGenerator[str, None]: except UpstreamError as exc: fail_exc = exc - if exc.status in retry_codes and attempt < max_retries: + if _should_retry_upstream(exc, retry_codes) and attempt < max_retries: _retry = True logger.warning("responses stream retry scheduled: attempt={}/{} status={} token={}...", attempt + 1, max_retries, exc.status, token[:8]) @@ -604,7 +613,7 @@ async def _run_stream() -> AsyncGenerator[str, None]: except UpstreamError as exc: fail_exc = exc - if exc.status in retry_codes and attempt < max_retries: + if _should_retry_upstream(exc, retry_codes) and attempt < max_retries: _retry = True logger.warning("responses retry scheduled: attempt={}/{} status={} token={}...", attempt + 1, max_retries, exc.status, token[:8]) From c1614d95c827718c3432166d5b0be5455476c174 Mon Sep 17 00:00:00 2001 From: Chenyme <118253778+chenyme@users.noreply.github.com> Date: Mon, 13 Apr 2026 13:46:16 +0800 Subject: [PATCH 2/2] refactor: Remove redundant import of is_invalid_credentials_error in _should_retry_upstream function --- app/products/openai/chat.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/app/products/openai/chat.py b/app/products/openai/chat.py index a245db0e..f704040a 100644 --- a/app/products/openai/chat.py +++ b/app/products/openai/chat.py @@ -29,6 +29,7 @@ classify_line, StreamAdapter, ) +from app.dataplane.reverse.protocol.xai_usage import is_invalid_credentials_error from app.dataplane.reverse.runtime.endpoint_table import CHAT from app.dataplane.reverse.transport.asset_upload import upload_from_input from app.dataplane.reverse.protocol.tool_prompt import ( @@ -117,8 +118,6 @@ def _configured_retry_codes(cfg) -> frozenset[int]: def _should_retry_upstream(exc: UpstreamError, retry_codes: frozenset[int]) -> bool: """Return whether this upstream error should switch to another token.""" - from app.dataplane.reverse.protocol.xai_usage import is_invalid_credentials_error - return exc.status in retry_codes or is_invalid_credentials_error(exc)