diff --git a/src/codeocean/capsule.py b/src/codeocean/capsule.py index 758a670..ddff71d 100644 --- a/src/codeocean/capsule.py +++ b/src/codeocean/capsule.py @@ -5,7 +5,13 @@ from typing import Optional, Iterator from requests_toolbelt.sessions import BaseUrlSession -from codeocean.components import Ownership, SortOrder, SearchFilter, Permissions +from codeocean.components import ( + Ownership, + SortOrder, + SearchFilter, + Permissions, + AppPanel, +) from codeocean.computation import Computation from codeocean.data_asset import DataAssetAttachParams, DataAssetAttachResults from codeocean.enum import StrEnum @@ -26,29 +32,6 @@ class CapsuleSortBy(StrEnum): Name = "name" -class AppPanelDataAssetKind(StrEnum): - """The kind of data asset displayed in an app panel. - - - 'Internal' → Data stored inside Code Ocean. - - 'External' → Data stored external to Code Ocean. - - 'Combined' → Data containing multiple external data assets. - - In pipelines, a data asset can only be replaced with one of the same kind. - """ - - Internal = "internal" - External = "external" - Combined = "combined" - - -class AppPanelParameterType(StrEnum): - """The type of parameter displayed in an app panel.""" - - Text = "text" - List = "list" - File = "file" - - @dataclass_json @dataclass(frozen=True) class OriginalCapsuleInfo: @@ -272,199 +255,6 @@ class CapsuleSearchResults: ) -@dataclass_json -@dataclass(frozen=True) -class AppPanelCategories: - """Categories for a capsule's App Panel parameters.""" - - id: str = dataclass_field( - metadata={"description": "Unique identifier for the category."}, - ) - name: str = dataclass_field( - metadata={"description": "Human-readable name of the category."}, - ) - description: Optional[str] = dataclass_field( - default=None, - metadata={"description": "Optional detailed description of the category."}, - ) - help_text: Optional[str] = dataclass_field( - default=None, - metadata={"description": "Optional help text providing guidance or additional information about the category."}, - ) - - -@dataclass_json -@dataclass(frozen=True) -class AppPanelParameters: - """Parameters for a capsule's App Panel.""" - - name: str = dataclass_field( - metadata={"description": "Parameter label/display name."} - ) - type: AppPanelParameterType = dataclass_field( - metadata={"description": "Type of the parameter (text, list, file)."} - ) - category: Optional[str] = dataclass_field( - default=None, - metadata={"description": "ID of category the parameter belongs to."} - ) - param_name: Optional[str] = dataclass_field( - default=None, - metadata={"description": "The parameter name/argument key"} - ) - description: Optional[str] = dataclass_field( - default=None, - metadata={"description": "Description of the parameter."} - ) - help_text: Optional[str] = dataclass_field( - default=None, - metadata={"description": "Help text for the parameter."} - ) - value_type: Optional[str] = dataclass_field( - default=None, - metadata={"description": "Value type of the parameter."} - ) - default_value: Optional[str] = dataclass_field( - default=None, - metadata={"description": "Default value of the parameter."} - ) - required: Optional[bool] = dataclass_field( - default=None, - metadata={"description": "Indicates if the parameter is required."} - ) - hidden: Optional[bool] = dataclass_field( - default=None, - metadata={"description": "Indicates if the parameter is hidden."} - ) - minimum: Optional[float] = dataclass_field( - default=None, - metadata={"description": "Minimum value for the parameter."} - ) - maximum: Optional[float] = dataclass_field( - default=None, - metadata={"description": "Maximum value for the parameter."} - ) - pattern: Optional[str] = dataclass_field( - default=None, - metadata={"description": "Regular expression pattern for the parameter."} - ) - value_options: Optional[list[str]] = dataclass_field( - default=None, - metadata={"description": "Allowed values for the parameter."} - ) - - -@dataclass_json -@dataclass(frozen=True) -class AppPanelGeneral: - """General information about a capsule's App Panel.""" - - title: Optional[str] = dataclass_field( - default=None, - metadata={"description": "Title of the App Panel."} - ) - instructions: Optional[str] = dataclass_field( - default=None, - metadata={"description": "Instructions for using the App Panel."} - ) - help_text: Optional[str] = dataclass_field( - default=None, - metadata={"description": "Help text for the App Panel."} - ) - - -@dataclass_json -@dataclass(frozen=True) -class AppPanelDataAsset: - """Data asset parameter for the App Panel.""" - - id: str = dataclass_field( - metadata={"description": "Unique identifier for the data asset."} - ) - mount: str = dataclass_field( - metadata={"description": "Mount path of the data asset within the capsule. " - "Use this mount path to replace the currently attached data asset with your own"} - ) - name: str = dataclass_field( - metadata={"description": "Display name of the data asset."} - ) - kind: AppPanelDataAssetKind = dataclass_field( - metadata={"description": "Kind of the data asset (internal, external, combined)."} - ) - accessible: bool = dataclass_field( - metadata={"description": "Indicates if the data asset is accessible to the user."} - ) - description: Optional[str] = dataclass_field( - default=None, - metadata={"description": "Optional description of the data asset parameter."} - ) - help_text: Optional[str] = dataclass_field( - default=None, - metadata={"description": "Optional help text for the data asset parameter."} - ) - - -@dataclass_json -@dataclass(frozen=True) -class AppPanelResult: - """Selected result files to display once the computation is complete.""" - - file_name: str = dataclass_field( - metadata={"description": "Name of the result file."} - ) - - -@dataclass_json -@dataclass(frozen=True) -class AppPanelProcess: - """Pipeline process name and its corresponding app panel (for pipelines of capsules only)""" - - name: str = dataclass_field( - metadata={"description": "Name of the pipeline process."} - ) - categories: Optional[AppPanelCategories] = dataclass_field( - default=None, - metadata={"description": "Categories for the pipeline process's app panel parameters."} - ) - parameters: Optional[AppPanelParameters] = dataclass_field( - default=None, - metadata={"description": "Parameters for the pipeline process's app panel."} - ) - - -@dataclass_json -@dataclass(frozen=True) -class AppPanel: - """App Panel configuration for a capsule or pipeline, including general info, data assets, - categories, parameters, and results. - """ - - general: Optional[AppPanelGeneral] = dataclass_field( - default=None, - metadata={"description": "General information about the App Panel."} - ) - data_assets: Optional[list[AppPanelDataAsset]] = dataclass_field( - default=None, - metadata={"description": "List of data assets used in the App Panel."} - ) - categories: Optional[list[AppPanelCategories]] = dataclass_field( - default=None, - metadata={"description": "Categories for organizing App Panel parameters."} - ) - parameters: Optional[list[AppPanelParameters]] = dataclass_field( - default=None, - metadata={"description": "Parameters for the App Panel."} - ) - results: Optional[list[AppPanelResult]] = dataclass_field( - default=None, - metadata={"description": "Result files to display after computation."} - ) - processes: Optional[list[AppPanelProcess]] = dataclass_field( - default=None, - metadata={"description": "Pipeline processes and their App Panels."} - ) - - @dataclass class Capsules: """Client for interacting with Code Ocean capsule APIs.""" @@ -547,24 +337,3 @@ def search_capsules_iterator(self, search_params: CapsuleSearchParams) -> Iterat return params["next_token"] = response.next_token - - def search_pipelines(self, search_params: CapsuleSearchParams) -> CapsuleSearchResults: - """Search for pipelines with filtering, sorting, and pagination - options.""" - res = self.client.post("pipelines/search", json=search_params.to_dict()) - - return CapsuleSearchResults.from_dict(res.json()) - - def search_pipelines_iterator(self, search_params: CapsuleSearchParams) -> Iterator[Capsule]: - """Iterate through all pipelines matching search criteria with automatic pagination.""" - params = search_params.to_dict() - while True: - response = self.search_pipelines(search_params=CapsuleSearchParams(**params)) - - for result in response.results: - yield result - - if not response.has_more: - return - - params["next_token"] = response.next_token diff --git a/src/codeocean/client.py b/src/codeocean/client.py index 607c801..8caadf1 100644 --- a/src/codeocean/client.py +++ b/src/codeocean/client.py @@ -12,6 +12,7 @@ from codeocean.custom_metadata import CustomMetadataSchema from codeocean.data_asset import DataAssets from codeocean.error import Error +from codeocean.pipeline import Pipelines @dataclass @@ -55,6 +56,7 @@ def __post_init__(self): self.computations = Computations(client=self.session) self.custom_metadata = CustomMetadataSchema(client=self.session) self.data_assets = DataAssets(client=self.session) + self.pipelines = Pipelines(client=self.session) def _error_handler(self, response, *args, **kwargs): try: diff --git a/src/codeocean/components.py b/src/codeocean/components.py index f580d64..693422b 100644 --- a/src/codeocean/components.py +++ b/src/codeocean/components.py @@ -146,3 +146,219 @@ class Ownership(StrEnum): Private = "private" Shared = "shared" Created = "created" + + +class AppPanelDataAssetKind(StrEnum): + """The kind of data asset displayed in an app panel. + + - 'Internal' → Data stored inside Code Ocean. + - 'External' → Data stored external to Code Ocean. + - 'Combined' → Data containing multiple external data assets. + + In pipelines, a data asset can only be replaced with one of the same kind. + """ + + Internal = "internal" + External = "external" + Combined = "combined" + + +class AppPanelParameterType(StrEnum): + """The type of parameter displayed in an app panel.""" + + Text = "text" + List = "list" + File = "file" + + +@dataclass_json +@dataclass(frozen=True) +class AppPanelCategories: + """Categories for a capsule's App Panel parameters.""" + + id: str = field( + metadata={"description": "Unique identifier for the category."}, + ) + name: str = field( + metadata={"description": "Human-readable name of the category."}, + ) + description: Optional[str] = field( + default=None, + metadata={"description": "Optional detailed description of the category."}, + ) + help_text: Optional[str] = field( + default=None, + metadata={"description": "Optional help text providing guidance or additional information about the category."}, + ) + + +@dataclass_json +@dataclass(frozen=True) +class AppPanelParameters: + """Parameters for a capsule's App Panel.""" + + name: str = field( + metadata={"description": "Parameter label/display name."} + ) + type: AppPanelParameterType = field( + metadata={"description": "Type of the parameter (text, list, file)."} + ) + category: Optional[str] = field( + default=None, + metadata={"description": "ID of category the parameter belongs to."} + ) + param_name: Optional[str] = field( + default=None, + metadata={"description": "The parameter name/argument key"} + ) + description: Optional[str] = field( + default=None, + metadata={"description": "Description of the parameter."} + ) + help_text: Optional[str] = field( + default=None, + metadata={"description": "Help text for the parameter."} + ) + value_type: Optional[str] = field( + default=None, + metadata={"description": "Value type of the parameter."} + ) + default_value: Optional[str] = field( + default=None, + metadata={"description": "Default value of the parameter."} + ) + required: Optional[bool] = field( + default=None, + metadata={"description": "Indicates if the parameter is required."} + ) + hidden: Optional[bool] = field( + default=None, + metadata={"description": "Indicates if the parameter is hidden."} + ) + minimum: Optional[float] = field( + default=None, + metadata={"description": "Minimum value for the parameter."} + ) + maximum: Optional[float] = field( + default=None, + metadata={"description": "Maximum value for the parameter."} + ) + pattern: Optional[str] = field( + default=None, + metadata={"description": "Regular expression pattern for the parameter."} + ) + value_options: Optional[list[str]] = field( + default=None, + metadata={"description": "Allowed values for the parameter."} + ) + + +@dataclass_json +@dataclass(frozen=True) +class AppPanelGeneral: + """General information about a capsule's App Panel.""" + + title: Optional[str] = field( + default=None, + metadata={"description": "Title of the App Panel."} + ) + instructions: Optional[str] = field( + default=None, + metadata={"description": "Instructions for using the App Panel."} + ) + help_text: Optional[str] = field( + default=None, + metadata={"description": "Help text for the App Panel."} + ) + + +@dataclass_json +@dataclass(frozen=True) +class AppPanelDataAsset: + """Data asset parameter for the App Panel.""" + + id: str = field( + metadata={"description": "Unique identifier for the data asset."} + ) + mount: str = field( + metadata={"description": "Mount path of the data asset within the capsule. " + "Use this mount path to replace the currently attached data asset with your own"} + ) + name: str = field( + metadata={"description": "Display name of the data asset."} + ) + kind: AppPanelDataAssetKind = field( + metadata={"description": "Kind of the data asset (internal, external, combined)."} + ) + accessible: bool = field( + metadata={"description": "Indicates if the data asset is accessible to the user."} + ) + description: Optional[str] = field( + default=None, + metadata={"description": "Optional description of the data asset parameter."} + ) + help_text: Optional[str] = field( + default=None, + metadata={"description": "Optional help text for the data asset parameter."} + ) + + +@dataclass_json +@dataclass(frozen=True) +class AppPanelResult: + """Selected result files to display once the computation is complete.""" + + file_name: str = field( + metadata={"description": "Name of the result file."} + ) + + +@dataclass_json +@dataclass(frozen=True) +class AppPanelProcess: + """Pipeline process name and its corresponding app panel (for pipelines of capsules only)""" + + name: str = field( + metadata={"description": "Name of the pipeline process."} + ) + categories: Optional[AppPanelCategories] = field( + default=None, + metadata={"description": "Categories for the pipeline process's app panel parameters."} + ) + parameters: Optional[AppPanelParameters] = field( + default=None, + metadata={"description": "Parameters for the pipeline process's app panel."} + ) + + +@dataclass_json +@dataclass(frozen=True) +class AppPanel: + """App Panel configuration for a capsule or pipeline, including general info, data assets, + categories, parameters, and results. + """ + + general: Optional[AppPanelGeneral] = field( + default=None, + metadata={"description": "General information about the App Panel."} + ) + data_assets: Optional[list[AppPanelDataAsset]] = field( + default=None, + metadata={"description": "List of data assets used in the App Panel."} + ) + categories: Optional[list[AppPanelCategories]] = field( + default=None, + metadata={"description": "Categories for organizing App Panel parameters."} + ) + parameters: Optional[list[AppPanelParameters]] = field( + default=None, + metadata={"description": "Parameters for the App Panel."} + ) + results: Optional[list[AppPanelResult]] = field( + default=None, + metadata={"description": "Result files to display after computation."} + ) + processes: Optional[list[AppPanelProcess]] = field( + default=None, + metadata={"description": "Pipeline processes and their App Panels."} + ) diff --git a/src/codeocean/computation.py b/src/codeocean/computation.py index b0d94eb..7506d2f 100644 --- a/src/codeocean/computation.py +++ b/src/codeocean/computation.py @@ -274,6 +274,19 @@ def run_capsule(self, run_params: RunParams) -> Computation: return Computation.from_dict(res.json()) + def run_pipeline(self, run_params: RunParams) -> Computation: + """ + Execute a pipeline with specified parameters and data assets. + + Set run_params.pipeline_id and optionally provide data_assets, + processes (with process-specific parameters), and nextflow_profile configuration. + + This is a convenience method that calls the same endpoint as run_capsule. + """ + res = self.client.post("computations", json=run_params.to_dict()) + + return Computation.from_dict(res.json()) + def wait_until_completed( self, computation: Computation, diff --git a/src/codeocean/data_asset.py b/src/codeocean/data_asset.py index 89c0a95..5066bdd 100644 --- a/src/codeocean/data_asset.py +++ b/src/codeocean/data_asset.py @@ -300,6 +300,12 @@ class AWSS3Source: "description": "The S3 bucket from which the data asset will be created", }, ) + endpoint_name: Optional[str] = field( + default=None, + metadata={ + "description": "The name of the custom S3 endpoint where the bucket is stored", + }, + ) prefix: Optional[str] = field( default=None, metadata={ @@ -318,6 +324,13 @@ class AWSS3Source: "description": "When true, Code Ocean will access the source bucket without credentials", }, ) + use_input_bucket: Optional[bool] = field( + default=None, + metadata={ + "description": "When true, Code Ocean will try to create the dataset from an internal " + "input bucket. All properties are ignored except for prefix. Only allowed to Admin users.", + }, + ) @dataclass_json @@ -396,6 +409,10 @@ class AWSS3Target: bucket: str = field( metadata={"description": "The S3 bucket where the data asset will be stored"}, ) + endpoint_name: Optional[str] = field( + default=None, + metadata={"description": "The name of the custom S3 endpoint where the bucket is stored"}, + ) prefix: Optional[str] = field( default=None, metadata={ diff --git a/src/codeocean/pipeline.py b/src/codeocean/pipeline.py new file mode 100644 index 0000000..30ab506 --- /dev/null +++ b/src/codeocean/pipeline.py @@ -0,0 +1,339 @@ +from __future__ import annotations + +from dataclasses import dataclass, field as dataclass_field +from dataclasses_json import dataclass_json +from typing import Iterator, Optional +from requests_toolbelt.sessions import BaseUrlSession + +from codeocean.components import ( + Ownership, + SortOrder, + SearchFilter, + Permissions, + AppPanel, +) +from codeocean.computation import Computation +from codeocean.data_asset import DataAssetAttachParams, DataAssetAttachResults +from codeocean.enum import StrEnum + + +class PipelineStatus(StrEnum): + """Status of a pipeline indicating its release state.""" + + NonRelease = "non_release" + Release = "release" + + +class PipelineSortBy(StrEnum): + """Fields available for sorting pipeline search results.""" + + Created = "created" + LastAccessed = "last_accessed" + Name = "name" + + +@dataclass_json +@dataclass(frozen=True) +class OriginalPipelineInfo: + """Information about the original pipeline when this pipeline is duplicated from + another.""" + + id: Optional[str] = dataclass_field( + default=None, + metadata={"description": "Original pipeline ID"}, + ) + major_version: Optional[int] = dataclass_field( + default=None, + metadata={"description": "Original pipeline major version"}, + ) + minor_version: Optional[int] = dataclass_field( + default=None, + metadata={"description": "Original pipeline minor version"}, + ) + name: Optional[str] = dataclass_field( + default=None, + metadata={"description": "Original pipeline name"}, + ) + created: Optional[int] = dataclass_field( + default=None, + metadata={"description": "Original pipeline creation time (int64 timestamp)"}, + ) + public: Optional[bool] = dataclass_field( + default=None, + metadata={"description": "Indicates whether the original pipeline is public"}, + ) + + +@dataclass_json +@dataclass(frozen=True) +class Pipeline: + """Represents a Code Ocean pipeline with its metadata and properties.""" + + id: str = dataclass_field( + metadata={"description": "Pipeline ID"}, + ) + created: int = dataclass_field( + metadata={"description": "Pipeline creation time (int64 timestamp)"}, + ) + name: str = dataclass_field( + metadata={"description": "Pipeline display name"}, + ) + status: PipelineStatus = dataclass_field( + metadata={"description": "Status of the pipeline (non_release or release)"}, + ) + owner: str = dataclass_field( + metadata={"description": "Pipeline owner's ID"}, + ) + slug: str = dataclass_field( + metadata={"description": "Alternate pipeline ID (URL-friendly identifier)"}, + ) + last_accessed: Optional[int] = dataclass_field( + default=None, + metadata={"description": "Pipeline last accessed time (int64 timestamp)"}, + ) + article: Optional[dict] = dataclass_field( + default=None, + metadata={ + "description": "Pipeline article info with URL, ID, DOI, " + "citation, state, name, journal_name, and " + "publish_time" + }, + ) + cloned_from_url: Optional[str] = dataclass_field( + default=None, + metadata={"description": "URL to external Git repository linked to pipeline"}, + ) + description: Optional[str] = dataclass_field( + default=None, + metadata={"description": "Pipeline description"}, + ) + field: Optional[str] = dataclass_field( + default=None, + metadata={"description": "Pipeline research field"}, + ) + tags: Optional[list[str]] = dataclass_field( + default=None, + metadata={"description": "List of tags associated with the pipeline"}, + ) + original_pipeline: Optional[OriginalPipelineInfo] = dataclass_field( + default=None, + metadata={ + "description": "Original pipeline info when this pipeline is duplicated from another" + }, + ) + release_pipeline: Optional[str] = dataclass_field( + default=None, + metadata={"description": "Release pipeline ID"}, + ) + submission: Optional[dict] = dataclass_field( + default=None, + metadata={ + "description": "Submission info with timestamp, commit hash, " + "verification_pipeline, verified status, and " + "verified_timestamp" + }, + ) + versions: Optional[list[dict]] = dataclass_field( + default=None, + metadata={ + "description": "Pipeline versions with major_version, minor_version, release_time, and DOI" + }, + ) + + +@dataclass_json +@dataclass(frozen=True) +class PipelineSearchParams: + """Parameters for searching pipelines with various filters and pagination + options.""" + + query: Optional[str] = dataclass_field( + default=None, + metadata={ + "description": """Search expression supporting free text and field:value filters. + Valid fields: + - id + - name + - doi + - tag + - field + - affiliation + - journal + - article + - author + + Free text: + - Matches across weighted fields (name, tags, description, authors, etc.) + + Syntax rules: + - Same field repeated = OR + - Different fields = AND + - Quotes = exact phrase + - No explicit OR operator + - No wildcards (*) + - Not case sensitive + + Notes: + - "description" is not directly searchable; it is covered by free-text matching. + + Examples: + - name:RNA-seq tag:genomics + - name:"single cell analysis" + - Synergy + - name:Synergy + """ + }, + ) + next_token: Optional[str] = dataclass_field( + default=None, + metadata={ + "description": "Token for next page of results from previous response" + }, + ) + offset: Optional[int] = dataclass_field( + default=None, + metadata={ + "description": "Starting index for search results (ignored if next_token is set)" + }, + ) + limit: Optional[int] = dataclass_field( + default=None, + metadata={ + "description": "Number of items to return (up to 1000, defaults to 100)" + }, + ) + sort_field: Optional[PipelineSortBy] = dataclass_field( + default=None, + metadata={"description": "Field to sort by (created, name, or last_accessed)"}, + ) + sort_order: Optional[SortOrder] = dataclass_field( + default=None, + metadata={ + "description": "Sort order ('asc' or 'desc') - must be provided with a sort_field parameter as well!" + }, + ) + ownership: Optional[Ownership] = dataclass_field( + default=None, + metadata={ + "description": "Filter by ownership ('private', 'created' or 'shared') - defaults to all accessible" + }, + ) + status: Optional[PipelineStatus] = dataclass_field( + default=None, + metadata={"description": "Filter by status (release or non_release) - defaults to all"}, + ) + favorite: Optional[bool] = dataclass_field( + default=None, + metadata={"description": "Search only favorite pipelines"}, + ) + archived: Optional[bool] = dataclass_field( + default=None, + metadata={"description": "Search only archived pipelines"}, + ) + filters: Optional[list[SearchFilter]] = dataclass_field( + default=None, + metadata={ + "description": "Additional field-level filters for name, description, tags, or custom fields" + }, + ) + + +@dataclass_json +@dataclass(frozen=True) +class PipelineSearchResults: + """Results from a pipeline search operation with pagination support.""" + + has_more: bool = dataclass_field( + metadata={"description": "Indicates if there are more results available"}, + ) + results: list[Pipeline] = dataclass_field( + metadata={"description": "Array of pipelines found matching the search criteria"}, + ) + next_token: Optional[str] = dataclass_field( + default=None, + metadata={"description": "Token for fetching the next page of results"}, + ) + + +@dataclass +class Pipelines: + """Client for interacting with Code Ocean pipeline APIs.""" + + client: BaseUrlSession + + def get_pipeline(self, pipeline_id: str) -> Pipeline: + """Retrieve metadata for a specific pipeline by its ID.""" + res = self.client.get(f"pipelines/{pipeline_id}") + + return Pipeline.from_dict(res.json()) + + def delete_pipeline(self, pipeline_id: str): + """Delete a pipeline permanently.""" + self.client.delete(f"pipelines/{pipeline_id}") + + def get_pipeline_app_panel(self, pipeline_id: str, version: int | None = None) -> AppPanel: + """Retrieve app panel information for a specific pipeline by its ID.""" + res = self.client.get(f"pipelines/{pipeline_id}/app_panel", params={"version": version} if version else None) + + return AppPanel.from_dict(res.json()) + + def list_computations(self, pipeline_id: str) -> list[Computation]: + """Get all computations associated with a specific pipeline.""" + res = self.client.get(f"pipelines/{pipeline_id}/computations") + + return [Computation.from_dict(c) for c in res.json()] + + def update_permissions(self, pipeline_id: str, permissions: Permissions): + """Update permissions for a pipeline.""" + self.client.post( + f"pipelines/{pipeline_id}/permissions", + json=permissions.to_dict(), + ) + + def attach_data_assets( + self, + pipeline_id: str, + attach_params: list[DataAssetAttachParams], + ) -> list[DataAssetAttachResults]: + """Attach one or more data assets to a pipeline with optional mount paths.""" + res = self.client.post( + f"pipelines/{pipeline_id}/data_assets", + json=[j.to_dict() for j in attach_params], + ) + + return [DataAssetAttachResults.from_dict(c) for c in res.json()] + + def detach_data_assets(self, pipeline_id: str, data_assets: list[str]): + """Detach one or more data assets from a pipeline by their IDs.""" + self.client.delete( + f"pipelines/{pipeline_id}/data_assets/", + json=data_assets, + ) + + def archive_pipeline(self, pipeline_id: str, archive: bool): + """Archive or unarchive a pipeline to control its visibility and accessibility.""" + self.client.patch( + f"pipelines/{pipeline_id}/archive", + params={"archive": archive}, + ) + + def search_pipelines(self, search_params: PipelineSearchParams) -> PipelineSearchResults: + """Search for pipelines with filtering, sorting, and pagination + options.""" + res = self.client.post("pipelines/search", json=search_params.to_dict()) + + return PipelineSearchResults.from_dict(res.json()) + + def search_pipelines_iterator(self, search_params: PipelineSearchParams) -> Iterator[Pipeline]: + """Iterate through all pipelines matching search criteria with automatic pagination.""" + params = search_params.to_dict() + while True: + response = self.search_pipelines(search_params=PipelineSearchParams(**params)) + + for result in response.results: + yield result + + if not response.has_more: + return + + params["next_token"] = response.next_token