diff --git a/airbyte/_util/api_util.py b/airbyte/_util/api_util.py index 49789394c..650de345a 100644 --- a/airbyte/_util/api_util.py +++ b/airbyte/_util/api_util.py @@ -1825,8 +1825,8 @@ def get_connector_builder_project_for_definition_id( client_id: SecretString | None, client_secret: SecretString | None, bearer_token: SecretString | None, -) -> str | None: - """Get the connector builder project ID for a declarative source definition. +) -> dict[str, Any]: + """Get the connector builder project info for a declarative source definition. Uses the Config API endpoint: /v1/connector_builder_projects/get_for_definition_id @@ -1842,9 +1842,10 @@ def get_connector_builder_project_for_definition_id( bearer_token: Bearer token for authentication (alternative to client credentials). Returns: - The builder project ID if found, None otherwise (can be null in API response) + A dict containing 'builderProjectId' and 'workspaceId' (the workspace that + owns the builder project, which may differ from the caller's workspace). """ - json_result = _make_config_api_request( + return _make_config_api_request( path="/connector_builder_projects/get_for_definition_id", json={ "actorDefinitionId": definition_id, @@ -1855,7 +1856,49 @@ def get_connector_builder_project_for_definition_id( client_secret=client_secret, bearer_token=bearer_token, ) - return json_result.get("builderProjectId") + + +def get_connector_builder_project( + *, + workspace_id: str, + builder_project_id: str, + api_root: str, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, +) -> dict[str, Any]: + """Get a connector builder project, including the draft manifest if one exists. + + Uses the Config API endpoint: + /v1/connector_builder_projects/get_with_manifest + + See: https://github.com/airbytehq/airbyte-platform-internal/blob/master/oss/airbyte-api/server-api/src/main/openapi/config.yaml#L1253 + + Args: + workspace_id: The workspace ID + builder_project_id: The connector builder project ID + api_root: The API root URL + client_id: OAuth client ID + client_secret: OAuth client secret + bearer_token: Bearer token for authentication (alternative to client credentials). + + Returns: + A dictionary containing the builder project details. Key fields include: + - builderProject: The project metadata (name, hasDraft, etc.) + - declarativeManifest: The draft manifest data (if hasDraft is True), + which contains a 'manifest' field with the actual YAML manifest dict. + """ + return _make_config_api_request( + path="/connector_builder_projects/get_with_manifest", + json={ + "workspaceId": workspace_id, + "builderProjectId": builder_project_id, + }, + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + bearer_token=bearer_token, + ) def update_connector_builder_project_testing_values( diff --git a/airbyte/cloud/connectors.py b/airbyte/cloud/connectors.py index 094b904ce..38df5a58f 100644 --- a/airbyte/cloud/connectors.py +++ b/airbyte/cloud/connectors.py @@ -372,6 +372,9 @@ def __init__( self.definition_type: Literal["yaml", "docker"] = definition_type self._definition_info: api_models.DeclarativeSourceDefinitionResponse | None = None self._connector_builder_project_id: str | None = None + self._connector_builder_project_id_fetched: bool = False + self._builder_project_workspace_id: str | None = None + self._builder_project_data: dict[str, Any] | None = None def _fetch_definition_info( self, @@ -461,19 +464,22 @@ def connector_builder_project_id(self) -> str | None: if self.definition_type != "yaml": return None - if self._connector_builder_project_id is not None: + if self._connector_builder_project_id_fetched: return self._connector_builder_project_id - self._connector_builder_project_id = ( - api_util.get_connector_builder_project_for_definition_id( - workspace_id=self.workspace.workspace_id, - definition_id=self.definition_id, - api_root=self.workspace.api_root, - client_id=self.workspace.client_id, - client_secret=self.workspace.client_secret, - bearer_token=self.workspace.bearer_token, - ) + result = api_util.get_connector_builder_project_for_definition_id( + workspace_id=self.workspace.workspace_id, + definition_id=self.definition_id, + api_root=self.workspace.api_root, + client_id=self.workspace.client_id, + client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, ) + self._connector_builder_project_id = result.get("builderProjectId") + self._connector_builder_project_id_fetched = True + # The builder project may live in a different workspace than the caller's. + # We must use the project's owning workspace ID when fetching its data. + self._builder_project_workspace_id = result.get("workspaceId") return self._connector_builder_project_id @@ -489,6 +495,107 @@ def connector_builder_project_url(self) -> str | None: return f"{self.workspace.workspace_url}/connector-builder/edit/{project_id}" + def get_builder_project_data( + self, + *, + use_cache: bool = True, + ) -> dict[str, Any]: + """Fetch the full connector builder project data, including draft manifest if present. + + This calls the `/v1/connector_builder_projects/get_with_manifest` endpoint which returns + the project metadata and draft manifest (if one exists). + + Args: + use_cache: If True, return cached data from a previous call if available. + Set to False to force a fresh API request. Defaults to True. + + Returns: + A dictionary containing the builder project details. Key fields include: + - builderProject: The project metadata (name, hasDraft, + activeDeclarativeManifestVersion, etc.) + - declarativeManifest: The draft manifest data (if hasDraft is True), + which contains a 'manifest' field with the actual YAML manifest dict. + + Raises: + NotImplementedError: If this is not a YAML custom source definition. + PyAirbyteInputError: If the connector builder project ID cannot be found. + """ + if self.definition_type != "yaml": + raise NotImplementedError( + "Builder project data is only available for YAML custom source definitions. " + "Docker custom sources are not yet supported." + ) + + if use_cache and self._builder_project_data is not None: + return self._builder_project_data + + builder_project_id = self.connector_builder_project_id + if not builder_project_id: + raise exc.PyAirbyteInputError( + message="Could not find connector builder project ID for this definition.", + context={ + "definition_id": self.definition_id, + "workspace_id": self.workspace.workspace_id, + }, + ) + + self._builder_project_data = api_util.get_connector_builder_project( + workspace_id=self._builder_project_workspace_id or self.workspace.workspace_id, + builder_project_id=builder_project_id, + api_root=self.workspace.api_root, + client_id=self.workspace.client_id, + client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, + ) + return self._builder_project_data + + @property + def has_draft(self) -> bool | None: + """Check whether this definition has an unpublished draft in Connector Builder. + + Returns: + True if a draft exists, False if no draft exists, + or None if this is not a YAML connector or the project ID is unavailable. + """ + if self.definition_type != "yaml": + return None + + if not self.connector_builder_project_id: + return None + + project_data = self.get_builder_project_data() + builder_project = project_data.get("builderProject", {}) + return builder_project.get("hasDraft", False) + + @property + def draft_manifest(self) -> dict[str, Any] | None: + """Get the draft (unpublished) manifest from the Connector Builder, if one exists. + + This reads the working draft that has been saved in the Connector Builder UI + but not yet published. Returns None if no draft exists or if this is not a + YAML connector. + + Returns: + The draft manifest as a dictionary, or None if no draft exists. + """ + if self.definition_type != "yaml": + return None + + if not self.connector_builder_project_id: + return None + + project_data = self.get_builder_project_data() + builder_project = project_data.get("builderProject", {}) + if not builder_project.get("hasDraft", False): + return None + + declarative_manifest = project_data.get("declarativeManifest", {}) + manifest = declarative_manifest.get("manifest") + if isinstance(manifest, dict): + return manifest + + return None + @property def definition_url(self) -> str: """Get the web URL of the custom source definition. diff --git a/airbyte/mcp/cloud.py b/airbyte/mcp/cloud.py index 2fd26d2eb..594bee0db 100644 --- a/airbyte/mcp/cloud.py +++ b/airbyte/mcp/cloud.py @@ -1721,11 +1721,23 @@ def get_custom_source_definition( default=None, ), ], + include_draft: Annotated[ + bool, + Field( + description=( + "Whether to include the Connector Builder draft manifest in the response. " + "If True and a draft exists, the response will include 'has_draft' and " + "'draft_manifest' fields. Defaults to False." + ), + default=False, + ), + ] = False, ) -> dict[str, Any]: """Get a custom YAML source definition from Airbyte Cloud, including its manifest. - Returns the full definition details including the manifest YAML content, - which can be used to inspect or store the connector configuration locally. + Returns the full definition details including the published manifest YAML content. + Optionally includes the Connector Builder draft manifest (unpublished changes) + when include_draft=True. Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available. @@ -1736,7 +1748,7 @@ def get_custom_source_definition( definition_type="yaml", ) - return { + result: dict[str, Any] = { "definition_id": definition.definition_id, "name": definition.name, "version": definition.version, @@ -1745,6 +1757,58 @@ def get_custom_source_definition( "manifest": definition.manifest, } + if include_draft: + result["has_draft"] = definition.has_draft + result["draft_manifest"] = definition.draft_manifest + + return result + + +@mcp_tool( + read_only=True, + idempotent=True, + open_world=True, +) +def get_connector_builder_draft_manifest( + ctx: Context, + definition_id: Annotated[ + str, + Field(description="The ID of the custom source definition to retrieve the draft for."), + ], + *, + workspace_id: Annotated[ + str | None, + Field( + description=WORKSPACE_ID_TIP_TEXT, + default=None, + ), + ], +) -> dict[str, Any]: + """Get the Connector Builder draft manifest for a custom source definition. + + Returns the working draft manifest that has been saved in the Connector Builder UI + but not yet published. This is useful for inspecting what a user is currently working + on before they publish their changes. + + If no draft exists, 'has_draft' will be False and 'draft_manifest' will be None. + The published manifest is always included for comparison. + """ + workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) + definition = workspace.get_custom_source_definition( + definition_id=definition_id, + definition_type="yaml", + ) + + return { + "definition_id": definition.definition_id, + "name": definition.name, + "connector_builder_project_id": definition.connector_builder_project_id, + "connector_builder_project_url": definition.connector_builder_project_url, + "has_draft": definition.has_draft, + "draft_manifest": definition.draft_manifest, + "published_manifest": definition.manifest, + } + @mcp_tool( destructive=True, @@ -2571,14 +2635,54 @@ def get_connection_artifact( return result +def _add_defaults_for_exclude_args( + exclude_args: list[str], +) -> None: + """Patch registered tool functions to add Python-level defaults for excluded args. + + FastMCP requires that excluded args have Python-level default values, but MCP tool + functions should only use Field(default=...) in their Annotated type hints (not + Python-level `= None`). This function bridges the gap by dynamically adding Python + defaults to the function signatures at registration time, so the source code stays + clean while satisfying FastMCP's requirement. + + Args: + exclude_args: List of argument names that will be excluded from the tool schema. + """ + import inspect # noqa: PLC0415 # Local import for optional patching logic + + from fastmcp_extensions.decorators import ( # noqa: PLC0415 + _REGISTERED_TOOLS, # noqa: PLC2701 + ) + + for func, _annotations in _REGISTERED_TOOLS: + sig = inspect.signature(func) + needs_patch = any( + arg_name in sig.parameters + and sig.parameters[arg_name].default is inspect.Parameter.empty + for arg_name in exclude_args + ) + if needs_patch: + new_params = [ + p.replace(default=None) + if name in exclude_args and p.default is inspect.Parameter.empty + else p + for name, p in sig.parameters.items() + ] + func.__signature__ = sig.replace(parameters=new_params) # type: ignore[attr-defined] + + def register_cloud_tools(app: FastMCP) -> None: """Register cloud tools with the FastMCP app. Args: app: FastMCP application instance """ + exclude_args = ["workspace_id"] if AIRBYTE_CLOUD_WORKSPACE_ID_IS_SET else None + if exclude_args: + _add_defaults_for_exclude_args(exclude_args) register_mcp_tools( app, mcp_module=__name__, - exclude_args=["workspace_id"] if AIRBYTE_CLOUD_WORKSPACE_ID_IS_SET else None, + exclude_args=exclude_args, ) diff --git a/tests/integration_tests/cloud/test_cloud_sync.py b/tests/integration_tests/cloud/test_cloud_sync.py index 821208fc3..3989c147f 100644 --- a/tests/integration_tests/cloud/test_cloud_sync.py +++ b/tests/integration_tests/cloud/test_cloud_sync.py @@ -45,9 +45,13 @@ def test_get_previous_sync_result( pre_created_connection_id: str, ) -> None: """Test running a connection.""" - sync_result: SyncResult = cloud_workspace.get_connection( + previous_sync_logs = cloud_workspace.get_connection( connection_id=pre_created_connection_id, - ).get_previous_sync_logs()[0] + ).get_previous_sync_logs() + if not previous_sync_logs: + pytest.skip("No previous sync logs found for this connection.") + + sync_result: SyncResult = previous_sync_logs[0] assert sync_result.is_job_complete() assert sync_result.get_job_status() assert sync_result.stream_names