Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 50 additions & 12 deletions app/control/proxy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def __init__(self) -> None:
self._flare = FlareSolverrClearanceProvider()
self._egress_mode: EgressMode = EgressMode.DIRECT
self._clearance_mode: ClearanceMode = ClearanceMode.NONE
self._config_sig: tuple | None = None
# Pool cursor for PROXY_POOL mode: sticky routing with failure-driven rotate.
# Incremented on node failure; all callers see the same cursor under _lock.
self._pool_cursor: int = 0
Expand All @@ -50,36 +51,73 @@ def __init__(self) -> None:
async def load(self) -> None:
"""Load proxy configuration from the current config snapshot."""
cfg = get_config()
self._egress_mode = EgressMode(cfg.get_str("proxy.egress.mode", "direct"))
self._clearance_mode = ClearanceMode.parse(cfg.get_str("proxy.clearance.mode", "none"))
egress_mode = EgressMode(cfg.get_str("proxy.egress.mode", "direct"))
clearance_mode = ClearanceMode.parse(cfg.get_str("proxy.clearance.mode", "none"))
base_url = cfg.get_str("proxy.egress.proxy_url", "")
res_url = cfg.get_str("proxy.egress.resource_proxy_url", "")
base_pool = tuple(cfg.get_list("proxy.egress.proxy_pool", []))
res_pool = tuple(cfg.get_list("proxy.egress.resource_proxy_pool", []))
config_sig = (
egress_mode.value,
clearance_mode.value,
base_url,
res_url,
base_pool,
res_pool,
cfg.get_str("proxy.clearance.flaresolverr_url", ""),
cfg.get_str("proxy.clearance.cf_cookies", ""),
cfg.get_str("proxy.clearance.user_agent", ""),
cfg.get_int("proxy.clearance.timeout_sec", 60),
)

nodes: list[EgressNode] = []
resource_nodes: list[EgressNode] = []

if self._egress_mode == EgressMode.SINGLE_PROXY:
base_url = cfg.get_str("proxy.egress.proxy_url", "")
res_url = cfg.get_str("proxy.egress.resource_proxy_url", "")
if egress_mode == EgressMode.SINGLE_PROXY:
if base_url:
nodes.append(EgressNode(node_id="single", proxy_url=base_url))
if res_url:
resource_nodes.append(EgressNode(node_id="res-single", proxy_url=res_url))

elif self._egress_mode == EgressMode.PROXY_POOL:
base_pool: list[str] = cfg.get_list("proxy.egress.proxy_pool", [])
res_pool: list[str] = cfg.get_list("proxy.egress.resource_proxy_pool", [])
elif egress_mode == EgressMode.PROXY_POOL:
for i, url in enumerate(base_pool):
nodes.append(EgressNode(node_id=f"pool-{i}", proxy_url=url))
for i, url in enumerate(res_pool):
resource_nodes.append(EgressNode(node_id=f"res-pool-{i}", proxy_url=url))

valid_affinities = {
n.proxy_url or "direct"
for n in [*nodes, *resource_nodes]
}
if not valid_affinities:
valid_affinities = {"direct"}

async with self._lock:
if self._config_sig == config_sig:
return
from .models import ClearanceBundleState

self._egress_mode = egress_mode
self._clearance_mode = clearance_mode
self._nodes = nodes
self._resource_nodes = resource_nodes
self._pool_cursor = 0
self._bundles = {
key: bundle.model_copy(update={"state": ClearanceBundleState.INVALID})
for key, bundle in self._bundles.items()
if key in valid_affinities
}
self._refresh_events = {
key: event
for key, event in self._refresh_events.items()
if key in valid_affinities
}
self._config_sig = config_sig

logger.info(
"proxy directory loaded: egress_mode={} clearance_mode={} node_count={} resource_node_count={}",
self._egress_mode,
self._clearance_mode,
egress_mode,
clearance_mode,
len(nodes),
len(resource_nodes),
)
Expand Down Expand Up @@ -326,11 +364,11 @@ def bundles(self) -> dict[str, ClearanceBundle]:


async def get_proxy_directory() -> ProxyDirectory:
"""Return the module-level ProxyDirectory, loading config on first call."""
"""Return the module-level ProxyDirectory, reloading config if it changed."""
global _directory
if _directory is None:
_directory = ProxyDirectory()
await _directory.load()
await _directory.load()
return _directory


Expand Down
2 changes: 2 additions & 0 deletions app/control/proxy/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ async def _loop(self) -> None:
async def _warm_up(self) -> None:
"""Pre-fetch clearance bundles without invalidating existing ones."""
try:
await self._directory.load()
await self._directory.warm_up()
logger.debug("proxy clearance warm-up completed")
except Exception as exc:
Expand All @@ -67,6 +68,7 @@ async def _refresh(self) -> None:
refresh failure never leaves requests without clearance.
"""
try:
await self._directory.load()
await self._directory.refresh_clearance_safe()
logger.debug("proxy clearance refresh completed")
except Exception as exc:
Expand Down
2 changes: 1 addition & 1 deletion app/dataplane/proxy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ def has_proxy(self) -> bool:

async def get_proxy_runtime() -> ProxyRuntime:
global _runtime
directory = await get_proxy_directory()
if _runtime is None:
directory = await get_proxy_directory()
_runtime = ProxyRuntime(directory)
return _runtime

Expand Down
4 changes: 3 additions & 1 deletion app/dataplane/proxy/adapters/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ def _skip_proxy_ssl(proxy_url: str) -> bool:


def normalize_proxy_url(url: str) -> str:
"""Remap socks5:// → socks5h:// for proper DNS-over-proxy."""
"""Normalize SOCKS schemes for consistent DNS-over-proxy behaviour."""
if not url:
return url
scheme = urlparse(url).scheme.lower()
if scheme == "socks":
return "socks5h://" + url[len("socks://"):]
if scheme == "socks5":
return "socks5h://" + url[len("socks5://"):]
if scheme == "socks4":
Expand Down
4 changes: 3 additions & 1 deletion app/dataplane/reverse/transport/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ def _normalize_socks(proxy_url: str) -> tuple[str, Optional[bool]]:
scheme = urlparse(proxy_url).scheme.lower()
rdns: Optional[bool] = None
base = scheme
if scheme == "socks5h":
if scheme == "socks":
base, rdns = "socks5", True
elif scheme == "socks5h":
base, rdns = "socks5", True
elif scheme == "socks4a":
base, rdns = "socks4", True
Expand Down
7 changes: 4 additions & 3 deletions app/products/anthropic/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from app.products.openai.chat import (
_stream_chat, _extract_message, _resolve_image,
_quota_sync, _fail_sync, _parse_retry_codes, _feedback_kind, _log_task_exception,
_configured_retry_codes, _should_retry_upstream,
)
from app.products.openai._tool_sieve import ToolSieve

Expand Down Expand Up @@ -308,7 +309,7 @@ async def create(
directory = _acct_dir

max_retries = cfg.get_int("retry.max_retries", 1)
retry_codes = _parse_retry_codes(cfg.get_str("retry.on_codes", "429,503"))
retry_codes = _configured_retry_codes(cfg)
timeout_s = cfg.get_float("chat.timeout", 120.0)
msg_id = _make_msg_id()

Expand Down Expand Up @@ -572,7 +573,7 @@ async def _run_stream() -> AsyncGenerator[str, None]:

except UpstreamError as exc:
fail_exc = exc
if exc.status in retry_codes and attempt < max_retries:
if _should_retry_upstream(exc, retry_codes) and attempt < max_retries:
_retry = True
logger.warning(
"messages stream retry: attempt={}/{} status={} token={}...",
Expand Down Expand Up @@ -649,7 +650,7 @@ async def _run_stream() -> AsyncGenerator[str, None]:

except UpstreamError as exc:
fail_exc = exc
if exc.status in retry_codes and attempt < max_retries:
if _should_retry_upstream(exc, retry_codes) and attempt < max_retries:
_retry = True
logger.warning(
"messages retry: attempt={}/{} status={} token={}...",
Expand Down
43 changes: 34 additions & 9 deletions app/products/openai/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
classify_line,
StreamAdapter,
)
from app.dataplane.reverse.protocol.xai_usage import is_invalid_credentials_error
from app.dataplane.reverse.runtime.endpoint_table import CHAT
from app.dataplane.reverse.transport.asset_upload import upload_from_input
from app.dataplane.reverse.protocol.tool_prompt import (
Expand Down Expand Up @@ -91,15 +92,35 @@ async def _fail_sync(


def _parse_retry_codes(s: str) -> frozenset[int]:
"""Parse a comma-separated list of HTTP status codes into a frozenset."""
"""Parse retry status-code config from either a CSV string or a list."""
result: set[int] = set()
for part in s.split(","):
part = part.strip()
if part.isdigit():
result.add(int(part))
parts: list[object]
if isinstance(s, str):
parts = [part.strip() for part in s.split(",")]
elif isinstance(s, (list, tuple, set)):
parts = list(s)
else:
return frozenset()
for part in parts:
text = str(part).strip()
if text.isdigit():
result.add(int(text))
return frozenset(result)


def _configured_retry_codes(cfg) -> frozenset[int]:
"""Read retry codes from current config, including legacy array keys."""
raw = cfg.get("retry.on_codes")
if raw is None:
raw = cfg.get("retry.retry_status_codes", "429,401,503")
return _parse_retry_codes(raw)


def _should_retry_upstream(exc: UpstreamError, retry_codes: frozenset[int]) -> bool:
"""Return whether this upstream error should switch to another token."""
return exc.status in retry_codes or is_invalid_credentials_error(exc)


def _feedback_kind(exc: BaseException) -> "FeedbackKind":
"""Map an upstream exception to the appropriate FeedbackKind."""
return feedback_kind_for_error(exc)
Expand Down Expand Up @@ -361,7 +382,7 @@ async def completions(
directory = _acct_dir

max_retries = cfg.get_int("retry.max_retries", 1)
retry_codes = _parse_retry_codes(cfg.get_str("retry.on_codes", "429,503"))
retry_codes = _configured_retry_codes(cfg)
response_id = make_response_id()
timeout_s = cfg.get_float("chat.timeout", 120.0)

Expand Down Expand Up @@ -528,7 +549,7 @@ async def _run_stream() -> AsyncGenerator[str, None]:

except UpstreamError as exc:
fail_exc = exc
if exc.status in retry_codes and attempt < max_retries:
if _should_retry_upstream(exc, retry_codes) and attempt < max_retries:
_retry = True
logger.warning(
"chat stream retry scheduled: attempt={}/{} status={} token={}...",
Expand Down Expand Up @@ -612,7 +633,7 @@ async def _run_stream() -> AsyncGenerator[str, None]:

except UpstreamError as exc:
fail_exc = exc
if exc.status in retry_codes and attempt < max_retries:
if _should_retry_upstream(exc, retry_codes) and attempt < max_retries:
_retry = True
logger.warning(
"chat retry scheduled: attempt={}/{} status={} token={}...",
Expand Down Expand Up @@ -710,4 +731,8 @@ async def _run_stream() -> AsyncGenerator[str, None]:
)


__all__ = ["completions"]
__all__ = [
"completions",
"_configured_retry_codes",
"_should_retry_upstream",
]
15 changes: 12 additions & 3 deletions app/products/openai/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from app.dataplane.reverse.protocol.xai_chat import classify_line, StreamAdapter

from .chat import _stream_chat, _extract_message, _resolve_image, _quota_sync, _fail_sync, _parse_retry_codes, _feedback_kind, _log_task_exception
from .chat import _configured_retry_codes, _should_retry_upstream
from ._format import (
make_resp_id, build_resp_usage, make_resp_object, format_sse,
)
Expand Down Expand Up @@ -183,6 +184,14 @@ def _parse_input(input_val: str | list) -> list[dict]:
url = src.get("url", "")
if url:
normalized.append({"type": "image_url", "image_url": {"url": url}})
elif ptype == "input_image":
src = part.get("image_url") or part.get("source") or {}
if isinstance(src, dict):
url = src.get("url", "")
else:
url = str(src or "")
if url:
normalized.append({"type": "image_url", "image_url": {"url": url}})
else:
normalized.append(part)
content = normalized
Expand Down Expand Up @@ -237,7 +246,7 @@ async def create(
directory = _acct_dir

max_retries = cfg.get_int("retry.max_retries", 1)
retry_codes = _parse_retry_codes(cfg.get_str("retry.on_codes", "429,503"))
retry_codes = _configured_retry_codes(cfg)
response_id = make_resp_id("resp")
reasoning_id = make_resp_id("rs")
message_id = make_resp_id("msg")
Expand Down Expand Up @@ -534,7 +543,7 @@ async def _run_stream() -> AsyncGenerator[str, None]:

except UpstreamError as exc:
fail_exc = exc
if exc.status in retry_codes and attempt < max_retries:
if _should_retry_upstream(exc, retry_codes) and attempt < max_retries:
_retry = True
logger.warning("responses stream retry scheduled: attempt={}/{} status={} token={}...",
attempt + 1, max_retries, exc.status, token[:8])
Expand Down Expand Up @@ -604,7 +613,7 @@ async def _run_stream() -> AsyncGenerator[str, None]:

except UpstreamError as exc:
fail_exc = exc
if exc.status in retry_codes and attempt < max_retries:
if _should_retry_upstream(exc, retry_codes) and attempt < max_retries:
_retry = True
logger.warning("responses retry scheduled: attempt={}/{} status={} token={}...",
attempt + 1, max_retries, exc.status, token[:8])
Expand Down
Loading