diff --git a/python/hsml/deployment.py b/python/hsml/deployment.py index eb68300e..bf3212c1 100644 --- a/python/hsml/deployment.py +++ b/python/hsml/deployment.py @@ -21,13 +21,11 @@ from hsml.core import serving_api from hsml.engine import serving_engine from hsml.predictor_state import PredictorState -from hsml.resources import Resources from hsml.inference_logger import InferenceLogger -from hsml.inference_batcher import InferenceBatcher -from hsml.transformer import Transformer from hsml.client.exceptions import ModelServingException from hsml.constants import DEPLOYABLE_COMPONENT, PREDICTOR_STATE +from hsml.predictor_specification import PredictorSpecification class Deployment: @@ -311,63 +309,6 @@ def predictor(self): def predictor(self, predictor): self._predictor = predictor - @property - def requested_instances(self): - """Total number of requested instances in the deployment.""" - return self._predictor.requested_instances - - # Single predictor - - @property - def model_name(self): - """Name of the model deployed by the predictor""" - return self._predictor.model_name - - @model_name.setter - def model_name(self, model_name: str): - self._predictor.model_name = model_name - - @property - def model_path(self): - """Model path deployed by the predictor.""" - return self._predictor.model_path - - @model_path.setter - def model_path(self, model_path: str): - self._predictor.model_path = model_path - - @property - def model_version(self): - """Model version deployed by the predictor.""" - return self._predictor.model_version - - @model_version.setter - def model_version(self, model_version: int): - self._predictor.model_version = model_version - - @property - def artifact_version(self): - """Artifact version deployed by the predictor.""" - return self._predictor.artifact_version - - @artifact_version.setter - def artifact_version(self, artifact_version: Union[int, str]): - self._predictor.artifact_version = artifact_version - - @property - def artifact_path(self): - """Path of the model artifact deployed by the predictor.""" - return self._predictor.artifact_path - - @property - def model_server(self): - """Model server ran by the predictor.""" - return self._predictor.model_server - - @model_server.setter - def model_server(self, model_server: str): - self._predictor.model_server = model_server - @property def serving_tool(self): """Serving tool used to run the model server.""" @@ -377,24 +318,6 @@ def serving_tool(self): def serving_tool(self, serving_tool: str): self._predictor.serving_tool = serving_tool - @property - def script_file(self): - """Script file used by the predictor.""" - return self._predictor.script_file - - @script_file.setter - def script_file(self, script_file: str): - self._predictor.script_file = script_file - - @property - def resources(self): - """Resource configuration for the predictor.""" - return self._predictor.resources - - @resources.setter - def resources(self, resources: Resources): - self._predictor.resources = resources - @property def inference_logger(self): """Configuration of the inference logger attached to this predictor.""" @@ -404,24 +327,6 @@ def inference_logger(self): def inference_logger(self, inference_logger: InferenceLogger): self._predictor.inference_logger = inference_logger - @property - def inference_batcher(self): - """Configuration of the inference batcher attached to this predictor.""" - return self._predictor.inference_batcher - - @inference_batcher.setter - def inference_batcher(self, inference_batcher: InferenceBatcher): - self._predictor.inference_batcher = inference_batcher - - @property - def transformer(self): - """Transformer configured in the predictor.""" - return self._predictor.transformer - - @transformer.setter - def transformer(self, transformer: Transformer): - self._predictor.transformer = transformer - @property def created_at(self): """Created at date of the predictor.""" @@ -432,6 +337,40 @@ def creator(self): """Creator of the predictor.""" return self._predictor.creator + @property + def specification(self): + """Specification for the main serving""" + return self._predictor.specification + + @specification.setter + def specification(self, specification: Union[PredictorSpecification, dict]): + self._predictor.specification = specification + + @property + def candidate_specification(self): + return self._predictor.candidate_specification + + @candidate_specification.setter + def candidate_specification(self, candidate_specification: Union[PredictorSpecification, dict]): + self._predictor.candidate_specification = candidate_specification + + @property + def candidate_traffic_percentage(self): + """The traffic percentage for the candidate predictor""" + return self._predictor.candidate_traffic_percentage + + @candidate_traffic_percentage.setter + def candidate_traffic_percentage(self, candidate_traffic_percentage: int): + self._predictor.candidate_traffic_percentage = candidate_traffic_percentage + + @property + def requested_instances(self): + """Total number of requested instances in the deployment.""" + requested_instances = self._predictor.specification.requested_instances + if self._predictor.candidate_specification is not None: + requested_instances += self._predictor.candidate_specification.requested_instances + return requested_instances + def __repr__(self): desc = ( f", description: {self._description!r}" diff --git a/python/hsml/engine/serving_engine.py b/python/hsml/engine/serving_engine.py index f0d508bf..6137d1d7 100644 --- a/python/hsml/engine/serving_engine.py +++ b/python/hsml/engine/serving_engine.py @@ -90,12 +90,12 @@ def start(self, deployment_instance, await_status: int) -> bool: (done, state) = self._check_status( deployment_instance, PREDICTOR_STATE.STATUS_RUNNING ) - if not done: min_instances = self._get_min_starting_instances(deployment_instance) num_steps = (len(self.START_STEPS) - 1) + min_instances - if deployment_instance._predictor._state.condition is None: - num_steps = min_instances # backward compatibility + if hasattr(deployment_instance._predictor._state, 'condition'): + if deployment_instance._predictor._state.condition is None: + num_steps = min_instances # backward compatibility pbar = tqdm(total=num_steps) pbar.set_description("Creating deployment") @@ -147,9 +147,10 @@ def stop(self, deployment_instance, await_status: int) -> bool: if deployment_instance.requested_instances >= num_instances else num_instances ) - if deployment_instance._predictor._state.condition is None: - # backward compatibility - num_steps = self._get_min_starting_instances(deployment_instance) + if hasattr(deployment_instance._predictor._state, 'condition'): + if deployment_instance._predictor._state.condition is None: + # backward compatibility + num_steps = self._get_min_starting_instances(deployment_instance) pbar = tqdm(total=num_steps) pbar.set_description("Preparing to stop deployment") @@ -280,30 +281,31 @@ def _check_status(self, deployment_instance, desired_status): return (False, state) def _get_starting_progress(self, current_step, state, num_instances): - if state.condition is None: # backward compatibility + if hasattr(state, 'condition') and state.condition is None: # backward compatibility progress = num_instances - current_step if state.status == PREDICTOR_STATE.STATUS_RUNNING: return (progress, "Deployment is ready") return (progress, None if current_step == 0 else "Deployment is starting") - step = self.START_STEPS.index(state.condition.type) + condition = self._get_condition(state) + step = self.START_STEPS.index(condition.type) if ( - state.condition.type == PREDICTOR_STATE.CONDITION_TYPE_STARTED - or state.condition.type == PREDICTOR_STATE.CONDITION_TYPE_READY + condition.type == PREDICTOR_STATE.CONDITION_TYPE_STARTED + or condition.type == PREDICTOR_STATE.CONDITION_TYPE_READY ): step += num_instances progress = step - current_step desc = None - if state.condition.type != PREDICTOR_STATE.CONDITION_TYPE_STOPPED: + if condition.type != PREDICTOR_STATE.CONDITION_TYPE_STOPPED: desc = ( - state.condition.reason + condition.reason if state.status != PREDICTOR_STATE.STATUS_FAILED else "Deployment failed to start" ) return (progress, desc) def _get_stopping_progress(self, total_steps, current_step, state, num_instances): - if state.condition is None: # backward compatibility + if hasattr(state, "condition") and state.condition is None: # backward compatibility progress = (total_steps - num_instances) - current_step if state.status == PREDICTOR_STATE.STATUS_STOPPED: return (progress, "Deployment is stopped") @@ -313,33 +315,48 @@ def _get_stopping_progress(self, total_steps, current_step, state, num_instances ) step = 0 - if state.condition.type == PREDICTOR_STATE.CONDITION_TYPE_SCHEDULED: - step = 1 if state.condition.status is None else 0 - elif state.condition.type == PREDICTOR_STATE.CONDITION_TYPE_STOPPED: + condition = self._get_condition(state) + if condition.type == PREDICTOR_STATE.CONDITION_TYPE_SCHEDULED: + step = 1 if condition.status is None else 0 + elif condition.type == PREDICTOR_STATE.CONDITION_TYPE_STOPPED: num_instances = (total_steps - 2) - num_instances # num stopped instances step = ( (2 + num_instances) - if (state.condition.status is None or state.condition.status) + if (condition.status is None or condition.status) else 0 ) progress = step - current_step desc = None if ( - state.condition.type != PREDICTOR_STATE.CONDITION_TYPE_READY + condition.type != PREDICTOR_STATE.CONDITION_TYPE_READY and state.status != PREDICTOR_STATE.STATUS_FAILED ): desc = ( "Deployment is stopped" if state.status == PREDICTOR_STATE.STATUS_STOPPED - else state.condition.reason + else state.serving_status.condition.reason ) return (progress, desc) + def _get_condition(self, state): + if state.candidate_status is None: + return state.serving_status.condition + if state.serving_status.condition.status is None or not state.serving_status.condition.status: + return state.serving_status.condition + if state.candidate_status.condition.status is None or not state.candidate_status.condition.status: + return state.candidate_status.condition + # ok to return the status of either + return state.serving_status.condition + def _get_min_starting_instances(self, deployment_instance): min_start_instances = 1 # predictor - if deployment_instance.transformer is not None: + if deployment_instance.specification.transformer is not None: min_start_instances += 1 # transformer + if deployment_instance.candidate_specification is not None: + min_start_instances += 1 + if deployment_instance.candidate_specification.transformer is not None: + min_start_instances += 1 return ( deployment_instance.requested_instances if deployment_instance.requested_instances >= min_start_instances @@ -349,9 +366,13 @@ def _get_min_starting_instances(self, deployment_instance): def _get_available_instances(self, state): if state.status == PREDICTOR_STATE.STATUS_CREATING: return 0 - num_instances = state.available_predictor_instances - if state.available_transformer_instances is not None: - num_instances += state.available_transformer_instances + num_instances = state.serving_status.available_predictor_instances + if state.serving_status.available_transformer_instances is not None: + num_instances += state.serving_status.available_transformer_instances + if state.candidate_status is not None: + num_instances += state.candidate_status.available_predictor_instances + if state.candidate_status.available_transformer_instances is not None: + num_instances += state.candidate_status.available_transformer_instances return num_instances def _get_stopped_instances(self, available_instances, requested_instances): diff --git a/python/hsml/predictor.py b/python/hsml/predictor.py index 0feabec6..2dfab3f7 100644 --- a/python/hsml/predictor.py +++ b/python/hsml/predictor.py @@ -15,77 +15,68 @@ import json import humps -from typing import Union, Optional +from typing import Union, Optional, Dict from hsml import util from hsml import deployment from hsml import client from hsml.constants import ARTIFACT_VERSION, PREDICTOR, MODEL -from hsml.transformer import Transformer from hsml.predictor_state import PredictorState -from hsml.deployable_component import DeployableComponent from hsml.resources import PredictorResources from hsml.inference_logger import InferenceLogger -from hsml.inference_batcher import InferenceBatcher +from hsml.predictor_specification import PredictorSpecification -class Predictor(DeployableComponent): +class Predictor: """Metadata object representing a predictor in Model Serving.""" def __init__( self, name: str, - model_name: str, - model_path: str, - model_version: int, - model_framework: str, # MODEL.FRAMEWORK - artifact_version: Union[int, str], - model_server: str, + specification: Union[Dict, PredictorSpecification], + candidate_specification: Optional[Union[Dict, PredictorSpecification]] = None, serving_tool: Optional[str] = None, - script_file: Optional[str] = None, - resources: Optional[Union[PredictorResources, dict]] = None, # base inference_logger: Optional[Union[InferenceLogger, dict]] = None, # base - inference_batcher: Optional[Union[InferenceBatcher, dict]] = None, # base - transformer: Optional[Union[Transformer, dict]] = None, id: Optional[int] = None, description: Optional[str] = None, created_at: Optional[str] = None, creator: Optional[str] = None, + candidate_traffic_percentage: Optional[int] = None, **kwargs, ): serving_tool = ( self._validate_serving_tool(serving_tool) or self._get_default_serving_tool() ) - resources = self._validate_resources( - util.get_obj_from_json(resources, PredictorResources), serving_tool - ) or self._get_default_resources(serving_tool) - super().__init__( - script_file, - resources, - inference_batcher, + main_serving_spec = util.get_obj_from_json( + specification, PredictorSpecification ) + main_serving_spec.resources = self._validate_resources( + main_serving_spec.resources, serving_tool + ) or self._get_default_resources(serving_tool) + self._specification = main_serving_spec + self._candidate_specification = None + if candidate_specification is not None: + candidate_spec = util.get_obj_from_json( + candidate_specification, PredictorSpecification + ) + candidate_spec.resources = self._validate_resources( + candidate_spec.resources, serving_tool + ) or self._get_default_resources(serving_tool) + self._candidate_specification = candidate_spec self._name = name - self._model_name = model_name - self._model_path = model_path - self._model_version = model_version - self._model_framework = model_framework - self._artifact_version = artifact_version self._serving_tool = serving_tool - self._model_server = model_server self._id = id self._description = description self._created_at = created_at self._creator = creator - self._inference_logger = util.get_obj_from_json( inference_logger, InferenceLogger ) - self._transformer = util.get_obj_from_json(transformer, Transformer) - self._validate_script_file(self._model_framework, self._script_file) + self._candidate_traffic_percentage = candidate_traffic_percentage def deploy(self): """Create a deployment for this predictor and persists it in the Model Serving. @@ -131,6 +122,20 @@ def _set_state(self, state: PredictorState): """Set the state of the predictor""" self._state = state + @classmethod + def _validate_resources(cls, resources, serving_tool): + if resources is not None: + # ensure scale-to-zero for kserve deployments when required + if ( + serving_tool == PREDICTOR.SERVING_TOOL_KSERVE + and resources.num_instances != 0 + and client.get_serving_num_instances_limits()[0] == 0 + ): + raise ValueError( + "Scale-to-zero is required for KServe deployments in this cluster. Please, set the number of instances to 0." + ) + return resources + @classmethod def _validate_serving_tool(cls, serving_tool): if serving_tool is not None: @@ -152,19 +157,10 @@ def _validate_serving_tool(cls, serving_tool): return serving_tool @classmethod - def _validate_script_file(cls, model_framework, script_file): - if model_framework == MODEL.FRAMEWORK_PYTHON and script_file is None: - raise ValueError( - "Predictor scripts are required in deployments for custom Python models" - ) - - @classmethod - def _infer_model_server(cls, model_framework): - return ( - PREDICTOR.MODEL_SERVER_TF_SERVING - if model_framework == MODEL.FRAMEWORK_TENSORFLOW - else PREDICTOR.MODEL_SERVER_PYTHON - ) + def _get_default_resources(cls, serving_tool): + # enable scale-to-zero by default in kserve deployments + num_instances = 0 if serving_tool == PREDICTOR.SERVING_TOOL_KSERVE else 1 + return PredictorResources(num_instances) @classmethod def _get_default_serving_tool(cls): @@ -175,35 +171,6 @@ def _get_default_serving_tool(cls): else PREDICTOR.SERVING_TOOL_DEFAULT ) - @classmethod - def _validate_resources(cls, resources, serving_tool): - if resources is not None: - # ensure scale-to-zero for kserve deployments when required - if ( - serving_tool == PREDICTOR.SERVING_TOOL_KSERVE - and resources.num_instances != 0 - and client.get_serving_num_instances_limits()[0] == 0 - ): - raise ValueError( - "Scale-to-zero is required for KServe deployments in this cluster. Please, set the number of instances to 0." - ) - return resources - - @classmethod - def _get_default_resources(cls, serving_tool): - # enable scale-to-zero by default in kserve deployments - num_instances = 0 if serving_tool == PREDICTOR.SERVING_TOOL_KSERVE else 1 - return PredictorResources(num_instances) - - @classmethod - def for_model(cls, model, **kwargs): - kwargs["model_name"] = model.name - kwargs["model_path"] = model.model_path - kwargs["model_version"] = model.version - - # get predictor for specific model, includes model type-related validations - return util.get_predictor_for_model(model, **kwargs) - @classmethod def from_response_json(cls, json_dict): json_decamelized = humps.decamelize(json_dict) @@ -227,31 +194,24 @@ def extract_fields_from_json(cls, json_decamelized): kwargs["description"] = util.extract_field_from_json( json_decamelized, "description" ) - kwargs["model_name"] = util.extract_field_from_json( - json_decamelized, "model_name", default=kwargs["name"] - ) - kwargs["model_path"] = json_decamelized.pop("model_path") - kwargs["model_version"] = json_decamelized.pop("model_version") - kwargs["model_framework"] = ( - json_decamelized.pop("model_framework") - if "model_framework" in json_decamelized - else MODEL.FRAMEWORK_SKLEARN # backward compatibility - ) - kwargs["artifact_version"] = util.extract_field_from_json( - json_decamelized, "artifact_version" - ) - kwargs["model_server"] = json_decamelized.pop("model_server") kwargs["serving_tool"] = json_decamelized.pop("serving_tool") - kwargs["script_file"] = util.extract_field_from_json( - json_decamelized, "predictor" - ) - kwargs["resources"] = PredictorResources.from_json(json_decamelized) kwargs["inference_logger"] = InferenceLogger.from_json(json_decamelized) - kwargs["inference_batcher"] = InferenceBatcher.from_json(json_decamelized) - kwargs["transformer"] = Transformer.from_json(json_decamelized) kwargs["id"] = json_decamelized.pop("id") kwargs["created_at"] = json_decamelized.pop("created") kwargs["creator"] = json_decamelized.pop("creator") + kwargs["specification"] = PredictorSpecification.from_json( + json_decamelized.pop("specification") + ) + kwargs["candidate_specification"] = ( + PredictorSpecification.from_json( + json_decamelized.pop("candidate_specification") + ) + if "candidate_specification" in json_decamelized + else None + ) + kwargs["candidate_traffic_percentage"] = util.extract_field_from_json( + json_decamelized, "candidate_traffic_percentage" + ) return kwargs def update_from_response_json(self, json_dict): @@ -268,25 +228,17 @@ def to_dict(self): "id": self._id, "name": self._name, "description": self._description, - "modelName": self._model_name, - "modelPath": self._model_path, - "modelVersion": self._model_version, - "modelFramework": self._model_framework, - "artifactVersion": self._artifact_version, "created": self._created_at, "creator": self._creator, - "modelServer": self._model_server, + "specification": self._specification.to_dict(), "servingTool": self._serving_tool, - "predictor": self._script_file, } - if self._resources is not None: - json = {**json, **self._resources.to_dict()} + if self._candidate_specification is not None: + json = {**json, **self._candidate_specification.to_dict()} + if self._candidate_traffic_percentage is not None: + json = {**json, **self._candidate_traffic_percentage} if self._inference_logger is not None: json = {**json, **self._inference_logger.to_dict()} - if self._inference_batcher is not None: - json = {**json, **self._inference_batcher.to_dict()} - if self._transformer is not None: - json = {**json, **self._transformer.to_dict()} return json @property @@ -312,70 +264,6 @@ def description(self): def description(self, description: str): self._description = description - @property - def model_name(self): - """Name of the model deployed by the predictor.""" - return self._model_name - - @model_name.setter - def model_name(self, model_name: str): - self._model_name = model_name - - @property - def model_path(self): - """Model path deployed by the predictor.""" - return self._model_path - - @model_path.setter - def model_path(self, model_path: str): - self._model_path = model_path - - @property - def model_version(self): - """Model version deployed by the predictor.""" - return self._model_version - - @model_version.setter - def model_version(self, model_version: int): - self._model_version = model_version - - @property - def model_framework(self): - """Model framework of the model to be deployed by the predictor.""" - return self._model_framework - - @model_framework.setter - def model_framework(self, model_framework: str): - self._model_framework = model_framework - self._model_server = self._infer_model_server(model_framework) - - @property - def artifact_version(self): - """Artifact version deployed by the predictor.""" - return self._artifact_version - - @artifact_version.setter - def artifact_version(self, artifact_version: Union[int, str]): - self._artifact_version = artifact_version - - @property - def artifact_path(self): - """Path of the model artifact deployed by the predictor. Resolves to /Projects/{project_name}/Models/{name}/{version}/Artifacts/{artifact_version}/{name}_{version}_{artifact_version}.zip""" - artifact_name = "{}_{}_{}.zip".format( - self._model_name, str(self._model_version), str(self._artifact_version) - ) - return "{}/{}/Artifacts/{}/{}".format( - self._model_path, - str(self._model_version), - str(self._artifact_version), - artifact_name, - ) - - @property - def model_server(self): - """Model server used by the predictor.""" - return self._model_server - @property def serving_tool(self): """Serving tool used to run the model server.""" @@ -385,16 +273,6 @@ def serving_tool(self): def serving_tool(self, serving_tool: str): self._serving_tool = serving_tool - @property - def script_file(self): - """Script file used to load and run the model.""" - return self._predictor._script_file - - @script_file.setter - def script_file(self, script_file: str): - self._script_file = script_file - self._artifact_version = ARTIFACT_VERSION.CREATE - @property def inference_logger(self): """Configuration of the inference logger attached to this predictor.""" @@ -404,15 +282,6 @@ def inference_logger(self): def inference_logger(self, inference_logger: InferenceLogger): self._inference_logger = inference_logger - @property - def transformer(self): - """Transformer configuration attached to the predictor.""" - return self._transformer - - @transformer.setter - def transformer(self, transformer: Transformer): - self._transformer = transformer - @property def created_at(self): """Created at date of the predictor.""" @@ -424,12 +293,31 @@ def creator(self): return self._creator @property - def requested_instances(self): - """Total number of requested instances in the predictor.""" - num_instances = self._resources.num_instances - if self._transformer is not None: - num_instances += self._transformer.resources.num_instances - return num_instances + def specification(self): + """The specification of the main predictor""" + return self._specification + + @specification.setter + def specification(self, specification: PredictorSpecification): + self._specification = specification + + @property + def candidate_specification(self): + """The specification for the candidate predictor""" + return self._candidate_specification + + @candidate_specification.setter + def candidate_specification(self, candidate_specification: PredictorSpecification): + self._candidate_specification = candidate_specification + + @property + def candidate_traffic_percentage(self): + """The traffic percentage for the candidate predictor""" + return self._candidate_traffic_percentage + + @candidate_traffic_percentage.setter + def candidate_traffic_percentage(self, candidate_traffic_percentage: int): + self._candidate_traffic_percentage = candidate_traffic_percentage def __repr__(self): desc = ( diff --git a/python/hsml/predictor_specification.py b/python/hsml/predictor_specification.py new file mode 100644 index 00000000..f223cd9f --- /dev/null +++ b/python/hsml/predictor_specification.py @@ -0,0 +1,293 @@ +# +# Copyright 2022 Logical Clocks AB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import humps +from typing import Union, Optional + +from hsml import util +from hsml import deployment + +from hsml.constants import ARTIFACT_VERSION, PREDICTOR, MODEL +from hsml.transformer import Transformer +from hsml.predictor_state import PredictorState +from hsml.deployable_component import DeployableComponent +from hsml.resources import PredictorResources +from hsml.inference_logger import InferenceLogger +from hsml.inference_batcher import InferenceBatcher + + +class PredictorSpecification(DeployableComponent): + """Metadata object representing a predictor in Model Serving.""" + + def __init__( + self, + model_name: str, + model_path: str, + model_version: int, + model_framework: str, # MODEL.FRAMEWORK + artifact_version: Union[int, str], + model_server: str, + script_file: Optional[str] = None, + resources: Optional[Union[PredictorResources, dict]] = None, # base + inference_batcher: Optional[Union[InferenceBatcher, dict]] = None, # base + transformer: Optional[Union[Transformer, dict]] = None, + id: Optional[int] = None, + **kwargs, + ): + super().__init__( + script_file, + resources, + inference_batcher, + ) + + self._model_name = model_name + self._model_path = model_path + self._model_version = model_version + self._model_framework = model_framework + self._artifact_version = artifact_version + self._model_server = model_server + self._id = id + self._transformer = util.get_obj_from_json(transformer, Transformer) + self._validate_script_file(self._model_framework, self._script_file) + + def deploy(self): + """Create a deployment for this predictor and persists it in the Model Serving. + + !!! example + ```python + + import hopsworks + + project = hopsworks.login() + + # get Hopsworks Model Registry handle + mr = project.get_model_registry() + + # retrieve the trained model you want to deploy + my_model = mr.get_model("my_model", version=1) + + # get Hopsworks Model Serving handle + ms = project.get_model_serving() + + my_predictor = ms.create_predictor(my_model) + my_deployment = my_predictor.deploy() + + print(my_deployment.get_state()) + ``` + + # Returns + `Deployment`. The deployment metadata object of a new or existing deployment. + """ + + _deployment = deployment.Deployment( + predictor=self, name=self._name, description=self._description + ) + _deployment.save() + + return _deployment + + def describe(self): + """Print a description of the predictor""" + util.pretty_print(self) + + def _set_state(self, state: PredictorState): + """Set the state of the predictor""" + self._state = state + + @classmethod + def _validate_script_file(cls, model_framework, script_file): + if model_framework == MODEL.FRAMEWORK_PYTHON and script_file is None: + raise ValueError( + "Predictor scripts are required in deployments for custom Python models" + ) + + @classmethod + def _infer_model_server(cls, model_framework): + return ( + PREDICTOR.MODEL_SERVER_TF_SERVING + if model_framework == MODEL.FRAMEWORK_TENSORFLOW + else PREDICTOR.MODEL_SERVER_PYTHON + ) + + @classmethod + def for_model(cls, model, **kwargs): + kwargs["model_name"] = model.name + kwargs["model_path"] = model.model_path + kwargs["model_version"] = model.version + + # get predictor for specific model, includes model type-related validations + return util.get_predictor_for_model(model, **kwargs) + + @classmethod + def from_response_json(cls, json_dict): + json_decamelized = humps.decamelize(json_dict) + return cls.from_json(json_decamelized) + + @classmethod + def from_json(cls, json_decamelized): + return PredictorSpecification(**cls.extract_fields_from_json(json_decamelized)) + + @classmethod + def extract_fields_from_json(cls, json_decamelized): + kwargs = {} + kwargs["model_name"] = json_decamelized.pop("model_name") + kwargs["model_path"] = json_decamelized.pop("model_path") + kwargs["model_version"] = json_decamelized.pop("model_version") + kwargs["model_framework"] = ( + json_decamelized.pop("model_framework") + if "model_framework" in json_decamelized + else MODEL.FRAMEWORK_SKLEARN # backward compatibility + ) + kwargs["artifact_version"] = util.extract_field_from_json( + json_decamelized, "artifact_version" + ) + kwargs["model_server"] = json_decamelized.pop("model_server") + kwargs["script_file"] = util.extract_field_from_json( + json_decamelized, "predictor" + ) + kwargs["resources"] = PredictorResources.from_json(json_decamelized) + kwargs["inference_batcher"] = InferenceBatcher.from_json(json_decamelized) + kwargs["transformer"] = Transformer.from_json(json_decamelized) + kwargs["id"] = json_decamelized.pop("id") + return kwargs + + def update_from_response_json(self, json_dict): + json_decamelized = humps.decamelize(json_dict) + self.__init__(**self.extract_fields_from_json(json_decamelized)) + self._set_state(PredictorState.from_response_json(json_decamelized)) + return self + + def json(self): + return json.dumps(self, cls=util.MLEncoder) + + def to_dict(self): + json = { + "id": self._id, + "modelName": self._model_name, + "modelPath": self._model_path, + "modelVersion": self._model_version, + "modelFramework": self._model_framework, + "artifactVersion": self._artifact_version, + "modelServer": self._model_server, + "predictor": self._script_file, + } + if self._resources is not None: + json = {**json, **self._resources.to_dict()} + if self._inference_batcher is not None: + json = {**json, **self._inference_batcher.to_dict()} + if self._transformer is not None: + json = {**json, **self._transformer.to_dict()} + return json + + @property + def id(self): + """Id of the predictor.""" + return self._id + + @property + def model_name(self): + """Name of the model deployed by the predictor.""" + return self._model_name + + @model_name.setter + def model_name(self, model_name: str): + self._model_name = model_name + + @property + def model_path(self): + """Model path deployed by the predictor.""" + return self._model_path + + @model_path.setter + def model_path(self, model_path: str): + self._model_path = model_path + + @property + def model_version(self): + """Model version deployed by the predictor.""" + return self._model_version + + @model_version.setter + def model_version(self, model_version: int): + self._model_version = model_version + + @property + def model_framework(self): + """Model framework of the model to be deployed by the predictor.""" + return self._model_framework + + @model_framework.setter + def model_framework(self, model_framework: str): + self._model_framework = model_framework + self._model_server = self._infer_model_server(model_framework) + + @property + def artifact_version(self): + """Artifact version deployed by the predictor.""" + return self._artifact_version + + @artifact_version.setter + def artifact_version(self, artifact_version: Union[int, str]): + self._artifact_version = artifact_version + + @property + def artifact_path(self): + """Path of the model artifact deployed by the predictor. Resolves to /Projects/{project_name}/Models/{name}/{version}/Artifacts/{artifact_version}/{name}_{version}_{artifact_version}.zip""" + artifact_name = "{}_{}_{}.zip".format( + self._model_name, str(self._model_version), str(self._artifact_version) + ) + return "{}/{}/Artifacts/{}/{}".format( + self._model_path, + str(self._model_version), + str(self._artifact_version), + artifact_name, + ) + + @property + def model_server(self): + """Model server used by the predictor.""" + return self._model_server + + @property + def script_file(self): + """Script file used to load and run the model.""" + return self._predictor._script_file + + @script_file.setter + def script_file(self, script_file: str): + self._script_file = script_file + self._artifact_version = ARTIFACT_VERSION.CREATE + + @property + def transformer(self): + """Transformer configuration attached to the predictor.""" + return self._transformer + + @transformer.setter + def transformer(self, transformer: Transformer): + self._transformer = transformer + + @property + def requested_instances(self): + """Total number of requested instances in the predictor.""" + num_instances = self._resources.num_instances + if self._transformer is not None: + num_instances += self._transformer.resources.num_instances + return num_instances + + def __repr__(self): + return str(vars(self)) + diff --git a/python/hsml/predictor_state.py b/python/hsml/predictor_state.py index eff8a84b..8b8b2e2a 100644 --- a/python/hsml/predictor_state.py +++ b/python/hsml/predictor_state.py @@ -17,34 +17,31 @@ from typing import Optional from hsml import util -from hsml.predictor_state_condition import PredictorStateCondition - +from hsml.predictor_state_internal import PredictorStateInternal class PredictorState: """State of a predictor.""" def __init__( self, - available_predictor_instances: int, - available_transformer_instances: Optional[int], hopsworks_inference_path: str, model_server_inference_path: str, internal_port: Optional[int], revision: Optional[int], deployed: Optional[bool], - condition: Optional[PredictorStateCondition], status: str, + serving_status: PredictorStateInternal, + candidate_status: Optional[PredictorStateInternal], **kwargs, ): - self._available_predictor_instances = available_predictor_instances - self._available_transformer_instances = available_transformer_instances self._hopsworks_inference_path = hopsworks_inference_path self._model_server_inference_path = model_server_inference_path self._internal_port = internal_port self._revision = revision self._deployed = deployed if deployed is not None else False - self._condition = condition self._status = status + self._serving_status = serving_status + self._candidate_status = candidate_status def describe(self): """Print a description of the deployment state""" @@ -57,10 +54,6 @@ def from_response_json(cls, json_dict): @classmethod def extract_fields_from_json(cls, json_decamelized): - ai = util.extract_field_from_json(json_decamelized, "available_instances") - ati = util.extract_field_from_json( - json_decamelized, "available_transformer_instances" - ) hip = util.extract_field_from_json(json_decamelized, "hopsworks_inference_path") msip = util.extract_field_from_json( json_decamelized, "model_server_inference_path" @@ -68,46 +61,30 @@ def extract_fields_from_json(cls, json_decamelized): ipt = util.extract_field_from_json(json_decamelized, "internal_port") r = util.extract_field_from_json(json_decamelized, "revision") d = util.extract_field_from_json(json_decamelized, "deployed") - c = util.extract_field_from_json( - json_decamelized, "condition", as_instance_of=PredictorStateCondition - ) s = util.extract_field_from_json(json_decamelized, "status") - - return ai, ati, hip, msip, ipt, r, d, c, s + ss = PredictorStateInternal.from_json(json_decamelized.pop("serving_status")) + cs = PredictorStateInternal.from_json(json_decamelized.pop("candidate_status")) if "candidate_status" in json_decamelized else None + return hip, msip, ipt, r, d, s, ss, cs def to_dict(self): json = { - "availableInstances": self._available_predictor_instances, + "serving_status": self._serving_status, "hopsworksInferencePath": self._hopsworks_inference_path, "modelServerInferencePath": self._model_server_inference_path, "status": self._status, } - if self._available_transformer_instances is not None: - json[ - "availableTransformerInstances" - ] = self._available_transformer_instances + if self._candidate_status is not None: + json["candidateStatus"] = self._candidate_status if self._internal_port is not None: json["internalPort"] = self._internal_port if self._revision is not None: json["revision"] = self._revision if self._deployed is not None: json["deployed"] = self._deployed - if self._condition is not None: - json = {**json, **self._condition.to_dict()} return json - @property - def available_predictor_instances(self): - """Available predicotr instances.""" - return self._available_predictor_instances - - @property - def available_transformer_instances(self): - """Available transformer instances.""" - return self._available_transformer_instances - @property def hopsworks_inference_path(self): """Inference path in the Hopsworks REST API.""" @@ -133,15 +110,20 @@ def deployed(self): """Whether the predictor is deployed or not.""" return self._deployed - @property - def condition(self): - """Condition of the current state of predictor.""" - return self._condition - @property def status(self): - """Status of the predictor.""" + """Overall status of the predictor including the candidate if is available""" return self._status + @property + def serving_status(self): + """Status of the main serving""" + return self._serving_status + + @property + def candidate_status(self): + """Status of the candidate serving""" + return self._candidate_status + def __repr__(self): return f"PredictorState(status: {self.status.capitalize()!r})" diff --git a/python/hsml/predictor_state_internal.py b/python/hsml/predictor_state_internal.py new file mode 100644 index 00000000..57455239 --- /dev/null +++ b/python/hsml/predictor_state_internal.py @@ -0,0 +1,105 @@ +# +# Copyright 2022 Logical Clocks AB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import humps +from typing import Optional + +from hsml import util +from hsml.predictor_state_condition import PredictorStateCondition + + +class PredictorStateInternal: + """State of a predictor.""" + + def __init__( + self, + available_predictor_instances: int, + available_transformer_instances: Optional[int], + condition: Optional[PredictorStateCondition], + status: str, + **kwargs, + ): + self._available_predictor_instances = available_predictor_instances + self._available_transformer_instances = available_transformer_instances + self._condition = condition + self._status = status + + def describe(self): + """Print a description of the deployment state""" + util.pretty_print(self) + + @classmethod + def from_response_json(cls, json_dict): + json_decamelized = humps.decamelize(json_dict) + return cls.from_json(json_decamelized) + + @classmethod + def from_json(cls, json_decamelized): + return PredictorStateInternal(*cls.extract_fields_from_json(json_decamelized)) + + @classmethod + def extract_fields_from_json(cls, json_decamelized): + ai = util.extract_field_from_json(json_decamelized, "available_instances") + ati = util.extract_field_from_json( + json_decamelized, "available_transformer_instances" + ) + c = util.extract_field_from_json( + json_decamelized, "condition", as_instance_of=PredictorStateCondition + ) + s = util.extract_field_from_json(json_decamelized, "status") + + return ai, ati, c, s + + def to_dict(self): + json = { + "availableInstances": self._available_predictor_instances, + "status": self._status, + } + + if self._available_transformer_instances is not None: + json[ + "availableTransformerInstances" + ] = self._available_transformer_instances + if self._condition is not None: + json = {**json, **self._condition.to_dict()} + + return json + + def json(self): + return json.dumps(self, cls=util.MLEncoder) + + @property + def available_predictor_instances(self): + """Available predicotr instances.""" + return self._available_predictor_instances + + @property + def available_transformer_instances(self): + """Available transformer instances.""" + return self._available_transformer_instances + + @property + def condition(self): + """Condition of the current state of predictor.""" + return self._condition + + @property + def status(self): + """Status of the predictor.""" + return self._status + + def __repr__(self): + return f"PredictorState(status: {self.to_dict()!r})"