Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions paddler_client_python/paddler_client/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,13 @@ def __init__(self, message: str, raw_data: str) -> None:

class PoolExhaustedError(PaddlerError):
def __init__(self) -> None:
super().__init__(
"No available WebSocket connections in the pool"
)
super().__init__("No available WebSocket connections in the pool")


class ConnectionDroppedError(PaddlerError):
def __init__(self, request_id: str) -> None:
self.request_id = request_id
super().__init__(
f"WebSocket connection dropped for request {request_id}"
)
super().__init__(f"WebSocket connection dropped for request {request_id}")


class ServerError(PaddlerError):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@ class InferenceSocketConnection:
def __init__(self, url: str) -> None:
self._url = url
self._ws: ClientConnection | None = None
self._pending: dict[
str, asyncio.Queue[InferenceMessage | Exception]
] = {}
self._pending: dict[str, asyncio.Queue[InferenceMessage | Exception]] = {}
self._write_queue: asyncio.Queue[str] = asyncio.Queue()
self._read_task: asyncio.Task[None] | None = None
self._write_task: asyncio.Task[None] | None = None
Expand All @@ -74,9 +72,7 @@ async def send(
if not self._connected:
raise ConnectionDroppedError(request_id)

response_queue: asyncio.Queue[InferenceMessage | Exception] = (
asyncio.Queue()
)
response_queue: asyncio.Queue[InferenceMessage | Exception] = asyncio.Queue()
self._pending[request_id] = response_queue
await self._write_queue.put(json_message)

Expand Down Expand Up @@ -107,9 +103,7 @@ async def _read_loop(self) -> None:

async for raw_message in self._ws:
if not isinstance(raw_message, str):
logger.warning(
"Received unexpected binary WebSocket message"
)
logger.warning("Received unexpected binary WebSocket message")
continue

try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ def __init__(self, url: str, pool_size: int) -> None:

self._url = url
self._pool_size = pool_size
self._connections: list[InferenceSocketConnection | None] = (
[None] * pool_size
)
self._connections: list[InferenceSocketConnection | None] = [None] * pool_size
self._next_idx = 0
self._lock = asyncio.Lock()

Expand Down
18 changes: 10 additions & 8 deletions paddler_client_python/paddler_client/inference_socket_url.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ def inference_socket_url(url: str) -> str:
if new_scheme is None:
raise ValueError(f"Unsupported URL scheme: {parsed.scheme}")

return urlunparse((
new_scheme,
parsed.netloc,
"/api/v1/inference_socket",
"",
parsed.query,
"",
))
return urlunparse(
(
new_scheme,
parsed.netloc,
"/api/v1/inference_socket",
"",
parsed.query,
"",
)
)
Empty file.
2 changes: 1 addition & 1 deletion paddler_client_python/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "paddler-client"
version = "3.0.1"
version = "3.0.2"
description = "Python client for Paddler LLM load balancer"
authors = ["Intentee"]
license = "Apache-2.0"
Expand Down
8 changes: 2 additions & 6 deletions paddler_client_python/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,15 @@ async def _wait_for_available_slots(client: ClientManagement) -> None:
while elapsed < WAIT_FOR_SLOTS_TIMEOUT_SECONDS:
snapshot = await client.get_agents()
total_slots = sum(agent.slots_total for agent in snapshot.agents)
total_processing = sum(
agent.slots_processing for agent in snapshot.agents
)
total_processing = sum(agent.slots_processing for agent in snapshot.agents)

if total_slots > 0 and total_processing == 0:
return

await asyncio.sleep(POLL_INTERVAL_SECONDS)
elapsed += POLL_INTERVAL_SECONDS

raise TimeoutError(
f"No idle agent slots within {WAIT_FOR_SLOTS_TIMEOUT_SECONDS}s"
)
raise TimeoutError(f"No idle agent slots within {WAIT_FOR_SLOTS_TIMEOUT_SECONDS}s")


@pytest.fixture
Expand Down
66 changes: 36 additions & 30 deletions paddler_client_python/tests/test_client_inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,25 @@ def _make_ndjson_token_response(
request_id: str,
token: str,
) -> str:
return json.dumps({
"Response": {
"request_id": request_id,
"response": {"GeneratedToken": {"Token": token}},
return json.dumps(
{
"Response": {
"request_id": request_id,
"response": {"GeneratedToken": {"Token": token}},
}
}
})
)


def _make_ndjson_done_response(request_id: str) -> str:
return json.dumps({
"Response": {
"request_id": request_id,
"response": {"GeneratedToken": "Done"},
return json.dumps(
{
"Response": {
"request_id": request_id,
"response": {"GeneratedToken": "Done"},
}
}
})
)


async def test_get_health_returns_text() -> None:
Expand Down Expand Up @@ -125,36 +129,38 @@ def handler(request: httpx.Request) -> httpx.Response:
body = json.loads(received_requests[0].content)
assert body["add_generation_prompt"] is True
assert body["max_tokens"] == 100
assert body["conversation_history"] == [
{"content": "Hello", "role": "user"}
]
assert body["conversation_history"] == [{"content": "Hello", "role": "user"}]
finally:
await client.close()


async def test_generate_embedding_batch() -> None:
def handler(request: httpx.Request) -> httpx.Response:
embedding_response = json.dumps({
"Response": {
"request_id": "r1",
"response": {
"Embedding": {
embedding_response = json.dumps(
{
"Response": {
"request_id": "r1",
"response": {
"Embedding": {
"embedding": [0.1, 0.2],
"normalization_method": "None",
"pooling_type": "Mean",
"source_document_id": "doc-1",
"Embedding": {
"embedding": [0.1, 0.2],
"normalization_method": "None",
"pooling_type": "Mean",
"source_document_id": "doc-1",
}
}
}
},
},
}
}
})
done_response = json.dumps({
"Response": {
"request_id": "r1",
"response": {"Embedding": "Done"},
)
done_response = json.dumps(
{
"Response": {
"request_id": "r1",
"response": {"Embedding": "Done"},
}
}
})
)

return httpx.Response(
200,
Expand Down
17 changes: 4 additions & 13 deletions paddler_client_python/tests/test_inference_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,7 @@ def test_parse_chat_template_error() -> None:
data = {
"Response": {
"request_id": "req-1",
"response": {
"GeneratedToken": {"ChatTemplateError": "bad template"}
},
"response": {"GeneratedToken": {"ChatTemplateError": "bad template"}},
}
}
message = parse_inference_client_message(data)
Expand All @@ -97,9 +95,7 @@ def test_parse_image_decoding_failed() -> None:
data = {
"Response": {
"request_id": "req-1",
"response": {
"GeneratedToken": {"ImageDecodingFailed": "corrupt image"}
},
"response": {"GeneratedToken": {"ImageDecodingFailed": "corrupt image"}},
}
}
message = parse_inference_client_message(data)
Expand All @@ -113,11 +109,7 @@ def test_parse_multimodal_not_supported() -> None:
data = {
"Response": {
"request_id": "req-1",
"response": {
"GeneratedToken": {
"MultimodalNotSupported": "no multimodal"
}
},
"response": {"GeneratedToken": {"MultimodalNotSupported": "no multimodal"}},
}
}
message = parse_inference_client_message(data)
Expand Down Expand Up @@ -181,8 +173,7 @@ def test_parse_embedding_error() -> None:

def test_parse_json_string() -> None:
json_str = (
'{"Response": {"request_id": "req-1",'
' "response": {"GeneratedToken": "Done"}}}'
'{"Response": {"request_id": "req-1", "response": {"GeneratedToken": "Done"}}}'
)
message = parse_inference_client_message(json_str)

Expand Down
35 changes: 16 additions & 19 deletions paddler_client_python/tests/test_stream_ndjson.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,30 @@


def _make_token_line(request_id: str, token: str) -> str:
return json.dumps({
"Response": {
"request_id": request_id,
"response": {"GeneratedToken": {"Token": token}},
return json.dumps(
{
"Response": {
"request_id": request_id,
"response": {"GeneratedToken": {"Token": token}},
}
}
})
)


def _make_done_line(request_id: str) -> str:
return json.dumps({
"Response": {
"request_id": request_id,
"response": {"GeneratedToken": "Done"},
return json.dumps(
{
"Response": {
"request_id": request_id,
"response": {"GeneratedToken": "Done"},
}
}
})
)


async def test_parses_multiple_ndjson_lines() -> None:
ndjson_content = (
_make_token_line("r1", "hello")
+ "\n"
+ _make_done_line("r1")
+ "\n"
_make_token_line("r1", "hello") + "\n" + _make_done_line("r1") + "\n"
)

def handler(request: httpx.Request) -> httpx.Response:
Expand All @@ -53,11 +54,7 @@ def handler(request: httpx.Request) -> httpx.Response:


async def test_skips_empty_lines() -> None:
ndjson_content = (
"\n\n"
+ _make_done_line("r1")
+ "\n\n"
)
ndjson_content = "\n\n" + _make_done_line("r1") + "\n\n"

def handler(request: httpx.Request) -> httpx.Response:
return httpx.Response(200, text=ndjson_content)
Expand Down
Loading
Loading