Skip to content
Merged
53 changes: 48 additions & 5 deletions airbyte/_util/api_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -1825,8 +1825,8 @@ def get_connector_builder_project_for_definition_id(
client_id: SecretString | None,
client_secret: SecretString | None,
bearer_token: SecretString | None,
) -> str | None:
"""Get the connector builder project ID for a declarative source definition.
) -> dict[str, Any]:
"""Get the connector builder project info for a declarative source definition.

Uses the Config API endpoint:
/v1/connector_builder_projects/get_for_definition_id
Expand All @@ -1842,9 +1842,10 @@ def get_connector_builder_project_for_definition_id(
bearer_token: Bearer token for authentication (alternative to client credentials).

Returns:
The builder project ID if found, None otherwise (can be null in API response)
A dict containing 'builderProjectId' and 'workspaceId' (the workspace that
owns the builder project, which may differ from the caller's workspace).
"""
json_result = _make_config_api_request(
return _make_config_api_request(
path="/connector_builder_projects/get_for_definition_id",
json={
"actorDefinitionId": definition_id,
Expand All @@ -1855,7 +1856,49 @@ def get_connector_builder_project_for_definition_id(
client_secret=client_secret,
bearer_token=bearer_token,
)
return json_result.get("builderProjectId")


def get_connector_builder_project(
*,
workspace_id: str,
builder_project_id: str,
api_root: str,
client_id: SecretString | None,
client_secret: SecretString | None,
bearer_token: SecretString | None,
) -> dict[str, Any]:
"""Get a connector builder project, including the draft manifest if one exists.

Uses the Config API endpoint:
/v1/connector_builder_projects/get_with_manifest

See: https://github.com/airbytehq/airbyte-platform-internal/blob/master/oss/airbyte-api/server-api/src/main/openapi/config.yaml#L1253

Args:
workspace_id: The workspace ID
builder_project_id: The connector builder project ID
api_root: The API root URL
client_id: OAuth client ID
client_secret: OAuth client secret
bearer_token: Bearer token for authentication (alternative to client credentials).

Returns:
A dictionary containing the builder project details. Key fields include:
- builderProject: The project metadata (name, hasDraft, etc.)
- declarativeManifest: The draft manifest data (if hasDraft is True),
which contains a 'manifest' field with the actual YAML manifest dict.
"""
return _make_config_api_request(
path="/connector_builder_projects/get_with_manifest",
json={
"workspaceId": workspace_id,
"builderProjectId": builder_project_id,
},
api_root=api_root,
client_id=client_id,
client_secret=client_secret,
bearer_token=bearer_token,
)


def update_connector_builder_project_testing_values(
Expand Down
127 changes: 117 additions & 10 deletions airbyte/cloud/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,9 @@ def __init__(
self.definition_type: Literal["yaml", "docker"] = definition_type
self._definition_info: api_models.DeclarativeSourceDefinitionResponse | None = None
self._connector_builder_project_id: str | None = None
self._connector_builder_project_id_fetched: bool = False
self._builder_project_workspace_id: str | None = None
self._builder_project_data: dict[str, Any] | None = None

def _fetch_definition_info(
self,
Expand Down Expand Up @@ -461,19 +464,22 @@ def connector_builder_project_id(self) -> str | None:
if self.definition_type != "yaml":
return None

if self._connector_builder_project_id is not None:
if self._connector_builder_project_id_fetched:
return self._connector_builder_project_id

self._connector_builder_project_id = (
api_util.get_connector_builder_project_for_definition_id(
workspace_id=self.workspace.workspace_id,
definition_id=self.definition_id,
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
)
result = api_util.get_connector_builder_project_for_definition_id(
workspace_id=self.workspace.workspace_id,
definition_id=self.definition_id,
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
)
self._connector_builder_project_id = result.get("builderProjectId")
self._connector_builder_project_id_fetched = True
# The builder project may live in a different workspace than the caller's.
# We must use the project's owning workspace ID when fetching its data.
self._builder_project_workspace_id = result.get("workspaceId")

return self._connector_builder_project_id

Expand All @@ -489,6 +495,107 @@ def connector_builder_project_url(self) -> str | None:

return f"{self.workspace.workspace_url}/connector-builder/edit/{project_id}"

def get_builder_project_data(
self,
*,
use_cache: bool = True,
) -> dict[str, Any]:
"""Fetch the full connector builder project data, including draft manifest if present.

This calls the `/v1/connector_builder_projects/get_with_manifest` endpoint which returns
the project metadata and draft manifest (if one exists).

Args:
use_cache: If True, return cached data from a previous call if available.
Set to False to force a fresh API request. Defaults to True.

Returns:
A dictionary containing the builder project details. Key fields include:
- builderProject: The project metadata (name, hasDraft,
activeDeclarativeManifestVersion, etc.)
- declarativeManifest: The draft manifest data (if hasDraft is True),
which contains a 'manifest' field with the actual YAML manifest dict.

Raises:
NotImplementedError: If this is not a YAML custom source definition.
PyAirbyteInputError: If the connector builder project ID cannot be found.
"""
if self.definition_type != "yaml":
raise NotImplementedError(
"Builder project data is only available for YAML custom source definitions. "
"Docker custom sources are not yet supported."
)

if use_cache and self._builder_project_data is not None:
return self._builder_project_data

builder_project_id = self.connector_builder_project_id
if not builder_project_id:
raise exc.PyAirbyteInputError(
message="Could not find connector builder project ID for this definition.",
context={
"definition_id": self.definition_id,
"workspace_id": self.workspace.workspace_id,
},
)

self._builder_project_data = api_util.get_connector_builder_project(
workspace_id=self._builder_project_workspace_id or self.workspace.workspace_id,
builder_project_id=builder_project_id,
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
)
return self._builder_project_data

@property
def has_draft(self) -> bool | None:
"""Check whether this definition has an unpublished draft in Connector Builder.

Returns:
True if a draft exists, False if no draft exists,
or None if this is not a YAML connector or the project ID is unavailable.
"""
if self.definition_type != "yaml":
return None

if not self.connector_builder_project_id:
return None

project_data = self.get_builder_project_data()
builder_project = project_data.get("builderProject", {})
return builder_project.get("hasDraft", False)

@property
def draft_manifest(self) -> dict[str, Any] | None:
"""Get the draft (unpublished) manifest from the Connector Builder, if one exists.

This reads the working draft that has been saved in the Connector Builder UI
but not yet published. Returns None if no draft exists or if this is not a
YAML connector.

Returns:
The draft manifest as a dictionary, or None if no draft exists.
"""
if self.definition_type != "yaml":
return None

if not self.connector_builder_project_id:
return None

project_data = self.get_builder_project_data()
builder_project = project_data.get("builderProject", {})
if not builder_project.get("hasDraft", False):
return None

declarative_manifest = project_data.get("declarativeManifest", {})
manifest = declarative_manifest.get("manifest")
if isinstance(manifest, dict):
return manifest

return None

@property
def definition_url(self) -> str:
"""Get the web URL of the custom source definition.
Expand Down
112 changes: 108 additions & 4 deletions airbyte/mcp/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -1721,11 +1721,23 @@ def get_custom_source_definition(
default=None,
),
],
include_draft: Annotated[
bool,
Field(
description=(
"Whether to include the Connector Builder draft manifest in the response. "
"If True and a draft exists, the response will include 'has_draft' and "
"'draft_manifest' fields. Defaults to False."
),
default=False,
),
] = False,
) -> dict[str, Any]:
"""Get a custom YAML source definition from Airbyte Cloud, including its manifest.

Returns the full definition details including the manifest YAML content,
which can be used to inspect or store the connector configuration locally.
Returns the full definition details including the published manifest YAML content.
Optionally includes the Connector Builder draft manifest (unpublished changes)
when include_draft=True.

Note: Only YAML (declarative) connectors are currently supported.
Docker-based custom sources are not yet available.
Expand All @@ -1736,7 +1748,7 @@ def get_custom_source_definition(
definition_type="yaml",
)

return {
result: dict[str, Any] = {
"definition_id": definition.definition_id,
"name": definition.name,
"version": definition.version,
Expand All @@ -1745,6 +1757,58 @@ def get_custom_source_definition(
"manifest": definition.manifest,
}

if include_draft:
result["has_draft"] = definition.has_draft
result["draft_manifest"] = definition.draft_manifest

return result


@mcp_tool(
read_only=True,
idempotent=True,
open_world=True,
)
def get_connector_builder_draft_manifest(
ctx: Context,
definition_id: Annotated[
str,
Field(description="The ID of the custom source definition to retrieve the draft for."),
],
*,
workspace_id: Annotated[
str | None,
Field(
description=WORKSPACE_ID_TIP_TEXT,
default=None,
),
],
) -> dict[str, Any]:
"""Get the Connector Builder draft manifest for a custom source definition.

Returns the working draft manifest that has been saved in the Connector Builder UI
but not yet published. This is useful for inspecting what a user is currently working
on before they publish their changes.

If no draft exists, 'has_draft' will be False and 'draft_manifest' will be None.
The published manifest is always included for comparison.
"""
workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
definition = workspace.get_custom_source_definition(
definition_id=definition_id,
definition_type="yaml",
)

return {
"definition_id": definition.definition_id,
"name": definition.name,
"connector_builder_project_id": definition.connector_builder_project_id,
"connector_builder_project_url": definition.connector_builder_project_url,
"has_draft": definition.has_draft,
"draft_manifest": definition.draft_manifest,
"published_manifest": definition.manifest,
}


@mcp_tool(
destructive=True,
Expand Down Expand Up @@ -2571,14 +2635,54 @@ def get_connection_artifact(
return result


def _add_defaults_for_exclude_args(
exclude_args: list[str],
) -> None:
"""Patch registered tool functions to add Python-level defaults for excluded args.

FastMCP requires that excluded args have Python-level default values, but MCP tool
functions should only use Field(default=...) in their Annotated type hints (not
Python-level `= None`). This function bridges the gap by dynamically adding Python
defaults to the function signatures at registration time, so the source code stays
clean while satisfying FastMCP's requirement.

Args:
exclude_args: List of argument names that will be excluded from the tool schema.
"""
import inspect # noqa: PLC0415 # Local import for optional patching logic

from fastmcp_extensions.decorators import ( # noqa: PLC0415
_REGISTERED_TOOLS, # noqa: PLC2701
)

for func, _annotations in _REGISTERED_TOOLS:
sig = inspect.signature(func)
needs_patch = any(
arg_name in sig.parameters
and sig.parameters[arg_name].default is inspect.Parameter.empty
for arg_name in exclude_args
)
if needs_patch:
new_params = [
p.replace(default=None)
if name in exclude_args and p.default is inspect.Parameter.empty
else p
for name, p in sig.parameters.items()
]
func.__signature__ = sig.replace(parameters=new_params) # type: ignore[attr-defined]


def register_cloud_tools(app: FastMCP) -> None:
"""Register cloud tools with the FastMCP app.

Args:
app: FastMCP application instance
"""
exclude_args = ["workspace_id"] if AIRBYTE_CLOUD_WORKSPACE_ID_IS_SET else None
if exclude_args:
_add_defaults_for_exclude_args(exclude_args)
register_mcp_tools(
app,
mcp_module=__name__,
exclude_args=["workspace_id"] if AIRBYTE_CLOUD_WORKSPACE_ID_IS_SET else None,
exclude_args=exclude_args,
)
8 changes: 6 additions & 2 deletions tests/integration_tests/cloud/test_cloud_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,13 @@ def test_get_previous_sync_result(
pre_created_connection_id: str,
) -> None:
"""Test running a connection."""
sync_result: SyncResult = cloud_workspace.get_connection(
previous_sync_logs = cloud_workspace.get_connection(
connection_id=pre_created_connection_id,
).get_previous_sync_logs()[0]
).get_previous_sync_logs()
if not previous_sync_logs:
pytest.skip("No previous sync logs found for this connection.")

sync_result: SyncResult = previous_sync_logs[0]
assert sync_result.is_job_complete()
assert sync_result.get_job_status()
assert sync_result.stream_names
Expand Down
Loading