diff --git a/client/python/USAGE.md b/client/python/USAGE.md index 082bfd0..5a55402 100644 --- a/client/python/USAGE.md +++ b/client/python/USAGE.md @@ -39,6 +39,24 @@ The client supports usage as a context manager, which automatically closes the u Example available in: ```examples/context_manager_usage.py``` +### Async Client Support + +For async applications, use `AsyncVortexDB`. It mirrors the synchronous client API and uses `grpc.aio` under the hood. + +Example available in: +```examples/async_usage.py``` + +```python +async with AsyncVortexDB( + grpc_url="localhost:50051", + api_key="your-api-key", +) as db: + point_id = await db.insert( + vector=DenseVector([0.1, 0.2, 0.3]), + payload=Payload.text("hello async vortex"), + ) +``` + --- ## Client API @@ -47,6 +65,25 @@ Example available in: Main client class for interacting with the VortexDB gRPC server. +### `AsyncVortexDB` + +Async client class for I/O-heavy applications. It has the same constructor and method names as `VortexDB`, but methods are awaitable: + +``` +await db.insert(...) +await db.get(...) +await db.search(...) +await db.delete(...) +await db.close() +``` + +It also supports async context manager usage: + +``` +async with AsyncVortexDB(...) as db: + ... +``` + #### **Constructor** ``` diff --git a/client/python/examples/async_usage.py b/client/python/examples/async_usage.py new file mode 100644 index 0000000..cce3335 --- /dev/null +++ b/client/python/examples/async_usage.py @@ -0,0 +1,31 @@ +import asyncio + +from vortexdb import AsyncVortexDB, DenseVector, Payload, Similarity + + +async def main(): + async with AsyncVortexDB( + grpc_url="localhost:50051", + api_key="your-api-key", + ) as db: + point_id = await db.insert( + vector=DenseVector([0.1, 0.2, 0.3]), + payload=Payload.text("hello async vortex"), + ) + + point = await db.get(point_id=point_id) + if point is not None: + print(point.pretty()) + + results = await db.search( + vector=DenseVector([0.1, 0.2, 0.3]), + similarity=Similarity.COSINE, + limit=5, + ) + print(results) + + await db.delete(point_id=point_id) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/client/python/tests/test_async_client.py b/client/python/tests/test_async_client.py new file mode 100644 index 0000000..1667633 --- /dev/null +++ b/client/python/tests/test_async_client.py @@ -0,0 +1,158 @@ +import asyncio +from unittest.mock import AsyncMock, Mock + +import pytest + +from vortexdb.async_client import AsyncVortexDB +from vortexdb.async_connection import AsyncGRPCConnection +from vortexdb.models import ContentType, DenseVector, Payload, Point, Similarity + + +@pytest.fixture +def mock_connection(monkeypatch): + """ + Replace AsyncGRPCConnection with a mock instance. + """ + conn = Mock(spec=AsyncGRPCConnection) + conn.stub = Mock() + conn.call = AsyncMock() + conn.close = AsyncMock() + monkeypatch.setattr("vortexdb.async_client.AsyncGRPCConnection", lambda _: conn) + return conn + + +@pytest.fixture +def client(mock_connection): + return AsyncVortexDB( + grpc_url="localhost:50051", + api_key="secret", + ) + + +def test_async_insert_success(client, mock_connection): + async def run(): + response = Mock() + response.id = Mock() + response.id.value = "point-123" + + mock_connection.call.return_value = response + + point_id = await client.insert( + vector=DenseVector([1, 2, 3]), + payload=Payload.text("hello"), + ) + + assert point_id == "point-123" + + asyncio.run(run()) + + +def test_async_insert_rejects_invalid_vector(client): + async def run(): + with pytest.raises(TypeError): + await client.insert( + vector=[1, 2, 3], + payload=Payload.text("hello"), + ) + + asyncio.run(run()) + + +def test_async_get_point_success(client, mock_connection): + async def run(): + proto_point = Mock() + proto_point.id.id.value = "point-123" + proto_point.vector.values = [1, 2, 3] + proto_point.payload.content_type = ContentType.TEXT.to_proto() + proto_point.payload.content = "hello" + + mock_connection.call.return_value = proto_point + + point = await client.get(point_id="point-123") + + assert isinstance(point, Point) + assert point.id == "point-123" + assert point.payload.content == "hello" + + asyncio.run(run()) + + +def test_async_get_point_not_found(client, mock_connection): + async def run(): + mock_connection.call.return_value = None + + result = await client.get(point_id="missing") + + assert result is None + + asyncio.run(run()) + + +def test_async_delete_success(client, mock_connection): + async def run(): + mock_connection.call.return_value = None + + await client.delete(point_id="point-123") + + mock_connection.call.assert_awaited_once() + + asyncio.run(run()) + + +def test_async_search_success(client, mock_connection): + async def run(): + mock_connection.call.return_value = Mock( + result_point_ids=[ + Mock(id=Mock(value="p1")), + Mock(id=Mock(value="p2")), + ] + ) + + results = await client.search( + vector=DenseVector([1, 2, 3]), + similarity=Similarity.COSINE, + limit=2, + ) + + assert results == ["p1", "p2"] + + asyncio.run(run()) + + +def test_async_search_invalid_vector(client): + async def run(): + with pytest.raises(TypeError): + await client.search( + vector=[1, 2, 3], + similarity=Similarity.COSINE, + limit=2, + ) + + asyncio.run(run()) + + +def test_async_close_closes_connection(client, mock_connection): + async def run(): + await client.close() + mock_connection.close.assert_awaited_once() + + asyncio.run(run()) + + +def test_async_context_manager_closes_connection(monkeypatch): + async def run(): + conn = Mock(spec=AsyncGRPCConnection) + conn.stub = Mock() + conn.call = AsyncMock() + conn.close = AsyncMock() + monkeypatch.setattr("vortexdb.async_client.AsyncGRPCConnection", lambda _: conn) + + async with AsyncVortexDB( + grpc_url="localhost:50051", + api_key="secret", + ) as db: + assert db is not None + + conn.close.assert_awaited_once() + + asyncio.run(run()) diff --git a/client/python/tests/test_async_connection.py b/client/python/tests/test_async_connection.py new file mode 100644 index 0000000..b6cd514 --- /dev/null +++ b/client/python/tests/test_async_connection.py @@ -0,0 +1,105 @@ +import asyncio +from unittest.mock import AsyncMock, Mock, patch + +import grpc + +from vortexdb.async_connection import AsyncGRPCConnection +from vortexdb.config import VortexDBConfig +from vortexdb.exceptions import ( + AuthenticationError, + InternalServerError, + InvalidArgumentError, + NotFoundError, + ServiceUnavailableError, + TimeoutError, +) + + +class FakeAioRpcError: + """ + Minimal AioRpcError-compatible object for mapping tests. + """ + + def __init__(self, status_code: grpc.StatusCode, details: str): + self._status_code = status_code + self._details = details + + def code(self): + return self._status_code + + def details(self): + return self._details + + +def make_config() -> VortexDBConfig: + return VortexDBConfig( + grpc_url="localhost:50051", + api_key="secret", + timeout=3.0, + ) + + +def test_async_channel_created_with_correct_url(): + with patch("grpc.aio.insecure_channel") as mock_channel: + mock_channel.return_value = Mock() + AsyncGRPCConnection(make_config()) + mock_channel.assert_called_once_with("localhost:50051") + + +def test_async_metadata_is_attached(): + with patch("grpc.aio.insecure_channel") as mock_channel: + mock_channel.return_value = Mock() + connection = AsyncGRPCConnection(make_config()) + + assert ("authorization", "Bearer secret") in connection._metadata + + +def test_successful_async_rpc_call(): + async def run(): + with patch("grpc.aio.insecure_channel") as mock_channel: + mock_channel.return_value = Mock() + connection = AsyncGRPCConnection(make_config()) + + fake_rpc = AsyncMock(return_value="ok") + + result = await connection.call(fake_rpc, request="req") + + fake_rpc.assert_awaited_once_with( + "req", + timeout=3.0, + metadata=connection._metadata, + ) + assert result == "ok" + + asyncio.run(run()) + + +def test_async_grpc_error_mapping(): + cases = [ + (grpc.StatusCode.UNAUTHENTICATED, AuthenticationError), + (grpc.StatusCode.NOT_FOUND, NotFoundError), + (grpc.StatusCode.INVALID_ARGUMENT, InvalidArgumentError), + (grpc.StatusCode.DEADLINE_EXCEEDED, TimeoutError), + (grpc.StatusCode.UNAVAILABLE, ServiceUnavailableError), + (grpc.StatusCode.UNKNOWN, InternalServerError), + ] + + for status_code, expected_exception in cases: + error = FakeAioRpcError(status_code, "boom") + mapped = AsyncGRPCConnection._map_grpc_error(error) + assert isinstance(mapped, expected_exception) + + +def test_async_close_closes_channel(): + async def run(): + with patch("grpc.aio.insecure_channel") as mock_channel: + channel = Mock() + channel.close = AsyncMock() + mock_channel.return_value = channel + connection = AsyncGRPCConnection(make_config()) + + await connection.close() + + channel.close.assert_awaited_once() + + asyncio.run(run()) diff --git a/client/python/vortexdb/__init__.py b/client/python/vortexdb/__init__.py index 62c100f..9d9d694 100644 --- a/client/python/vortexdb/__init__.py +++ b/client/python/vortexdb/__init__.py @@ -1,6 +1,7 @@ # vortexdb/__init__.py from vortexdb.client import VortexDB +from vortexdb.async_client import AsyncVortexDB from vortexdb.models import ( DenseVector, Payload, @@ -19,6 +20,7 @@ __all__ = [ "VortexDB", + "AsyncVortexDB", "DenseVector", "Payload", "Point", diff --git a/client/python/vortexdb/async_client.py b/client/python/vortexdb/async_client.py new file mode 100644 index 0000000..1d721a3 --- /dev/null +++ b/client/python/vortexdb/async_client.py @@ -0,0 +1,118 @@ +from typing import List + +from vortexdb import protoutils as proto +from vortexdb.async_connection import AsyncGRPCConnection +from vortexdb.config import VortexDBConfig +from vortexdb.models import DenseVector, Payload, Point, Similarity + + +class AsyncVortexDB: + """High-level async Python client for VortexDB.""" + + def __init__( + self, + *, + grpc_url: str | None = None, + api_key: str | None = None, + timeout: float | None = None, + ): + # Config order followed - args -> env vars -> defaults + self._config = VortexDBConfig.from_env( + grpc_url=grpc_url, + api_key=api_key, + timeout=timeout, + ) + + self._conn = AsyncGRPCConnection(self._config) + + async def insert(self, *, vector: DenseVector, payload: Payload) -> str: + """ + Insert a vector with payload. + Returns: point_id (str) + """ + if not isinstance(vector, DenseVector): + raise TypeError( + "vector must be a DenseVector. " + "Use: DenseVector([1.0, 2.0, 3.0])" + ) + + request = proto.build_insert_request( + vector=vector, + payload=payload, + ) + + response = await self._conn.call( + self._conn.stub.InsertVector, + request, + ) + + return response.id.value + + async def get(self, *, point_id: str) -> Point | None: + """ + Retrieve a point by ID. + """ + request = proto.build_point_id_request(point_id) + + response = await self._conn.call( + self._conn.stub.GetPoint, + request, + ) + + if response is None: + return None + + return Point.from_proto(response) + + async def delete(self, *, point_id: str) -> None: + """ + Delete a point by ID. + """ + request = proto.build_point_id_request(point_id) + + await self._conn.call( + self._conn.stub.DeletePoint, + request, + ) + + async def search( + self, + *, + vector: DenseVector, + similarity: Similarity, + limit: int, + ) -> List[str]: + """ + Search for nearest neighbors. + Returns: List of point IDs + """ + if not isinstance(vector, DenseVector): + raise TypeError( + "vector must be a DenseVector. " + "Use: DenseVector([1.0, 2.0, 3.0])" + ) + + request = proto.build_search_request( + vector=vector, + similarity=similarity, + limit=limit, + ) + + response = await self._conn.call( + self._conn.stub.SearchPoints, + request, + ) + + return [pid.id.value for pid in response.result_point_ids] + + async def close(self) -> None: + """ + Close the async gRPC connection. + """ + await self._conn.close() + + async def __aenter__(self) -> "AsyncVortexDB": + return self + + async def __aexit__(self, exc_type, exc, tb) -> None: + await self.close() diff --git a/client/python/vortexdb/async_connection.py b/client/python/vortexdb/async_connection.py new file mode 100644 index 0000000..3ad10ab --- /dev/null +++ b/client/python/vortexdb/async_connection.py @@ -0,0 +1,72 @@ +from typing import Any, Callable + +import grpc + +from vortexdb.config import VortexDBConfig +from vortexdb.exceptions import ( + AuthenticationError, + InternalServerError, + InvalidArgumentError, + NotFoundError, + ServiceUnavailableError, + TimeoutError, + VortexDBError, +) +from vortexdb.grpc.vector_db_pb2_grpc import VectorDBStub + + +class AsyncGRPCConnection: + """Async gRPC connection wrapper for VortexDB.""" + + def __init__(self, config: VortexDBConfig): + self._config = config + self._channel = grpc.aio.insecure_channel(config.grpc_url) + self._stub = VectorDBStub(self._channel) + self._metadata = ( + ("authorization", f"Bearer {config.api_key}"), + ) + + @property + def stub(self) -> VectorDBStub: + return self._stub + + async def call( + self, + rpc: Callable[..., Any], + request: Any, + ) -> Any: + """Execute an async gRPC call with standard error handling.""" + try: + return await rpc( + request, + timeout=self._config.timeout, + metadata=self._metadata, + ) + + except grpc.aio.AioRpcError as e: + raise self._map_grpc_error(e) from e + + async def close(self) -> None: + """Close the underlying async gRPC channel.""" + await self._channel.close() + + @staticmethod + def _map_grpc_error(error: grpc.aio.AioRpcError) -> VortexDBError: + code = error.code() + + if code == grpc.StatusCode.UNAUTHENTICATED: + return AuthenticationError(error.details()) + + if code == grpc.StatusCode.NOT_FOUND: + return NotFoundError(error.details()) + + if code == grpc.StatusCode.INVALID_ARGUMENT: + return InvalidArgumentError(error.details()) + + if code == grpc.StatusCode.DEADLINE_EXCEEDED: + return TimeoutError(error.details()) + + if code == grpc.StatusCode.UNAVAILABLE: + return ServiceUnavailableError(error.details()) + + return InternalServerError(error.details())