-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbase_service.py
More file actions
58 lines (48 loc) · 1.7 KB
/
base_service.py
File metadata and controls
58 lines (48 loc) · 1.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
from typing import Any
from mpt_api_client.http.query_state import QueryState
from mpt_api_client.http.types import Response
from mpt_api_client.models import Meta, ModelCollection
from mpt_api_client.models import Model as BaseModel
class ServiceBase[Client, Model: BaseModel]: # noqa: WPS214
"""Service base with agnostic HTTP client."""
_endpoint: str
_model_class: type[Model]
_collection_key = "data"
def __init__(
self,
*,
http_client: Client,
query_state: QueryState | None = None,
endpoint_params: dict[str, str] | None = None,
) -> None:
self.http_client = http_client
self.query_state = query_state or QueryState()
self.endpoint_params = endpoint_params or {}
@property
def path(self) -> str:
"""Service endpoint URL."""
return self._endpoint.format(**self.endpoint_params)
def build_path(
self,
query_params: dict[str, Any] | None = None,
) -> str:
"""Builds the endpoint URL with all the query parameters.
Returns:
Complete URL with query parameters.
"""
query = self.query_state.build(query_params)
return f"{self.path}?{query}" if query else self.path
@classmethod
def make_collection(cls, response: Response) -> ModelCollection[Model]:
"""Builds a collection from a response.
Args:
response: The response object.
"""
meta = Meta.from_response(response)
return ModelCollection(
resources=[
cls._model_class(resource, meta)
for resource in response.json().get(cls._collection_key)
],
meta=meta,
)