From 3df58bdced325de5833ed1957dcef863eecdcf84 Mon Sep 17 00:00:00 2001 From: robert Date: Mon, 9 Mar 2026 14:15:37 +0100 Subject: [PATCH 1/2] new endpoints. Also fix old endpoints for vg-api 2.0 --- valueguard/valueguard.py | 464 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 454 insertions(+), 10 deletions(-) diff --git a/valueguard/valueguard.py b/valueguard/valueguard.py index 24dd302..db2d7c2 100644 --- a/valueguard/valueguard.py +++ b/valueguard/valueguard.py @@ -415,7 +415,7 @@ def valuation(self, search_criteria=None): return json.loads(response.content.decode("utf-8")) def sales_reference(self, search_criteria=None): - """ Handles the query to get sales from reference. + """Handles the query to get sales references. Parameters ---------- @@ -429,16 +429,27 @@ def sales_reference(self, search_criteria=None): """ if search_criteria is None: search_criteria = {} + url = self.server_url + "/v1/sales/reference?access_token=" + \ urllib.parse.quote(self.access_token) - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) + session = requests.Session() - response = session.get(url, verify=self._verify_ssl) + headers = { + "Content-Type": "application/json", + "Accept": "application/json" + } + + response = session.post( + url, + verify=self._verify_ssl, + headers=headers, + json=search_criteria + ) + if response.status_code != 200: - # print(response.content.decode("utf-8")) raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) + + return response.json() def sales(self, search_criteria=None): """ Handles the query to get sales. @@ -973,12 +984,445 @@ def sales_market_share(self, search_criteria=None): url = self.server_url + "/v1/sales/market-share?access_token=" + \ urllib.parse.quote(self.access_token) + session = requests.Session() headers = { - 'Content-Type': 'application/json' + "Content-Type": "application/json", + "Accept": "application/json" } - response = session.post(url, verify=self._verify_ssl, headers=headers, data=json.dumps(search_criteria)) + + response = session.post( + url, + verify=self._verify_ssl, + headers=headers, + json=search_criteria + ) + if response.status_code != 200: - # print(response.content.decode("utf-8")) raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) + + return response.json() + + def ads_reference(self, search_criteria=None): + """Handles the query to get ads references. + + Parameters + ---------- + :param search_criteria: + Defines the search criteria used to filter the query. + + Returns + ------- + :return: + The query result in JSON format + """ + + if search_criteria is None: + search_criteria = {} + + url = self.server_url + "/v1/ads/reference?access_token=" + \ + urllib.parse.quote(self.access_token) + + session = requests.Session() + headers = { + "Content-Type": "application/json", + "Accept": "application/json" + } + + response = session.post( + url, + verify=self._verify_ssl, + headers=headers, + json=search_criteria + ) + + if response.status_code != 200: + raise Exception(response.content.decode("utf-8")) + + return response.json() + + def ads_market_share(self, search_criteria=None): + """ Handles the query to get the ads market share. + + Parameters + ---------- + :param search_criteria: + Defines the search criteria used to filter the query. + + Returns + ------- + :return: + The query result in JSON format + """ + + if search_criteria is None: + search_criteria = {} + + url = self.server_url + "/v1/ads/market-share?access_token=" + \ + urllib.parse.quote(self.access_token) + + session = requests.Session() + headers = { + "Content-Type": "application/json", + "Accept": "application/json" + } + + response = session.post( + url, + verify=self._verify_ssl, + headers=headers, + json=search_criteria + ) + + if response.status_code != 200: + raise Exception(response.content.decode("utf-8")) + + return response.json() + + def housing_association(self, org_number): + """Handles the query to get information about a housing association. + + Parameters + ---------- + :param org-nr: + Organization number used to filter the query. + + Returns + ------- + :return: + The query result in JSON format + """ + + + url = ( + self.server_url + + "/v1/housing-association/" + + urllib.parse.quote(org_number) + + "?access_token=" + + urllib.parse.quote(self.access_token) + ) + + session = requests.Session() + + response = session.get( + url, + verify=self._verify_ssl + ) + + if response.status_code != 200: + raise Exception(response.content.decode("utf-8")) + + return response.json() + + def housing_association_report(self, housing_association_id, year): + """Handles the query to get the annual report for a housing association. + + Parameters + ---------- + :param org-nr: + Organization number used to filter the query. + : param year + Year for the report + + Returns + ------- + :return: + The PDF file content as bytes. + """ + + url = ( + self.server_url + + "/v1/housing-association/report" + + "?housing_association_id=" + + urllib.parse.quote(str(housing_association_id)) + + "&year=" + + urllib.parse.quote(str(year)) + + "&access_token=" + + urllib.parse.quote(self.access_token) + ) + + session = requests.Session() + + response = session.get( + url, + verify=self._verify_ssl + ) + + if response.status_code != 200: + raise Exception(response.content.decode("utf-8", errors="replace")) + + return response.content + + def geocode(self, query, limit=5, match_type="address", exact_match=False): + """Handles the query to geocode an address or place. + + Parameters + ---------- + query : str + The search string (e.g. street name, address). + + limit : int, optional + Maximum number of results to return (default 5, max 10). + + match_type : str, optional + Type of match to perform. Default is "address". + + exact_match : bool, optional + Whether the match should be exact. Default False. + + Returns + ------- + dict + The API response returned as JSON containing geocode results. + + """ + + url = ( + self.server_url + + "/v1/geocode" + + "?access_token=" + + urllib.parse.quote(self.access_token) + + "&query=" + + urllib.parse.quote(query) + + "&limit=" + + urllib.parse.quote(str(limit)) + + "&match_type=" + + urllib.parse.quote(match_type) + + "&exact_match=" + + urllib.parse.quote(str(exact_match).lower()) + ) + + session = requests.Session() + + response = session.get( + url, + verify=self._verify_ssl + ) + + if response.status_code != 200: + raise Exception(response.content.decode("utf-8")) + + return response.json() + + def projects_new(self): + """Handles the query to fetch data required to create a new project. + + Returns + ------- + dict + The API response returned as JSON containing default project data. + """ + + url = ( + self.server_url + + "/v1/projects/new" + + "?access_token=" + + urllib.parse.quote(self.access_token) + ) + + session = requests.Session() + + response = session.get( + url, + verify=self._verify_ssl + ) + + if response.status_code != 200: + raise Exception(response.content.decode("utf-8")) + + return response.json() + + def projects(self, limit=10): + """Handles the query to fetch projects for the authenticated user. + + Parameters + ---------- + limit : int, optional + Maximum number of projects to return. + + Returns + ------- + dict + The API response returned as JSON containing the projects. + + """ + + url = ( + self.server_url + + "/v1/projects" + + "?access_token=" + + urllib.parse.quote(self.access_token) + + "&limit=" + + urllib.parse.quote(str(limit)) + ) + + session = requests.Session() + + response = session.get( + url, + verify=self._verify_ssl + ) + + if response.status_code != 200: + raise Exception(response.content.decode("utf-8")) + + return response.json() + + + def project(self, project_id): + """Handles the query to fetch a specific project. + + Parameters + ---------- + project_id : str + The UUID of the project. + + Returns + ------- + dict + The API response returned as JSON containing the project data. + + """ + + url = ( + self.server_url + + "/v1/projects/" + + urllib.parse.quote(str(project_id)) + + "?access_token=" + + urllib.parse.quote(self.access_token) + ) + + session = requests.Session() + + response = session.get( + url, + verify=self._verify_ssl + ) + + if response.status_code != 200: + raise Exception(response.content.decode("utf-8")) + + return response.json() + + def create_project(self, project_data): + """Handles the request to create a new project. + + Parameters + ---------- + project_data : dict + Dictionary containing the full project configuration + including search criteria and selected fields. + + Returns + ------- + dict + The API response returned as JSON containing the created project. + + """ + + url = ( + self.server_url + + "/v1/projects" + + "?access_token=" + + urllib.parse.quote(self.access_token) + ) + + session = requests.Session() + + headers = { + "Content-Type": "application/json", + "Accept": "application/json" + } + + response = session.post( + url, + verify=self._verify_ssl, + headers=headers, + json=project_data + ) + + if response.status_code != 200: + raise Exception(response.content.decode("utf-8")) + + return response.json() + + def delete_project(self, project_id): + """Handles the request to delete a project. + + Parameters + ---------- + project_id : str + The UUID of the project to delete. + + Returns + ------- + dict + The API response returned as JSON. + + """ + + url = ( + self.server_url + + "/v1/projects/" + + urllib.parse.quote(str(project_id)) + + "?access_token=" + + urllib.parse.quote(self.access_token) + ) + + session = requests.Session() + + response = session.delete( + url, + verify=self._verify_ssl + ) + + if response.status_code != 200: + raise Exception(response.content.decode("utf-8")) + + return response.json() + + def update_project_name(self, project_id, name): + """Handles the request to update the name of a project. + + Parameters + ---------- + project_id : str + The UUID of the project. + + name : str + The new name for the project. + + Returns + ------- + dict + The API response returned as JSON. + + """ + + url = ( + self.server_url + + "/v1/projects/" + + urllib.parse.quote(str(project_id)) + + "/name" + + "?access_token=" + + urllib.parse.quote(self.access_token) + ) + + session = requests.Session() + + headers = { + "Content-Type": "application/json", + "Accept": "application/json" + } + + response = session.post( + url, + verify=self._verify_ssl, + headers=headers, + json={"name": name} + ) + + if response.status_code != 200: + raise Exception(response.content.decode("utf-8")) + + return response.json() \ No newline at end of file From 5ac1c15d1961fecb5009e385fd1daea2511ef13c Mon Sep 17 00:00:00 2001 From: robert Date: Tue, 10 Mar 2026 08:45:17 +0100 Subject: [PATCH 2/2] cleanup. server_url to api-prod.valueguard.se --- valueguard/valueguard.py | 1660 +++++++++----------------------------- 1 file changed, 387 insertions(+), 1273 deletions(-) diff --git a/valueguard/valueguard.py b/valueguard/valueguard.py index db2d7c2..a646138 100644 --- a/valueguard/valueguard.py +++ b/valueguard/valueguard.py @@ -1,1428 +1,542 @@ import requests import json import urllib.parse +from urllib.parse import quote def _generate_request_search_criteria(search_criteria): + """Build a query-string fragment from search criteria. + + Parameters + ---------- + search_criteria : iterable + Typically dict.items() containing key/value pairs. + + Returns + ------- + str + A string starting with '&' for each parameter, matching the old format. + """ url = "" for key, value in search_criteria: value = _change_array_to_string(value) - url += "&" + urllib.parse.quote(key) + "=" + urllib.parse.quote(str(value)) + url += f"&{quote(str(key))}={quote(str(value))}" return url def _change_array_to_string(value): - """ Handles the check to see if the parameter is an array and then converts it to a string + """Convert list values to a comma-separated string. - Parameters - ---------- - :param value: - The possible array or list we are going to convert to a string. - - Returns - ------- - :return: - A string. Either converted from an array or as it is. + This preserves the old behavior where list query parameters are sent as a + single comma-separated value. """ if isinstance(value, list): - parameters_string = ','.join(value) - return parameters_string + return ",".join(value) return value -class Client: - # Static - # server_url = "http://localhost:8080" - server_url = "https://api.valueguard.se" - __oauth2_client_name = "api" - _verify_ssl=True - - # User settings; - access_token = "" - refresh_token = "" - - def __init__(self): - pass - - def authenticate(self, username, password): - """ Uses user's credentials to authenticate. - - Generates the url to authenticate and request the access token as well - as the refresh token from the server. - - Parameters - ---------- - :param username: - Username used to authenticate - :param password: - Password used to authenticates - - Raises - ------ - :exception - Exception raised when the server response is invalid - """ - url = self.server_url + "/oauth/token?client_id=" + self.__oauth2_client_name + "&grant_type=password" \ - "&username=" + \ - urllib.parse.quote(username) + \ - "&password=" + \ - urllib.parse.quote(password) - # print(url) - response = requests.post(url, verify=self._verify_ssl) - - if response.status_code == 200: - response_json = json.loads(response.content.decode("utf-8")) - self.access_token = response_json['access_token'] - self.refresh_token = response_json['refresh_token'] - # print(response_json) - elif response.status_code != 200: - # print(response.status_code) - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - - def renew_access_token(self): - """ Handles the renewal of the access token. - - """ - url = self.server_url + "/oauth/token?client_id=api&grant_type=refresh_token&refresh_token=" + \ - urllib.parse.quote(self.refresh_token) - print(url) - response = requests.post(url, verify=self._verify_ssl) - if response.status_code == 200: - response_json = json.loads(response.content.decode("utf-8")) - self.access_token = response_json['access_token'] - self.refresh_token = response_json['refresh_token'] - #print(response_json) - else: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - - - def user(self): - """ Handles the query to get the user. - - Returns - ------- - :return: - The query result in JSON format - """ - url = self.server_url + "/v0/users/me?access_token=" + \ - urllib.parse.quote(self.access_token) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) - - def user_roles(self): - """ Handles the query to get users roles. - - Returns - ------- - :return: - The query result in JSON format - """ - url = self.server_url + "/v0/users/me/roles?access_token=" + \ - urllib.parse.quote(self.access_token) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) - - def household(self, search_criteria=None): - """ Handles the query to retrieve data about a household. - - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. - - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} - url = self.server_url + "/v1/household?access_token=" + \ - urllib.parse.quote(self.access_token) - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) - - def household_registration(self, search_criteria=None): - """ Handles the query to retrieve data about a household registration. - - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. - - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} - url = self.server_url + "/v1/household/registration?access_token=" + \ - urllib.parse.quote(self.access_token) - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) - - def residential_registry(self, offset, limit, search_criteria=None): - """ Handles the query to retrieve data from the residential registry. - - Uses offset and limit to break down the results of the query into chunks. - - Parameters - ---------- - :param offset: - The offset to start retrieving data from. - :param limit: - Defines the amount of data objects retrieved with each query. - :param search_criteria: - Defines the search criteria used to filter the query. - - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} - url = self.server_url + "/v1/residential/registry?access_token=" + \ - urllib.parse.quote(self.access_token) + \ - "&offset=" + urllib.parse.quote(str(offset)) + \ - "&limit=" + urllib.parse.quote(str(limit)) - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) - - def residential_registry_markups(self, offset, limit, search_criteria=None): - """ Handles the query to retrieve data from the residential registry markups. - - Uses offset and limit to break down the results of the query into chunks. - - Parameters - ---------- - :param offset: - The offset to start retrieving data from. - :param limit: - Defines the amount of data objects retrieved with each query. - :param search_criteria: - Defines the search criteria used to filter the query. - - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} - url = self.server_url + "/v1/residential/registry/markups?access_token=" + \ - urllib.parse.quote(self.access_token) + \ - "&offset=" + urllib.parse.quote(str(offset)) + \ - "&limit=" + urllib.parse.quote(str(limit)) - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) - - def residential_registry_valuations(self, offset, limit, search_criteria=None): - """ Handles the query to retrieve data from the residential registry valuations. - - Uses offset and limit to break down the results of the query into chunks. - - Parameters - ---------- - :param offset: - The offset to start retrieving data from. - :param limit: - Defines the amount of data objects retrieved with each query. - :param search_criteria: - Defines the search criteria used to filter the query. - - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} - url = self.server_url + "/v1/residential/registry/valuations?access_token=" + \ - urllib.parse.quote(self.access_token) + \ - "&offset=" + urllib.parse.quote(str(offset)) + \ - "&limit=" + urllib.parse.quote(str(limit)) - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) - - def taxation_registry_units(self, offset, limit, search_criteria=None): - """ Handles the query to retrieve data from the taxation unit registry. - - Uses offset and limit to break down the results of the query into chunks. - - Parameters - ---------- - :param offset: - The offset to start retrieving data from. - :param limit: - Defines the amount of data objects retrieved with each query. - :param search_criteria: - Defines the search criteria used to filter the query. - - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} - url = self.server_url + "/v1/taxation/registry/units?access_token=" + \ - urllib.parse.quote(self.access_token) + \ - "&offset=" + urllib.parse.quote(str(offset)) + \ - "&limit=" + urllib.parse.quote(str(limit)) - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) - - def taxation_registry_buildings(self, offset, limit, search_criteria=None): - """ Handles the query to retrieve data from the taxation building registry. - - Uses offset and limit to break down the results of the query into chunks. - - Parameters - ---------- - :param offset: - The offset to start retrieving data from. - :param limit: - Defines the amount of data objects retrieved with each query. - :param search_criteria: - Defines the search criteria used to filter the query. - - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} - url = self.server_url + "/v1/taxation/registry/buildings?access_token=" + \ - urllib.parse.quote(self.access_token) + \ - "&offset=" + urllib.parse.quote(str(offset)) + \ - "&limit=" + urllib.parse.quote(str(limit)) - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) - - def taxation_registry_lands(self, offset, limit, search_criteria=None): - """ Handles the query to retrieve data from the taxation land registry. - - Uses offset and limit to break down the results of the query into chunks. - - Parameters - ---------- - :param offset: - The offset to start retrieving data from. - :param limit: - Defines the amount of data objects retrieved with each query. - :param search_criteria: - Defines the search criteria used to filter the query. - - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} - url = self.server_url + "/v1/taxation/registry/lands?access_token=" + \ - urllib.parse.quote(self.access_token) + \ - "&offset=" + urllib.parse.quote(str(offset)) + \ - "&limit=" + urllib.parse.quote(str(limit)) - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) - - def valuation(self, search_criteria=None): - """ Handles the query to make valuations. - - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. - - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} - url = self.server_url + "/v1/valuation?access_token=" + \ - urllib.parse.quote(self.access_token) - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) - - def sales_reference(self, search_criteria=None): - """Handles the query to get sales references. - - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. - - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} - - url = self.server_url + "/v1/sales/reference?access_token=" + \ - urllib.parse.quote(self.access_token) - - session = requests.Session() - headers = { - "Content-Type": "application/json", - "Accept": "application/json" - } - - response = session.post( - url, - verify=self._verify_ssl, - headers=headers, - json=search_criteria - ) - - if response.status_code != 200: - raise Exception(response.content.decode("utf-8")) - - return response.json() - - def sales(self, search_criteria=None): - """ Handles the query to get sales. - - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. - - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} - url = self.server_url + "/v1/sales?access_token=" + \ - urllib.parse.quote(self.access_token) - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) - - def ads(self, search_criteria=None): - """ Handles the query to get ads. - - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. - - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} - url = self.server_url + "/v1/ads?access_token=" + \ - urllib.parse.quote(self.access_token) - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) - - def ads_pdf(self, search_criteria=None): - """ Handles the query to get pdf snapshot of ad. - - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. - - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} - url = self.server_url + "/v1/ads/pdf?access_token=" + \ - urllib.parse.quote(self.access_token) - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return response.content - - def index_definitions(self, search_criteria=None): - """ Handles the query to get index definitions. - - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. - - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} - url = self.server_url + "/v1/index/definitions?access_token=" + \ - urllib.parse.quote(self.access_token) - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) - - def index_definitions_complete(self, search_criteria=None): - """ Handles the query to get index definitions complete. - - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. - - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} - url = self.server_url + "/v1/index/definitions/complete?access_token=" + \ - urllib.parse.quote(self.access_token) - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) - - def index_normalized(self, search_criteria=None): - """ Handles the query to get index normalized. - - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. - - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} - url = self.server_url + "/v1/index/normalized?access_token=" + \ - urllib.parse.quote(self.access_token) - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) - - def index_best(self, search_criteria=None): - """ Handles the query to get best index. - - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. - - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} - url = self.server_url + "/v1/index/best?access_token=" + \ - urllib.parse.quote(self.access_token) - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) - - def index_recount(self, search_criteria=None): - """ Handles the query to get an index recounted value. - - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. - - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} - url = self.server_url + "/v1/index/recount?access_token=" + \ - urllib.parse.quote(self.access_token) - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) - - def index_publishing_calendar(self, search_criteria=None): - """ Handles the query to get the index publishing calendar. - - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. - - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} - url = self.server_url + "/v1/index/publishing/calendar" - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) - - def index_statistics(self, search_criteria=None, public=False): - """ Handles the query to get the index publishing calendar. - - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. - - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} - - if public: - url = self.server_url + "/v1/index/statistics/public?" - else: - url = self.server_url + "/v1/index/statistics?access_token=" + \ - urllib.parse.quote(self.access_token) - - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) - - def index_volume(self, search_criteria=None): - """ Handles the query to get the index area volume. - - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. - - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} - url = self.server_url + "/v1/index/volume?access_token=" + \ - urllib.parse.quote(self.access_token) - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) - - def area(self, search_criteria=None): - """ Handles the query to get the areas. - - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. - - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} - - url = self.server_url + "/v1/area?access_token=" + \ - urllib.parse.quote(self.access_token) - - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) - - def area_category(self, search_criteria=None): - """ Handles the query to get the area category. - - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. - - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} - - url = self.server_url + "/v1/area/category?access_token=" + \ - urllib.parse.quote(self.access_token) - - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) - - def area_information(self, search_criteria=None): - """ Handles the query to get the area information. - - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. - - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} - - url = self.server_url + "/v1/area/information?access_token=" + \ - urllib.parse.quote(self.access_token) - - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) - - def area_information_dates(self, search_criteria=None): - """ Handles the query to get the area information. - - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. - - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} - - url = self.server_url + "/v1/area/information/dates?access_token=" + \ - urllib.parse.quote(self.access_token) - - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) - - def area_information_field(self, search_criteria=None): - """ Handles the query to get the area information fields. - - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. - - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} - - url = self.server_url + "/v1/area/information/field?access_token=" + \ - urllib.parse.quote(self.access_token) - - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) - - def area_information_tag(self, search_criteria=None): - """ Handles the query to get the area information tags. - - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. - - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} - - url = self.server_url + "/v1/area/information/tag?access_token=" + \ - urllib.parse.quote(self.access_token) - - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) - - def area_polygon(self, search_criteria=None): - """ Handles the query to get the area polygons. - - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. - - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} - - url = self.server_url + "/v1/area/polygon?access_token=" + \ - urllib.parse.quote(self.access_token) - - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) - if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) +class Client: + # Default configuration + server_url = "https://api-prod.valueguard.se" + __oauth2_client_name = "api" + _verify_ssl = True - def area_information_dates(self, search_criteria=None): - """ Handles the query to get the area information dates. + # Auth state + access_token = "" + refresh_token = "" - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. + def __init__(self): + self.session = requests.Session() - Returns - ------- - :return: - The query result in JSON format - """ - if search_criteria is None: - search_criteria = {} + # ------------------------------------------------------------------------- + # Internal helpers + # ------------------------------------------------------------------------- - url = self.server_url + "/v1/area/information/dates?access_token=" + \ - urllib.parse.quote(self.access_token) + def _decode_error(self, response): + """Decode an error response safely.""" + return response.content.decode("utf-8", errors="replace") - url += _generate_request_search_criteria(search_criteria.items()) - # print(url) - session = requests.Session() - response = session.get(url, verify=self._verify_ssl) + def _handle_json_response(self, response): + """Return parsed JSON or raise an exception on non-200 responses.""" if response.status_code != 200: - # print(response.content.decode("utf-8")) - raise Exception(response.content.decode("utf-8")) - return json.loads(response.content.decode("utf-8")) + raise Exception(self._decode_error(response)) + return response.json() + def _handle_content_response(self, response): + """Return raw response content or raise an exception on non-200 responses.""" + if response.status_code != 200: + raise Exception(self._decode_error(response)) + return response.content - def sales_market_share(self, search_criteria=None): - """ Handles the query to get the sales market share. + def _build_url(self, path, search_criteria=None, include_access_token=True): + """Build a full request URL. Parameters ---------- - :param search_criteria: - Defines the search criteria used to filter the query. + path : str + API path starting with '/'. + search_criteria : dict | None + Optional query criteria. + include_access_token : bool + Whether to append access_token in the query string. Returns ------- - :return: - The query result in JSON format + str + Fully formatted URL. """ if search_criteria is None: search_criteria = {} - url = self.server_url + "/v1/sales/market-share?access_token=" + \ - urllib.parse.quote(self.access_token) + if include_access_token: + url = f"{self.server_url}{path}?access_token={quote(self.access_token)}" + else: + url = f"{self.server_url}{path}" + + url += _generate_request_search_criteria(search_criteria.items()) + return url + + def _get_json(self, path, search_criteria=None, include_access_token=True): + """Perform a GET request and return JSON.""" + url = self._build_url( + path=path, + search_criteria=search_criteria, + include_access_token=include_access_token, + ) + response = self.session.get(url, verify=self._verify_ssl) + return self._handle_json_response(response) + + def _get_content(self, path, search_criteria=None, include_access_token=True): + """Perform a GET request and return raw bytes.""" + url = self._build_url( + path=path, + search_criteria=search_criteria, + include_access_token=include_access_token, + ) + response = self.session.get(url, verify=self._verify_ssl) + return self._handle_content_response(response) + + def _post_json_body(self, path, payload=None): + """Perform a POST request with JSON body and return JSON.""" + if payload is None: + payload = {} - session = requests.Session() + url = self._build_url(path=path, include_access_token=True) headers = { "Content-Type": "application/json", - "Accept": "application/json" + "Accept": "application/json", } - response = session.post( + response = self.session.post( url, verify=self._verify_ssl, headers=headers, - json=search_criteria + json=payload, ) + return self._handle_json_response(response) - if response.status_code != 200: - raise Exception(response.content.decode("utf-8")) + # ------------------------------------------------------------------------- + # Authentication + # ------------------------------------------------------------------------- - return response.json() + def authenticate(self, username, password): + """Authenticate with username and password. - def ads_reference(self, search_criteria=None): - """Handles the query to get ads references. + Retrieves both access token and refresh token. + """ + url = ( + f"{self.server_url}/oauth/token" + f"?client_id={self.__oauth2_client_name}" + f"&grant_type=password" + f"&username={quote(username)}" + f"&password={quote(password)}" + ) - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. + response = self.session.post(url, verify=self._verify_ssl) - Returns - ------- - :return: - The query result in JSON format - """ + if response.status_code == 200: + response_json = json.loads(response.content.decode("utf-8")) + self.access_token = response_json["access_token"] + self.refresh_token = response_json["refresh_token"] + else: + raise Exception(self._decode_error(response)) - if search_criteria is None: - search_criteria = {} + def renew_access_token(self): + """Renew the access token using the refresh token.""" + url = ( + f"{self.server_url}/oauth/token" + f"?client_id=api" + f"&grant_type=refresh_token" + f"&refresh_token={quote(self.refresh_token)}" + ) - url = self.server_url + "/v1/ads/reference?access_token=" + \ - urllib.parse.quote(self.access_token) + response = self.session.post(url, verify=self._verify_ssl) - session = requests.Session() - headers = { - "Content-Type": "application/json", - "Accept": "application/json" - } + if response.status_code == 200: + response_json = json.loads(response.content.decode("utf-8")) + self.access_token = response_json["access_token"] + self.refresh_token = response_json["refresh_token"] + else: + raise Exception(self._decode_error(response)) - response = session.post( - url, - verify=self._verify_ssl, - headers=headers, - json=search_criteria - ) + # ------------------------------------------------------------------------- + # User + # ------------------------------------------------------------------------- - if response.status_code != 200: - raise Exception(response.content.decode("utf-8")) + def user(self): + """Get the authenticated user.""" + return self._get_json("/v0/users/me") - return response.json() + def user_roles(self): + """Get roles for the authenticated user.""" + return self._get_json("/v0/users/me/roles") - def ads_market_share(self, search_criteria=None): - """ Handles the query to get the ads market share. + # ------------------------------------------------------------------------- + # Household + # ------------------------------------------------------------------------- - Parameters - ---------- - :param search_criteria: - Defines the search criteria used to filter the query. + def household(self, search_criteria=None): + """Retrieve household data.""" + return self._get_json("/v1/household", search_criteria) - Returns - ------- - :return: - The query result in JSON format - """ + def household_registration(self, search_criteria=None): + """Retrieve household registration data.""" + return self._get_json("/v1/household/registration", search_criteria) + # ------------------------------------------------------------------------- + # Residential registry + # ------------------------------------------------------------------------- + + def residential_registry(self, offset, limit, search_criteria=None): + """Retrieve residential registry data.""" if search_criteria is None: search_criteria = {} - url = self.server_url + "/v1/ads/market-share?access_token=" + \ - urllib.parse.quote(self.access_token) - - session = requests.Session() - headers = { - "Content-Type": "application/json", - "Accept": "application/json" - } - - response = session.post( - url, - verify=self._verify_ssl, - headers=headers, - json=search_criteria + url = ( + f"{self.server_url}/v1/residential/registry" + f"?access_token={quote(self.access_token)}" + f"&offset={quote(str(offset))}" + f"&limit={quote(str(limit))}" ) + url += _generate_request_search_criteria(search_criteria.items()) - if response.status_code != 200: - raise Exception(response.content.decode("utf-8")) - - return response.json() + response = self.session.get(url, verify=self._verify_ssl) + return self._handle_json_response(response) - def housing_association(self, org_number): - """Handles the query to get information about a housing association. + def residential_registry_markups(self, offset, limit, search_criteria=None): + """Retrieve residential registry markups.""" + if search_criteria is None: + search_criteria = {} - Parameters - ---------- - :param org-nr: - Organization number used to filter the query. + url = ( + f"{self.server_url}/v1/residential/registry/markups" + f"?access_token={quote(self.access_token)}" + f"&offset={quote(str(offset))}" + f"&limit={quote(str(limit))}" + ) + url += _generate_request_search_criteria(search_criteria.items()) - Returns - ------- - :return: - The query result in JSON format - """ + response = self.session.get(url, verify=self._verify_ssl) + return self._handle_json_response(response) + def residential_registry_valuations(self, offset, limit, search_criteria=None): + """Retrieve residential registry valuations.""" + if search_criteria is None: + search_criteria = {} url = ( - self.server_url - + "/v1/housing-association/" - + urllib.parse.quote(org_number) - + "?access_token=" - + urllib.parse.quote(self.access_token) + f"{self.server_url}/v1/residential/registry/valuations" + f"?access_token={quote(self.access_token)}" + f"&offset={quote(str(offset))}" + f"&limit={quote(str(limit))}" ) + url += _generate_request_search_criteria(search_criteria.items()) - session = requests.Session() - - response = session.get( - url, - verify=self._verify_ssl - ) + response = self.session.get(url, verify=self._verify_ssl) + return self._handle_json_response(response) - if response.status_code != 200: - raise Exception(response.content.decode("utf-8")) + # ------------------------------------------------------------------------- + # Taxation registry + # ------------------------------------------------------------------------- - return response.json() + def taxation_registry_units(self, offset, limit, search_criteria=None): + """Retrieve taxation unit registry data.""" + if search_criteria is None: + search_criteria = {} - def housing_association_report(self, housing_association_id, year): - """Handles the query to get the annual report for a housing association. + url = ( + f"{self.server_url}/v1/taxation/registry/units" + f"?access_token={quote(self.access_token)}" + f"&offset={quote(str(offset))}" + f"&limit={quote(str(limit))}" + ) + url += _generate_request_search_criteria(search_criteria.items()) - Parameters - ---------- - :param org-nr: - Organization number used to filter the query. - : param year - Year for the report + response = self.session.get(url, verify=self._verify_ssl) + return self._handle_json_response(response) - Returns - ------- - :return: - The PDF file content as bytes. - """ + def taxation_registry_buildings(self, offset, limit, search_criteria=None): + """Retrieve taxation building registry data.""" + if search_criteria is None: + search_criteria = {} url = ( - self.server_url - + "/v1/housing-association/report" - + "?housing_association_id=" - + urllib.parse.quote(str(housing_association_id)) - + "&year=" - + urllib.parse.quote(str(year)) - + "&access_token=" - + urllib.parse.quote(self.access_token) + f"{self.server_url}/v1/taxation/registry/buildings" + f"?access_token={quote(self.access_token)}" + f"&offset={quote(str(offset))}" + f"&limit={quote(str(limit))}" ) + url += _generate_request_search_criteria(search_criteria.items()) - session = requests.Session() + response = self.session.get(url, verify=self._verify_ssl) + return self._handle_json_response(response) - response = session.get( - url, - verify=self._verify_ssl - ) + def taxation_registry_lands(self, offset, limit, search_criteria=None): + """Retrieve taxation land registry data.""" + if search_criteria is None: + search_criteria = {} - if response.status_code != 200: - raise Exception(response.content.decode("utf-8", errors="replace")) + url = ( + f"{self.server_url}/v1/taxation/registry/lands" + f"?access_token={quote(self.access_token)}" + f"&offset={quote(str(offset))}" + f"&limit={quote(str(limit))}" + ) + url += _generate_request_search_criteria(search_criteria.items()) - return response.content + response = self.session.get(url, verify=self._verify_ssl) + return self._handle_json_response(response) - def geocode(self, query, limit=5, match_type="address", exact_match=False): - """Handles the query to geocode an address or place. + # ------------------------------------------------------------------------- + # Valuation / sales / ads + # ------------------------------------------------------------------------- - Parameters - ---------- - query : str - The search string (e.g. street name, address). + def valuation(self, search_criteria=None): + """Make a valuation query.""" + return self._get_json("/v1/valuation", search_criteria) - limit : int, optional - Maximum number of results to return (default 5, max 10). + def sales_reference(self, search_criteria=None): + """Get sales references.""" + return self._post_json_body("/v1/sales/reference", search_criteria) - match_type : str, optional - Type of match to perform. Default is "address". + def sales(self, search_criteria=None): + """Get sales.""" + return self._get_json("/v1/sales", search_criteria) - exact_match : bool, optional - Whether the match should be exact. Default False. + def ads(self, search_criteria=None): + """Get ads.""" + return self._get_json("/v1/ads", search_criteria) - Returns - ------- - dict - The API response returned as JSON containing geocode results. + def ads_pdf(self, search_criteria=None): + """Get PDF snapshot of an ad.""" + return self._get_content("/v1/ads/pdf", search_criteria) - """ + # ------------------------------------------------------------------------- + # Index + # ------------------------------------------------------------------------- - url = ( - self.server_url - + "/v1/geocode" - + "?access_token=" - + urllib.parse.quote(self.access_token) - + "&query=" - + urllib.parse.quote(query) - + "&limit=" - + urllib.parse.quote(str(limit)) - + "&match_type=" - + urllib.parse.quote(match_type) - + "&exact_match=" - + urllib.parse.quote(str(exact_match).lower()) - ) + def index_definitions(self, search_criteria=None): + """Get index definitions.""" + return self._get_json("/v1/index/definitions", search_criteria) - session = requests.Session() + def index_definitions_complete(self, search_criteria=None): + """Get complete index definitions.""" + return self._get_json("/v1/index/definitions/complete", search_criteria) - response = session.get( - url, - verify=self._verify_ssl - ) + def index_normalized(self, search_criteria=None): + """Get normalized index.""" + return self._get_json("/v1/index/normalized", search_criteria) - if response.status_code != 200: - raise Exception(response.content.decode("utf-8")) + def index_best(self, search_criteria=None): + """Get best index.""" + return self._get_json("/v1/index/best", search_criteria) - return response.json() + def index_recount(self, search_criteria=None): + """Get recounted index value.""" + return self._get_json("/v1/index/recount", search_criteria) - def projects_new(self): - """Handles the query to fetch data required to create a new project. + def index_publishing_calendar(self, search_criteria=None): + """Get index publishing calendar. - Returns - ------- - dict - The API response returned as JSON containing default project data. + This endpoint is public and does not use access_token. """ - - url = ( - self.server_url - + "/v1/projects/new" - + "?access_token=" - + urllib.parse.quote(self.access_token) + return self._get_json( + "/v1/index/publishing/calendar", + search_criteria=search_criteria, + include_access_token=False, ) - session = requests.Session() - - response = session.get( - url, - verify=self._verify_ssl - ) + def index_statistics(self, search_criteria=None, public=False): + """Get index statistics. - if response.status_code != 200: - raise Exception(response.content.decode("utf-8")) + Parameters + ---------- + public : bool + If True, use the public endpoint without access token. + """ + if public: + url = f"{self.server_url}/v1/index/statistics/public?" + if search_criteria is None: + search_criteria = {} + url += _generate_request_search_criteria(search_criteria.items()) + response = self.session.get(url, verify=self._verify_ssl) + return self._handle_json_response(response) - return response.json() + return self._get_json("/v1/index/statistics", search_criteria) - def projects(self, limit=10): - """Handles the query to fetch projects for the authenticated user. + def index_volume(self, search_criteria=None): + """Get index area volume.""" + return self._get_json("/v1/index/volume", search_criteria) - Parameters - ---------- - limit : int, optional - Maximum number of projects to return. + # ------------------------------------------------------------------------- + # Area + # ------------------------------------------------------------------------- - Returns - ------- - dict - The API response returned as JSON containing the projects. + def area(self, search_criteria=None): + """Get areas.""" + return self._get_json("/v1/area", search_criteria) - """ + def area_category(self, search_criteria=None): + """Get area categories.""" + return self._get_json("/v1/area/category", search_criteria) - url = ( - self.server_url - + "/v1/projects" - + "?access_token=" - + urllib.parse.quote(self.access_token) - + "&limit=" - + urllib.parse.quote(str(limit)) - ) + def area_information(self, search_criteria=None): + """Get area information.""" + return self._get_json("/v1/area/information", search_criteria) - session = requests.Session() + def area_information_dates(self, search_criteria=None): + """Get area information dates.""" + return self._get_json("/v1/area/information/dates", search_criteria) - response = session.get( - url, - verify=self._verify_ssl - ) + def area_information_field(self, search_criteria=None): + """Get area information fields.""" + return self._get_json("/v1/area/information/field", search_criteria) - if response.status_code != 200: - raise Exception(response.content.decode("utf-8")) + def area_information_tag(self, search_criteria=None): + """Get area information tags.""" + return self._get_json("/v1/area/information/tag", search_criteria) - return response.json() + def area_polygon(self, search_criteria=None): + """Get area polygons.""" + return self._get_json("/v1/area/polygon", search_criteria) + # ------------------------------------------------------------------------- + # Market share / references + # ------------------------------------------------------------------------- - def project(self, project_id): - """Handles the query to fetch a specific project. + def sales_market_share(self, search_criteria=None): + """Get sales market share.""" + return self._post_json_body("/v1/sales/market-share", search_criteria) - Parameters - ---------- - project_id : str - The UUID of the project. + def ads_reference(self, search_criteria=None): + """Get ads references.""" + return self._post_json_body("/v1/ads/reference", search_criteria) - Returns - ------- - dict - The API response returned as JSON containing the project data. + def ads_market_share(self, search_criteria=None): + """Get ads market share.""" + return self._post_json_body("/v1/ads/market-share", search_criteria) - """ + # ------------------------------------------------------------------------- + # Housing association + # ------------------------------------------------------------------------- + def housing_association(self, org_number): + """Get information about a housing association.""" url = ( - self.server_url - + "/v1/projects/" - + urllib.parse.quote(str(project_id)) - + "?access_token=" - + urllib.parse.quote(self.access_token) + f"{self.server_url}/v1/housing-association/{quote(org_number)}" + f"?access_token={quote(self.access_token)}" ) - session = requests.Session() + response = self.session.get(url, verify=self._verify_ssl) + return self._handle_json_response(response) - response = session.get( - url, - verify=self._verify_ssl + def housing_association_report(self, housing_association_id, year): + """Get the annual report PDF for a housing association.""" + url = ( + f"{self.server_url}/v1/housing-association/report" + f"?housing_association_id={quote(str(housing_association_id))}" + f"&year={quote(str(year))}" + f"&access_token={quote(self.access_token)}" ) - if response.status_code != 200: - raise Exception(response.content.decode("utf-8")) + response = self.session.get(url, verify=self._verify_ssl) + return self._handle_content_response(response) - return response.json() + # ------------------------------------------------------------------------- + # Geocode + # ------------------------------------------------------------------------- - def create_project(self, project_data): - """Handles the request to create a new project. + def geocode(self, query, limit=5, match_type="address", exact_match=False): + """Geocode an address or place.""" + url = ( + f"{self.server_url}/v1/geocode" + f"?access_token={quote(self.access_token)}" + f"&query={quote(query)}" + f"&limit={quote(str(limit))}" + f"&match_type={quote(match_type)}" + f"&exact_match={quote(str(exact_match).lower())}" + ) - Parameters - ---------- - project_data : dict - Dictionary containing the full project configuration - including search criteria and selected fields. + response = self.session.get(url, verify=self._verify_ssl) + return self._handle_json_response(response) - Returns - ------- - dict - The API response returned as JSON containing the created project. + # ------------------------------------------------------------------------- + # Projects + # ------------------------------------------------------------------------- - """ + def projects_new(self): + """Fetch default data required to create a new project.""" + return self._get_json("/v1/projects/new") + def projects(self, limit=10): + """Fetch projects for the authenticated user.""" url = ( - self.server_url - + "/v1/projects" - + "?access_token=" - + urllib.parse.quote(self.access_token) + f"{self.server_url}/v1/projects" + f"?access_token={quote(self.access_token)}" + f"&limit={quote(str(limit))}" ) - session = requests.Session() - - headers = { - "Content-Type": "application/json", - "Accept": "application/json" - } + response = self.session.get(url, verify=self._verify_ssl) + return self._handle_json_response(response) - response = session.post( - url, - verify=self._verify_ssl, - headers=headers, - json=project_data + def project(self, project_id): + """Fetch a specific project.""" + url = ( + f"{self.server_url}/v1/projects/{quote(str(project_id))}" + f"?access_token={quote(self.access_token)}" ) - if response.status_code != 200: - raise Exception(response.content.decode("utf-8")) + response = self.session.get(url, verify=self._verify_ssl) + return self._handle_json_response(response) - return response.json() + def create_project(self, project_data): + """Create a new project.""" + return self._post_json_body("/v1/projects", project_data) def delete_project(self, project_id): - """Handles the request to delete a project. - - Parameters - ---------- - project_id : str - The UUID of the project to delete. - - Returns - ------- - dict - The API response returned as JSON. - - """ - + """Delete a project.""" url = ( - self.server_url - + "/v1/projects/" - + urllib.parse.quote(str(project_id)) - + "?access_token=" - + urllib.parse.quote(self.access_token) - ) - - session = requests.Session() - - response = session.delete( - url, - verify=self._verify_ssl + f"{self.server_url}/v1/projects/{quote(str(project_id))}" + f"?access_token={quote(self.access_token)}" ) - if response.status_code != 200: - raise Exception(response.content.decode("utf-8")) - - return response.json() + response = self.session.delete(url, verify=self._verify_ssl) + return self._handle_json_response(response) def update_project_name(self, project_id, name): - """Handles the request to update the name of a project. - - Parameters - ---------- - project_id : str - The UUID of the project. - - name : str - The new name for the project. - - Returns - ------- - dict - The API response returned as JSON. - - """ - + """Update a project's name.""" url = ( - self.server_url - + "/v1/projects/" - + urllib.parse.quote(str(project_id)) - + "/name" - + "?access_token=" - + urllib.parse.quote(self.access_token) + f"{self.server_url}/v1/projects/{quote(str(project_id))}/name" + f"?access_token={quote(self.access_token)}" ) - session = requests.Session() - headers = { "Content-Type": "application/json", - "Accept": "application/json" + "Accept": "application/json", } - response = session.post( + response = self.session.post( url, verify=self._verify_ssl, headers=headers, - json={"name": name} + json={"name": name}, ) - - if response.status_code != 200: - raise Exception(response.content.decode("utf-8")) - - return response.json() \ No newline at end of file + return self._handle_json_response(response) \ No newline at end of file