Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions crates/agent/src/integration_tests/harness/connectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,26 +211,25 @@ impl Connectors for TestConnectors {
.get("deltaUpdates")
.and_then(|d| d.as_bool())
.unwrap_or(false);
let constraints = collection
let projection_constraints = collection
.projections
.iter()
.map(|p| {
(
p.field.clone(),
.map(|p| materialize::response::validated::ProjectionConstraint {
field: p.field.clone(),
constraint: Some(
materialize::response::validated::Constraint {
r#type: 3,
reason: "all fields are recommended in tests"
.to_string(),
folded_field: String::new(),
},
)
),
})
.collect();
let resource_path = mock_resource_path(&resource_config);
materialize::response::validated::Binding {
case_insensitive_fields: false,
constraints,
projection_constraints: Vec::new(),
projection_constraints,
resource_path,
delta_updates,
ser_policy: None,
Expand Down
19 changes: 9 additions & 10 deletions crates/dekaf-connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use proto_flow::{
};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, JsonSchema, Copy)]
#[serde(rename_all = "snake_case")]
Expand Down Expand Up @@ -135,25 +134,25 @@ where
parsed_outer_config.variant.clone()
))?;

let constraints = binding
let projection_constraints = binding
.collection
.context("collection must exist")?
.projections
.iter()
.map(|projection| {
(
projection.field.clone(),
constraint_for_projection(&projection, &parsed_inner_config),
)
.map(|projection| validated::ProjectionConstraint {
field: projection.field.clone(),
constraint: Some(constraint_for_projection(
&projection,
&parsed_inner_config,
)),
})
.collect::<BTreeMap<_, _>>();
.collect::<Vec<_>>();

Ok::<proto_flow::materialize::response::validated::Binding, anyhow::Error>(
validated::Binding {
case_insensitive_fields: false,
constraints,
delta_updates: true,
projection_constraints: Vec::new(),
projection_constraints,
resource_path: vec![resource_config.topic_name],
ser_policy: None,
},
Expand Down
8 changes: 4 additions & 4 deletions crates/flow-web/FIELD_SELECTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,10 @@ const input = {
},
validated: {
resourcePath: ["users"],
constraints: {
"id": { type: "FIELD_REQUIRED", reason: "Primary key" },
"name": { type: "FIELD_OPTIONAL", reason: "User data" }
},
projectionConstraints: [
{ field: "id", constraint: { type: "FIELD_REQUIRED", reason: "Primary key" } },
{ field: "name", constraint: { type: "FIELD_OPTIONAL", reason: "User data" } }
],
caseInsensitiveFields: false
}
}
Expand Down
40 changes: 26 additions & 14 deletions crates/flow-web/tests/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,24 +122,36 @@ fn test_field_selection() {
},
"validated": {
"resourcePath": ["test_table"],
"constraints": {
"id": {
"type": "FIELD_OPTIONAL",
"reason": "Available field"
"projectionConstraints": [
{
"field": "id",
"constraint": {
"type": "FIELD_OPTIONAL",
"reason": "Available field"
}
},
"flow_published_at": {
"type": "FIELD_OPTIONAL",
"reason": "Available field"
{
"field": "flow_published_at",
"constraint": {
"type": "FIELD_OPTIONAL",
"reason": "Available field"
}
},
"value": {
"type": "FIELD_OPTIONAL",
"reason": "Available field"
{
"field": "value",
"constraint": {
"type": "FIELD_OPTIONAL",
"reason": "Available field"
}
},
"bad": {
"type": "FIELD_FORBIDDEN",
"reason": "Not today, pal."
{
"field": "bad",
"constraint": {
"type": "FIELD_FORBIDDEN",
"reason": "Not today, pal."
}
}
},
],
"deltaUpdates": false
}
}
Expand Down
24 changes: 6 additions & 18 deletions crates/proto-flow/src/materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,8 @@ pub mod response {
/// Nested message and enum types in `Validated`.
pub mod validated {
/// ProjectionConstraint pairs a projection field name with a single Constraint.
/// Unlike the legacy `constraints` map, a repeated list of ProjectionConstraint
/// allows multiple constraints to be expressed for the same field simultaneously.
/// A repeated list of ProjectionConstraint allows multiple constraints to be
/// expressed for the same field simultaneously.
/// For example, INCOMPATIBLE and LOCATION_REQUIRED on the same field signals that
/// the field is required but the existing destination column has an incompatible
/// type, and a backfill is needed.
Expand Down Expand Up @@ -453,17 +453,6 @@ pub mod response {
/// (for example) "myField" and "MyField" from co-existing within a table.
#[prost(bool, tag = "5")]
pub case_insensitive_fields: bool,
/// Constraints imposed by the connector, keyed by field name.
/// Projections of the CollectionSpec which are missing from
/// constraints are implicitly forbidden.
///
/// Deprecated: use projection_constraints instead. When projection_constraints
/// is non-empty it is authoritative and this map is ignored by the control
/// plane. This field is retained for backward compatibility with connectors
/// that have not yet migrated to the list form.
#[prost(btree_map = "string, message", tag = "1")]
pub constraints:
::prost::alloc::collections::BTreeMap<::prost::alloc::string::String, Constraint>,
/// Components of the resource path which fully qualify the resource
/// identified by this binding.
///
Expand Down Expand Up @@ -493,12 +482,11 @@ pub mod response {
#[prost(message, optional, tag = "4")]
pub ser_policy: ::core::option::Option<super::super::super::flow::SerPolicy>,
/// Constraints on each projection, as a list that allows multiple constraints
/// per projection field. When non-empty, this field is authoritative and the
/// legacy `constraints` map is ignored.
/// per projection field. Projections of the CollectionSpec which are missing
/// from this list are implicitly forbidden.
///
/// Connectors should populate this field instead of `constraints`. The list
/// form allows expressing compound requirements that a single constraint type
/// cannot capture. For example, emitting both INCOMPATIBLE and
/// The list form allows expressing compound requirements that a single
/// constraint type cannot capture. For example, emitting both INCOMPATIBLE and
/// LOCATION_REQUIRED for the same field signals that the field is required
/// but the existing destination column is incompatible; a backfill is required.
#[prost(message, repeated, tag = "6")]
Expand Down
19 changes: 0 additions & 19 deletions crates/proto-flow/src/materialize.serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3208,9 +3208,6 @@ impl serde::Serialize for response::validated::Binding {
if self.case_insensitive_fields {
len += 1;
}
if !self.constraints.is_empty() {
len += 1;
}
if !self.resource_path.is_empty() {
len += 1;
}
Expand All @@ -3227,9 +3224,6 @@ impl serde::Serialize for response::validated::Binding {
if self.case_insensitive_fields {
struct_ser.serialize_field("caseInsensitiveFields", &self.case_insensitive_fields)?;
}
if !self.constraints.is_empty() {
struct_ser.serialize_field("constraints", &self.constraints)?;
}
if !self.resource_path.is_empty() {
struct_ser.serialize_field("resourcePath", &self.resource_path)?;
}
Expand All @@ -3254,7 +3248,6 @@ impl<'de> serde::Deserialize<'de> for response::validated::Binding {
const FIELDS: &[&str] = &[
"case_insensitive_fields",
"caseInsensitiveFields",
"constraints",
"resource_path",
"resourcePath",
"delta_updates",
Expand All @@ -3268,7 +3261,6 @@ impl<'de> serde::Deserialize<'de> for response::validated::Binding {
#[allow(clippy::enum_variant_names)]
enum GeneratedField {
CaseInsensitiveFields,
Constraints,
ResourcePath,
DeltaUpdates,
SerPolicy,
Expand Down Expand Up @@ -3296,7 +3288,6 @@ impl<'de> serde::Deserialize<'de> for response::validated::Binding {
{
match value {
"caseInsensitiveFields" | "case_insensitive_fields" => Ok(GeneratedField::CaseInsensitiveFields),
"constraints" => Ok(GeneratedField::Constraints),
"resourcePath" | "resource_path" => Ok(GeneratedField::ResourcePath),
"deltaUpdates" | "delta_updates" => Ok(GeneratedField::DeltaUpdates),
"serPolicy" | "ser_policy" => Ok(GeneratedField::SerPolicy),
Expand All @@ -3321,7 +3312,6 @@ impl<'de> serde::Deserialize<'de> for response::validated::Binding {
V: serde::de::MapAccess<'de>,
{
let mut case_insensitive_fields__ = None;
let mut constraints__ = None;
let mut resource_path__ = None;
let mut delta_updates__ = None;
let mut ser_policy__ = None;
Expand All @@ -3334,14 +3324,6 @@ impl<'de> serde::Deserialize<'de> for response::validated::Binding {
}
case_insensitive_fields__ = Some(map_.next_value()?);
}
GeneratedField::Constraints => {
if constraints__.is_some() {
return Err(serde::de::Error::duplicate_field("constraints"));
}
constraints__ = Some(
map_.next_value::<std::collections::BTreeMap<_, _>>()?
);
}
GeneratedField::ResourcePath => {
if resource_path__.is_some() {
return Err(serde::de::Error::duplicate_field("resourcePath"));
Expand Down Expand Up @@ -3373,7 +3355,6 @@ impl<'de> serde::Deserialize<'de> for response::validated::Binding {
}
Ok(response::validated::Binding {
case_insensitive_fields: case_insensitive_fields__.unwrap_or_default(),
constraints: constraints__.unwrap_or_default(),
resource_path: resource_path__.unwrap_or_default(),
delta_updates: delta_updates__.unwrap_or_default(),
ser_policy: ser_policy__,
Expand Down
23 changes: 0 additions & 23 deletions crates/proto-flow/tests/regression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -668,29 +668,6 @@ fn ex_materialize_response() -> materialize::Response {
bindings: vec![materialize::response::validated::Binding {
resource_path: vec!["some".to_string(), "path".to_string()],
case_insensitive_fields: true,
constraints: [
(
"req_field".to_string(),
materialize::response::validated::Constraint {
r#type:
materialize::response::validated::constraint::Type::FieldRequired
as i32,
reason: "is required".to_string(),
folded_field: "REQ_FIELD".to_string(),
},
),
(
"opt_field".to_string(),
materialize::response::validated::Constraint {
r#type:
materialize::response::validated::constraint::Type::FieldOptional
as i32,
reason: "is optional".to_string(),
folded_field: String::new(),
},
),
]
.into(),
projection_constraints: vec![
materialize::response::validated::ProjectionConstraint {
field: "flow_document".to_string(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,6 @@ expression: json_test(msg)
"bindings": [
{
"caseInsensitiveFields": true,
"constraints": {
"opt_field": {
"type": "FIELD_OPTIONAL",
"reason": "is optional"
},
"req_field": {
"type": "FIELD_REQUIRED",
"reason": "is required",
"foldedField": "REQ_FIELD"
}
},
"resourcePath": [
"some",
"path"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,35 +21,31 @@ expression: proto_test(msg)
|65651211 7b227265 66726573 68223a22| ee..{"refresh":" 00000100
|68647222 7d52100a 06616363 65737312| hdr"}R...access. 00000110
|06226865 7265225a 04504f53 54620450| ."here"Z.POSTb.P 00000120
|4f535412 e2010adf 010a1c0a 096f7074| OST..........opt 00000130
|5f666965 6c64120f 10041a0b 6973206f| _field......is o 00000140
|7074696f 6e616c0a 270a0972 65715f66| ptional.'..req_f 00000150
|69656c64 121a1001 1a0b6973 20726571| ield......is req 00000160
|75697265 64220952 45515f46 49454c44| uired".REQ_FIELD 00000170
|1204736f 6d651204 70617468 1801220a| ..some..path..". 00000180
|08808004 10e80718 e8072801 323b0a0d| ..........(.2;.. 00000190
|666c6f77 5f646f63 756d656e 74122a10| flow_document.*. 000001a0
|021a2674 68652072 6f6f7420 646f6375| ..&the root docu 000001b0
|6d656e74 206d7573 74206265 206d6174| ment must be mat 000001c0
|65726961 6c697a65 64323d0a 0d666c6f| erialized2=..flo 000001d0
|775f646f 63756d65 6e74122c 10061a28| w_document.,...( 000001e0
|65786973 74696e67 20636f6c 756d6e20| existing column 000001f0
|68617320 616e2069 6e636f6d 70617469| has an incompati 00000200
|626c6520 74797065 1a2a0a10 49206469| ble type.*..I di 00000210
|6420736f 6d652073 74756666 12160a12| d some stuff.... 00000220
|7b227374 61746522 3a227570 64617465| {"state":"update 00000230
|227d1001 22680a64 0a4a0a15 612f7265| "}.."h.d.J..a/re 00000240
|61642f6a 6f75726e 616c3b73 75666669| ad/journal;suffi 00000250
|78123108 b9601215 0a050309 08050712| x.1..`.......... 00000260
|0c09e321 00000000 000010d7 0812150a| ...!............ 00000270
|05070c66 2b1d120c 09350100 00000000| ...f+....5...... 00000280
|0010ae11 12160a0e 616e2f61 636b2f6a| ........an/ack/j 00000290
|6f75726e 616c1204 03040205 10012a14| ournal........*. 000002a0
|08041210 7b226c6f 61646564 223a2264| ....{"loaded":"d 000002b0
|6f63227d 32180a16 0a127b22 73746174| oc"}2.....{"stat 000002c0
|65223a22 75706461 7465227d 10013a18| e":"update"}..:. 000002d0
|0a160a12 7b227374 61746522 3a227570| ....{"state":"up 000002e0
|64617465 227d1001 42180a16 0a127b22| date"}..B.....{" 000002f0
|73746174 65223a22 75706461 7465227d| state":"update"} 00000300
|1001a206 06120248 691801| .......Hi.. 00000310
0000031b
|4f535412 9b010a98 01120473 6f6d6512| OST........some. 00000130
|04706174 68180122 0a088080 0410e807| .path.."........ 00000140
|18e80728 01323b0a 0d666c6f 775f646f| ...(.2;..flow_do 00000150
|63756d65 6e74122a 10021a26 74686520| cument.*...&the 00000160
|726f6f74 20646f63 756d656e 74206d75| root document mu 00000170
|73742062 65206d61 74657269 616c697a| st be materializ 00000180
|6564323d 0a0d666c 6f775f64 6f63756d| ed2=..flow_docum 00000190
|656e7412 2c10061a 28657869 7374696e| ent.,...(existin 000001a0
|6720636f 6c756d6e 20686173 20616e20| g column has an 000001b0
|696e636f 6d706174 69626c65 20747970| incompatible typ 000001c0
|651a2a0a 10492064 69642073 6f6d6520| e.*..I did some 000001d0
|73747566 6612160a 127b2273 74617465| stuff....{"state 000001e0
|223a2275 70646174 65227d10 0122680a| ":"update"}.."h. 000001f0
|640a4a0a 15612f72 6561642f 6a6f7572| d.J..a/read/jour 00000200
|6e616c3b 73756666 69781231 08b96012| nal;suffix.1..`. 00000210
|150a0503 09080507 120c09e3 21000000| ............!... 00000220
|00000010 d7081215 0a05070c 662b1d12| ............f+.. 00000230
|0c093501 00000000 000010ae 1112160a| ..5............. 00000240
|0e616e2f 61636b2f 6a6f7572 6e616c12| .an/ack/journal. 00000250
|04030402 0510012a 14080412 107b226c| .......*.....{"l 00000260
|6f616465 64223a22 646f6322 7d32180a| oaded":"doc"}2.. 00000270
|160a127b 22737461 7465223a 22757064| ...{"state":"upd 00000280
|61746522 7d10013a 180a160a 127b2273| ate"}..:.....{"s 00000290
|74617465 223a2275 70646174 65227d10| tate":"update"}. 000002a0
|0142180a 160a127b 22737461 7465223a| .B.....{"state": 000002b0
|22757064 61746522 7d1001a2 06061202| "update"}....... 000002c0
|48691801| Hi.. 000002d0
000002d4
Loading
Loading