From 0ecb279ee6dac5cef95683734e9a679ce8ea78ad Mon Sep 17 00:00:00 2001 From: PrasadhNanjundan Date: Wed, 9 Jul 2025 12:46:22 +0530 Subject: [PATCH] MSA-14513: sdk methods for list instance and workflow details --- msa_sdk/orchestration.py | 53 +++++++++++++++++++++++++++++++++++++--- 1 file changed, 49 insertions(+), 4 deletions(-) diff --git a/msa_sdk/orchestration.py b/msa_sdk/orchestration.py index d129233d..7fe4c20c 100644 --- a/msa_sdk/orchestration.py +++ b/msa_sdk/orchestration.py @@ -15,21 +15,66 @@ def __init__(self, ubiqube_id): """Initialize.""" MSA_API.__init__(self) self.api_path_v1 = "/orchestration/v1" + self.api_path_v2 = "/orchestration/v2" self.api_path = "/orchestration" self.ubiqube_id = ubiqube_id - def list_service_instances(self): + def list_service_instances(self, service_name=""): """ List service instances. + Parameters + ---------- + service_name: String + Service name to filter the instances, default is empty string + Returns ------- None - """ self.action = 'List service instances' - self.path = "{}/{}/service/instance".format(self.api_path, - self.ubiqube_id) + base_path = f"{self.api_path}/{self.ubiqube_id}/service/instance" + + if service_name: + self.path = f"{base_path}?serviceName={service_name}" + else: + self.path = base_path + + self._call_get() + + + def get_workflow_details(self, service_name, defined_var_flag, status="", sort="lastupdated", sort_order="DESC", search_filter="", page=1, page_size=100): + """ + Get workflow details. + + Parameters + ---------- + service_name: String + Service name + defined_var_flag: Boolean + Flag to get only WF defined Vars or all vars + status: String + Status of the workflow, default is empty string + sort: String + Sort by field, default is lastupdated + sort_order: String + Sort order, default is DESC + search_filter: String + Search filter, default is empty string + page: Integer + Page number, default is 1 + page_size: Integer + Page size, default is 100 + + Returns + ------- + None + + """ + self.action = 'Get workflow details' + self.path = "{}/{}/workflow/details?serviceName={}&definedVarFlag={}&status={}&sort={}&sort_order={}&search_filter={}&page={}&page_size={}".format( + self.api_path_v2, self.ubiqube_id, service_name, defined_var_flag, status, sort, sort_order, search_filter, page, page_size) + self._call_get() def read_service_instance(self, service_id):