diff --git a/.sqlx/query-6bbf6e17a0043b9d6c161e25e30317a4decd2c98dca9722338bee8daa249701c.json b/.sqlx/query-6bbf6e17a0043b9d6c161e25e30317a4decd2c98dca9722338bee8daa249701c.json new file mode 100644 index 00000000000..d97b900a60e --- /dev/null +++ b/.sqlx/query-6bbf6e17a0043b9d6c161e25e30317a4decd2c98dca9722338bee8daa249701c.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT dp.data_plane_name\n FROM data_plane_private_links l\n JOIN data_planes dp ON dp.id = l.data_plane_id\n WHERE l.id = $1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "data_plane_name", + "type_info": "Text" + } + ], + "parameters": { + "Left": [ + "Macaddr8" + ] + }, + "nullable": [ + false + ] + }, + "hash": "6bbf6e17a0043b9d6c161e25e30317a4decd2c98dca9722338bee8daa249701c" +} diff --git a/.sqlx/query-7c9bba86a6f18a744f01a2a797cd9c7dba5f59c83ec0ca6b25426ef9e4b3bc4a.json b/.sqlx/query-7c9bba86a6f18a744f01a2a797cd9c7dba5f59c83ec0ca6b25426ef9e4b3bc4a.json new file mode 100644 index 00000000000..d830f397a66 --- /dev/null +++ b/.sqlx/query-7c9bba86a6f18a744f01a2a797cd9c7dba5f59c83ec0ca6b25426ef9e4b3bc4a.json @@ -0,0 +1,48 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO data_plane_private_links (data_plane_id, provider, config)\n SELECT dp.id, $2, $3\n FROM data_planes dp WHERE dp.data_plane_name = $1\n RETURNING\n id as \"id: models::Id\",\n status,\n details as \"details: sqlx::types::Json\",\n error,\n observed_at as \"observed_at: chrono::DateTime\"\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id: models::Id", + "type_info": "Macaddr8" + }, + { + "ordinal": 1, + "name": "status", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "details: sqlx::types::Json", + "type_info": "Jsonb" + }, + { + "ordinal": 3, + "name": "error", + "type_info": "Text" + }, + { + "ordinal": 4, + "name": "observed_at: chrono::DateTime", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Text", + "Text", + "Jsonb" + ] + }, + "nullable": [ + false, + false, + true, + true, + true + ] + }, + "hash": "7c9bba86a6f18a744f01a2a797cd9c7dba5f59c83ec0ca6b25426ef9e4b3bc4a" +} diff --git a/.sqlx/query-83e8089031d4761b394960b40feeab81829d07bcba5ee43cc672e2c6916bcf7a.json b/.sqlx/query-83e8089031d4761b394960b40feeab81829d07bcba5ee43cc672e2c6916bcf7a.json new file mode 100644 index 00000000000..7c305ff60b4 --- /dev/null +++ b/.sqlx/query-83e8089031d4761b394960b40feeab81829d07bcba5ee43cc672e2c6916bcf7a.json @@ -0,0 +1,42 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE data_plane_private_links SET\n provider = $2,\n config = $3,\n status = 'pending',\n details = NULL,\n error = NULL,\n observed_at = NULL,\n updated_at = now()\n WHERE id = $1\n RETURNING\n status,\n details as \"details: sqlx::types::Json\",\n error,\n observed_at as \"observed_at: chrono::DateTime\"\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "status", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "details: sqlx::types::Json", + "type_info": "Jsonb" + }, + { + "ordinal": 2, + "name": "error", + "type_info": "Text" + }, + { + "ordinal": 3, + "name": "observed_at: chrono::DateTime", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Macaddr8", + "Text", + "Jsonb" + ] + }, + "nullable": [ + false, + true, + true, + true + ] + }, + "hash": "83e8089031d4761b394960b40feeab81829d07bcba5ee43cc672e2c6916bcf7a" +} diff --git a/.sqlx/query-85b146f39fee0ab26b92ff4e37ef0b891a8778542d1d46e39ecc9515e2c16810.json b/.sqlx/query-85b146f39fee0ab26b92ff4e37ef0b891a8778542d1d46e39ecc9515e2c16810.json deleted file mode 100644 index 8ffa1f76aa0..00000000000 --- a/.sqlx/query-85b146f39fee0ab26b92ff4e37ef0b891a8778542d1d46e39ecc9515e2c16810.json +++ /dev/null @@ -1,23 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "UPDATE data_planes\n SET private_links = $2, updated_at = now()\n WHERE data_plane_name = $1\n RETURNING private_links as \"private_links!: Vec\"\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "private_links!: Vec", - "type_info": "JsonArray" - } - ], - "parameters": { - "Left": [ - "Text", - "JsonArray" - ] - }, - "nullable": [ - false - ] - }, - "hash": "85b146f39fee0ab26b92ff4e37ef0b891a8778542d1d46e39ecc9515e2c16810" -} diff --git a/.sqlx/query-a93ae7bf9e1f6b70def0680845e783a097cab9a98b20f17a5f125f993c295062.json b/.sqlx/query-9dbbc797f9e651eb717f5430186ae1d15adc381c81c4a832f458bcd3171c4044.json similarity index 69% rename from .sqlx/query-a93ae7bf9e1f6b70def0680845e783a097cab9a98b20f17a5f125f993c295062.json rename to .sqlx/query-9dbbc797f9e651eb717f5430186ae1d15adc381c81c4a832f458bcd3171c4044.json index f7578f37405..ad17dc79186 100644 --- a/.sqlx/query-a93ae7bf9e1f6b70def0680845e783a097cab9a98b20f17a5f125f993c295062.json +++ b/.sqlx/query-9dbbc797f9e651eb717f5430186ae1d15adc381c81c4a832f458bcd3171c4044.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "select\n dp.data_plane_name,\n dp.cidr_blocks::text[] as \"cidr_blocks!: Vec\",\n dp.gcp_service_account_email,\n dp.aws_iam_user_arn,\n dp.azure_application_name,\n dp.azure_application_client_id,\n dp.private_links as \"private_links!: Vec\",\n dp.aws_link_endpoints as \"aws_link_endpoints: Vec\",\n dp.azure_link_endpoints as \"azure_link_endpoints: Vec\",\n dp.gcp_psc_endpoints as \"gcp_psc_endpoints: Vec\"\n from unnest($1::text[]) as input(name)\n join data_planes dp on dp.data_plane_name = input.name\n ", + "query": "select\n dp.data_plane_name,\n dp.cidr_blocks::text[] as \"cidr_blocks!: Vec\",\n dp.gcp_service_account_email,\n dp.aws_iam_user_arn,\n dp.azure_application_name,\n dp.azure_application_client_id,\n dp.aws_link_endpoints as \"aws_link_endpoints: Vec\",\n dp.azure_link_endpoints as \"azure_link_endpoints: Vec\",\n dp.gcp_psc_endpoints as \"gcp_psc_endpoints: Vec\"\n from unnest($1::text[]) as input(name)\n join data_planes dp on dp.data_plane_name = input.name\n ", "describe": { "columns": [ { @@ -35,21 +35,16 @@ }, { "ordinal": 6, - "name": "private_links!: Vec", - "type_info": "JsonArray" - }, - { - "ordinal": 7, "name": "aws_link_endpoints: Vec", "type_info": "JsonArray" }, { - "ordinal": 8, + "ordinal": 7, "name": "azure_link_endpoints: Vec", "type_info": "JsonArray" }, { - "ordinal": 9, + "ordinal": 8, "name": "gcp_psc_endpoints: Vec", "type_info": "JsonArray" } @@ -66,11 +61,10 @@ true, true, true, - false, true, true, true ] }, - "hash": "a93ae7bf9e1f6b70def0680845e783a097cab9a98b20f17a5f125f993c295062" + "hash": "9dbbc797f9e651eb717f5430186ae1d15adc381c81c4a832f458bcd3171c4044" } diff --git a/.sqlx/query-aa3dc5334a258f854af3bc750c3f0622b48bc29f58480f76687a9e5c3f91f49a.json b/.sqlx/query-aa3dc5334a258f854af3bc750c3f0622b48bc29f58480f76687a9e5c3f91f49a.json new file mode 100644 index 00000000000..3d2c3a303d3 --- /dev/null +++ b/.sqlx/query-aa3dc5334a258f854af3bc750c3f0622b48bc29f58480f76687a9e5c3f91f49a.json @@ -0,0 +1,17 @@ +{ + "db_name": "PostgreSQL", + "query": "\n WITH endpoints AS (\n SELECT 'aws'::text AS provider, ep ->> 'service_name' AS identity, ep AS detail\n FROM unnest($2::jsonb[]) AS ep\n UNION ALL\n SELECT 'azure'::text, ep ->> 'service_name', ep\n FROM unnest($3::jsonb[]) AS ep\n UNION ALL\n SELECT 'gcp'::text, ep ->> 'service_attachment', ep\n FROM unnest($4::jsonb[]) AS ep\n ),\n -- Providers for which this converge actually published endpoints.\n -- A link is only re-evaluated when its provider published at\n -- least one endpoint, so a transient empty or partial export\n -- cannot flip an already-`provisioned` link back to `pending`\n -- and null its details. A link's row is deleted (not emptied)\n -- when it is removed, so a genuine teardown never relies on the\n -- array going empty.\n published_providers AS (\n SELECT DISTINCT provider FROM endpoints\n )\n UPDATE data_plane_private_links l SET\n status = CASE WHEN e.identity IS NOT NULL THEN 'provisioned' ELSE 'pending' END,\n details = e.detail,\n observed_at = now(),\n updated_at = now()\n FROM data_plane_private_links l2\n LEFT JOIN endpoints e\n ON e.provider = l2.provider AND e.identity = l2.service_identity\n WHERE l.id = l2.id\n AND l2.data_plane_id = $1\n AND l2.provider IN (SELECT provider FROM published_providers)\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Macaddr8", + "JsonbArray", + "JsonbArray", + "JsonbArray" + ] + }, + "nullable": [] + }, + "hash": "aa3dc5334a258f854af3bc750c3f0622b48bc29f58480f76687a9e5c3f91f49a" +} diff --git a/.sqlx/query-dc7c02435c03246d6a9b0efe1acf88b6ee10b05976bcc6f14d36b385562056f9.json b/.sqlx/query-dc7c02435c03246d6a9b0efe1acf88b6ee10b05976bcc6f14d36b385562056f9.json new file mode 100644 index 00000000000..1b853e48998 --- /dev/null +++ b/.sqlx/query-dc7c02435c03246d6a9b0efe1acf88b6ee10b05976bcc6f14d36b385562056f9.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "DELETE FROM data_plane_private_links WHERE id = $1", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Macaddr8" + ] + }, + "nullable": [] + }, + "hash": "dc7c02435c03246d6a9b0efe1acf88b6ee10b05976bcc6f14d36b385562056f9" +} diff --git a/.sqlx/query-f3bf1074e7818356216cb6b1c8f46dd669b4b040cf0d7b98e8c0f6f86fbcb49d.json b/.sqlx/query-f3bf1074e7818356216cb6b1c8f46dd669b4b040cf0d7b98e8c0f6f86fbcb49d.json new file mode 100644 index 00000000000..c8acac3cdb9 --- /dev/null +++ b/.sqlx/query-f3bf1074e7818356216cb6b1c8f46dd669b4b040cf0d7b98e8c0f6f86fbcb49d.json @@ -0,0 +1,52 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n id as \"id: models::Id\",\n config as \"config!: sqlx::types::Json\",\n status,\n details as \"details: sqlx::types::Json\",\n error,\n observed_at as \"observed_at: chrono::DateTime\"\n FROM data_plane_private_links\n WHERE data_plane_id = $1\n ORDER BY created_at, id\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id: models::Id", + "type_info": "Macaddr8" + }, + { + "ordinal": 1, + "name": "config!: sqlx::types::Json", + "type_info": "Jsonb" + }, + { + "ordinal": 2, + "name": "status", + "type_info": "Text" + }, + { + "ordinal": 3, + "name": "details: sqlx::types::Json", + "type_info": "Jsonb" + }, + { + "ordinal": 4, + "name": "error", + "type_info": "Text" + }, + { + "ordinal": 5, + "name": "observed_at: chrono::DateTime", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Macaddr8" + ] + }, + "nullable": [ + false, + false, + false, + true, + true, + true + ] + }, + "hash": "f3bf1074e7818356216cb6b1c8f46dd669b4b040cf0d7b98e8c0f6f86fbcb49d" +} diff --git a/crates/control-plane-api/src/fixtures/private_links.sql b/crates/control-plane-api/src/fixtures/private_links.sql index f3edc31be7a..c19f6dcf308 100644 --- a/crates/control-plane-api/src/fixtures/private_links.sql +++ b/crates/control-plane-api/src/fixtures/private_links.sql @@ -1,12 +1,15 @@ --- Adds a private data plane with populated private-link config and one AWS --- provisioning result row, plus a `read` grant for Alice on the private --- prefix so the GraphQL authorization layer can surface it. +-- Adds a private data plane plus three configured private links (one per +-- provider) as rows in `data_plane_private_links`: the AWS link is +-- `provisioned` and carries its endpoint details, the Azure and GCP links are +-- still `pending`. One AWS endpoint result is also set on the data_planes row +-- for the legacy `awsLinkEndpoints` field. Grants Alice `read` (plus the +-- `manage_data_plane` bundle) on the private prefix so the GraphQL +-- authorization layer can surface and mutate it. -- --- Loaded alongside `data_planes` and `alice` in tests that exercise the --- typed `privateLinks` field on the dataPlanes query and the --- `updateDataPlanePrivateLinks` mutation. Kept separate from the shared --- `data_planes.sql` so its presence does not change every other GraphQL --- test. +-- Loaded alongside `data_planes` and `alice` by tests that exercise the +-- `privateLinks` field and the per-link CRUD mutations. Kept separate from the +-- shared `data_planes.sql` so its presence does not change every other GraphQL +-- test. Link ids are explicit so snapshots are deterministic. do $$ declare alice_private_dp_id flowid := '444444444444'; @@ -38,7 +41,6 @@ begin gcp_service_account_email, azure_application_name, azure_application_client_id, - private_links, aws_link_endpoints ) values ( alice_private_dp_id, @@ -63,15 +65,36 @@ begin 'estuary-test-app-private', '44444444-4444-4444-4444-444444444444', array[ - '{"region":"us-east-1","az_ids":["use1-az1","use1-az2"],"service_name":"com.amazonaws.vpce.us-east-1.vpce-svc-abc123"}'::json, - '{"service_name":"/subscriptions/x/resourceGroups/rg/providers/Microsoft.Network/privateLinkServices/svc","location":"eastus","dns_name":"privatelink.database.windows.net"}'::json, - '{"service_attachment":"projects/p/regions/us-central1/serviceAttachments/sa","region":"us-central1","dns_zone_name":"z","dns_record_names":["r1","r2"],"all_ports":true}'::json - ], - array[ - '{"endpoint_id":"vpce-0123456789abcdef0","state":"available"}'::json + '{"service_name":"com.amazonaws.vpce.us-east-1.vpce-svc-abc123","dns_entries":[{"dns_name":"vpce-0123abc.vpce-svc-abc123.us-east-1.vpce.amazonaws.com","hosted_zone_id":"Z7HUB22EVRPK5"}]}'::json ] ); + insert into public.data_plane_private_links (id, data_plane_id, provider, config, status, details) values + ( + '00:00:00:00:00:00:0a:01', + alice_private_dp_id, + 'aws', + '{"region":"us-east-1","az_ids":["use1-az1","use1-az2"],"service_name":"com.amazonaws.vpce.us-east-1.vpce-svc-abc123"}'::jsonb, + 'provisioned', + '{"service_name":"com.amazonaws.vpce.us-east-1.vpce-svc-abc123","dns_entries":[{"dns_name":"vpce-0123abc.vpce-svc-abc123.us-east-1.vpce.amazonaws.com","hosted_zone_id":"Z7HUB22EVRPK5"}]}'::jsonb + ), + ( + '00:00:00:00:00:00:0a:02', + alice_private_dp_id, + 'azure', + '{"service_name":"/subscriptions/x/resourceGroups/rg/providers/Microsoft.Network/privateLinkServices/svc","location":"eastus","dns_name":"privatelink.database.windows.net"}'::jsonb, + 'pending', + null + ), + ( + '00:00:00:00:00:00:0a:03', + alice_private_dp_id, + 'gcp', + '{"service_attachment":"projects/p/regions/us-central1/serviceAttachments/sa","region":"us-central1","dns_zone_name":"z","dns_record_names":["r1","r2"],"all_ports":true}'::jsonb, + 'pending', + null + ); + -- Mirrors what `create_data_plane.rs` installs at provisioning time: -- legacy `read` for RLS/`user_roles()`, and the `ManageDataPlane` bundle -- for the capability bits. diff --git a/crates/control-plane-api/src/server/public/graphql/data_planes.rs b/crates/control-plane-api/src/server/public/graphql/data_planes.rs index bc1d7880410..9ecbe0fabd3 100644 --- a/crates/control-plane-api/src/server/public/graphql/data_planes.rs +++ b/crates/control-plane-api/src/server/public/graphql/data_planes.rs @@ -15,6 +15,51 @@ pub enum DataPlaneCloudProvider { Local, } +/// Controller-observed provisioning status of a configured private link. +#[derive(Debug, Clone, Copy, PartialEq, Eq, async_graphql::Enum)] +pub enum PrivateLinkProvisioningStatus { + /// No matching provisioned endpoint exists yet. + Pending, + /// A provisioned endpoint matching this link exists. + Provisioned, + /// Provisioning failed; see `error`. + Failed, +} + +impl PrivateLinkProvisioningStatus { + fn from_db(s: &str) -> async_graphql::Result { + match s { + "pending" => Ok(Self::Pending), + "provisioned" => Ok(Self::Provisioned), + "failed" => Ok(Self::Failed), + other => Err(async_graphql::Error::new(format!( + "unknown private link status {other:?}" + ))), + } + } +} + +/// A configured private link and its controller-observed provisioning status. +#[derive(Debug, Clone, SimpleObject)] +#[graphql(name = "PrivateLink")] +pub struct DataPlanePrivateLink { + /// Stable identifier of this private link. + pub id: models::Id, + /// Cloud provider of the link. + pub provider: models::PrivateLinkProvider, + /// The link configuration (AWS PrivateLink, Azure Private Link, or GCP PSC). + pub config: models::PrivateLink, + /// Controller-observed provisioning status. + pub status: PrivateLinkProvisioningStatus, + /// Provider-specific provisioning details (DNS entries, IPs) once + /// provisioned; opaque JSON exported by the data-plane controller. + pub details: Option>, + /// Failure detail when `status` is `failed`. + pub error: Option, + /// When the controller last observed this link's status. + pub observed_at: Option>, +} + /// A data plane where tasks execute and collections are stored. #[derive(Debug, Clone, SimpleObject)] #[graphql(complex)] @@ -46,13 +91,14 @@ pub struct DataPlane { pub azure_application_name: Option, /// Azure application client ID for this data-plane. pub azure_application_client_id: Option, - // The four private-networking fields below are gated behind - // `ViewDataPlanePrivateNetworking` and resolved by `ComplexObject` methods. - // They are stored as raw JSON and skipped from the derived object so the - // capability check lives with the field rather than the construction site; - // see the resolvers below. + // The private-networking fields below are gated behind + // `ViewDataPlanePrivateNetworking` and resolved by `ComplexObject` methods, + // so the capability check lives with the field rather than the construction + // site; see the resolvers below. `control_id` lets the `private_links` + // resolver query the `data_plane_private_links` table; the endpoint arrays + // are raw JSON exported by the controller. #[graphql(skip)] - raw_private_links: Vec, + control_id: models::Id, #[graphql(skip)] raw_aws_link_endpoints: Vec, #[graphql(skip)] @@ -63,15 +109,15 @@ pub struct DataPlane { #[ComplexObject] impl DataPlane { - /// Configured private link endpoints for this data-plane. Replacing this - /// list (via `updateDataPlanePrivateLinks`) triggers reconvergence by the - /// data-plane controller on its next poll. Returns an empty list to - /// callers that lack the `ViewDataPlanePrivateNetworking` capability on - /// this data plane. + /// Configured private links for this data-plane, each with its + /// controller-observed provisioning status. Mutating links (via + /// `addDataPlanePrivateLink` and friends) triggers reconvergence by the + /// data-plane controller. Returns an empty list to callers that lack the + /// `ViewDataPlanePrivateNetworking` capability on this data plane. async fn private_links( &self, ctx: &Context<'_>, - ) -> async_graphql::Result> { + ) -> async_graphql::Result> { if !super::may_access( ctx, &self.name, @@ -79,15 +125,37 @@ impl DataPlane { )? { return Ok(Vec::new()); } - self.raw_private_links - .iter() - .enumerate() - .map(|(idx, raw)| { - serde_json::from_value::(raw.clone()).map_err(|err| { - async_graphql::Error::new(format!( - "failed to parse private_links[{idx}] for data plane {}: {err}", - self.name, - )) + let env = ctx.data::()?; + + let rows = sqlx::query!( + r#" + SELECT + id as "id: models::Id", + config as "config!: sqlx::types::Json", + status, + details as "details: sqlx::types::Json", + error, + observed_at as "observed_at: chrono::DateTime" + FROM data_plane_private_links + WHERE data_plane_id = $1 + ORDER BY created_at, id + "#, + self.control_id as models::Id, + ) + .fetch_all(&env.pg_pool) + .await?; + + rows.into_iter() + .map(|row| { + let config = row.config.0; + Ok(DataPlanePrivateLink { + id: row.id, + provider: config.provider(), + config, + status: PrivateLinkProvisioningStatus::from_db(&row.status)?, + details: row.details.map(|d| async_graphql::Json(d.0)), + error: row.error, + observed_at: row.observed_at, }) }) .collect() @@ -176,7 +244,6 @@ async fn fetch_data_plane_details( dp.aws_iam_user_arn, dp.azure_application_name, dp.azure_application_client_id, - dp.private_links as "private_links!: Vec", dp.aws_link_endpoints as "aws_link_endpoints: Vec", dp.azure_link_endpoints as "azure_link_endpoints: Vec", dp.gcp_psc_endpoints as "gcp_psc_endpoints: Vec" @@ -199,7 +266,6 @@ async fn fetch_data_plane_details( aws_iam_user_arn: row.aws_iam_user_arn, azure_application_name: row.azure_application_name, azure_application_client_id: row.azure_application_client_id, - private_links: row.private_links, aws_link_endpoints: row.aws_link_endpoints.unwrap_or_default(), azure_link_endpoints: row.azure_link_endpoints.unwrap_or_default(), gcp_psc_endpoints: row.gcp_psc_endpoints.unwrap_or_default(), @@ -215,7 +281,6 @@ struct DataPlaneDetails { aws_iam_user_arn: Option, azure_application_name: Option, azure_application_client_id: Option, - private_links: Vec, aws_link_endpoints: Vec, azure_link_endpoints: Vec, gcp_psc_endpoints: Vec, @@ -420,7 +485,7 @@ impl DataPlanesQuery { azure_application_name: details.and_then(|d| d.azure_application_name.clone()), azure_application_client_id: details .and_then(|d| d.azure_application_client_id.clone()), - raw_private_links: details.map(|d| d.private_links.clone()).unwrap_or_default(), + control_id: dp.control_id, raw_aws_link_endpoints: details .map(|d| d.aws_link_endpoints.clone()) .unwrap_or_default(), @@ -444,41 +509,88 @@ impl DataPlanesQuery { #[derive(Debug, Default)] pub struct DataPlanesMutation; +/// Structural check: the name must sit under `ops/dp/private/` with at least +/// one path segment beyond it. Anything more specific (cluster suffix shape, +/// owning prefix shape) is the data plane's problem; an unknown but well-formed +/// name falls out as "not found" when no `data_planes` row matches. +fn require_private_dp_name(name: &str) -> async_graphql::Result<()> { + if name + .strip_prefix("ops/dp/private/") + .is_none_or(|rest| !rest.contains('/') || rest.starts_with('/')) + { + return Err(async_graphql::Error::new(format!( + "{name} is not a private data-plane name" + ))); + } + Ok(()) +} + +/// Maps a unique-violation on `(data_plane_id, service_identity)` to a clear +/// message; other database errors propagate unchanged. +fn map_link_db_error(err: sqlx::Error) -> async_graphql::Error { + if let sqlx::Error::Database(db) = &err { + if db.is_unique_violation() { + return async_graphql::Error::new( + "a private link with this service identity already exists on this data plane", + ); + } + } + async_graphql::Error::new(err.to_string()) +} + +/// Resolves the owning data-plane name for an id-addressed private link and +/// authorizes the caller to modify it. A link that does not exist and a link the +/// caller may not modify both return the same "not found" error, so an +/// unauthorized caller cannot probe which link ids exist. This deliberately uses +/// the visibility gate ([`super::may_access`]) rather than the hard gate +/// ([`super::verify_authorization`]) so a denial is hidden as not-found instead +/// of surfacing as a distinguishable permission-denied that names the data plane. +async fn resolve_modifiable_link_data_plane( + ctx: &Context<'_>, + id: models::Id, +) -> async_graphql::Result { + let env = ctx.data::()?; + let not_found = || async_graphql::Error::new(format!("private link '{id}' not found")); + + let Some(data_plane_name) = sqlx::query_scalar!( + r#" + SELECT dp.data_plane_name + FROM data_plane_private_links l + JOIN data_planes dp ON dp.id = l.data_plane_id + WHERE l.id = $1 + "#, + id as models::Id, + ) + .fetch_optional(&env.pg_pool) + .await? + else { + return Err(not_found()); + }; + + if !super::may_access( + ctx, + &data_plane_name, + models::authz::Capability::ModifyDataPlanePrivateNetworking, + )? { + return Err(not_found()); + } + + Ok(data_plane_name) +} + #[async_graphql::Object] impl DataPlanesMutation { - /// Replaces the configured private link endpoints on a private data plane. - /// - /// The provided list overwrites the entire `private_links` column; partial - /// updates are intentionally not supported. The data-plane controller - /// converges to the new configuration on its next poll. Returns the desired - /// private links state. The `*LinkEndpoints` provisioning results are not echoed here: - /// they lag this write until the controller converges, so callers needing them re-query `dataPlanes`. - /// - /// Requires the `ModifyDataPlanePrivateNetworking` capability on the - /// private data-plane name. - pub async fn update_data_plane_private_links( + /// Adds a private link to a private data plane. The data-plane controller + /// converges to provision it on its next poll; the returned link starts + /// `pending`. Requires `ModifyDataPlanePrivateNetworking` on the data plane. + pub async fn add_data_plane_private_link( &self, ctx: &Context<'_>, data_plane_name: String, - private_links: Vec, - ) -> async_graphql::Result> { + config: models::PrivateLink, + ) -> async_graphql::Result { let env = ctx.data::()?; - let claims = env.claims()?; - - // Structural check only: the name must sit under `ops/dp/private/` and - // have at least one path segment beyond it. Anything more specific - // (cluster suffix shape, owning prefix shape) is the data plane's - // problem, not the mutation's; an unknown name falls out as "not - // found" when the UPDATE matches zero rows. - if data_plane_name - .strip_prefix("ops/dp/private/") - .is_none_or(|rest| !rest.contains('/') || rest.starts_with('/')) - { - return Err(async_graphql::Error::new(format!( - "{data_plane_name} is not a private data-plane name" - ))); - } - + require_private_dp_name(&data_plane_name)?; super::verify_authorization( env, &data_plane_name, @@ -486,19 +598,26 @@ impl DataPlanesMutation { ) .await?; - let bound: Vec> = - private_links.iter().map(sqlx::types::Json).collect(); + let provider = config.provider(); let row = sqlx::query!( - r#"UPDATE data_planes - SET private_links = $2, updated_at = now() - WHERE data_plane_name = $1 - RETURNING private_links as "private_links!: Vec" + r#" + INSERT INTO data_plane_private_links (data_plane_id, provider, config) + SELECT dp.id, $2, $3 + FROM data_planes dp WHERE dp.data_plane_name = $1 + RETURNING + id as "id: models::Id", + status, + details as "details: sqlx::types::Json", + error, + observed_at as "observed_at: chrono::DateTime" "#, data_plane_name, - &bound as &[sqlx::types::Json<&models::PrivateLink>], + provider.as_str(), + sqlx::types::Json(&config) as sqlx::types::Json<&models::PrivateLink>, ) .fetch_optional(&env.pg_pool) - .await?; + .await + .map_err(map_link_db_error)?; let Some(row) = row else { return Err(async_graphql::Error::new(format!( @@ -506,22 +625,94 @@ impl DataPlanesMutation { ))); }; - tracing::info!( - %data_plane_name, - link_count = row.private_links.len(), - %claims.sub, - "updated data plane private links", - ); + tracing::info!(%data_plane_name, link_id = %row.id, "added data plane private link"); - row.private_links - .into_iter() - .map(serde_json::from_value::) - .collect::, _>>() - .map_err(|err| { - async_graphql::Error::new(format!( - "stored private_links for {data_plane_name} did not round-trip: {err}" - )) - }) + Ok(DataPlanePrivateLink { + id: row.id, + provider, + config, + status: PrivateLinkProvisioningStatus::from_db(&row.status)?, + details: row.details.map(|d| async_graphql::Json(d.0)), + error: row.error, + observed_at: row.observed_at, + }) + } + + /// Replaces the configuration of an existing private link by id. Changing + /// the configuration resets its observed status to `pending` until the + /// controller reconverges. Requires `ModifyDataPlanePrivateNetworking` on + /// the owning data plane. + pub async fn update_data_plane_private_link( + &self, + ctx: &Context<'_>, + id: models::Id, + config: models::PrivateLink, + ) -> async_graphql::Result { + let env = ctx.data::()?; + let _data_plane_name = resolve_modifiable_link_data_plane(ctx, id).await?; + + let provider = config.provider(); + let row = sqlx::query!( + r#" + UPDATE data_plane_private_links SET + provider = $2, + config = $3, + status = 'pending', + details = NULL, + error = NULL, + observed_at = NULL, + updated_at = now() + WHERE id = $1 + RETURNING + status, + details as "details: sqlx::types::Json", + error, + observed_at as "observed_at: chrono::DateTime" + "#, + id as models::Id, + provider.as_str(), + sqlx::types::Json(&config) as sqlx::types::Json<&models::PrivateLink>, + ) + .fetch_optional(&env.pg_pool) + .await + .map_err(map_link_db_error)? + // The row was authorized by `resolve_modifiable_link_data_plane` above, + // but a concurrent remove (or a cascading data-plane teardown) can delete + // it before this UPDATE runs. Report the same existence-hiding not-found + // rather than leaking a raw "no rows returned" sqlx error. + .ok_or_else(|| async_graphql::Error::new(format!("private link '{id}' not found")))?; + + Ok(DataPlanePrivateLink { + id, + provider, + config, + status: PrivateLinkProvisioningStatus::from_db(&row.status)?, + details: row.details.map(|d| async_graphql::Json(d.0)), + error: row.error, + observed_at: row.observed_at, + }) + } + + /// Removes a private link by id. The controller tears down its endpoint on + /// the next converge. Requires `ModifyDataPlanePrivateNetworking` on the + /// owning data plane. Returns the removed link id. + pub async fn remove_data_plane_private_link( + &self, + ctx: &Context<'_>, + id: models::Id, + ) -> async_graphql::Result { + let env = ctx.data::()?; + let data_plane_name = resolve_modifiable_link_data_plane(ctx, id).await?; + + _ = sqlx::query!( + "DELETE FROM data_plane_private_links WHERE id = $1", + id as models::Id, + ) + .execute(&env.pg_pool) + .await?; + + tracing::info!(link_id = %id, %data_plane_name, "removed data plane private link"); + Ok(id) } } @@ -609,24 +800,15 @@ mod tests { azureLinkEndpoints gcpPscEndpoints privateLinks { - __typename - ... on AWSPrivateLink { - region - azIds - serviceName - } - ... on AzurePrivateLink { - serviceName - location - dnsName - resourceType - } - ... on GCPPrivateServiceConnect { - serviceAttachment - region - dnsZoneName - dnsRecordNames - allPorts + id + provider + status + details + config { + __typename + ... on AWSPrivateLink { region azIds serviceName } + ... on AzurePrivateLink { serviceName location dnsName resourceType } + ... on GCPPrivateServiceConnect { serviceAttachment region dnsZoneName dnsRecordNames allPorts } } } } @@ -730,7 +912,7 @@ mod tests { // does not carry. let bob_denied: serde_json::Value = server .graphql( - &update_mutation("ops/dp/private/aliceCo/aws-us-east-1-c1", VALID_AWS_INPUT), + &add_mutation("ops/dp/private/aliceCo/aws-us-east-1-c1", VALID_AWS_INPUT), Some(&bob_token), ) .await; @@ -808,7 +990,7 @@ mod tests { // Modify is denied: ModifyDataPlanePrivateNetworking flowed only // through the now-cleared `manage_data_plane` bundle on the edge. let denied: serde_json::Value = server - .graphql(&update_mutation(dp, VALID_AWS_INPUT), Some(&alice_token)) + .graphql(&add_mutation(dp, VALID_AWS_INPUT), Some(&alice_token)) .await; assert_eq!( first_error_message(&denied), @@ -816,62 +998,11 @@ mod tests { ); } - // A malformed `private_links` row produces a field-level error that names - // the data plane and the failing index. Because `privateLinks` is declared - // `[PrivateLink!]!` (non-null), the error null-propagates up to the - // nullable root and the whole `data` field comes back as null; the error - // path locates the offending edge. - #[sqlx::test( - migrations = "../../supabase/migrations", - fixtures( - path = "../../../fixtures", - scripts("data_planes", "alice", "private_links") - ) - )] - async fn test_graphql_data_planes_malformed_private_link(pool: sqlx::PgPool) { - let _guard = test_server::init(); - - // Corrupt the private_links column for the private dp before snapshot. - sqlx::query( - r#"UPDATE data_planes - SET private_links = array['{"not":"a private link"}'::json] - WHERE data_plane_name = 'ops/dp/private/aliceCo/aws-us-east-1-c1'"#, - ) - .execute(&pool) - .await - .unwrap(); - - let server = - test_server::TestServer::start(pool.clone(), test_server::snapshot(pool, false).await) - .await; - - let token = server.make_access_token(uuid::Uuid::from_bytes([0x11; 16]), None); - - let response: serde_json::Value = server - .graphql( - &serde_json::json!({ - "query": r#" - query { - dataPlanes { - edges { - node { - name - privateLinks { __typename } - } - } - } - } - "# - }), - Some(&token), - ) - .await; - - insta::assert_json_snapshot!("data_planes_malformed_private_link", response); - } - - // ===== updateDataPlanePrivateLinks mutation tests ===== + // ===== per-link CRUD mutation tests ===== + // The `*_INPUT` constants are `PrivateLinkConfigInput` @oneOf values. The + // AWS one matches the fixture's existing AWS link (used to exercise the + // duplicate-identity guard); `NEW_AWS_INPUT` is a distinct link to add. const VALID_AWS_INPUT: &str = r#"{ "aws": { "region": "us-east-1", @@ -879,44 +1010,80 @@ mod tests { "serviceName": "com.amazonaws.vpce.us-east-1.vpce-svc-abc123" } }"#; - const VALID_AZURE_INPUT: &str = r#"{ - "azure": { - "serviceName": "/subscriptions/x/resourceGroups/rg/providers/Microsoft.Network/privateLinkServices/svc", - "location": "eastus", - "dnsName": "privatelink.database.windows.net", - "resourceType": "" - } - }"#; - const VALID_GCP_INPUT: &str = r#"{ - "gcp": { - "serviceAttachment": "projects/p/regions/us-central1/serviceAttachments/sa", - "region": "us-central1", - "dnsZoneName": "z", - "dnsRecordNames": ["r1"], - "allPorts": true + const NEW_AWS_INPUT: &str = r#"{ + "aws": { + "region": "us-east-1", + "azIds": ["use1-az1"], + "serviceName": "com.amazonaws.vpce.us-east-1.vpce-svc-new999" } }"#; - fn update_mutation(name: &str, links_json: &str) -> serde_json::Value { - // The mutation echoes the stored links as the `PrivateLink` union, so - // the selection set spreads each variant's discriminating fields. + fn add_mutation(name: &str, config_json: &str) -> serde_json::Value { serde_json::json!({ "query": r#" - mutation($name: String!, $links: [PrivateLinkInput!]!) { - updateDataPlanePrivateLinks(dataPlaneName: $name, privateLinks: $links) { - __typename - ... on AWSPrivateLink { region serviceName } - ... on AzurePrivateLink { serviceName location } - ... on GCPPrivateServiceConnect { serviceAttachment region } + mutation($name: String!, $config: PrivateLinkConfigInput!) { + addDataPlanePrivateLink(dataPlaneName: $name, config: $config) { + id + provider + status + config { + __typename + ... on AWSPrivateLink { serviceName } + ... on AzurePrivateLink { serviceName } + ... on GCPPrivateServiceConnect { serviceAttachment } + } } }"#, "variables": { "name": name, - "links": serde_json::from_str::(&format!("[{links_json}]")).unwrap(), + "config": serde_json::from_str::(config_json).unwrap(), } }) } + fn update_link_mutation(id: &str, config_json: &str) -> serde_json::Value { + serde_json::json!({ + "query": r#" + mutation($id: Id!, $config: PrivateLinkConfigInput!) { + updateDataPlanePrivateLink(id: $id, config: $config) { + id status config { __typename ... on AWSPrivateLink { serviceName } } + } + }"#, + "variables": { + "id": id, + "config": serde_json::from_str::(config_json).unwrap(), + } + }) + } + + fn remove_link_mutation(id: &str) -> serde_json::Value { + serde_json::json!({ + "query": r#" + mutation($id: Id!) { removeDataPlanePrivateLink(id: $id) }"#, + "variables": { "id": id } + }) + } + + /// Extracts the first error message from a GraphQL response, or panics + /// if the response did not return an error. + fn first_error_message(response: &serde_json::Value) -> &str { + response["errors"][0]["message"] + .as_str() + .unwrap_or_else(|| panic!("expected an error, got: {response}")) + } + + async fn count_links(pool: &sqlx::PgPool, dp: &str) -> i64 { + sqlx::query_scalar( + r#"SELECT count(*) FROM data_plane_private_links l + JOIN data_planes dp ON dp.id = l.data_plane_id + WHERE dp.data_plane_name = $1"#, + ) + .bind(dp) + .fetch_one(pool) + .await + .unwrap() + } + #[sqlx::test( migrations = "../../supabase/migrations", fixtures( @@ -924,89 +1091,86 @@ mod tests { scripts("data_planes", "alice", "private_links") ) )] - async fn test_update_private_links_happy_path(pool: sqlx::PgPool) { + async fn test_add_private_link(pool: sqlx::PgPool) { let _guard = test_server::init(); - let server = test_server::TestServer::start( pool.clone(), test_server::snapshot(pool.clone(), false).await, ) .await; let alice_token = server.make_access_token(uuid::Uuid::from_bytes([0x11; 16]), None); - let dp = "ops/dp/private/aliceCo/aws-us-east-1-c1"; - let links = format!("{VALID_AWS_INPUT},{VALID_AZURE_INPUT},{VALID_GCP_INPUT}"); - - let updated_at_before: chrono::DateTime = - sqlx::query_scalar("SELECT updated_at FROM data_planes WHERE data_plane_name = $1") - .bind(dp) - .fetch_one(&pool) - .await - .unwrap(); - let response: serde_json::Value = server - .graphql(&update_mutation(dp, &links), Some(&alice_token)) + // A new link is created `pending` (no endpoint provisioned yet) as a + // fourth row alongside the three from the fixture. + let added: serde_json::Value = server + .graphql(&add_mutation(dp, NEW_AWS_INPUT), Some(&alice_token)) .await; - // The mutation echoes the three submitted links in order, one per - // union variant. - let echoed = response["data"]["updateDataPlanePrivateLinks"] - .as_array() - .unwrap_or_else(|| panic!("expected echoed links, got: {response}")); - let typenames: Vec<&str> = echoed - .iter() - .map(|l| l["__typename"].as_str().unwrap()) - .collect(); + let link = &added["data"]["addDataPlanePrivateLink"]; + assert_eq!(link["provider"], "AWS", "got: {added}"); + assert_eq!(link["status"], "PENDING"); assert_eq!( - typenames, - [ - "AWSPrivateLink", - "AzurePrivateLink", - "GCPPrivateServiceConnect" - ], - ); - assert_eq!(echoed[0]["region"], "us-east-1"); - - // Postgres `now()` is `transaction_timestamp()` at microsecond - // precision, so two distinct transactions return distinct values. - let updated_at_after: chrono::DateTime = - sqlx::query_scalar("SELECT updated_at FROM data_planes WHERE data_plane_name = $1") - .bind(dp) - .fetch_one(&pool) - .await - .unwrap(); - assert!( - updated_at_after > updated_at_before, - "updated_at must advance on a successful mutation" + link["config"]["serviceName"], + "com.amazonaws.vpce.us-east-1.vpce-svc-new999" ); + assert!(link["id"].is_string()); + assert_eq!(count_links(&pool, dp).await, 4); - // Calling again with a single AWS link replaces the entire array - // rather than merging. - let response: serde_json::Value = server - .graphql(&update_mutation(dp, VALID_AWS_INPUT), Some(&alice_token)) + // Adding a link whose service identity already exists on the data plane + // is rejected by the unique constraint. + let dup: serde_json::Value = server + .graphql(&add_mutation(dp, VALID_AWS_INPUT), Some(&alice_token)) .await; - let echoed = response["data"]["updateDataPlanePrivateLinks"] - .as_array() - .unwrap_or_else(|| panic!("expected echoed links, got: {response}")); - assert_eq!(echoed.len(), 1); - assert_eq!(echoed[0]["__typename"], "AWSPrivateLink"); + assert_eq!( + first_error_message(&dup), + "a private link with this service identity already exists on this data plane", + ); + assert_eq!(count_links(&pool, dp).await, 4); + } - // Confirm the second call replaced (rather than merged) the array. - let stored_count: i64 = sqlx::query_scalar( - "SELECT array_length(private_links, 1)::bigint FROM data_planes WHERE data_plane_name = $1", + #[sqlx::test( + migrations = "../../supabase/migrations", + fixtures( + path = "../../../fixtures", + scripts("data_planes", "alice", "private_links") ) - .bind(dp) - .fetch_one(&pool) - .await - .unwrap(); - assert_eq!(stored_count, 1); - } + )] + async fn test_update_and_remove_private_link(pool: sqlx::PgPool) { + let _guard = test_server::init(); + let server = test_server::TestServer::start( + pool.clone(), + test_server::snapshot(pool.clone(), false).await, + ) + .await; + let alice_token = server.make_access_token(uuid::Uuid::from_bytes([0x11; 16]), None); + let dp = "ops/dp/private/aliceCo/aws-us-east-1-c1"; - /// Extracts the first error message from a GraphQL response, or panics - /// if the response did not return an error. - fn first_error_message(response: &serde_json::Value) -> &str { - response["errors"][0]["message"] - .as_str() - .unwrap_or_else(|| panic!("expected an error, got: {response}")) + // The fixture's AWS link id; it is `provisioned`. Replacing its config + // resets the observed status to `pending`. + let aws_id = "0000000000000a01"; + let updated: serde_json::Value = server + .graphql( + &update_link_mutation(aws_id, NEW_AWS_INPUT), + Some(&alice_token), + ) + .await; + let link = &updated["data"]["updateDataPlanePrivateLink"]; + assert_eq!(link["id"], aws_id, "got: {updated}"); + assert_eq!(link["status"], "PENDING"); + assert_eq!( + link["config"]["serviceName"], + "com.amazonaws.vpce.us-east-1.vpce-svc-new999" + ); + + // Removing a link returns its id and drops the row. + let removed: serde_json::Value = server + .graphql(&remove_link_mutation(aws_id), Some(&alice_token)) + .await; + assert_eq!( + removed["data"]["removeDataPlanePrivateLink"], aws_id, + "got: {removed}" + ); + assert_eq!(count_links(&pool, dp).await, 2); } #[sqlx::test( @@ -1016,10 +1180,10 @@ mod tests { scripts("data_planes", "alice", "private_links") ) )] - async fn test_update_private_links_authorization(pool: sqlx::PgPool) { + async fn test_private_link_mutation_authorization(pool: sqlx::PgPool) { let _guard = test_server::init(); - // Create a bob who has no grants on the private dp. + // bob has no grants on the private dp. sqlx::query( "INSERT INTO auth.users (id, email) VALUES \ ('22222222-2222-2222-2222-222222222222', 'bob@example.test')", @@ -1028,34 +1192,56 @@ mod tests { .await .unwrap(); - let server = - test_server::TestServer::start(pool.clone(), test_server::snapshot(pool, false).await) - .await; + let server = test_server::TestServer::start( + pool.clone(), + test_server::snapshot(pool.clone(), false).await, + ) + .await; let alice_token = server.make_access_token(uuid::Uuid::from_bytes([0x11; 16]), None); let bob_token = server.make_access_token(uuid::Uuid::from_bytes([0x22; 16]), Some("bob@example.test")); - let dp = "ops/dp/private/aliceCo/aws-us-east-1-c1"; - // Alice has read on the private dp via the aliceCo/ -> ops/dp/private/aliceCo/ - // role grant installed by the private_links fixture. + // Alice (read + manage_data_plane bundle) can add. let alice_ok: serde_json::Value = server - .graphql(&update_mutation(dp, VALID_AWS_INPUT), Some(&alice_token)) + .graphql(&add_mutation(dp, NEW_AWS_INPUT), Some(&alice_token)) .await; - let echoed = alice_ok["data"]["updateDataPlanePrivateLinks"] - .as_array() - .unwrap_or_else(|| panic!("alice with `read` should succeed: {alice_ok}")); - assert_eq!(echoed.len(), 1); - assert_eq!(echoed[0]["__typename"], "AWSPrivateLink"); + assert_eq!( + alice_ok["data"]["addDataPlanePrivateLink"]["provider"], "AWS", + "got: {alice_ok}" + ); - // Bob has no grants and should be rejected. + // Bob is rejected for lacking ModifyDataPlanePrivateNetworking. The + // name-addressed `add` openly names the prefix, because the caller + // supplied the name and so reveals nothing they did not already know. let bob_denied: serde_json::Value = server - .graphql(&update_mutation(dp, VALID_AWS_INPUT), Some(&bob_token)) + .graphql(&add_mutation(dp, NEW_AWS_INPUT), Some(&bob_token)) .await; assert_eq!( first_error_message(&bob_denied), "PermissionDenied: bob@example.test is not authorized to access prefix or name 'ops/dp/private/aliceCo/aws-us-east-1-c1' with required capability ModifyDataPlanePrivateNetworking", ); + + // An id-addressed mutation on a link Bob may not modify must return the + // same "not found" as a missing id, never a permission error that would + // confirm the link (or its data plane) exists. `0000000000000a01` is the + // fixture's existing AWS link. + let aws_id = "0000000000000a01"; + for probe in [ + update_link_mutation(aws_id, NEW_AWS_INPUT), + remove_link_mutation(aws_id), + ] { + let response: serde_json::Value = server.graphql(&probe, Some(&bob_token)).await; + let message = first_error_message(&response); + assert!( + message.contains("not found") && !message.contains("PermissionDenied"), + "expected an existence-hiding not-found error, got: {response}" + ); + } + + // Bob's denied remove did not actually delete: Alice's added link plus + // the three from the fixture remain. + assert_eq!(count_links(&pool, dp).await, 4); } #[sqlx::test( @@ -1065,22 +1251,22 @@ mod tests { scripts("data_planes", "alice", "private_links") ) )] - async fn test_update_private_links_name_validation(pool: sqlx::PgPool) { + async fn test_add_private_link_name_validation(pool: sqlx::PgPool) { let _guard = test_server::init(); let server = test_server::TestServer::start(pool.clone(), test_server::snapshot(pool, false).await) .await; let alice_token = server.make_access_token(uuid::Uuid::from_bytes([0x11; 16]), None); - // Names outside `ops/dp/private//...` are rejected by the - // structural check before any auth or DB work. + // Names outside `ops/dp/private//...` are rejected before any + // auth or DB work. let cases: &[&str] = &[ "ops/dp/public/aws-us-west-2-c1", "ops/dp/private/aws-us-east-1-c1", ]; for name in cases { let response: serde_json::Value = server - .graphql(&update_mutation(name, VALID_AWS_INPUT), Some(&alice_token)) + .graphql(&add_mutation(name, NEW_AWS_INPUT), Some(&alice_token)) .await; assert_eq!( first_error_message(&response), @@ -1089,13 +1275,11 @@ mod tests { ); } - // A structurally-valid name that alice is authorized for (the - // fixture's aliceCo/ -> ops/dp/private/aliceCo/ role grant covers any - // sub-prefix) but which matches no data_planes row: the UPDATE - // affects zero rows and reports not-found. + // A well-formed name alice is authorized for but with no matching + // data_planes row reports not-found. let response: serde_json::Value = server .graphql( - &update_mutation("ops/dp/private/aliceCo/aws-us-east-2-c9", VALID_AWS_INPUT), + &add_mutation("ops/dp/private/aliceCo/aws-us-east-2-c9", NEW_AWS_INPUT), Some(&alice_token), ) .await; diff --git a/crates/control-plane-api/src/server/public/graphql/snapshots/control_plane_api__server__public__graphql__data_planes__tests__data_planes_malformed_private_link.snap b/crates/control-plane-api/src/server/public/graphql/snapshots/control_plane_api__server__public__graphql__data_planes__tests__data_planes_malformed_private_link.snap deleted file mode 100644 index 7ab623c0ddc..00000000000 --- a/crates/control-plane-api/src/server/public/graphql/snapshots/control_plane_api__server__public__graphql__data_planes__tests__data_planes_malformed_private_link.snap +++ /dev/null @@ -1,23 +0,0 @@ ---- -source: crates/control-plane-api/src/server/public/graphql/data_planes.rs -expression: response ---- -{ - "data": null, - "errors": [ - { - "locations": [ - { - "column": 37, - "line": 7 - } - ], - "message": "failed to parse private_links[0] for data plane ops/dp/private/aliceCo/aws-us-east-1-c1: data did not match any variant of untagged enum PrivateLink", - "path": [ - "dataPlanes", - "edges", - 0 - ] - } - ] -} diff --git a/crates/control-plane-api/src/server/public/graphql/snapshots/control_plane_api__server__public__graphql__data_planes__tests__data_planes_with_private_links.snap b/crates/control-plane-api/src/server/public/graphql/snapshots/control_plane_api__server__public__graphql__data_planes__tests__data_planes_with_private_links.snap index b13edcd151f..538351ab767 100644 --- a/crates/control-plane-api/src/server/public/graphql/snapshots/control_plane_api__server__public__graphql__data_planes__tests__data_planes_with_private_links.snap +++ b/crates/control-plane-api/src/server/public/graphql/snapshots/control_plane_api__server__public__graphql__data_planes__tests__data_planes_with_private_links.snap @@ -10,8 +10,13 @@ expression: response "node": { "awsLinkEndpoints": [ { - "endpoint_id": "vpce-0123456789abcdef0", - "state": "available" + "dns_entries": [ + { + "dns_name": "vpce-0123abc.vpce-svc-abc123.us-east-1.vpce.amazonaws.com", + "hosted_zone_id": "Z7HUB22EVRPK5" + } + ], + "service_name": "com.amazonaws.vpce.us-east-1.vpce-svc-abc123" } ], "azureLinkEndpoints": [], @@ -19,31 +24,57 @@ expression: response "name": "ops/dp/private/aliceCo/aws-us-east-1-c1", "privateLinks": [ { - "__typename": "AWSPrivateLink", - "azIds": [ - "use1-az1", - "use1-az2" - ], - "region": "us-east-1", - "serviceName": "com.amazonaws.vpce.us-east-1.vpce-svc-abc123" + "config": { + "__typename": "AWSPrivateLink", + "azIds": [ + "use1-az1", + "use1-az2" + ], + "region": "us-east-1", + "serviceName": "com.amazonaws.vpce.us-east-1.vpce-svc-abc123" + }, + "details": { + "dns_entries": [ + { + "dns_name": "vpce-0123abc.vpce-svc-abc123.us-east-1.vpce.amazonaws.com", + "hosted_zone_id": "Z7HUB22EVRPK5" + } + ], + "service_name": "com.amazonaws.vpce.us-east-1.vpce-svc-abc123" + }, + "id": "0000000000000a01", + "provider": "AWS", + "status": "PROVISIONED" }, { - "__typename": "AzurePrivateLink", - "dnsName": "privatelink.database.windows.net", - "location": "eastus", - "resourceType": null, - "serviceName": "/subscriptions/x/resourceGroups/rg/providers/Microsoft.Network/privateLinkServices/svc" + "config": { + "__typename": "AzurePrivateLink", + "dnsName": "privatelink.database.windows.net", + "location": "eastus", + "resourceType": null, + "serviceName": "/subscriptions/x/resourceGroups/rg/providers/Microsoft.Network/privateLinkServices/svc" + }, + "details": null, + "id": "0000000000000a02", + "provider": "AZURE", + "status": "PENDING" }, { - "__typename": "GCPPrivateServiceConnect", - "allPorts": true, - "dnsRecordNames": [ - "r1", - "r2" - ], - "dnsZoneName": "z", - "region": "us-central1", - "serviceAttachment": "projects/p/regions/us-central1/serviceAttachments/sa" + "config": { + "__typename": "GCPPrivateServiceConnect", + "allPorts": true, + "dnsRecordNames": [ + "r1", + "r2" + ], + "dnsZoneName": "z", + "region": "us-central1", + "serviceAttachment": "projects/p/regions/us-central1/serviceAttachments/sa" + }, + "details": null, + "id": "0000000000000a03", + "provider": "GCP", + "status": "PENDING" } ] } diff --git a/crates/data-plane-controller/src/job/executor.rs b/crates/data-plane-controller/src/job/executor.rs index afdc4688a47..ee37328e2fe 100644 --- a/crates/data-plane-controller/src/job/executor.rs +++ b/crates/data-plane-controller/src/job/executor.rs @@ -411,6 +411,11 @@ async fn fetch_row_state( config.model.name = Some(row.data_plane_name); config.model.fqdn = Some(row.data_plane_fqdn); + // The controller reads desired links from the `private_links` column, which + // the `data_plane_private_links` trigger keeps projected from the per-link + // rows. Reading the table directly is deferred to the contract change that + // drops this column, so the controller has no deploy-ordering dependency on + // the agent-api cutover. config.model.private_links = row.private_links.into_iter().map(|link| link.0).collect(); let stack = if let Some(key) = row.pulumi_key { @@ -618,6 +623,54 @@ impl automations::Outcome for Outcome { .execute(&mut *txn) .await .context("failed to publish exports into data_planes row")?; + + // Record per-link observed status by joining each link to its + // provisioned endpoint on (provider, service_identity): present -> + // `provisioned` with the endpoint as `details`, absent -> `pending`. + // This is the temporary bridge until est-dry-dock emits a per-link + // result keyed by the link id (which will also enable `failed`). + _ = sqlx::query!( + r#" + WITH endpoints AS ( + SELECT 'aws'::text AS provider, ep ->> 'service_name' AS identity, ep AS detail + FROM unnest($2::jsonb[]) AS ep + UNION ALL + SELECT 'azure'::text, ep ->> 'service_name', ep + FROM unnest($3::jsonb[]) AS ep + UNION ALL + SELECT 'gcp'::text, ep ->> 'service_attachment', ep + FROM unnest($4::jsonb[]) AS ep + ), + -- Providers for which this converge actually published endpoints. + -- A link is only re-evaluated when its provider published at + -- least one endpoint, so a transient empty or partial export + -- cannot flip an already-`provisioned` link back to `pending` + -- and null its details. A link's row is deleted (not emptied) + -- when it is removed, so a genuine teardown never relies on the + -- array going empty. + published_providers AS ( + SELECT DISTINCT provider FROM endpoints + ) + UPDATE data_plane_private_links l SET + status = CASE WHEN e.identity IS NOT NULL THEN 'provisioned' ELSE 'pending' END, + details = e.detail, + observed_at = now(), + updated_at = now() + FROM data_plane_private_links l2 + LEFT JOIN endpoints e + ON e.provider = l2.provider AND e.identity = l2.service_identity + WHERE l.id = l2.id + AND l2.data_plane_id = $1 + AND l2.provider IN (SELECT provider FROM published_providers) + "#, + self.data_plane_id as models::Id, + &aws_link_endpoints as &[serde_json::Value], + &azure_link_endpoints as &[serde_json::Value], + &gcp_psc_endpoints as &[serde_json::Value], + ) + .execute(&mut *txn) + .await + .context("failed to update private link statuses")?; } Ok(automations::Action::Sleep(self.sleep)) diff --git a/crates/flow-client/control-plane-api.graphql b/crates/flow-client/control-plane-api.graphql index 353b01be592..cfa4bbd3724 100644 --- a/crates/flow-client/control-plane-api.graphql +++ b/crates/flow-client/control-plane-api.graphql @@ -705,11 +705,11 @@ type DataPlane { """ azureApplicationClientId: String """ - Configured private link endpoints for this data-plane. Replacing this - list (via `updateDataPlanePrivateLinks`) triggers reconvergence by the - data-plane controller on its next poll. Returns an empty list to - callers that lack the `ViewDataPlanePrivateNetworking` capability on - this data plane. + Configured private links for this data-plane, each with its + controller-observed provisioning status. Mutating links (via + `addDataPlanePrivateLink` and friends) triggers reconvergence by the + data-plane controller. Returns an empty list to callers that lack the + `ViewDataPlanePrivateNetworking` capability on this data plane. """ privateLinks: [PrivateLink!]! """ @@ -1257,18 +1257,24 @@ type MutationRoot { """ deleteInviteLink(token: UUID!): Boolean! """ - Replaces the configured private link endpoints on a private data plane. - - The provided list overwrites the entire `private_links` column; partial - updates are intentionally not supported. The data-plane controller - converges to the new configuration on its next poll. Returns the desired - private links state. The `*LinkEndpoints` provisioning results are not echoed here: - they lag this write until the controller converges, so callers needing them re-query `dataPlanes`. - - Requires the `ModifyDataPlanePrivateNetworking` capability on the - private data-plane name. + Adds a private link to a private data plane. The data-plane controller + converges to provision it on its next poll; the returned link starts + `pending`. Requires `ModifyDataPlanePrivateNetworking` on the data plane. + """ + addDataPlanePrivateLink(dataPlaneName: String!, config: PrivateLinkConfigInput!): PrivateLink! + """ + Replaces the configuration of an existing private link by id. Changing + the configuration resets its observed status to `pending` until the + controller reconverges. Requires `ModifyDataPlanePrivateNetworking` on + the owning data plane. + """ + updateDataPlanePrivateLink(id: Id!, config: PrivateLinkConfigInput!): PrivateLink! + """ + Removes a private link by id. The controller tears down its endpoint on + the next converge. Requires `ModifyDataPlanePrivateNetworking` on the + owning data plane. Returns the removed link id. """ - updateDataPlanePrivateLinks(dataPlaneName: String!, privateLinks: [PrivateLinkInput!]!): [PrivateLink!]! + removeDataPlanePrivateLink(id: Id!): Id! """ Create a refresh token for the authenticated user. """ @@ -1411,22 +1417,86 @@ input PrefixesBy { minCapability: Capability! } +""" +A configured private link and its controller-observed provisioning status. +""" +type PrivateLink { + """ + Stable identifier of this private link. + """ + id: Id! + """ + Cloud provider of the link. + """ + provider: PrivateLinkProvider! + """ + The link configuration (AWS PrivateLink, Azure Private Link, or GCP PSC). + """ + config: PrivateLinkConfig! + """ + Controller-observed provisioning status. + """ + status: PrivateLinkProvisioningStatus! + """ + Provider-specific provisioning details (DNS entries, IPs) once + provisioned; opaque JSON exported by the data-plane controller. + """ + details: JSON + """ + Failure detail when `status` is `failed`. + """ + error: String + """ + When the controller last observed this link's status. + """ + observedAt: DateTime +} + """ Private link configuration for a customer-owned data plane: AWS PrivateLink, Azure Private Link, or GCP Private Service Connect. """ -union PrivateLink = AWSPrivateLink | AzurePrivateLink | GCPPrivateServiceConnect +union PrivateLinkConfig = AWSPrivateLink | AzurePrivateLink | GCPPrivateServiceConnect """ Private link configuration for a customer-owned data plane: AWS PrivateLink, Azure Private Link, or GCP Private Service Connect. """ -input PrivateLinkInput @oneOf { +input PrivateLinkConfigInput @oneOf { aws: AWSPrivateLinkInput azure: AzurePrivateLinkInput gcp: GCPPrivateServiceConnectInput } +""" +Cloud provider of a private link. Distinct from a data plane's provider +(a private link is never local) and used to disambiguate the service +identity, since AWS and Azure links both key on `service_name`. +""" +enum PrivateLinkProvider { + AWS + AZURE + GCP +} + +""" +Controller-observed provisioning status of a configured private link. +""" +enum PrivateLinkProvisioningStatus { + """ + No matching provisioned endpoint exists yet. + """ + PENDING + """ + A provisioned endpoint matching this link exists. + """ + PROVISIONED + """ + Provisioning failed; see `error`. + """ + FAILED +} + """ Filter connectors by their protocol (capture or materialization). """ diff --git a/crates/models/src/lib.rs b/crates/models/src/lib.rs index b9196ff4432..f7b87273b28 100644 --- a/crates/models/src/lib.rs +++ b/crates/models/src/lib.rs @@ -59,7 +59,9 @@ pub use materializations::{ MaterializationBinding, MaterializationDef, MaterializationEndpoint, MaterializationFields, RecommendedDepth, TargetNamingStrategy, }; -pub use private_links::{AWSPrivateLink, AzurePrivateLink, GCPPrivateServiceConnect, PrivateLink}; +pub use private_links::{ + AWSPrivateLink, AzurePrivateLink, GCPPrivateServiceConnect, PrivateLink, PrivateLinkProvider, +}; pub use raw_value::RawValue; pub use references::{ CATALOG_PREFIX_RE, Capture, Collection, CompositeKey, Field, JsonPointer, Materialization, diff --git a/crates/models/src/private_links.rs b/crates/models/src/private_links.rs index bf5ae139070..ed31245904b 100644 --- a/crates/models/src/private_links.rs +++ b/crates/models/src/private_links.rs @@ -10,7 +10,7 @@ use serde::{Deserialize, Serialize}; #[cfg_attr( feature = "async-graphql", derive(async_graphql::Union, async_graphql::OneofObject), - graphql(name = "PrivateLink", input_name = "PrivateLinkInput") + graphql(name = "PrivateLinkConfig", input_name = "PrivateLinkConfigInput") )] #[serde(untagged)] pub enum PrivateLink { @@ -19,6 +19,52 @@ pub enum PrivateLink { GCP(GCPPrivateServiceConnect), } +/// Cloud provider of a private link. Distinct from a data plane's provider +/// (a private link is never local) and used to disambiguate the service +/// identity, since AWS and Azure links both key on `service_name`. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[cfg_attr(feature = "async-graphql", derive(async_graphql::Enum))] +#[serde(rename_all = "lowercase")] +pub enum PrivateLinkProvider { + Aws, + Azure, + Gcp, +} + +impl PrivateLinkProvider { + /// Lowercase wire/DB representation (`aws`/`azure`/`gcp`). + pub fn as_str(&self) -> &'static str { + match self { + PrivateLinkProvider::Aws => "aws", + PrivateLinkProvider::Azure => "azure", + PrivateLinkProvider::Gcp => "gcp", + } + } +} + +impl PrivateLink { + /// The provider's service identifier for this link: `service_name` for AWS + /// and Azure, `service_attachment` for GCP. These are required fields, so + /// every link has one. It is the stable per-link identity and the join key + /// against the data plane's provisioned endpoint results. + pub fn service_identity(&self) -> &str { + match self { + PrivateLink::AWS(link) => &link.service_name, + PrivateLink::Azure(link) => &link.service_name, + PrivateLink::GCP(link) => &link.service_attachment, + } + } + + /// The cloud provider of this link, derived from its variant. + pub fn provider(&self) -> PrivateLinkProvider { + match self { + PrivateLink::AWS(_) => PrivateLinkProvider::Aws, + PrivateLink::Azure(_) => PrivateLinkProvider::Azure, + PrivateLink::GCP(_) => PrivateLinkProvider::Gcp, + } + } +} + #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] #[cfg_attr( feature = "async-graphql", @@ -195,4 +241,32 @@ mod tests { // False default is skipped on serialize. assert!(!serde_json::to_string(&gcp).unwrap().contains("all_ports")); } + + #[test] + fn service_identity_and_provider_per_variant() { + let aws: PrivateLink = serde_json::from_str( + r#"{"region":"us-east-1","az_ids":["use1-az1"],"service_name":"com.amazonaws.vpce.us-east-1.vpce-svc-abc"}"#, + ).unwrap(); + assert_eq!( + aws.service_identity(), + "com.amazonaws.vpce.us-east-1.vpce-svc-abc" + ); + assert_eq!(aws.provider(), PrivateLinkProvider::Aws); + + let azure: PrivateLink = + serde_json::from_str(r#"{"service_name":"/subscriptions/x/svc","location":"eastus"}"#) + .unwrap(); + assert_eq!(azure.service_identity(), "/subscriptions/x/svc"); + assert_eq!(azure.provider(), PrivateLinkProvider::Azure); + + let gcp: PrivateLink = serde_json::from_str( + r#"{"service_attachment":"projects/p/regions/r/serviceAttachments/sa","region":"r","dns_zone_name":"z","dns_record_names":["n"]}"#, + ).unwrap(); + assert_eq!( + gcp.service_identity(), + "projects/p/regions/r/serviceAttachments/sa" + ); + assert_eq!(gcp.provider(), PrivateLinkProvider::Gcp); + assert_eq!(gcp.provider().as_str(), "gcp"); + } } diff --git a/supabase/migrations/20260618120000_data_plane_private_links.sql b/supabase/migrations/20260618120000_data_plane_private_links.sql new file mode 100644 index 00000000000..5eb1bccb751 --- /dev/null +++ b/supabase/migrations/20260618120000_data_plane_private_links.sql @@ -0,0 +1,180 @@ +-- Model private links as first-class rows with a stable identity and +-- data-plane-controller-owned observed status, replacing the flat +-- `data_planes.private_links` JSON array as the source of truth. +-- +-- During the transition a trigger projects rows back into the +-- `data_planes.private_links` column, so the controller (which still reads that +-- column until its own cutover) keeps working unchanged. A later migration +-- drops the projection and the legacy `private_links` / `*_link_endpoints` +-- columns once the controller reads and writes this table directly. + +begin; + +create table public.data_plane_private_links ( + id public.flowid primary key not null default internal.id_generator(), + data_plane_id public.flowid not null + references public.data_planes (id) on delete cascade, + -- Cloud provider of the link, stored so consumers need not parse `config` + -- to learn the variant and so the controller selects the matching endpoint + -- output array. AWS and Azure links both key on `service_name`, so the + -- provider is what disambiguates them. + provider text not null check (provider in ('aws', 'azure', 'gcp')), + -- The polymorphic link configuration: the same element shape as the legacy + -- `data_planes.private_links` array; round-trips `models::PrivateLink`. + config jsonb not null, + -- The provider's service identifier, used as the join key against the + -- controller's provisioned endpoint outputs and to enforce uniqueness. A + -- data plane is single-cloud, so this is unambiguous within a data plane. + service_identity text generated always as + (coalesce(config ->> 'service_name', config ->> 'service_attachment')) stored, + -- Observed state, written by the data-plane controller. `status` is + -- `pending` until a converge matches a provisioned endpoint; `failed` is + -- reserved for when est-dry-dock reports per-link errors (a later change). + status text not null default 'pending' check (status in ('pending', 'provisioned', 'failed')), + details jsonb, + error text, + observed_at timestamptz, + created_at timestamptz not null default now(), + updated_at timestamptz not null default now(), + constraint data_plane_private_links_unique_identity + unique (data_plane_id, service_identity) +); + +comment on table public.data_plane_private_links is + 'Per-link private networking configuration (desired) and controller-observed status for a data plane.'; + +create index data_plane_private_links_data_plane_id_idx + on public.data_plane_private_links (data_plane_id); + +-- Access mirrors `data_planes`: a user read-authorized to the parent data plane +-- may select (a `read` grant already conveys ViewDataPlanePrivateNetworking). +-- Writes go only through agent-api (service_role); the finer View/Modify +-- capability gating lives in the agent-api resolvers. +alter table public.data_plane_private_links enable row level security; + +create policy "Users must be read-authorized to the parent data plane" + on public.data_plane_private_links + for select + using (exists ( + select 1 + from public.data_planes dp + where dp.id = data_plane_private_links.data_plane_id + and exists ( + select 1 + from public.auth_roles('read'::public.grant_capability) r(role_prefix, capability) + where (dp.data_plane_name)::text ^@ (r.role_prefix)::text + ) + )); + +grant all on table public.data_plane_private_links to service_role; +grant select on table public.data_plane_private_links to reporting_user; +grant select ( + id, data_plane_id, provider, config, service_identity, + status, details, error, observed_at, created_at, updated_at +) on table public.data_plane_private_links to authenticated; + +-- Pre-flight: abort the migration on legacy `private_links` data the new +-- table's invariants cannot represent, rather than silently dropping or +-- corrupting it. A duplicate (data_plane_id, service_identity) would collide on +-- the unique constraint, and an element missing both `service_name` and +-- `service_attachment` would produce a NULL generated `service_identity` that +-- bypasses uniqueness and later fails the resolver's non-null `PrivateLink` +-- decode. Both indicate data that needs hand-correction before this migration. +do $$ +declare + v_missing bigint; + v_dupes bigint; +begin + select count(*) into v_missing + from public.data_planes dp, + lateral unnest(dp.private_links) as elem + where coalesce(elem ->> 'service_name', elem ->> 'service_attachment') is null; + + if v_missing > 0 then + raise exception + 'cannot backfill data_plane_private_links: % private_links element(s) lack a service_name/service_attachment', + v_missing; + end if; + + select count(*) into v_dupes from ( + select 1 + from public.data_planes dp, + lateral unnest(dp.private_links) as elem + group by dp.id, coalesce(elem ->> 'service_name', elem ->> 'service_attachment') + having count(*) > 1 + ) d; + + if v_dupes > 0 then + raise exception + 'cannot backfill data_plane_private_links: % data plane(s) have duplicate private_links service identities', + v_dupes; + end if; +end $$; + +-- Backfill one row per element of every existing `private_links` array. Done +-- before the trigger exists, so it does not reproject or wake anything; the +-- column already holds the source data, so column and table are consistent. No +-- `on conflict` clause: the pre-flight above has proven there are no collisions, +-- so any conflict here is an unexpected invariant break that should abort. +insert into public.data_plane_private_links (data_plane_id, provider, config) +select + dp.id, + case + when (elem ->> 'service_attachment') is not null then 'gcp' + when (elem ->> 'az_ids') is not null then 'aws' + else 'azure' + end, + elem::jsonb +from public.data_planes dp, + lateral unnest(dp.private_links) as elem; + +-- When a link's desired configuration changes (an insert, a delete, or a +-- `config`/`provider` update; see the trigger's `update of` scope below): +-- reproject the rows back into the parent's `data_planes.private_links` column +-- (the controller still reads it until its cutover), and send the parent's +-- controller task a `Converge` message so it applies the new desired +-- configuration promptly rather than waiting for the next idle poll. The +-- message must deserialize into the data-plane-controller's +-- externally-tagged `Message` enum, whose `Converge` unit variant is the JSON +-- string `"converge"` (this is not the `{"type":...}` shape the live-specs +-- controller uses). The not-idle guard on `data_planes` only blocks +-- `config`/`deploy_branch`, so projecting `private_links` is allowed mid-converge. +create function internal.on_data_plane_private_links_change() returns trigger + language plpgsql security definer + set search_path to '' + as $$ +declare + v_data_plane_id public.flowid := coalesce(new.data_plane_id, old.data_plane_id); + v_controller_task_id public.flowid; +begin + update public.data_planes dp set + private_links = coalesce(( + select array_agg(l.config::json order by l.created_at, l.id) + from public.data_plane_private_links l + where l.data_plane_id = v_data_plane_id + ), array[]::json[]) + where dp.id = v_data_plane_id + returning dp.controller_task_id into v_controller_task_id; + + if v_controller_task_id is not null then + perform internal.send_to_task( + v_controller_task_id, + '00:00:00:00:00:00:00:00'::public.flowid, + '"converge"'::json + ); + end if; + + return null; +end; +$$; + +-- Scoped to inserts, deletes, and updates that touch the user-owned desired +-- columns (`config`/`provider`). The controller's post-converge status write +-- only sets `status`/`details`/`observed_at`/`updated_at`, so it does not fire +-- this trigger; were it to, each converge would reproject, wake the controller, +-- and re-trigger itself in an unbounded reconverge loop. +create trigger on_data_plane_private_links_change + after insert or delete or update of config, provider on public.data_plane_private_links + for each row execute function internal.on_data_plane_private_links_change(); + +commit;