diff --git a/valueguard/valueguard.py b/valueguard/valueguard.py index 24dd302..a646138 100644 --- a/valueguard/valueguard.py +++ b/valueguard/valueguard.py @@ -1,984 +1,542 @@ import requests import json import urllib.parse +from urllib.parse import quote def _generate_request_search_criteria(search_criteria): + """Build a query-string fragment from search criteria. + + Parameters + ---------- + search_criteria : iterable + Typically dict.items() containing key/value pairs. + + Returns + ------- + str + A string starting with '&' for each parameter, matching the old format. + """ url = "" for key, value in search_criteria: value = _change_array_to_string(value) - url += "&" + urllib.parse.quote(key) + "=" + urllib.parse.quote(str(value)) + url += f"&{quote(str(key))}={quote(str(value))}" return url def _change_array_to_string(value): - """ Handles the check to see if the parameter is an array and then converts it to a string - - Parameters - ---------- - :param value: - The possible array or list we are going to convert to a string. + """Convert list values to a comma-separated string. - Returns - ------- - :return: - A string. Either converted from an array or as it is. + This preserves the old behavior where list query parameters are sent as a + single comma-separated value. """ if isinstance(value, list): - parameters_string = ','.join(value) - return parameters_string + return ",".join(value) return value class Client: - # Static - # server_url = "http://localhost:8080" - server_url = "https://api.valueguard.se" + # Default configuration + server_url = "https://api-prod.valueguard.se" __oauth2_client_name = "api" - _verify_ssl=True + _verify_ssl = True - # User settings; + # Auth state access_token = "" refresh_token = "" def __init__(self): - pass - - def authenticate(self, username, password): - """ Uses user's credentials to authenticate. - - Generates the url to authenticate and request the access token as well - as the refresh token from the server. - - Parameters - ---------- - :param username: - Username used to authenticate - :param password: - Password used to authenticates - - Raises - ------ - :exception - Exception raised when the server response is invalid - """ - url = self.server_url + "/oauth/token?client_id=" + self.__oauth2_client_name + "&grant_type=password" \ - "&username=" + \ - urllib.parse.quote(username) + \ - "&password=" + \ - urllib.parse.quote(password) - # print(url) - response = requests.post(url, verify=self._verify_ssl) - - if response.status_code == 200: - response_json = json.loads(response.content.decode("utf-8")) - self.access_token = response_json['access_token'] - self.refresh_token = response_json['refresh_token'] - # print(response_json) - elif response.status_code != 200: - # print(response.status_code) - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - - def renew_access_token(self): - """ Handles the renewal of the access token. - - """ - url = self.server_url + "/oauth/token?client_id=api&grant_type=refresh_token&refresh_token=" + \ - urllib.parse.quote(self.refresh_token) - print(url) - response = requests.post(url, verify=self._verify_ssl) - if response.status_code == 200: - response_json = json.loads(response.content.decode("utf-8")) - self.access_token = response_json['access_token'] - self.refresh_token = response_json['refresh_token'] - #print(response_json) - else: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - - - def user(self): - """ Handles the query to get the user. - - Returns - ------- - :return: - The query result in JSON format - """ - url = self.server_url + "/v0/users/me?access_token=" + \ - urllib.parse.quote(self.access_token) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) - - def user_roles(self): - """ Handles the query to get users roles. - - Returns - ------- - :return: - The query result in JSON format - """ - url = self.server_url + "/v0/users/me/roles?access_token=" + \ - urllib.parse.quote(self.access_token) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) - - def household(self, search_criteria=None): - """ Handles the query to retrieve data about a household. + self.session = requests.Session() - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. + # ------------------------------------------------------------------------- + # Internal helpers + # ------------------------------------------------------------------------- - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} - url = self.server_url + "/v1/household?access_token=" + \ - urllib.parse.quote(self.access_token) - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) - - def household_registration(self, search_criteria=None): - """ Handles the query to retrieve data about a household registration. + def _decode_error(self, response): + """Decode an error response safely.""" + return response.content.decode("utf-8", errors="replace") - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. - - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} - url = self.server_url + "/v1/household/registration?access_token=" + \ - urllib.parse.quote(self.access_token) - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) + def _handle_json_response(self, response): + """Return parsed JSON or raise an exception on non-200 responses.""" if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) - - def residential_registry(self, offset, limit, search_criteria=None): - """ Handles the query to retrieve data from the residential registry. - - Uses offset and limit to break down the results of the query into chunks. + raise Exception(self._decode_error(response)) + return response.json() - Parameters - ---------- - :param offset: - The offset to start retrieving data from. - :param limit: - Defines the amount of data objects retrieved with each query. - :param search_criteria: - Defines the search criteria used to filter the query. - - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} - url = self.server_url + "/v1/residential/registry?access_token=" + \ - urllib.parse.quote(self.access_token) + \ - "&offset=" + urllib.parse.quote(str(offset)) + \ - "&limit=" + urllib.parse.quote(str(limit)) - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) + def _handle_content_response(self, response): + """Return raw response content or raise an exception on non-200 responses.""" if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) - - def residential_registry_markups(self, offset, limit, search_criteria=None): - """ Handles the query to retrieve data from the residential registry markups. + raise Exception(self._decode_error(response)) + return response.content - Uses offset and limit to break down the results of the query into chunks. + def _build_url(self, path, search_criteria=None, include_access_token=True): + """Build a full request URL. Parameters ---------- - :param offset: - The offset to start retrieving data from. - :param limit: - Defines the amount of data objects retrieved with each query. - :param search_criteria: - Defines the search criteria used to filter the query. + path : str + API path starting with '/'. + search_criteria : dict | None + Optional query criteria. + include_access_token : bool + Whether to append access_token in the query string. Returns ------- - :return: - The query result in JSON format + str + Fully formatted URL. """ if search_criteria is None: search_criteria = {} - url = self.server_url + "/v1/residential/registry/markups?access_token=" + \ - urllib.parse.quote(self.access_token) + \ - "&offset=" + urllib.parse.quote(str(offset)) + \ - "&limit=" + urllib.parse.quote(str(limit)) - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) - def residential_registry_valuations(self, offset, limit, search_criteria=None): - """ Handles the query to retrieve data from the residential registry valuations. - - Uses offset and limit to break down the results of the query into chunks. - - Parameters - ---------- - :param offset: - The offset to start retrieving data from. - :param limit: - Defines the amount of data objects retrieved with each query. - :param search_criteria: - Defines the search criteria used to filter the query. + if include_access_token: + url = f"{self.server_url}{path}?access_token={quote(self.access_token)}" + else: + url = f"{self.server_url}{path}" - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} - url = self.server_url + "/v1/residential/registry/valuations?access_token=" + \ - urllib.parse.quote(self.access_token) + \ - "&offset=" + urllib.parse.quote(str(offset)) + \ - "&limit=" + urllib.parse.quote(str(limit)) url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) + return url + + def _get_json(self, path, search_criteria=None, include_access_token=True): + """Perform a GET request and return JSON.""" + url = self._build_url( + path=path, + search_criteria=search_criteria, + include_access_token=include_access_token, + ) + response = self.session.get(url, verify=self._verify_ssl) + return self._handle_json_response(response) + + def _get_content(self, path, search_criteria=None, include_access_token=True): + """Perform a GET request and return raw bytes.""" + url = self._build_url( + path=path, + search_criteria=search_criteria, + include_access_token=include_access_token, + ) + response = self.session.get(url, verify=self._verify_ssl) + return self._handle_content_response(response) + + def _post_json_body(self, path, payload=None): + """Perform a POST request with JSON body and return JSON.""" + if payload is None: + payload = {} + + url = self._build_url(path=path, include_access_token=True) + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + } - def taxation_registry_units(self, offset, limit, search_criteria=None): - """ Handles the query to retrieve data from the taxation unit registry. + response = self.session.post( + url, + verify=self._verify_ssl, + headers=headers, + json=payload, + ) + return self._handle_json_response(response) - Uses offset and limit to break down the results of the query into chunks. + # ------------------------------------------------------------------------- + # Authentication + # ------------------------------------------------------------------------- - Parameters - ---------- - :param offset: - The offset to start retrieving data from. - :param limit: - Defines the amount of data objects retrieved with each query. - :param search_criteria: - Defines the search criteria used to filter the query. + def authenticate(self, username, password): + """Authenticate with username and password. - Returns - ------- - :return: - The query result in JSON format + Retrieves both access token and refresh token. """ - if search_criteria is None: - search_criteria = {} - url = self.server_url + "/v1/taxation/registry/units?access_token=" + \ - urllib.parse.quote(self.access_token) + \ - "&offset=" + urllib.parse.quote(str(offset)) + \ - "&limit=" + urllib.parse.quote(str(limit)) - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) + url = ( + f"{self.server_url}/oauth/token" + f"?client_id={self.__oauth2_client_name}" + f"&grant_type=password" + f"&username={quote(username)}" + f"&password={quote(password)}" + ) - def taxation_registry_buildings(self, offset, limit, search_criteria=None): - """ Handles the query to retrieve data from the taxation building registry. - - Uses offset and limit to break down the results of the query into chunks. - - Parameters - ---------- - :param offset: - The offset to start retrieving data from. - :param limit: - Defines the amount of data objects retrieved with each query. - :param search_criteria: - Defines the search criteria used to filter the query. - - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} - url = self.server_url + "/v1/taxation/registry/buildings?access_token=" + \ - urllib.parse.quote(self.access_token) + \ - "&offset=" + urllib.parse.quote(str(offset)) + \ - "&limit=" + urllib.parse.quote(str(limit)) - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) + response = self.session.post(url, verify=self._verify_ssl) - def taxation_registry_lands(self, offset, limit, search_criteria=None): - """ Handles the query to retrieve data from the taxation land registry. - - Uses offset and limit to break down the results of the query into chunks. - - Parameters - ---------- - :param offset: - The offset to start retrieving data from. - :param limit: - Defines the amount of data objects retrieved with each query. - :param search_criteria: - Defines the search criteria used to filter the query. - - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} - url = self.server_url + "/v1/taxation/registry/lands?access_token=" + \ - urllib.parse.quote(self.access_token) + \ - "&offset=" + urllib.parse.quote(str(offset)) + \ - "&limit=" + urllib.parse.quote(str(limit)) - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) + if response.status_code == 200: + response_json = json.loads(response.content.decode("utf-8")) + self.access_token = response_json["access_token"] + self.refresh_token = response_json["refresh_token"] + else: + raise Exception(self._decode_error(response)) - def valuation(self, search_criteria=None): - """ Handles the query to make valuations. + def renew_access_token(self): + """Renew the access token using the refresh token.""" + url = ( + f"{self.server_url}/oauth/token" + f"?client_id=api" + f"&grant_type=refresh_token" + f"&refresh_token={quote(self.refresh_token)}" + ) - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. + response = self.session.post(url, verify=self._verify_ssl) - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} - url = self.server_url + "/v1/valuation?access_token=" + \ - urllib.parse.quote(self.access_token) - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) - - def sales_reference(self, search_criteria=None): - """ Handles the query to get sales from reference. + if response.status_code == 200: + response_json = json.loads(response.content.decode("utf-8")) + self.access_token = response_json["access_token"] + self.refresh_token = response_json["refresh_token"] + else: + raise Exception(self._decode_error(response)) - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. + # ------------------------------------------------------------------------- + # User + # ------------------------------------------------------------------------- - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} - url = self.server_url + "/v1/sales/reference?access_token=" + \ - urllib.parse.quote(self.access_token) - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) + def user(self): + """Get the authenticated user.""" + return self._get_json("/v0/users/me") - def sales(self, search_criteria=None): - """ Handles the query to get sales. + def user_roles(self): + """Get roles for the authenticated user.""" + return self._get_json("/v0/users/me/roles") - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. + # ------------------------------------------------------------------------- + # Household + # ------------------------------------------------------------------------- - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} - url = self.server_url + "/v1/sales?access_token=" + \ - urllib.parse.quote(self.access_token) - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) + def household(self, search_criteria=None): + """Retrieve household data.""" + return self._get_json("/v1/household", search_criteria) - def ads(self, search_criteria=None): - """ Handles the query to get ads. + def household_registration(self, search_criteria=None): + """Retrieve household registration data.""" + return self._get_json("/v1/household/registration", search_criteria) - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. + # ------------------------------------------------------------------------- + # Residential registry + # ------------------------------------------------------------------------- - Returns - ------- - :return: - The query result in JSON format - """ + def residential_registry(self, offset, limit, search_criteria=None): + """Retrieve residential registry data.""" if search_criteria is None: search_criteria = {} - url = self.server_url + "/v1/ads?access_token=" + \ - urllib.parse.quote(self.access_token) - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) - - def ads_pdf(self, search_criteria=None): - """ Handles the query to get pdf snapshot of ad. - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. - - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} - url = self.server_url + "/v1/ads/pdf?access_token=" + \ - urllib.parse.quote(self.access_token) + url = ( + f"{self.server_url}/v1/residential/registry" + f"?access_token={quote(self.access_token)}" + f"&offset={quote(str(offset))}" + f"&limit={quote(str(limit))}" + ) url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return response.content - - def index_definitions(self, search_criteria=None): - """ Handles the query to get index definitions. - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. + response = self.session.get(url, verify=self._verify_ssl) + return self._handle_json_response(response) - Returns - ------- - :return: - The query result in JSON format - """ + def residential_registry_markups(self, offset, limit, search_criteria=None): + """Retrieve residential registry markups.""" if search_criteria is None: search_criteria = {} - url = self.server_url + "/v1/index/definitions?access_token=" + \ - urllib.parse.quote(self.access_token) - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) - - def index_definitions_complete(self, search_criteria=None): - """ Handles the query to get index definitions complete. - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. - - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} - url = self.server_url + "/v1/index/definitions/complete?access_token=" + \ - urllib.parse.quote(self.access_token) + url = ( + f"{self.server_url}/v1/residential/registry/markups" + f"?access_token={quote(self.access_token)}" + f"&offset={quote(str(offset))}" + f"&limit={quote(str(limit))}" + ) url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) - - def index_normalized(self, search_criteria=None): - """ Handles the query to get index normalized. - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. + response = self.session.get(url, verify=self._verify_ssl) + return self._handle_json_response(response) - Returns - ------- - :return: - The query result in JSON format - """ + def residential_registry_valuations(self, offset, limit, search_criteria=None): + """Retrieve residential registry valuations.""" if search_criteria is None: search_criteria = {} - url = self.server_url + "/v1/index/normalized?access_token=" + \ - urllib.parse.quote(self.access_token) - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) - - def index_best(self, search_criteria=None): - """ Handles the query to get best index. - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. - - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} - url = self.server_url + "/v1/index/best?access_token=" + \ - urllib.parse.quote(self.access_token) + url = ( + f"{self.server_url}/v1/residential/registry/valuations" + f"?access_token={quote(self.access_token)}" + f"&offset={quote(str(offset))}" + f"&limit={quote(str(limit))}" + ) url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) - def index_recount(self, search_criteria=None): - """ Handles the query to get an index recounted value. + response = self.session.get(url, verify=self._verify_ssl) + return self._handle_json_response(response) - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. + # ------------------------------------------------------------------------- + # Taxation registry + # ------------------------------------------------------------------------- - Returns - ------- - :return: - The query result in JSON format - """ + def taxation_registry_units(self, offset, limit, search_criteria=None): + """Retrieve taxation unit registry data.""" if search_criteria is None: search_criteria = {} - url = self.server_url + "/v1/index/recount?access_token=" + \ - urllib.parse.quote(self.access_token) - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) - def index_publishing_calendar(self, search_criteria=None): - """ Handles the query to get the index publishing calendar. - - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. - - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} - url = self.server_url + "/v1/index/publishing/calendar" + url = ( + f"{self.server_url}/v1/taxation/registry/units" + f"?access_token={quote(self.access_token)}" + f"&offset={quote(str(offset))}" + f"&limit={quote(str(limit))}" + ) url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) - - def index_statistics(self, search_criteria=None, public=False): - """ Handles the query to get the index publishing calendar. - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. + response = self.session.get(url, verify=self._verify_ssl) + return self._handle_json_response(response) - Returns - ------- - :return: - The query result in JSON format - """ + def taxation_registry_buildings(self, offset, limit, search_criteria=None): + """Retrieve taxation building registry data.""" if search_criteria is None: search_criteria = {} - - if public: - url = self.server_url + "/v1/index/statistics/public?" - else: - url = self.server_url + "/v1/index/statistics?access_token=" + \ - urllib.parse.quote(self.access_token) - - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) - - def index_volume(self, search_criteria=None): - """ Handles the query to get the index area volume. - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. - - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} - url = self.server_url + "/v1/index/volume?access_token=" + \ - urllib.parse.quote(self.access_token) + url = ( + f"{self.server_url}/v1/taxation/registry/buildings" + f"?access_token={quote(self.access_token)}" + f"&offset={quote(str(offset))}" + f"&limit={quote(str(limit))}" + ) url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) - def area(self, search_criteria=None): - """ Handles the query to get the areas. + response = self.session.get(url, verify=self._verify_ssl) + return self._handle_json_response(response) - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. - - Returns - ------- - :return: - The query result in JSON format - """ + def taxation_registry_lands(self, offset, limit, search_criteria=None): + """Retrieve taxation land registry data.""" if search_criteria is None: search_criteria = {} - url = self.server_url + "/v1/area?access_token=" + \ - urllib.parse.quote(self.access_token) - + url = ( + f"{self.server_url}/v1/taxation/registry/lands" + f"?access_token={quote(self.access_token)}" + f"&offset={quote(str(offset))}" + f"&limit={quote(str(limit))}" + ) url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) - - def area_category(self, search_criteria=None): - """ Handles the query to get the area category. - - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} + response = self.session.get(url, verify=self._verify_ssl) + return self._handle_json_response(response) - url = self.server_url + "/v1/area/category?access_token=" + \ - urllib.parse.quote(self.access_token) + # ------------------------------------------------------------------------- + # Valuation / sales / ads + # ------------------------------------------------------------------------- - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) - - def area_information(self, search_criteria=None): - """ Handles the query to get the area information. + def valuation(self, search_criteria=None): + """Make a valuation query.""" + return self._get_json("/v1/valuation", search_criteria) - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. + def sales_reference(self, search_criteria=None): + """Get sales references.""" + return self._post_json_body("/v1/sales/reference", search_criteria) - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} + def sales(self, search_criteria=None): + """Get sales.""" + return self._get_json("/v1/sales", search_criteria) - url = self.server_url + "/v1/area/information?access_token=" + \ - urllib.parse.quote(self.access_token) + def ads(self, search_criteria=None): + """Get ads.""" + return self._get_json("/v1/ads", search_criteria) - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) + def ads_pdf(self, search_criteria=None): + """Get PDF snapshot of an ad.""" + return self._get_content("/v1/ads/pdf", search_criteria) - def area_information_dates(self, search_criteria=None): - """ Handles the query to get the area information. + # ------------------------------------------------------------------------- + # Index + # ------------------------------------------------------------------------- - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. + def index_definitions(self, search_criteria=None): + """Get index definitions.""" + return self._get_json("/v1/index/definitions", search_criteria) - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} + def index_definitions_complete(self, search_criteria=None): + """Get complete index definitions.""" + return self._get_json("/v1/index/definitions/complete", search_criteria) - url = self.server_url + "/v1/area/information/dates?access_token=" + \ - urllib.parse.quote(self.access_token) + def index_normalized(self, search_criteria=None): + """Get normalized index.""" + return self._get_json("/v1/index/normalized", search_criteria) - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) + def index_best(self, search_criteria=None): + """Get best index.""" + return self._get_json("/v1/index/best", search_criteria) - def area_information_field(self, search_criteria=None): - """ Handles the query to get the area information fields. + def index_recount(self, search_criteria=None): + """Get recounted index value.""" + return self._get_json("/v1/index/recount", search_criteria) - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. + def index_publishing_calendar(self, search_criteria=None): + """Get index publishing calendar. - Returns - ------- - :return: - The query result in JSON format + This endpoint is public and does not use access_token. """ - if search_criteria is None: - search_criteria = {} + return self._get_json( + "/v1/index/publishing/calendar", + search_criteria=search_criteria, + include_access_token=False, + ) - url = self.server_url + "/v1/area/information/field?access_token=" + \ - urllib.parse.quote(self.access_token) - - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) - - def area_information_tag(self, search_criteria=None): - """ Handles the query to get the area information tags. + def index_statistics(self, search_criteria=None, public=False): + """Get index statistics. Parameters ---------- - :param search_criteria: - Defines the search criteria used to filter the query. - - Returns - ------- - :return: - The query result in JSON format + public : bool + If True, use the public endpoint without access token. """ - if search_criteria is None: - search_criteria = {} - - url = self.server_url + "/v1/area/information/tag?access_token=" + \ - urllib.parse.quote(self.access_token) + if public: + url = f"{self.server_url}/v1/index/statistics/public?" + if search_criteria is None: + search_criteria = {} + url += _generate_request_search_criteria(search_criteria.items()) + response = self.session.get(url, verify=self._verify_ssl) + return self._handle_json_response(response) - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) + return self._get_json("/v1/index/statistics", search_criteria) - def area_polygon(self, search_criteria=None): - """ Handles the query to get the area polygons. + def index_volume(self, search_criteria=None): + """Get index area volume.""" + return self._get_json("/v1/index/volume", search_criteria) - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. + # ------------------------------------------------------------------------- + # Area + # ------------------------------------------------------------------------- - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} + def area(self, search_criteria=None): + """Get areas.""" + return self._get_json("/v1/area", search_criteria) - url = self.server_url + "/v1/area/polygon?access_token=" + \ - urllib.parse.quote(self.access_token) + def area_category(self, search_criteria=None): + """Get area categories.""" + return self._get_json("/v1/area/category", search_criteria) - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) + def area_information(self, search_criteria=None): + """Get area information.""" + return self._get_json("/v1/area/information", search_criteria) def area_information_dates(self, search_criteria=None): - """ Handles the query to get the area information dates. + """Get area information dates.""" + return self._get_json("/v1/area/information/dates", search_criteria) - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. - - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} + def area_information_field(self, search_criteria=None): + """Get area information fields.""" + return self._get_json("/v1/area/information/field", search_criteria) - url = self.server_url + "/v1/area/information/dates?access_token=" + \ - urllib.parse.quote(self.access_token) + def area_information_tag(self, search_criteria=None): + """Get area information tags.""" + return self._get_json("/v1/area/information/tag", search_criteria) - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) + def area_polygon(self, search_criteria=None): + """Get area polygons.""" + return self._get_json("/v1/area/polygon", search_criteria) + # ------------------------------------------------------------------------- + # Market share / references + # ------------------------------------------------------------------------- def sales_market_share(self, search_criteria=None): - """ Handles the query to get the sales market share. + """Get sales market share.""" + return self._post_json_body("/v1/sales/market-share", search_criteria) + + def ads_reference(self, search_criteria=None): + """Get ads references.""" + return self._post_json_body("/v1/ads/reference", search_criteria) + + def ads_market_share(self, search_criteria=None): + """Get ads market share.""" + return self._post_json_body("/v1/ads/market-share", search_criteria) + + # ------------------------------------------------------------------------- + # Housing association + # ------------------------------------------------------------------------- + + def housing_association(self, org_number): + """Get information about a housing association.""" + url = ( + f"{self.server_url}/v1/housing-association/{quote(org_number)}" + f"?access_token={quote(self.access_token)}" + ) + + response = self.session.get(url, verify=self._verify_ssl) + return self._handle_json_response(response) + + def housing_association_report(self, housing_association_id, year): + """Get the annual report PDF for a housing association.""" + url = ( + f"{self.server_url}/v1/housing-association/report" + f"?housing_association_id={quote(str(housing_association_id))}" + f"&year={quote(str(year))}" + f"&access_token={quote(self.access_token)}" + ) + + response = self.session.get(url, verify=self._verify_ssl) + return self._handle_content_response(response) + + # ------------------------------------------------------------------------- + # Geocode + # ------------------------------------------------------------------------- + + def geocode(self, query, limit=5, match_type="address", exact_match=False): + """Geocode an address or place.""" + url = ( + f"{self.server_url}/v1/geocode" + f"?access_token={quote(self.access_token)}" + f"&query={quote(query)}" + f"&limit={quote(str(limit))}" + f"&match_type={quote(match_type)}" + f"&exact_match={quote(str(exact_match).lower())}" + ) + + response = self.session.get(url, verify=self._verify_ssl) + return self._handle_json_response(response) + + # ------------------------------------------------------------------------- + # Projects + # ------------------------------------------------------------------------- + + def projects_new(self): + """Fetch default data required to create a new project.""" + return self._get_json("/v1/projects/new") + + def projects(self, limit=10): + """Fetch projects for the authenticated user.""" + url = ( + f"{self.server_url}/v1/projects" + f"?access_token={quote(self.access_token)}" + f"&limit={quote(str(limit))}" + ) + + response = self.session.get(url, verify=self._verify_ssl) + return self._handle_json_response(response) + + def project(self, project_id): + """Fetch a specific project.""" + url = ( + f"{self.server_url}/v1/projects/{quote(str(project_id))}" + f"?access_token={quote(self.access_token)}" + ) + + response = self.session.get(url, verify=self._verify_ssl) + return self._handle_json_response(response) + + def create_project(self, project_data): + """Create a new project.""" + return self._post_json_body("/v1/projects", project_data) + + def delete_project(self, project_id): + """Delete a project.""" + url = ( + f"{self.server_url}/v1/projects/{quote(str(project_id))}" + f"?access_token={quote(self.access_token)}" + ) + + response = self.session.delete(url, verify=self._verify_ssl) + return self._handle_json_response(response) + + def update_project_name(self, project_id, name): + """Update a project's name.""" + url = ( + f"{self.server_url}/v1/projects/{quote(str(project_id))}/name" + f"?access_token={quote(self.access_token)}" + ) - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. - - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} - - url = self.server_url + "/v1/sales/market-share?access_token=" + \ - urllib.parse.quote(self.access_token) - session = requests.Session() headers = { - 'Content-Type': 'application/json' + "Content-Type": "application/json", + "Accept": "application/json", } - response = session.post(url, verify=self._verify_ssl, headers=headers, data=json.dumps(search_criteria)) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) + + response = self.session.post( + url, + verify=self._verify_ssl, + headers=headers, + json={"name": name}, + ) + return self._handle_json_response(response) \ No newline at end of file