-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathasync_service.py
More file actions
31 lines (23 loc) · 1.31 KB
/
async_service.py
File metadata and controls
31 lines (23 loc) · 1.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from mpt_api_client.http.async_client import AsyncHTTPClient
from mpt_api_client.http.base_service import ServiceBase
from mpt_api_client.http.resource_accessor import AsyncResourceAccessor
from mpt_api_client.http.url_utils import join_url_path
from mpt_api_client.models import Model as BaseModel
class AsyncService[Model: BaseModel](ServiceBase[AsyncHTTPClient, Model]): # noqa: WPS214
"""Immutable Service for RESTful resource collections.
Examples:
active_orders_cc = order_collection.filter(RQLQuery(status="active"))
active_orders = active_orders_cc.order_by("created").iterate()
product_active_orders = active_orders_cc.filter(RQLQuery(product__id="PRD-1")).iterate()
new_order = order_collection.create(order_data)
"""
def _resource(self, resource_id: str) -> AsyncResourceAccessor[Model]:
"""Return an :class:`AsyncResourceAccessor` bound to *resource_id*.
Usage::
await self._resource("RES-123").post("complete", json=data)
await self._resource("RES-123").get()
await self._resource("RES-123").put(json=data)
await self._resource("RES-123").delete()
"""
resource_url = join_url_path(self.path, resource_id)
return AsyncResourceAccessor(self.http_client, resource_url, self._model_class)