diff --git a/crates/models/src/snapshots/models__triggers__test__trigger-config-schema.snap b/crates/models/src/snapshots/models__triggers__test__trigger-config-schema.snap index 79e7bcb007b..9c83709ac85 100644 --- a/crates/models/src/snapshots/models__triggers__test__trigger-config-schema.snap +++ b/crates/models/src/snapshots/models__triggers__test__trigger-config-schema.snap @@ -43,6 +43,14 @@ expression: schema "secret": true } }, + "interval": { + "title": "Minimum interval between deliveries for this trigger.", + "type": [ + "string", + "null" + ], + "pattern": "^\\d+(s|m|h|d)$" + }, "maxAttempts": { "title": "Maximum number of delivery attempts (including the initial attempt).", "description": "The task is failed if all attempts are exhausted without a successful delivery.", diff --git a/crates/models/src/triggers.rs b/crates/models/src/triggers.rs index 0bc12e22b7f..f65f1f069a4 100644 --- a/crates/models/src/triggers.rs +++ b/crates/models/src/triggers.rs @@ -44,6 +44,14 @@ pub struct TriggerConfig { #[serde(default = "default_max_attempts")] #[schemars(extend("default" = 3_u32))] pub max_attempts: u32, + /// # Minimum interval between deliveries for this trigger. + #[schemars(schema_with = "super::duration_schema")] + #[serde( + default, + with = "humantime_serde", + skip_serializing_if = "Option::is_none" + )] + pub interval: Option, } /// HTTP method for the webhook request. @@ -92,6 +100,52 @@ impl TriggerVariables { run_id: "2024-01-01T00:00:00.000Z".to_string(), } } + + /// Merge `other` into `self`, widening this window to cover both. Used to + /// collapse a burst of debounced transactions into a single delivery. + pub fn merge(&mut self, other: &TriggerVariables) { + for name in &other.collection_names { + if !self.collection_names.contains(name) { + self.collection_names.push(name.clone()); + } + } + self.collection_names.sort(); + + if other.flow_published_at_min < self.flow_published_at_min { + self.flow_published_at_min = other.flow_published_at_min.clone(); + } + if other.flow_published_at_max > self.flow_published_at_max { + self.flow_published_at_max = other.flow_published_at_max.clone(); + } + } +} + +/// Persisted trigger parameters, tolerating both wire formats that have been +/// written to the runtime's durable "trigger-params" key: +/// +/// - `PerConfig`: the current format — an accumulated window per trigger, +/// written by runtimes with per-config debounce. +/// - `Single`: the legacy format — one window, fired by every configured +/// trigger. Written by the V1 runtime and by pre-debounce V2 runtimes. +#[derive(Debug, Deserialize, PartialEq)] +#[serde(untagged)] +pub enum PersistedTriggerParams { + PerConfig(BTreeMap), + Single(TriggerVariables), +} + +impl PersistedTriggerParams { + /// Reduce to a single window spanning all pending deliveries (V1 style). + /// Returns None if there is nothing pending. + pub fn into_merged(self) -> Option { + match self { + Self::Single(variables) => Some(variables), + Self::PerConfig(map) => map.into_values().reduce(|mut acc, v| { + acc.merge(&v); + acc + }), + } + } } /// Render a single payload template string with the given context. @@ -131,6 +185,7 @@ pub struct HmacExcludedOriginals { pub payload_template: String, pub timeout: Duration, pub max_attempts: u32, + pub interval: Option, } /// Replace HMAC-excluded fields in each trigger config with placeholder values, @@ -144,6 +199,7 @@ pub fn strip_hmac_excluded_fields(triggers: &mut Triggers) -> Vec serde_json::Value { mod test { use super::*; + // Both persisted wire formats decode unambiguously, and a per-config map + // merges down to the single window the V1 runtime fires with. + #[test] + fn persisted_trigger_params_decodes_both_formats() { + let single = TriggerVariables::placeholder(); + let legacy_blob = serde_json::to_vec(&single).unwrap(); + let decoded: PersistedTriggerParams = serde_json::from_slice(&legacy_blob).unwrap(); + assert_eq!(decoded, PersistedTriggerParams::Single(single.clone())); + assert_eq!(decoded_merged(&legacy_blob), Some(single.clone())); + + let mut early = single.clone(); + early.flow_published_at_min = "2023-01-01T00:00:00Z".to_string(); + early.collection_names = vec!["acmeCo/other".to_string()]; + let map: BTreeMap = [ + ("POST https://a".to_string(), single.clone()), + ("POST https://b".to_string(), early), + ] + .into(); + let map_blob = serde_json::to_vec(&map).unwrap(); + let decoded: PersistedTriggerParams = serde_json::from_slice(&map_blob).unwrap(); + assert_eq!(decoded, PersistedTriggerParams::PerConfig(map)); + + let merged = decoded_merged(&map_blob).unwrap(); + assert_eq!(merged.flow_published_at_min, "2023-01-01T00:00:00Z"); + assert_eq!( + merged.collection_names, + vec![ + "acmeCo/example/collection".to_string(), + "acmeCo/other".to_string() + ], + ); + + assert_eq!(decoded_merged(b"{}"), None); + } + + fn decoded_merged(blob: &[u8]) -> Option { + serde_json::from_slice::(blob) + .unwrap() + .into_merged() + } + #[test] fn trigger_config_schema_snapshot() { let settings = schemars::generate::SchemaSettings::draft2019_09(); @@ -202,6 +300,7 @@ mod test { payload_template: "my template".to_string(), timeout: Duration::from_secs(45), max_attempts: 5, + interval: Some(Duration::from_secs(1800)), }], sops: None, }; diff --git a/crates/runtime-next/src/leader/materialize/actor.rs b/crates/runtime-next/src/leader/materialize/actor.rs index d570ae29dab..9bdc4e909aa 100644 --- a/crates/runtime-next/src/leader/materialize/actor.rs +++ b/crates/runtime-next/src/leader/materialize/actor.rs @@ -32,6 +32,8 @@ pub struct Actor { Option)>>>, // Task being executed by this actor. task: Task, + // Leader-lifetime trigger debounce accumulator and last-fire times. + trigger_debounce: fsm::TriggerDebounce, // Future for an in-flight trigger dispatch, if any. trigger_fut: Option>>, } @@ -55,6 +57,7 @@ impl Actor { shard_tx, stats_write_fut: None, task, + trigger_debounce: fsm::TriggerDebounce::default(), trigger_fut: None, } } @@ -124,6 +127,7 @@ impl Actor { let action: fsm::Action; let prev_kind = tail.kind(); (action, tail) = tail.step( + &self.trigger_debounce, self.intents_write_fut.is_none(), now, &mut ready_shard_rx, @@ -148,6 +152,7 @@ impl Actor { (action, head) = head.step( &mut binding_bytes_behind, &mut close_requested, + &mut self.trigger_debounce, &mut self.legacy_checkpoint, now, &mut ready_frontier, @@ -457,15 +462,13 @@ impl Actor { "leader", "starting trigger execution" ); - let variables: models::TriggerVariables = - serde_json::from_slice(&trigger_params) - .context("decoding trigger_variables JSON")?; + let to_fire = super::triggers::decode_to_fire(&triggers, &trigger_params)?; let client = self.http_client.clone(); self.trigger_fut = Some( async move { // TODO(johnny): Periodic writes into task ops logs if it takes a while. - super::triggers::fire_pending_triggers(&triggers, &variables, &client).await + super::triggers::fire_pending_triggers(&triggers, &to_fire, &client).await } .boxed(), ); diff --git a/crates/runtime-next/src/leader/materialize/fsm.rs b/crates/runtime-next/src/leader/materialize/fsm.rs index 700f6a331bd..9c735c9be1d 100644 --- a/crates/runtime-next/src/leader/materialize/fsm.rs +++ b/crates/runtime-next/src/leader/materialize/fsm.rs @@ -64,7 +64,8 @@ pub struct PendingDeltas { pub persist_patches: Vec, /// Queued connector state patches for the next shards broadcast. pub shard_patches: Vec, - /// Parameters for the post-Acknowledge trigger. + /// Serialized `BTreeMap` of the trigger + /// windows due to fire post-Acknowledge, or empty if none are due pub trigger_params: bytes::Bytes, } @@ -168,6 +169,7 @@ impl Head { self, binding_bytes_behind: &mut [i64], close_requested: &mut bool, + debounce: &mut TriggerDebounce, legacy_checkpoint: &mut Option<(shuffle::Frontier, consumer::Checkpoint)>, now: uuid::Clock, ready_frontier: &mut Option, @@ -178,13 +180,23 @@ impl Head { task: &Task, ) -> (Action, Head) { match self { - Head::Idle(s) => s.step(now, close_requested, ready_frontier, stopping, tail, task), + Head::Idle(s) => s.step( + now, + close_requested, + debounce, + ready_frontier, + stopping, + tail, + task, + ), Head::Extend(s) => s.step(shard_rx), Head::Flush(s) => s.step(now, shard_rx, task), Head::Persist(s) => s.step(shard_rx), Head::Store(s) => s.step(binding_bytes_behind, shard_rx, task), Head::WriteStats(s) => s.step(legacy_checkpoint, stats_write_idle, task), - Head::StartCommit(s) => s.step(legacy_checkpoint, now, shard_rx, stopping), + Head::StartCommit(s) => { + s.step(debounce, legacy_checkpoint, now, shard_rx, stopping, task) + } Head::Stop => panic!("HeadFSM::Stop observed at step boundary"), } } @@ -207,6 +219,7 @@ impl Tail { /// Dispatch to the current sub-state's `step()`. pub fn step( self, + debounce: &TriggerDebounce, intents_write_idle: bool, now: uuid::Clock, shard_rx: &mut Option<(usize, proto::Materialize)>, @@ -218,7 +231,7 @@ impl Tail { Tail::Begin(s) => s.step(stopping, task), Tail::WriteIntents(s) => s.step(intents_write_idle), Tail::Acknowledge(s) => s.step(now, shard_rx, task), - Tail::Trigger(s) => s.step(now, trigger_call_running), + Tail::Trigger(s) => s.step(debounce, now, trigger_call_running), Tail::Persist(s) => s.step(shard_rx), Tail::Done(_) => (Action::Idle, self), } @@ -256,6 +269,7 @@ impl HeadIdle { mut self, now: uuid::Clock, close_requested: &mut bool, + debounce: &mut TriggerDebounce, ready_frontier: &mut Option, stopping: bool, tail: &mut Tail, @@ -325,8 +339,32 @@ impl HeadIdle { ); } - // Should we begin to close the transaction? + // No transaction is open. Fire any debounced windows that have come + // due while the task is quiet. if !is_open { + if let Tail::Done(done) = tail + && let Some(compiled) = &task.triggers + { + let to_fire = debounce.take_due(now, &compiled.configs); + if !to_fire.is_empty() { + let shard_patches = std::mem::take(&mut done.shard_patches); + *tail = Tail::Trigger(TailTrigger { shard_patches }); + + return ( + Action::CallTrigger { + triggers: compiled.clone(), + trigger_params: serde_json::to_vec(&to_fire) + .expect("TriggerVariables always serialize") + .into(), + }, + Head::Idle(self), + ); + } + // Nothing due: wake when the earliest pending window comes due. + if let Some(wake_after) = debounce.next_due(now, &compiled.configs) { + return (Action::Sleep { wake_after }, Head::Idle(self)); + } + } return (Action::Idle, Head::Idle(self)); } else if may_close { let Self { mut extents, .. } = self; @@ -625,9 +663,7 @@ impl HeadStore { // We've received all expected Stored responses. let Self { - extents, - mut pending, - .. + extents, pending, .. } = self; // Fold deltas from the extents Frontier into per-binding "bytes behind" gauges. @@ -638,33 +674,6 @@ impl HeadStore { *entry += jf.bytes_behind_delta; } - // Compose the trigger payload now that we have a complete txn-wide view. - if task.triggers.is_some() && !extents.bindings.is_empty() { - let collection_names: Vec = extents - .bindings - .keys() - .filter_map(|idx| task.binding_collection_names.get(*idx as usize).cloned()) - .collect(); - - let mut it = extents - .bindings - .values() - .map(|extents| (extents.min_source_clock, extents.max_source_clock)); - let init = it.next().unwrap_or_default(); - let (min, max) = it.fold(init, |(min, max), (a, b)| (min.min(a), max.max(b))); - - pending.trigger_params = serde_json::to_vec(&models::TriggerVariables { - collection_names, - connector_image: task.connector_image.clone(), - materialization_name: task.shard_ref.name.clone(), - flow_published_at_min: tokens::DateTime::from(min.to_time()).to_rfc3339(), - flow_published_at_max: tokens::DateTime::from(max.to_time()).to_rfc3339(), - run_id: tokens::DateTime::from(extents.open.to_time()).to_rfc3339(), - }) - .unwrap() - .into(); - } - let action = match build_stats_doc(task, &extents, binding_bytes_behind) { Ok(stats) => Action::WriteStats { stats }, Err(error) => Action::Error { error }, @@ -768,10 +777,12 @@ pub struct HeadStartCommit { impl HeadStartCommit { pub fn step( mut self, + debounce: &mut TriggerDebounce, legacy_checkpoint: &Option<(shuffle::Frontier, consumer::Checkpoint)>, now: uuid::Clock, shard_rx: &mut Option<(usize, proto::Materialize)>, stopping: bool, + task: &Task, ) -> (Action, Head) { // Did we receive an expected StartedCommit response? if let Some(( @@ -809,6 +820,14 @@ impl HeadStartCommit { .. } = self; + // Merge this transaction's window into the debounce accumulator, then + // persist the full accumulator — including entries about to fire + let window = compute_trigger_window(task, &extents); + if let (Some(compiled), Some(window)) = (&task.triggers, &window) { + debounce.accumulate(compiled.keys(), window); + } + let (trigger_params_json, delete_trigger_params) = debounce.to_persist(); + let Extents { close, frontier, .. } = extents; @@ -828,9 +847,10 @@ impl HeadStartCommit { committed_frontier: Some(shuffle::JournalFrontier::encode(&frontier.journals)), connector_patches_json: take_patches(&mut pending.persist_patches), delete_ack_intents: true, + delete_trigger_params, legacy_checkpoint, max_keys: std::mem::take(&mut pending.max_key_deltas), - trigger_params_json: pending.trigger_params.clone(), + trigger_params_json, ..Default::default() }; @@ -843,6 +863,17 @@ impl HeadStartCommit { // resume from Tail::Begin. (Action::Idle, Head::Stop) } else { + // Move the due subset out of the accumulator; the Tail delivers it + // post-Acknowledge and then persists the reduced accumulator. + if let Some(compiled) = &task.triggers { + let to_fire = debounce.take_due(now, &compiled.configs); + if !to_fire.is_empty() { + pending.trigger_params = serde_json::to_vec(&to_fire) + .expect("TriggerVariables always serialize") + .into(); + } + } + // Rotate to begin a next transaction. `idempotent_replay` // is one-shot — only the first transaction of a session may replay // recovered hints, so post-Rotate HeadIdle is always non-replay. @@ -1038,18 +1069,27 @@ pub struct TailTrigger { } impl TailTrigger { - pub fn step(self, now: uuid::Clock, trigger_call_running: bool) -> (Action, Tail) { + pub fn step( + self, + debounce: &TriggerDebounce, + now: uuid::Clock, + trigger_call_running: bool, + ) -> (Action, Tail) { if trigger_call_running { return (Action::Idle, Tail::Trigger(self)); } let Self { shard_patches } = self; + // The fire succeeded: durably record the accumulator with the fired + // entries removed, deleting the key outright when nothing remains pending. let seq_no = now.as_u64(); + let (trigger_params_json, delete_trigger_params) = debounce.to_persist(); let action = Action::Persist { persist: proto::Persist { seq_no, - delete_trigger_params: true, + delete_trigger_params, + trigger_params_json, ..Default::default() }, }; @@ -1103,6 +1143,128 @@ pub struct TailDone { pub shard_patches: bytes::Bytes, } +/// Leader-lifetime debounce state for materialization triggers. Accumulates +/// per-transaction windows per trigger and gates firing to at most once per +/// configured `interval` +#[derive(Debug, Default)] +pub struct TriggerDebounce { + /// Accumulated, not-yet-fired window per trigger `config_key`. Persisted. + pub pending: BTreeMap, + /// Wall-clock of the last fire per `config_key`. In-memory only + pub last_fire: BTreeMap, +} + +impl TriggerDebounce { + /// Merge one committed transaction's `window` into the accumulator of every + /// configured trigger `key`. + pub fn accumulate<'a>( + &mut self, + keys: impl Iterator, + window: &models::TriggerVariables, + ) { + for key in keys { + self.pending + .entry(key.to_string()) + .and_modify(|acc| acc.merge(window)) + .or_insert_with(|| window.clone()); + } + } + + /// Remove and return the windows due to fire now: a config is due when it + /// has a non-empty accumulated window and either has no `interval`, has + /// never fired, or its `interval` has elapsed since its last fire. + pub fn take_due( + &mut self, + now: uuid::Clock, + configs: &[models::TriggerConfig], + ) -> BTreeMap { + let mut to_fire = BTreeMap::new(); + for config in configs { + let key = super::triggers::config_key(config); + let due = match (config.interval, self.last_fire.get(&key)) { + (Some(interval), Some(last)) => uuid::Clock::delta(now, *last) >= interval, + _ => true, // No interval configured, or never fired. + }; + if !due { + continue; + } + let Some(window) = self.pending.remove(&key) else { + continue; // Nothing accumulated for this config. + }; + self.last_fire.insert(key.clone(), now); + to_fire.insert(key, window); + } + to_fire + } + + /// Duration until the earliest pending-but-not-yet-due window comes due, + /// or None when no pending window has a future deadline. + pub fn next_due( + &self, + now: uuid::Clock, + configs: &[models::TriggerConfig], + ) -> Option { + configs + .iter() + .filter_map(|config| { + let key = super::triggers::config_key(config); + if !self.pending.contains_key(&key) { + return None; + } + let (Some(interval), Some(last)) = (config.interval, self.last_fire.get(&key)) + else { + return None; + }; + Some(interval.saturating_sub(uuid::Clock::delta(now, *last))) + }) + .min() + } + + /// Encode the accumulator for a `proto::Persist`: the serialized pending + /// map and whether the durable key should instead be deleted (nothing + /// remains pending). + pub fn to_persist(&self) -> (bytes::Bytes, bool) { + if self.pending.is_empty() { + (bytes::Bytes::new(), true) + } else { + let json = + serde_json::to_vec(&self.pending).expect("TriggerVariables always serialize"); + (json.into(), false) + } + } +} + +/// Compose this transaction's trigger window from its committed `extents`, +/// or None if the task has no triggers or the transaction materialized no +/// data. +fn compute_trigger_window(task: &Task, extents: &Extents) -> Option { + if task.triggers.is_none() || extents.bindings.is_empty() { + return None; + } + + let collection_names: Vec = extents + .bindings + .keys() + .filter_map(|idx| task.binding_collection_names.get(*idx as usize).cloned()) + .collect(); + + let mut it = extents + .bindings + .values() + .map(|extents| (extents.min_source_clock, extents.max_source_clock)); + let init = it.next().unwrap_or_default(); + let (min, max) = it.fold(init, |(min, max), (a, b)| (min.min(a), max.max(b))); + + Some(models::TriggerVariables { + collection_names, + connector_image: task.connector_image.clone(), + materialization_name: task.shard_ref.name.clone(), + flow_published_at_min: tokens::DateTime::from(min.to_time()).to_rfc3339(), + flow_published_at_max: tokens::DateTime::from(max.to_time()).to_rfc3339(), + run_id: tokens::DateTime::from(extents.open.to_time()).to_rfc3339(), + }) +} + // Extend separate accrued patches for a future Persist vs future shard broadcast, // into `pending` from `src`. pub fn extend_patches(pending: &mut PendingDeltas, src: &[u8]) { @@ -1183,6 +1345,7 @@ mod tests { struct Ctx { binding_bytes_behind: Vec, close_requested: bool, + debounce: TriggerDebounce, intents_idle: bool, legacy_checkpoint: Option<(shuffle::Frontier, consumer::Checkpoint)>, now: uuid::Clock, @@ -1201,6 +1364,7 @@ mod tests { head.step( &mut self.binding_bytes_behind, &mut self.close_requested, + &mut self.debounce, &mut self.legacy_checkpoint, self.now, &mut self.ready_frontier, @@ -1215,6 +1379,7 @@ mod tests { fn step_tail(&mut self, tail: Tail) -> (Action, Tail) { self.now.tick(); tail.step( + &self.debounce, self.intents_idle, self.now, &mut self.shard_rx, @@ -1239,7 +1404,18 @@ mod tests { peers: (0..n_shards).map(|i| format!("shard-{i}")).collect(), shard_ref: ops::ShardRef::default(), triggers: Some(std::sync::Arc::new( - super::super::triggers::CompiledTriggers::compile(vec![]).unwrap(), + super::super::triggers::CompiledTriggers::compile(vec![models::TriggerConfig { + url: "https://example.com/hook".to_string(), + method: models::HttpMethod::POST, + headers: Default::default(), + payload_template: "{}".to_string(), + timeout: Duration::from_secs(30), + max_attempts: 3, + // No interval: fire every transaction that materializes data, + // matching the pre-debounce behavior this lifecycle test covers. + interval: None, + }]) + .unwrap(), )), } } @@ -1383,6 +1559,7 @@ mod tests { let mut ctx = Ctx { binding_bytes_behind: vec![0; task.binding_collection_names.len()], close_requested: false, + debounce: TriggerDebounce::default(), intents_idle: true, legacy_checkpoint: None, now: uuid::Clock::from_unix(1_700_000_000, 0), @@ -1569,7 +1746,7 @@ mod tests { @r#" ( b"[{\"phase\":\"committed\",\"shard\":0}\t,{\"phase\":\"committed\",\"shard\":1}\t]", - b"{\"collection_names\":[\"test/collection\"],\"connector_image\":\"\",\"materialization_name\":\"\",\"flow_published_at_min\":\"2023-11-14T22:13:25+00:00\",\"flow_published_at_max\":\"2023-11-14T22:13:30+00:00\",\"run_id\":\"2023-11-14T22:13:20.000001+00:00\"}", + b"{\"POST https://example.com/hook\":{\"collection_names\":[\"test/collection\"],\"connector_image\":\"\",\"materialization_name\":\"\",\"flow_published_at_min\":\"2023-11-14T22:13:25+00:00\",\"flow_published_at_max\":\"2023-11-14T22:13:30+00:00\",\"run_id\":\"2023-11-14T22:13:20.000001+00:00\"}}", ) "#); @@ -1737,12 +1914,178 @@ mod tests { } /// Verifies aggregation of L:Loaded `max_key_delta` across shards and Load cycles. + // On recovery, `handler` seeds `Tail::Begin` with the persisted trigger + // accumulator as the to_fire set. The Tail must re-fire it verbatim + // (at-least-once), and it flows independently of the live (empty) accumulator. + #[test] + fn recovery_refires_persisted_accumulator() { + let mut recovered: std::collections::BTreeMap = + Default::default(); + recovered.insert( + "POST https://a".to_string(), + models::TriggerVariables::placeholder(), + ); + recovered.insert( + "POST https://b".to_string(), + models::TriggerVariables::placeholder(), + ); + let serialized = serde_json::to_vec(&recovered).unwrap(); + + let mut ctx = Ctx { + binding_bytes_behind: vec![0; 1], + close_requested: false, + debounce: TriggerDebounce::default(), + intents_idle: true, + legacy_checkpoint: None, + now: uuid::Clock::from_unix(1_700_000_000, 0), + pending_ack_intents: BTreeMap::new(), + ready_frontier: None, + shard_rx: None, + stats_idle: false, + stopping: false, + task: mk_task(1), + trigger_running: false, + }; + + // Recovery injects the persisted accumulator as the Tail's to_fire set. + let mut tail = Tail::Begin(TailBegin { + pending: PendingDeltas { + trigger_params: Bytes::from(serialized), + ..Default::default() + }, + }); + + // Begin → Acknowledge. + let (action, t) = ctx.step_tail(tail); + tail = t; + assert!(matches!(action, Action::Acknowledge { .. })); + + // Single shard Acknowledged, no patches → WriteIntents (CallTrigger chained). + ctx.shard_rx = Some(mk_acknowledged(0, b"")); + let (action, t) = ctx.step_tail(tail); + tail = t; + assert!(matches!(action, Action::WriteIntents { .. })); + + // Intents written → CallTrigger carrying the recovered map verbatim. + let (action, _t) = ctx.step_tail(tail); + let params = match action { + Action::CallTrigger { trigger_params, .. } => trigger_params, + other => panic!("expected CallTrigger, got {other:?}"), + }; + let fired: std::collections::BTreeMap = + serde_json::from_slice(¶ms).unwrap(); + assert_eq!(fired, recovered, "recovered accumulator re-fires verbatim"); + assert!( + ctx.debounce.pending.is_empty(), + "recovery re-fire does not touch the live accumulator", + ); + } + + // A debounced window fires from Idle once its interval elapses, with no + // further transaction: HeadIdle sleeps until the deadline, then emits + // CallTrigger and rotates the Tail through its normal fire → + // Persist(reduced accumulator) → Done sequence. + #[test] + fn idle_fires_debounced_window_after_interval() { + let mut task = mk_task(1); + let config = models::TriggerConfig { + url: "https://example.com/hook".to_string(), + method: models::HttpMethod::POST, + headers: Default::default(), + payload_template: "{}".to_string(), + timeout: Duration::from_secs(30), + max_attempts: 3, + interval: Some(Duration::from_secs(600)), + }; + let key = super::super::triggers::config_key(&config); + task.triggers = Some(std::sync::Arc::new( + super::super::triggers::CompiledTriggers::compile(vec![config]).unwrap(), + )); + + let t0 = uuid::Clock::from_unix(1_700_000_000, 0); + let mut ctx = Ctx { + binding_bytes_behind: vec![0; 1], + close_requested: false, + debounce: TriggerDebounce::default(), + intents_idle: true, + legacy_checkpoint: None, + now: t0, + pending_ack_intents: BTreeMap::new(), + ready_frontier: None, + shard_rx: None, + stats_idle: false, + stopping: false, + task, + trigger_running: false, + }; + + // Seed: the config last fired at t0, and a window accumulated since. + ctx.debounce.last_fire.insert(key.clone(), t0); + ctx.debounce + .pending + .insert(key.clone(), models::TriggerVariables::placeholder()); + + let mut tail = Tail::Done(TailDone { + shard_patches: Bytes::new(), + }); + let mut head = Head::Idle(HeadIdle::default()); + + // Idle before the deadline: Head sleeps until the window comes due. + ctx.now = uuid::Clock::from_unix(1_700_000_100, 0); // t0 + 100s. + let (action, h) = ctx.step_head(head, &mut tail); + head = h; + let Action::Sleep { wake_after } = action else { + panic!("expected Sleep, got {action:?}"); + }; + assert!( + wake_after > Duration::from_secs(499) && wake_after <= Duration::from_secs(500), + "expected ~500s until due, got {wake_after:?}", + ); + + // Past the deadline: fire from Idle, with no transaction in flight. + ctx.now = uuid::Clock::from_unix(1_700_000_601, 0); // t0 + 601s. + let (action, h) = ctx.step_head(head, &mut tail); + head = h; + let params = match action { + Action::CallTrigger { trigger_params, .. } => trigger_params, + other => panic!("expected CallTrigger, got {other:?}"), + }; + let fired: BTreeMap = + serde_json::from_slice(¶ms).unwrap(); + assert_eq!(fired.into_keys().collect::>(), vec![key.clone()]); + assert!(matches!(head, Head::Idle(_))); + assert!(matches!(tail, Tail::Trigger(_))); + assert!(ctx.debounce.pending.is_empty()); + assert_eq!(ctx.debounce.last_fire.get(&key).copied(), Some(ctx.now)); + + // Trigger completes → Persist deleting the (now empty) accumulator → Done. + let (action, t) = ctx.step_tail(tail); + tail = t; + let persist = match action { + Action::Persist { persist } => persist, + other => panic!("expected Persist, got {other:?}"), + }; + assert!(persist.delete_trigger_params); + assert!(persist.trigger_params_json.is_empty()); + + ctx.shard_rx = Some(mk_tail_persisted(&tail)); + let (action, t) = ctx.step_tail(tail); + tail = t; + assert!(matches!(action, Action::Idle)); + assert!(matches!(tail, Tail::Done(_))); + + // Head is quiescent again: nothing pending, no timer to arm. + let (action, _h) = ctx.step_head(head, &mut tail); + assert!(matches!(action, Action::Idle)); + } + #[test] fn loaded_max_key_delta_reduction() { let task = mk_task(2); let mut ctx = Ctx { binding_bytes_behind: vec![0; task.binding_collection_names.len()], close_requested: false, + debounce: TriggerDebounce::default(), intents_idle: true, legacy_checkpoint: None, now: uuid::Clock::from_unix(1_700_000_000, 0), @@ -1984,6 +2327,7 @@ mod tests { let mut ctx = Ctx { binding_bytes_behind: vec![0; 3], close_requested: false, + debounce: TriggerDebounce::default(), intents_idle: false, legacy_checkpoint: None, now: uuid::Clock::from_unix(1_700_000_000, 0), @@ -2030,3 +2374,119 @@ mod tests { .quickcheck(prop as fn(u64) -> bool); } } + +#[cfg(test)] +mod debounce_tests { + use super::*; + use std::time::Duration; + + fn cfg(url: &str, interval: Option) -> models::TriggerConfig { + models::TriggerConfig { + url: url.to_string(), + method: models::HttpMethod::POST, + headers: Default::default(), + payload_template: "{}".to_string(), + timeout: Duration::from_secs(30), + max_attempts: 3, + interval, + } + } + + fn window(collection: &str, min: &str, max: &str) -> models::TriggerVariables { + models::TriggerVariables { + collection_names: vec![collection.to_string()], + connector_image: "img".to_string(), + materialization_name: "mat".to_string(), + flow_published_at_min: min.to_string(), + flow_published_at_max: max.to_string(), + run_id: min.to_string(), + } + } + + fn t(secs: u64) -> uuid::Clock { + uuid::Clock::from_unix(secs, 0) + } + + fn keys(configs: &[models::TriggerConfig]) -> Vec { + configs + .iter() + .map(super::super::triggers::config_key) + .collect() + } + + fn accumulate(d: &mut TriggerDebounce, ks: &[String], w: &models::TriggerVariables) { + d.accumulate(ks.iter().map(String::as_str), w); + } + + // A burst of transactions within one interval collapses into a single + // delivery whose window spans the union of the collapsed transactions. + #[test] + fn burst_within_interval_collapses_to_one_fire() { + let configs = vec![cfg("https://a", Some(Duration::from_secs(600)))]; + let ks = keys(&configs); + let mut d = TriggerDebounce::default(); + + // First qualifying txn: never fired, so it's due immediately. + accumulate(&mut d, &ks, &window("c", "t00", "t00")); + let f0 = d.take_due(t(0), &configs); + assert_eq!(f0.len(), 1, "first txn fires"); + + // Two more txns inside the 600s window: accumulated but suppressed. + accumulate(&mut d, &ks, &window("c", "t01", "t01")); + assert!(d.take_due(t(60), &configs).is_empty(), "debounced at t=60"); + accumulate(&mut d, &ks, &window("c", "t02", "t02")); + assert!( + d.take_due(t(120), &configs).is_empty(), + "debounced at t=120" + ); + + // Once the interval elapses, the single fire covers the merged window. + let f = d.take_due(t(600), &configs); + assert_eq!(f.len(), 1, "fires once interval elapses"); + let w = f.values().next().unwrap(); + assert_eq!(w.flow_published_at_min, "t01", "min spans the burst"); + assert_eq!(w.flow_published_at_max, "t02", "max spans the burst"); + assert!(d.pending.is_empty(), "accumulator drained after fire"); + } + + // Each config debounces on its own interval off the same transactions. + #[test] + fn per_config_intervals_are_independent() { + let configs = vec![ + cfg("https://fast", Some(Duration::from_secs(60))), + cfg("https://slow", Some(Duration::from_secs(600))), + ]; + let ks = keys(&configs); + let mut d = TriggerDebounce::default(); + + // t=0: both fire (never fired). + accumulate(&mut d, &ks, &window("c", "t00", "t00")); + assert_eq!(d.take_due(t(0), &configs).len(), 2); + + // t=90: fast (>=60s) is due; slow (<600s) is still debounced. + accumulate(&mut d, &ks, &window("c", "t90", "t90")); + let f = d.take_due(t(90), &configs); + assert_eq!(f.len(), 1); + assert!(f.contains_key(&super::super::triggers::config_key(&configs[0]))); + + // The slow config still holds its accumulated window pending. + assert!( + d.pending + .contains_key(&super::super::triggers::config_key(&configs[1])) + ); + } + + // With no interval, every qualifying transaction fires (pre-debounce behavior). + #[test] + fn no_interval_fires_every_transaction() { + let configs = vec![cfg("https://a", None)]; + let ks = keys(&configs); + let mut d = TriggerDebounce::default(); + + for i in 0..3 { + accumulate(&mut d, &ks, &window("c", "t", "t")); + assert_eq!(d.take_due(t(i), &configs).len(), 1, "fires on txn {i}"); + assert!(d.pending.is_empty()); + } + } +} diff --git a/crates/runtime-next/src/leader/materialize/triggers.rs b/crates/runtime-next/src/leader/materialize/triggers.rs index 6ca90a02e3a..49833d4f361 100644 --- a/crates/runtime-next/src/leader/materialize/triggers.rs +++ b/crates/runtime-next/src/leader/materialize/triggers.rs @@ -1,9 +1,18 @@ use anyhow::Context; use models::TriggerVariables; +use std::collections::BTreeMap; + +/// Stable identity for a trigger config, used to key debounce state so it +/// survives config reordering or additions across sessions. +pub fn config_key(config: &models::TriggerConfig) -> String { + format!("{:?} {}", config.method, config.url) +} /// Pre-compiled trigger templates and their associated configs. pub struct CompiledTriggers { pub configs: Vec, + /// Maps a config's stable `config_key` to its index in `configs`. + key_index: BTreeMap, registry: handlebars::Handlebars<'static>, } @@ -14,13 +23,45 @@ impl CompiledTriggers { registry.set_strict_mode(true); registry.register_escape_fn(handlebars::no_escape); + let mut key_index = BTreeMap::new(); for (index, config) in configs.iter().enumerate() { registry .register_template_string(&Self::template_name(index), &config.payload_template) .with_context(|| format!("compiling trigger {index} template"))?; + + // Debounce state is keyed by `config_key` (method + URL), so + // multiple configs sharing a key aren't supported: the first config + // wins and later duplicates never fire. + let key = config_key(config); + if key_index.contains_key(&key) { + service_kit::event!( + tracing::Level::WARN, + "trigger", + trigger_key = key.clone(), + trigger_index = index, + "duplicate trigger configs share a method and URL; only the first will fire", + ); + } else { + key_index.insert(key, index); + } } - Ok(Self { configs, registry }) + Ok(Self { + configs, + key_index, + registry, + }) + } + + /// Stable keys of all configured triggers, one per config. + pub fn keys(&self) -> impl Iterator { + self.key_index.keys().map(String::as_str) + } + + /// Resolve a stable `config_key` to its config index, or None if no + /// current config matches (e.g. the trigger was removed on republish). + pub fn index_for_key(&self, key: &str) -> Option { + self.key_index.get(key).copied() } /// Render the template for trigger `index` with the given context. @@ -41,27 +82,42 @@ impl std::fmt::Debug for CompiledTriggers { } } -/// Fire all configured triggers using the given variables. +/// Decode persisted trigger parameters into the per-config to_fire map. +/// A legacy single-window blob (persisted by a pre-debounce build, or by the +/// V1 runtime ahead of a migration) fans out to every configured trigger, +/// matching its original fire-all semantics. +pub fn decode_to_fire( + compiled: &CompiledTriggers, + bytes: &[u8], +) -> anyhow::Result> { + match serde_json::from_slice::(bytes) + .context("decoding trigger to_fire JSON")? + { + models::triggers::PersistedTriggerParams::PerConfig(map) => Ok(map), + models::triggers::PersistedTriggerParams::Single(variables) => Ok(compiled + .keys() + .map(|key| (key.to_string(), variables.clone())) + .collect()), + } +} + +/// Fire the due subset of triggers. `to_fire` maps a config's stable +/// `config_key` to the accumulated window to deliver for that config. pub async fn fire_pending_triggers( compiled: &CompiledTriggers, - variables: &TriggerVariables, + to_fire: &BTreeMap, client: &reqwest::Client, ) -> anyhow::Result<()> { let started_at = std::time::Instant::now(); - send_webhooks( - compiled, - variables, - client, - std::time::Duration::from_secs(1), - ) - .await - .context("trigger webhook delivery failed")?; + send_webhooks(compiled, to_fire, client, std::time::Duration::from_secs(1)) + .await + .context("trigger webhook delivery failed")?; service_kit::event!( tracing::Level::INFO, "leader", - num_triggers = compiled.configs.len(), + num_triggers = to_fire.len(), elapsed_ms = started_at.elapsed().as_millis() as u64, "trigger webhooks delivered successfully", ); @@ -69,33 +125,35 @@ pub async fn fire_pending_triggers( Ok(()) } -/// Render and send all configured trigger webhooks concurrently. +/// Render and send the due trigger webhooks concurrently, each with its own +/// accumulated window. pub async fn send_webhooks( compiled: &CompiledTriggers, - variables: &TriggerVariables, + to_fire: &BTreeMap, client: &reqwest::Client, base_backoff: std::time::Duration, ) -> anyhow::Result<()> { - if compiled.configs.is_empty() { - return Ok(()); + let mut rendered = Vec::with_capacity(to_fire.len()); + for (key, variables) in to_fire { + let Some(index) = compiled.index_for_key(key) else { + // The config was removed on a republish while a delivery was still + // pending; drop it rather than fail the task. + service_kit::event!( + tracing::Level::WARN, + "trigger", + trigger_key = key.clone(), + "pending trigger has no matching config; dropping", + ); + continue; + }; + let trigger = &compiled.configs[index]; + let context = models::build_template_context(variables, &trigger.headers); + rendered.push((index, trigger, compiled.render(index, &context)?)); } - let rendered: Vec = compiled - .configs - .iter() - .enumerate() - .map(|(index, trigger)| { - let context = models::build_template_context(variables, &trigger.headers); - compiled.render(index, &context) - }) - .collect::>()?; - - let futures: Vec<_> = compiled - .configs - .iter() - .zip(rendered) - .enumerate() - .map(|(index, (trigger, body))| { + let futures: Vec<_> = rendered + .into_iter() + .map(|(index, trigger, body)| { send_single_webhook(index, trigger, body, client, base_backoff) }) .collect(); @@ -241,9 +299,80 @@ mod test { payload_template: template.to_string(), timeout: std::time::Duration::from_secs(5), max_attempts: 3, + interval: None, } } + // A legacy single-window blob (pre-debounce persisted format) fans out to + // every configured trigger; the current map format passes through as-is. + #[test] + fn decode_to_fire_handles_both_persisted_formats() { + let cfg_a = make_trigger_with_url("https://a", "{}"); + let cfg_b = make_trigger_with_url("https://b", "{}"); + let compiled = CompiledTriggers::compile(vec![cfg_a.clone(), cfg_b.clone()]).unwrap(); + + let variables = TriggerVariables::placeholder(); + let legacy_blob = serde_json::to_vec(&variables).unwrap(); + let fanned = decode_to_fire(&compiled, &legacy_blob).unwrap(); + assert_eq!( + fanned, + [ + (config_key(&cfg_a), variables.clone()), + (config_key(&cfg_b), variables.clone()), + ] + .into(), + ); + + let map: BTreeMap = + [(config_key(&cfg_a), variables.clone())].into(); + let map_blob = serde_json::to_vec(&map).unwrap(); + assert_eq!(decode_to_fire(&compiled, &map_blob).unwrap(), map); + } + + // Two configs sharing method+URL would collide in the debounce accumulator, + // so multiple configs sharing a key aren't supported: the first wins and + // later duplicates never fire. + #[tokio::test] + async fn duplicate_config_keys_first_wins() { + let bodies = Arc::new(std::sync::Mutex::new(Vec::new())); + let app = { + let bodies = bodies.clone(); + axum::Router::new().route( + "/hook", + axum::routing::post(move |body: String| { + let bodies = bodies.clone(); + async move { + bodies.lock().unwrap().push(body); + axum::http::StatusCode::OK + } + }), + ) + }; + let (addr, _handle) = start_mock_server(app).await; + let url = format!("http://{addr}/hook"); + + let first = make_trigger_with_url(&url, r#"{"which":"first"}"#); + let second = make_trigger_with_url(&url, r#"{"which":"second"}"#); + let compiled = CompiledTriggers::compile(vec![first.clone(), second]).unwrap(); + + let to_fire: BTreeMap = + [(config_key(&first), TriggerVariables::placeholder())].into(); + send_webhooks( + &compiled, + &to_fire, + &reqwest::Client::new(), + std::time::Duration::ZERO, + ) + .await + .unwrap(); + + assert_eq!( + *bodies.lock().unwrap(), + vec![r#"{"which":"first"}"#.to_string()], + "exactly one delivery, using the first config's template", + ); + } + #[test] fn render_template() { let mut trigger = make_trigger_with_url( @@ -348,9 +477,13 @@ mod test { trigger.max_attempts = case.max_attempts; let compiled = CompiledTriggers::compile(vec![trigger]).unwrap(); + let to_fire: BTreeMap = compiled + .keys() + .map(|k| (k.to_string(), TriggerVariables::placeholder())) + .collect(); let result = send_webhooks( &compiled, - &TriggerVariables::placeholder(), + &to_fire, &reqwest::Client::new(), std::time::Duration::ZERO, ) @@ -370,4 +503,68 @@ mod test { ); } } + + // A `to_fire` map fires each configured trigger once with its own window, + // and an entry whose key matches no current config (removed/edited on + // republish) is skipped without failing the batch. + #[tokio::test] + async fn fires_map_and_skips_unknown_key() { + let hits = Arc::new(AtomicU32::new(0)); + let app = { + let (ha, hb) = (hits.clone(), hits.clone()); + axum::Router::new() + .route( + "/a", + axum::routing::post(move || { + let h = ha.clone(); + async move { + h.fetch_add(1, Ordering::SeqCst); + axum::http::StatusCode::OK + } + }), + ) + .route( + "/b", + axum::routing::post(move || { + let h = hb.clone(); + async move { + h.fetch_add(1, Ordering::SeqCst); + axum::http::StatusCode::OK + } + }), + ) + }; + let (addr, _handle) = start_mock_server(app).await; + + let cfg_a = make_trigger_with_url(&format!("http://{addr}/a"), r#"{"t":"a"}"#); + let cfg_b = make_trigger_with_url(&format!("http://{addr}/b"), r#"{"t":"b"}"#); + let compiled = CompiledTriggers::compile(vec![cfg_a.clone(), cfg_b.clone()]).unwrap(); + + let mut to_fire: BTreeMap = BTreeMap::new(); + to_fire.insert(config_key(&cfg_a), TriggerVariables::placeholder()); + to_fire.insert(config_key(&cfg_b), TriggerVariables::placeholder()); + // No matching config: must be dropped, not fired and not an error. + to_fire.insert( + "POST http://removed/x".to_string(), + TriggerVariables::placeholder(), + ); + + let result = send_webhooks( + &compiled, + &to_fire, + &reqwest::Client::new(), + std::time::Duration::ZERO, + ) + .await; + + assert!( + result.is_ok(), + "unknown key must not fail the batch: {result:?}" + ); + assert_eq!( + hits.load(Ordering::SeqCst), + 2, + "both configured triggers fire; the unknown key is skipped", + ); + } } diff --git a/crates/runtime/src/materialize/protocol.rs b/crates/runtime/src/materialize/protocol.rs index 140c58ae1a8..6aaf67b783d 100644 --- a/crates/runtime/src/materialize/protocol.rs +++ b/crates/runtime/src/materialize/protocol.rs @@ -382,9 +382,14 @@ pub async fn recv_connector_acked_or_loaded_or_flushed( queue_connector_state_update(&state, &mut wb).context("invalid Acknowledged")?; db.write_opt(wb, rocksdb::WriteOptions::default()).await?; } + // Decode via PersistedTriggerParams: a per-config map written by + // the V2 runtime ahead of a rollback merges into the one window + // that V1 fires every config with. if let Some(compiled) = compiled_triggers - && let Some(variables) = - db.load_trigger_params::().await? + && let Some(params) = db + .load_trigger_params::() + .await? + && let Some(variables) = params.into_merged() { let compiled = std::sync::Arc::clone(compiled); let http_client = http_client.clone(); @@ -690,6 +695,7 @@ mod test { payload_template: template.to_string(), timeout: std::time::Duration::from_secs(5), max_attempts: 3, + interval: None, } } diff --git a/crates/runtime/src/materialize/triggers.rs b/crates/runtime/src/materialize/triggers.rs index d2a25af1932..a00286e6c82 100644 --- a/crates/runtime/src/materialize/triggers.rs +++ b/crates/runtime/src/materialize/triggers.rs @@ -295,6 +295,7 @@ mod test { payload_template: template.to_string(), timeout: std::time::Duration::from_secs(5), max_attempts: 3, + interval: None, } } diff --git a/site/docs/concepts/materialization/materialization-triggers.md b/site/docs/concepts/materialization/materialization-triggers.md index 41b01b8bcc9..8dcec829fa6 100644 --- a/site/docs/concepts/materialization/materialization-triggers.md +++ b/site/docs/concepts/materialization/materialization-triggers.md @@ -10,8 +10,9 @@ been materialized — for example, to kick off a dbt run, send a Slack message, or call a custom API. Triggers are configured on the materialization itself and fire once per -committed transaction. Each trigger sends an HTTP request whose URL, method, -headers, and JSON body you define. The body is a +committed transaction (or at most once per `interval`, when one is set). Each +trigger sends an HTTP request whose URL, method, headers, and JSON body you +define. The body is a [Handlebars](https://handlebarsjs.com/) template that can reference transaction metadata and secret header values. @@ -91,6 +92,10 @@ materializations: # Maximum number of delivery attempts (including the initial attempt). # Optional. Default: 3. maxAttempts: 3 + # Minimum interval between deliveries. When set, bursts of + # transactions collapse into at most one delivery per interval. + # Optional. Default: unset (fire on every transaction). + interval: 30m ``` ## Properties @@ -104,6 +109,7 @@ materializations: | **`/triggers/config/*/payloadTemplate`** | Payload Template | Handlebars template that renders to the JSON request body. | string | | | **`/triggers/config/*/timeout`** | Timeout | Request timeout for each delivery attempt. Must be greater than 0. The task is failed if all attempts are exhausted without a successful delivery. | string (duration) | `30s` | | **`/triggers/config/*/maxAttempts`** | Max Attempts | Maximum number of delivery attempts (including the initial attempt). | integer | `3` | +| **`/triggers/config/*/interval`** | Interval | Minimum interval between deliveries. A burst of transactions collapses into at most one delivery per interval, covering the full span. Unset: fire on every transaction. | string (duration) | unset | ## Template variables @@ -287,6 +293,6 @@ after initial publication without re-entering all secret header values: - `method` - `headers` (keys and encrypted values) -The remaining fields (`payloadTemplate`, `timeout`, `maxAttempts`) are -excluded from the SOPS integrity check, so you can modify them freely without -needing to re-enter your secret header values. +The remaining fields (`payloadTemplate`, `timeout`, `maxAttempts`, `interval`) +are excluded from the SOPS integrity check, so you can modify them freely +without needing to re-enter your secret header values.