Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ expression: schema
"secret": true
}
},
"interval": {
"title": "Minimum interval between deliveries for this trigger.",
"type": [
"string",
"null"
],
"pattern": "^\\d+(s|m|h|d)$"
},
"maxAttempts": {
"title": "Maximum number of delivery attempts (including the initial attempt).",
"description": "The task is failed if all attempts are exhausted without a successful delivery.",
Expand Down
99 changes: 99 additions & 0 deletions crates/models/src/triggers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ pub struct TriggerConfig {
#[serde(default = "default_max_attempts")]
#[schemars(extend("default" = 3_u32))]
pub max_attempts: u32,
/// # Minimum interval between deliveries for this trigger.
#[schemars(schema_with = "super::duration_schema")]
#[serde(
default,
with = "humantime_serde",
skip_serializing_if = "Option::is_none"
)]
pub interval: Option<Duration>,
}

/// HTTP method for the webhook request.
Expand Down Expand Up @@ -92,6 +100,52 @@ impl TriggerVariables {
run_id: "2024-01-01T00:00:00.000Z".to_string(),
}
}

/// Merge `other` into `self`, widening this window to cover both. Used to
/// collapse a burst of debounced transactions into a single delivery.
pub fn merge(&mut self, other: &TriggerVariables) {
for name in &other.collection_names {
if !self.collection_names.contains(name) {
self.collection_names.push(name.clone());
}
}
self.collection_names.sort();

if other.flow_published_at_min < self.flow_published_at_min {
self.flow_published_at_min = other.flow_published_at_min.clone();
}
if other.flow_published_at_max > self.flow_published_at_max {
self.flow_published_at_max = other.flow_published_at_max.clone();
}
}
}

/// Persisted trigger parameters, tolerating both wire formats that have been
/// written to the runtime's durable "trigger-params" key:
///
/// - `PerConfig`: the current format — an accumulated window per trigger,
/// written by runtimes with per-config debounce.
/// - `Single`: the legacy format — one window, fired by every configured
/// trigger. Written by the V1 runtime and by pre-debounce V2 runtimes.
#[derive(Debug, Deserialize, PartialEq)]
#[serde(untagged)]
pub enum PersistedTriggerParams {
PerConfig(BTreeMap<String, TriggerVariables>),
Single(TriggerVariables),
}

impl PersistedTriggerParams {
/// Reduce to a single window spanning all pending deliveries (V1 style).
/// Returns None if there is nothing pending.
pub fn into_merged(self) -> Option<TriggerVariables> {
match self {
Self::Single(variables) => Some(variables),
Self::PerConfig(map) => map.into_values().reduce(|mut acc, v| {
acc.merge(&v);
acc
}),
}
}
}

/// Render a single payload template string with the given context.
Expand Down Expand Up @@ -131,6 +185,7 @@ pub struct HmacExcludedOriginals {
pub payload_template: String,
pub timeout: Duration,
pub max_attempts: u32,
pub interval: Option<Duration>,
}

/// Replace HMAC-excluded fields in each trigger config with placeholder values,
Expand All @@ -144,6 +199,7 @@ pub fn strip_hmac_excluded_fields(triggers: &mut Triggers) -> Vec<HmacExcludedOr
payload_template: std::mem::take(&mut config.payload_template),
timeout: std::mem::replace(&mut config.timeout, Duration::ZERO),
max_attempts: std::mem::replace(&mut config.max_attempts, 0),
interval: std::mem::take(&mut config.interval),
})
.collect()
}
Expand All @@ -157,6 +213,7 @@ pub fn restore_hmac_excluded_fields(
config.payload_template = orig.payload_template;
config.timeout = orig.timeout;
config.max_attempts = orig.max_attempts;
config.interval = orig.interval;
}
}

Expand All @@ -183,6 +240,47 @@ pub fn triggers_schema() -> serde_json::Value {
mod test {
use super::*;

// Both persisted wire formats decode unambiguously, and a per-config map
// merges down to the single window the V1 runtime fires with.
#[test]
fn persisted_trigger_params_decodes_both_formats() {
let single = TriggerVariables::placeholder();
let legacy_blob = serde_json::to_vec(&single).unwrap();
let decoded: PersistedTriggerParams = serde_json::from_slice(&legacy_blob).unwrap();
assert_eq!(decoded, PersistedTriggerParams::Single(single.clone()));
assert_eq!(decoded_merged(&legacy_blob), Some(single.clone()));

let mut early = single.clone();
early.flow_published_at_min = "2023-01-01T00:00:00Z".to_string();
early.collection_names = vec!["acmeCo/other".to_string()];
let map: BTreeMap<String, TriggerVariables> = [
("POST https://a".to_string(), single.clone()),
("POST https://b".to_string(), early),
]
.into();
let map_blob = serde_json::to_vec(&map).unwrap();
let decoded: PersistedTriggerParams = serde_json::from_slice(&map_blob).unwrap();
assert_eq!(decoded, PersistedTriggerParams::PerConfig(map));

let merged = decoded_merged(&map_blob).unwrap();
assert_eq!(merged.flow_published_at_min, "2023-01-01T00:00:00Z");
assert_eq!(
merged.collection_names,
vec![
"acmeCo/example/collection".to_string(),
"acmeCo/other".to_string()
],
);

assert_eq!(decoded_merged(b"{}"), None);
}

fn decoded_merged(blob: &[u8]) -> Option<TriggerVariables> {
serde_json::from_slice::<PersistedTriggerParams>(blob)
.unwrap()
.into_merged()
}

#[test]
fn trigger_config_schema_snapshot() {
let settings = schemars::generate::SchemaSettings::draft2019_09();
Expand All @@ -202,6 +300,7 @@ mod test {
payload_template: "my template".to_string(),
timeout: Duration::from_secs(45),
max_attempts: 5,
interval: Some(Duration::from_secs(1800)),
}],
sops: None,
};
Expand Down
11 changes: 7 additions & 4 deletions crates/runtime-next/src/leader/materialize/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ pub struct Actor {
Option<BoxFuture<'static, tonic::Result<(crate::Publisher, BTreeMap<String, Bytes>)>>>,
// Task being executed by this actor.
task: Task,
// Leader-lifetime trigger debounce accumulator and last-fire times.
trigger_debounce: fsm::TriggerDebounce,
// Future for an in-flight trigger dispatch, if any.
trigger_fut: Option<BoxFuture<'static, anyhow::Result<()>>>,
}
Expand All @@ -55,6 +57,7 @@ impl Actor {
shard_tx,
stats_write_fut: None,
task,
trigger_debounce: fsm::TriggerDebounce::default(),
trigger_fut: None,
}
}
Expand Down Expand Up @@ -124,6 +127,7 @@ impl Actor {
let action: fsm::Action;
let prev_kind = tail.kind();
(action, tail) = tail.step(
&self.trigger_debounce,
self.intents_write_fut.is_none(),
now,
&mut ready_shard_rx,
Expand All @@ -148,6 +152,7 @@ impl Actor {
(action, head) = head.step(
&mut binding_bytes_behind,
&mut close_requested,
&mut self.trigger_debounce,
&mut self.legacy_checkpoint,
now,
&mut ready_frontier,
Expand Down Expand Up @@ -457,15 +462,13 @@ impl Actor {
"leader",
"starting trigger execution"
);
let variables: models::TriggerVariables =
serde_json::from_slice(&trigger_params)
.context("decoding trigger_variables JSON")?;
let to_fire = super::triggers::decode_to_fire(&triggers, &trigger_params)?;
let client = self.http_client.clone();

self.trigger_fut = Some(
async move {
// TODO(johnny): Periodic writes into task ops logs if it takes a while.
super::triggers::fire_pending_triggers(&triggers, &variables, &client).await
super::triggers::fire_pending_triggers(&triggers, &to_fire, &client).await
}
.boxed(),
);
Expand Down
Loading
Loading