diff --git a/CHANGELOG.md b/CHANGELOG.md index 041e251..ba7f2ea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- **Slack/Discord webhook sink** for state-transition push + notifications. New `[sinks.webhook]` config table with + `enabled` / `endpoint` / `endpoint_env` / `flavor` / `on_states` + / `rate_limit_secs` keys. Defaults forward only `WaitingInput` + and `Error` transitions — most agent transitions are routine + `Idle ↔ Working` and would spam. Auto-detects Slack + (`hooks.slack.com` → `{"text": "..."}`) and Discord + (`discord.com/api/webhooks` → `{"content": "..."}`) from the + URL; falls back to a generic flavor that posts the full + `Transition` JSON. Per-`(kind, session_id, state)` in-task + rate-limit (default 60s) prevents flapping permission loops + from paging the operator 30 times a minute. Best-effort by + design — failed POSTs log at WARN and drop, no on-disk queue, + no retry backoff. The webhook URL is the secret on Slack and + Discord, so prefer `endpoint_env` over inline TOML. - **`muxa logs`** — tail muxad's stdout/stderr without remembering `/tmp/muxad.log` and `/tmp/muxad.err`. Default streams the last 30 lines of both files (configurable via `-n/--lines`) then follows diff --git a/config.example.toml b/config.example.toml index 18c0acc..5403598 100644 --- a/config.example.toml +++ b/config.example.toml @@ -176,3 +176,26 @@ # device_id = "laptop-01" # batch_size = 50 # flush_interval_ms = 5000 + +# [sinks.webhook] forwards `Transition` events to a Slack or Discord +# webhook so you can be paged on your phone the moment an agent flips +# to `WaitingInput` or `Error` while you're AFK from `muxa watch`. +# Auto-detects the wire format from the URL: hooks.slack.com → +# `{"text": "..."}`, discord.com/api/webhooks → `{"content": "..."}`, +# anything else → the full `Transition` JSON. +# +# The webhook URL is itself the secret for Slack/Discord; prefer +# `endpoint_env` over inline `endpoint` so it never lives in TOML. +# `endpoint_env` wins when both are set. +# +# Best-effort by design: a failed POST logs and drops. No queue, no +# backoff — Slack/Discord's own rate-limit handling is good enough, +# and we'd rather lose a single page than thunder-herd 1000 buffered +# alerts when an outage clears. +[sinks.webhook] +# enabled = true +# endpoint = "https://hooks.slack.com/services/T0/B0/abc" +# endpoint_env = "MUXA_SLACK_URL" +# flavor = "slack" # slack | discord | generic +# on_states = ["WaitingInput", "Error"] +# rate_limit_secs = 60 # per (kind, session, state) diff --git a/crates/muxa/src/config.rs b/crates/muxa/src/config.rs index 0047823..0147df7 100644 --- a/crates/muxa/src/config.rs +++ b/crates/muxa/src/config.rs @@ -74,6 +74,12 @@ pub enum ConfigError { not set (no default endpoint by design)" )] OhMyPromptMissingEndpoint, + + #[error( + "sinks.webhook: enabled = true but neither [sinks.webhook].endpoint \ + nor [sinks.webhook].endpoint_env is set (one of the two is required)" + )] + WebhookMissingEndpoint, } #[derive(Debug, Clone, Serialize, Deserialize, Default)] @@ -102,6 +108,7 @@ pub struct Config { #[serde(default, deny_unknown_fields)] pub struct SinksConfig { pub oh_my_prompt: OhMyPromptToml, + pub webhook: WebhookToml, } /// `[sinks.oh_my_prompt]` raw TOML schema. The daemon resolves these @@ -126,6 +133,36 @@ pub struct OhMyPromptToml { pub flush_interval_ms: Option, } +/// `[sinks.webhook]` raw TOML schema. The daemon resolves these fields +/// against env vars + defaults via `WebhookSink::resolve` at startup. +/// +/// Either `endpoint` (URL inline in TOML) OR `endpoint_env` (name of an +/// env var holding the URL) is required when `enabled = true`. The +/// env-var path is preferred for Slack/Discord webhooks because the URL +/// itself is the secret — committing it to a shared dotfile is a leak. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +#[serde(default, deny_unknown_fields)] +pub struct WebhookToml { + pub enabled: Option, + /// Full webhook URL. Mutually-optional with `endpoint_env`; the env + /// var wins when both are set. + pub endpoint: Option, + /// Name of an env var holding the full webhook URL. Set this in + /// preference to `endpoint` so the secret URL never lives in TOML. + pub endpoint_env: Option, + /// Wire-format flavor: `slack` | `discord` | `generic`. Auto-detected + /// from the URL when unset. + pub flavor: Option, + /// State transitions to forward. Defaults to `["WaitingInput", + /// "Error"]` — the two states that mean "operator attention needed". + /// `PascalCase` or `snake_case` are both accepted at resolve time. + pub on_states: Option>, + /// Per-`(kind, session_id, state)` rate-limit window in seconds. + /// Defaults to 60. Set to 0 to disable (one notification per + /// transition, even if the agent flaps). + pub rate_limit_secs: Option, +} + /// `[dashboard]` config — the user-facing TOML schema for the dashboard /// HTTP server. All fields are `Option` so the config-file layer can /// distinguish "not set" (use default or env/flag override) from @@ -429,6 +466,7 @@ impl Config { pub fn validate_for_daemon(&self) -> std::result::Result<(), ConfigError> { validate_dashboard(&self.dashboard)?; validate_oh_my_prompt(&self.sinks.oh_my_prompt)?; + validate_webhook(&self.sinks.webhook)?; Ok(()) } @@ -514,6 +552,18 @@ fn validate_oh_my_prompt(cfg: &OhMyPromptToml) -> std::result::Result<(), Config Ok(()) } +fn validate_webhook(cfg: &WebhookToml) -> std::result::Result<(), ConfigError> { + if !cfg.enabled.unwrap_or(false) { + return Ok(()); + } + let has_endpoint = cfg.endpoint.as_deref().is_some_and(|s| !s.is_empty()); + let has_endpoint_env = cfg.endpoint_env.as_deref().is_some_and(|s| !s.is_empty()); + if !has_endpoint && !has_endpoint_env { + return Err(ConfigError::WebhookMissingEndpoint); + } + Ok(()) +} + /// Walk a `[watch.detail] template` string and yield each placeholder name /// (or pipe-fallback name) that isn't in [`WATCH_DETAIL_PLACEHOLDERS`]. /// Unbalanced `{` / missing `}` are tolerated silently — the runtime @@ -1201,6 +1251,7 @@ default_content = "nope" enabled: Some(true), ..OhMyPromptToml::default() }, + ..SinksConfig::default() }, ..Config::default() }; @@ -1224,6 +1275,7 @@ default_content = "nope" endpoint: Some("https://example.dev".into()), ..OhMyPromptToml::default() }, + ..SinksConfig::default() }, ..Config::default() }; @@ -1238,6 +1290,94 @@ default_content = "nope" assert!(cfg.validate_for_daemon().is_ok()); } + /// `[sinks.webhook] enabled = true` with neither endpoint nor + /// `endpoint_env` set must fail at load — there is no default URL, + /// and a sink that can't deliver alerts is worse than no sink at + /// all (operator thinks they're being watched). + #[test] + fn validate_rejects_webhook_enabled_without_endpoint() { + let cfg = Config { + sinks: SinksConfig { + webhook: WebhookToml { + enabled: Some(true), + ..WebhookToml::default() + }, + ..SinksConfig::default() + }, + ..Config::default() + }; + let err = cfg.validate_for_daemon().unwrap_err(); + assert!( + matches!(err, ConfigError::WebhookMissingEndpoint), + "got {err:?}", + ); + } + + /// Either `endpoint` or `endpoint_env` is sufficient. We don't + /// validate the env var contents here because that's a daemon-runtime + /// concern — the env var may be populated only inside the unit + /// `Environment=` and the user's interactive shell wouldn't see it. + #[test] + fn validate_accepts_webhook_with_endpoint() { + let cfg = Config { + sinks: SinksConfig { + webhook: WebhookToml { + enabled: Some(true), + endpoint: Some("https://hooks.slack.com/services/T0/B0/x".into()), + ..WebhookToml::default() + }, + ..SinksConfig::default() + }, + ..Config::default() + }; + assert!(cfg.validate_for_daemon().is_ok()); + } + + #[test] + fn validate_accepts_webhook_with_endpoint_env_only() { + let cfg = Config { + sinks: SinksConfig { + webhook: WebhookToml { + enabled: Some(true), + endpoint_env: Some("MUXA_SLACK_URL".into()), + ..WebhookToml::default() + }, + ..SinksConfig::default() + }, + ..Config::default() + }; + assert!(cfg.validate_for_daemon().is_ok()); + } + + /// Webhook section parses with the documented field set and + /// `deny_unknown_fields` rejects typos. + #[test] + fn parses_webhook_section() { + let toml = r#" +[sinks.webhook] +enabled = true +endpoint = "https://hooks.slack.com/services/T0/B0/x" +flavor = "slack" +on_states = ["WaitingInput", "Error"] +rate_limit_secs = 30 +"#; + let cfg: Config = toml::from_str(toml).unwrap(); + assert_eq!(cfg.sinks.webhook.enabled, Some(true)); + assert_eq!(cfg.sinks.webhook.flavor.as_deref(), Some("slack")); + assert_eq!(cfg.sinks.webhook.rate_limit_secs, Some(30)); + } + + #[test] + fn rejects_unknown_field_in_webhook_section() { + let toml = r#" +[sinks.webhook] +enabled = true +endpoint = "https://example.com" +typoed_field = 1 +"#; + assert!(toml::from_str::(toml).is_err()); + } + /// CLI commands like `muxa watch` only call `Config::validate()`, /// never `validate_for_daemon()`. A config that's *only* /// daemon-misconfigured (e.g. `dashboard.bind = "0.0.0.0:7878"` with @@ -1259,6 +1399,7 @@ default_content = "nope" enabled: Some(true), ..OhMyPromptToml::default() }, + ..SinksConfig::default() }, ..Config::default() }; diff --git a/crates/muxa/src/event.rs b/crates/muxa/src/event.rs index 034751c..a104b78 100644 --- a/crates/muxa/src/event.rs +++ b/crates/muxa/src/event.rs @@ -25,7 +25,7 @@ pub enum AgentKind { Unknown, } -#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, strum::Display)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash, strum::Display)] #[serde(rename_all = "snake_case")] #[strum(serialize_all = "snake_case")] pub enum AgentState { diff --git a/crates/muxa/src/sinks/mod.rs b/crates/muxa/src/sinks/mod.rs index 3d65cc9..998abbf 100644 --- a/crates/muxa/src/sinks/mod.rs +++ b/crates/muxa/src/sinks/mod.rs @@ -10,6 +10,9 @@ //! //! - [`ohmyprompt`] — HTTP POST to `oh-my-prompt`'s `/api/sync/upload` //! endpoint with batching, retries, and a bounded ring-buffer. +//! - [`webhook`] — single-shot HTTP POST to a Slack/Discord webhook +//! on state transitions (default: `WaitingInput`/`Error`). Best-effort, +//! no queue, in-task per-agent rate-limit. //! //! Each sink owns its own runtime contract (wire schema, auth header //! shape, retry policy) — there is intentionally no `Sink` trait in v1 @@ -17,5 +20,7 @@ //! shapes diverge enough that a uniform interface would be premature. pub mod ohmyprompt; +pub mod webhook; pub use ohmyprompt::{OhMyPromptError, OhMyPromptSink}; +pub use webhook::{WebhookError, WebhookSink}; diff --git a/crates/muxa/src/sinks/webhook.rs b/crates/muxa/src/sinks/webhook.rs new file mode 100644 index 0000000..4b568a8 --- /dev/null +++ b/crates/muxa/src/sinks/webhook.rs @@ -0,0 +1,799 @@ +//! Slack/Discord webhook sink — push state-transition alerts off-host. +//! +//! Subscribes to the daemon's `Transition` broadcast and POSTs a one-line +//! message to a Slack-incoming-webhook or Discord-webhook URL whenever an +//! agent transitions into a state the operator wants to be paged on +//! (defaults: `WaitingInput`, `Error`). Designed for the AFK case — push +//! a Slack message to your phone the moment Claude Code stops to ask for +//! permission, so you can either grant it from the couch or come back to +//! the desk knowing something needs you. +//! +//! ## Why this is intentionally lightweight +//! +//! - No queue, no retry backoff, no on-disk spool. A dropped notification +//! matters less than a stalled sink task: Slack/Discord both have their +//! own retry semantics on the receiving end, and we'd rather lose one +//! alert than buffer 1000 of them while Slack is down and then page the +//! operator with a thundering herd when service comes back. +//! - In-task per-agent rate limiter (`HashMap<(kind, session, state), Instant>`). +//! `WaitingInput` ↔ `Working` flap-flap is common during permission +//! loops; we don't want one flaky agent to send 30 push notifications a +//! minute. One message per `(kind, session_id, state)` per +//! `rate_limit_secs` window is plenty. +//! - Filter by `to`-state up front (`on_states`). Most transitions are +//! routine `Idle ↔ Working` and would spam. + +use crate::config::WebhookToml; +use crate::event::{AgentKind, AgentState}; +use crate::state::Transition; +use serde::Serialize; +use std::collections::HashMap; +use std::time::{Duration, Instant}; +use tokio::sync::broadcast; +use tokio::task::JoinHandle; + +/// Default per-agent rate-limit window. One alert per `(kind, session, +/// state)` tuple in this many seconds. Picked at "pageable, not spammy" +/// — long enough to absorb a flapping `Working`/`WaitingInput` loop, +/// short enough that a real second incident an hour later still pages. +pub const DEFAULT_RATE_LIMIT_SECS: u64 = 60; + +/// HTTP timeout per POST. Short — Slack/Discord webhooks usually return +/// in well under a second, and a stalled request blocks no other work +/// (each notification fires from the sink loop and we drop on failure) +/// but slow timeouts add up if a host is offline. +const HTTP_REQUEST_TIMEOUT: Duration = Duration::from_secs(10); + +/// Cap on the message body. Slack truncates around 4 KB; Discord around +/// 2 KB. We stay well under both with the prompt clipped to 100 chars +/// — the goal is "is this worth looking at", not full transcript review. +const PROMPT_TRUNCATE_LEN: usize = 100; + +#[derive(Debug, thiserror::Error)] +pub enum WebhookError { + #[error( + "webhook sink is enabled but neither [sinks.webhook].endpoint nor \ + [sinks.webhook].endpoint_env is set (one of the two is required)" + )] + MissingEndpoint, + + #[error( + "webhook sink is enabled but env var {var} is empty/unset \ + (set the full webhook URL there, or use the `endpoint` field)" + )] + MissingEndpointEnv { var: String }, + + #[error("invalid webhook URL {url:?}: {source}")] + InvalidEndpoint { + url: String, + #[source] + source: url::ParseError, + }, + + #[error("reqwest client init failed: {0}")] + ReqwestInit(#[source] reqwest::Error), +} + +/// Wire-format flavor for the outgoing POST body. +/// +/// Slack and Discord both accept a "minimum viable" JSON shape with one +/// string field. We auto-detect from the URL host, with an explicit +/// override for the rare case (`generic`) where the operator points at a +/// custom HTTP receiver. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum WebhookFlavor { + Slack, + Discord, + /// Posts the full `Transition` JSON. Useful for n8n/Zapier-style + /// receivers that want structured data, and for our own integration + /// tests. + Generic, +} + +impl WebhookFlavor { + /// Parse a config string. Empty / unknown values fall through to + /// `None` so the caller can apply URL-based auto-detection. + fn from_str_opt(s: &str) -> Option { + match s.to_ascii_lowercase().as_str() { + "slack" => Some(Self::Slack), + "discord" => Some(Self::Discord), + "generic" => Some(Self::Generic), + _ => None, + } + } +} + +/// Pick a flavor by inspecting the URL host. We use simple substring +/// matching rather than parsing the URL because Slack/Discord both +/// publish stable, well-known hostnames and a substring check sidesteps +/// quirks like `slack.com` vs `hooks.slack.com` vs proxied URLs. +pub fn infer_flavor(url: &str) -> WebhookFlavor { + let lower = url.to_ascii_lowercase(); + if lower.contains("hooks.slack.com") { + WebhookFlavor::Slack + } else if lower.contains("discord.com/api/webhooks") + || lower.contains("discordapp.com/api/webhooks") + { + WebhookFlavor::Discord + } else { + WebhookFlavor::Generic + } +} + +/// Resolved webhook sink — runtime state derived from the TOML config. +#[derive(Debug)] +pub struct WebhookSink { + endpoint: String, + flavor: WebhookFlavor, + on_states: Vec, + rate_limit: Duration, + client: reqwest::Client, +} + +impl WebhookSink { + /// Resolve the TOML sub-table into a runtime sink, or `Ok(None)` if + /// disabled. Errors when `enabled = true` but the URL cannot be + /// determined or is malformed — better to fail loudly at startup + /// than silently no-op while the operator wonders why no alerts + /// arrive. + pub fn resolve(toml: &WebhookToml) -> Result, WebhookError> { + if !toml.enabled.unwrap_or(false) { + return Ok(None); + } + + // `endpoint_env` wins when both are set. The expected workflow is + // "TOML carries the harmless fields, env var carries the + // secret-bearing URL", and an env override is the natural way to + // shadow a default URL committed to a shared dotfile. + let endpoint = if let Some(var) = toml.endpoint_env.as_deref().filter(|s| !s.is_empty()) { + std::env::var(var) + .ok() + .filter(|s| !s.is_empty()) + .ok_or_else(|| WebhookError::MissingEndpointEnv { + var: var.to_string(), + })? + } else { + toml.endpoint + .as_deref() + .filter(|s| !s.is_empty()) + .ok_or(WebhookError::MissingEndpoint)? + .to_string() + }; + + // Validate the URL up front so a typo surfaces at startup rather + // than on the first transition. + url::Url::parse(&endpoint).map_err(|source| WebhookError::InvalidEndpoint { + url: endpoint.clone(), + source, + })?; + + let flavor = toml + .flavor + .as_deref() + .and_then(WebhookFlavor::from_str_opt) + .unwrap_or_else(|| infer_flavor(&endpoint)); + + let on_states = if let Some(raw) = toml.on_states.as_ref() { + raw.iter().filter_map(|s| parse_state(s)).collect() + } else { + default_on_states() + }; + + let rate_limit = + Duration::from_secs(toml.rate_limit_secs.unwrap_or(DEFAULT_RATE_LIMIT_SECS)); + + let client = reqwest::Client::builder() + .timeout(HTTP_REQUEST_TIMEOUT) + .build() + .map_err(WebhookError::ReqwestInit)?; + + Ok(Some(Self { + endpoint, + flavor, + on_states, + rate_limit, + client, + })) + } + + /// Run the sink until the shutdown channel fires or the broadcast + /// closes. Mirrors the ohmyprompt sink's shape: one task, no + /// background workers, abort-on-shutdown. The rate limiter lives + /// on the stack so there's no shared state with the rest of the + /// daemon. + pub async fn run( + self, + mut transition_rx: broadcast::Receiver, + mut shutdown_rx: broadcast::Receiver<()>, + ) -> Result<(), WebhookError> { + // Per-(kind, session_id, state) "last sent" timestamps. Bounded + // implicitly by the pane lifecycle — entries for stopped agents + // simply stop being looked up. We don't bother with explicit + // GC: the map is small and the daemon process restarts on + // upgrade. + let mut last_sent: HashMap<(AgentKind, String, AgentState), Instant> = HashMap::new(); + + loop { + tokio::select! { + biased; + _ = shutdown_rx.recv() => { + tracing::debug!("webhook sink received shutdown"); + return Ok(()); + } + next = transition_rx.recv() => { + match next { + Ok(transition) => { + self.handle_transition(&transition, &mut last_sent).await; + } + Err(broadcast::error::RecvError::Lagged(n)) => { + // Lagged transitions can only mean we missed + // pages we'd otherwise have sent — log and + // keep going. The alternative (resyncing + // state) doesn't help: by the time we + // notice, the agent is somewhere else. + tracing::warn!( + missed = n, + "webhook sink lagged behind transition stream; continuing" + ); + } + Err(broadcast::error::RecvError::Closed) => { + tracing::debug!("transition broadcast closed; webhook sink exiting"); + return Ok(()); + } + } + } + } + } + } + + /// Decide whether to forward, then POST. Errors are logged at WARN + /// and dropped — the run loop must never exit because of a 500. + async fn handle_transition( + &self, + transition: &Transition, + last_sent: &mut HashMap<(AgentKind, String, AgentState), Instant>, + ) { + if !should_forward(transition, &self.on_states) { + return; + } + + let key = ( + transition.agent.kind, + transition.agent.session_id.clone(), + transition.to, + ); + let now = Instant::now(); + if let Some(prev) = last_sent.get(&key) { + if now.duration_since(*prev) < self.rate_limit { + tracing::debug!( + kind = %transition.agent.kind, + session = %transition.agent.session_id, + state = %transition.to, + "webhook: suppressed by rate limit" + ); + return; + } + } + last_sent.insert(key, now); + + let message = format_message(transition); + let body = build_payload(self.flavor, &message, transition); + + let result = self + .client + .post(&self.endpoint) + .json(&body) + .send() + .await + .and_then(reqwest::Response::error_for_status); + + match result { + Ok(_) => { + tracing::debug!( + kind = %transition.agent.kind, + session = %transition.agent.session_id, + state = %transition.to, + "webhook: notification posted" + ); + } + Err(e) => { + // Best-effort: drop on failure. See module docs for why + // we don't queue. + tracing::warn!( + error = %e, + kind = %transition.agent.kind, + state = %transition.to, + "webhook: post failed; dropping notification" + ); + } + } + } +} + +/// Public spawn helper mirroring `sinks::ohmyprompt::spawn` (added in +/// the daemon main wire-up). Returns the `JoinHandle` so the daemon can +/// hold/await it during shutdown. +pub fn spawn( + sink: WebhookSink, + transition_rx: broadcast::Receiver, + shutdown_rx: broadcast::Receiver<()>, +) -> JoinHandle<()> { + tokio::spawn(async move { + if let Err(e) = sink.run(transition_rx, shutdown_rx).await { + tracing::error!(error = %e, "webhook sink exited"); + } + }) +} + +/// Parse a `[sinks.webhook].on_states` entry. Accepts both +/// `PascalCase` (`"WaitingInput"`) and `snake_case` (`"waiting_input"`) +/// forms because TOML readers tend to type either; `AgentState`'s serde +/// is `snake_case` and its strum Display matches, but the brief calls +/// out the `PascalCase` form so we accept both. +fn parse_state(s: &str) -> Option { + match s { + "Starting" | "starting" => Some(AgentState::Starting), + "Working" | "working" => Some(AgentState::Working), + "Idle" | "idle" => Some(AgentState::Idle), + "WaitingInput" | "waiting_input" => Some(AgentState::WaitingInput), + "Error" | "error" => Some(AgentState::Error), + "Stopped" | "stopped" => Some(AgentState::Stopped), + _ => { + tracing::warn!( + value = %s, + "webhook: unknown on_states entry; ignoring" + ); + None + } + } +} + +fn default_on_states() -> Vec { + vec![AgentState::WaitingInput, AgentState::Error] +} + +/// Filter predicate. Pulled out as a free function so the unit tests can +/// drive it without instantiating a sink (which would need an HTTP +/// client). +fn should_forward(transition: &Transition, on_states: &[AgentState]) -> bool { + on_states.contains(&transition.to) +} + +/// Build the human-readable one-line message body. +/// +/// Format: `{from_glyph} → {to_glyph} {kind} [{pane}] {tag} — "{snippet}"` +/// +/// `kind` uses strum's Display impl so a kind change at the protocol +/// layer ripples here automatically. `pane` falls back to `-` when the +/// agent has no tmux pane bound (paneless agents). `snippet` prefers +/// `last_prompt` (what the user just asked) but falls back to +/// `last_notification` (the agent's own message) so an Error transition +/// without a fresh prompt still carries the failure tag. +fn format_message(transition: &Transition) -> String { + let agent = &transition.agent; + let pane = agent.pane.as_deref().unwrap_or("-"); + let from_glyph = state_glyph(transition.from); + let to_glyph = state_glyph(transition.to); + let tag = match transition.to { + AgentState::WaitingInput => "needs input", + AgentState::Error => "error", + AgentState::Working => "working", + AgentState::Idle => "idle", + AgentState::Starting => "starting", + AgentState::Stopped => "stopped", + }; + + let snippet = agent + .last_prompt + .as_deref() + .or(agent.last_notification.as_deref()) + .map(truncate_snippet) + .unwrap_or_default(); + + if snippet.is_empty() { + format!( + "{from_glyph} → {to_glyph} {kind} [{pane}] {tag}", + kind = agent.kind, + ) + } else { + format!( + "{from_glyph} → {to_glyph} {kind} [{pane}] {tag} — \"{snippet}\"", + kind = agent.kind, + ) + } +} + +/// Map a state to a glyph for the message header. Mirrors the watch UI's +/// glyphs so the Slack message visually matches what an operator sees on +/// their terminal — recognition memory is faster than reading. +fn state_glyph(state: AgentState) -> &'static str { + match state { + AgentState::Starting => "…", + AgentState::Working => "⚙", + AgentState::Idle => "·", + AgentState::WaitingInput => "!", + AgentState::Error => "✗", + AgentState::Stopped => "■", + } +} + +/// Truncate at char (not byte) boundaries — `last_prompt` is user input +/// and likely contains UTF-8. Append a trailing `…` to make truncation +/// obvious in the alert. +fn truncate_snippet(s: &str) -> String { + let trimmed: String = s.chars().take(PROMPT_TRUNCATE_LEN).collect(); + if trimmed.chars().count() < s.chars().count() { + format!("{trimmed}…") + } else { + trimmed + } +} + +/// Slack-flavored payload: `{ "text": "..." }`. +#[derive(Debug, Serialize)] +struct SlackPayload<'a> { + text: &'a str, +} + +/// Discord-flavored payload: `{ "content": "..." }`. +#[derive(Debug, Serialize)] +struct DiscordPayload<'a> { + content: &'a str, +} + +/// Build a `serde_json::Value` for the chosen flavor. We materialize +/// to `Value` (rather than returning a `Box`) so the +/// tests can assert the wire shape without re-parsing JSON, and so +/// `reqwest`'s `.json(&body)` path stays a single monomorphized call. +fn build_payload( + flavor: WebhookFlavor, + message: &str, + transition: &Transition, +) -> serde_json::Value { + match flavor { + WebhookFlavor::Slack => serde_json::to_value(SlackPayload { text: message }) + .unwrap_or_else(|_| serde_json::json!({ "text": message })), + WebhookFlavor::Discord => serde_json::to_value(DiscordPayload { content: message }) + .unwrap_or_else(|_| serde_json::json!({ "content": message })), + WebhookFlavor::Generic => { + // The full `Transition` is `Serialize` — the receiver gets + // the structured payload. If serialization fails (it + // shouldn't, every field is plain data), fall back to the + // message string so we still send *something*. + serde_json::to_value(transition) + .unwrap_or_else(|_| serde_json::json!({ "text": message })) + } + } +} + +#[cfg(test)] +#[allow(clippy::field_reassign_with_default)] +mod tests { + use super::*; + use crate::event::AgentKind; + use crate::state::Agent; + use time::macros::datetime; + + fn agent(kind: AgentKind, pane: Option<&str>, prompt: Option<&str>) -> Agent { + Agent { + kind, + session_id: "sess-1".into(), + pane: pane.map(str::to_string), + cwd: None, + state: AgentState::WaitingInput, + last_prompt: prompt.map(str::to_string), + last_response: None, + last_notification: None, + model: None, + context_used_pct: None, + cost_usd: None, + rate_limit_5h_pct: None, + rate_limit_5h_resets_at: None, + rate_limit_7d_pct: None, + rate_limit_7d_resets_at: None, + rate_limited_until: None, + rate_limit_scope: None, + rate_limit_source: None, + started_at: datetime!(2026-04-30 12:00:00 UTC), + last_activity_at: datetime!(2026-04-30 12:00:00 UTC), + } + } + + fn transition(from: AgentState, to: AgentState, agent: Agent) -> Transition { + Transition { from, to, agent } + } + + #[test] + fn format_message_for_waiting_input() { + let mut a = agent(AgentKind::ClaudeCode, Some("main:2"), Some("do all")); + a.state = AgentState::WaitingInput; + let t = transition(AgentState::Working, AgentState::WaitingInput, a); + assert_eq!( + format_message(&t), + "⚙ → ! claude_code [main:2] needs input — \"do all\"" + ); + } + + #[test] + fn format_message_for_error() { + let mut a = agent(AgentKind::Codex, Some("work:1"), Some("rate_limit")); + a.state = AgentState::Error; + let t = transition(AgentState::Working, AgentState::Error, a); + assert_eq!( + format_message(&t), + "⚙ → ✗ codex [work:1] error — \"rate_limit\"" + ); + } + + #[test] + fn format_message_falls_back_to_dash_for_paneless_agent() { + let a = agent(AgentKind::ClaudeCode, None, Some("hi")); + let t = transition(AgentState::Working, AgentState::WaitingInput, a); + let msg = format_message(&t); + assert!(msg.contains("[-]"), "got {msg:?}"); + } + + #[test] + fn format_message_uses_last_notification_when_no_prompt() { + let mut a = agent(AgentKind::ClaudeCode, Some("p:0"), None); + a.last_notification = Some("permission requested".into()); + let t = transition(AgentState::Working, AgentState::WaitingInput, a); + let msg = format_message(&t); + assert!(msg.contains("permission requested"), "got {msg:?}"); + } + + #[test] + fn format_message_truncates_long_prompts() { + let long = "x".repeat(300); + let a = agent(AgentKind::ClaudeCode, Some("p:0"), Some(&long)); + let t = transition(AgentState::Working, AgentState::WaitingInput, a); + let msg = format_message(&t); + // 100 truncated chars + ellipsis. Easier to assert the ellipsis + // is present than to count UTF-8 bytes back out of the message. + assert!(msg.contains('…'), "expected truncation marker in {msg:?}"); + assert!(msg.len() < long.len() + 50); + } + + #[test] + fn should_forward_filters_idle_transitions() { + let a = agent(AgentKind::ClaudeCode, Some("p:0"), None); + let t = transition(AgentState::Working, AgentState::Idle, a); + let on = default_on_states(); + assert!(!should_forward(&t, &on)); + } + + #[test] + fn should_forward_passes_waiting_input_and_error() { + let a = agent(AgentKind::ClaudeCode, Some("p:0"), None); + let on = default_on_states(); + let t = transition(AgentState::Working, AgentState::WaitingInput, a.clone()); + assert!(should_forward(&t, &on)); + let t = transition(AgentState::Working, AgentState::Error, a); + assert!(should_forward(&t, &on)); + } + + /// The rate-limiter logic is the same shape as `handle_transition` + /// uses internally, but we exercise it without `tokio::time::sleep` + /// — manually subtracting from the stored Instant lets us run the + /// "60 seconds later" branch in microseconds. + fn check_and_record( + last_sent: &mut HashMap<(AgentKind, String, AgentState), Instant>, + key: (AgentKind, String, AgentState), + now: Instant, + window: Duration, + ) -> bool { + if let Some(prev) = last_sent.get(&key) { + if now.duration_since(*prev) < window { + return false; + } + } + last_sent.insert(key, now); + true + } + + #[test] + fn rate_limiter_suppresses_within_window() { + let mut last = HashMap::new(); + let key = ( + AgentKind::ClaudeCode, + "sess-1".to_string(), + AgentState::WaitingInput, + ); + let t0 = Instant::now(); + let window = Duration::from_secs(60); + assert!(check_and_record(&mut last, key.clone(), t0, window)); + // Half a second later — well inside the window. + let t1 = t0 + Duration::from_millis(500); + assert!(!check_and_record(&mut last, key, t1, window)); + } + + #[test] + fn rate_limiter_releases_after_window() { + let mut last = HashMap::new(); + let key = ( + AgentKind::ClaudeCode, + "sess-1".to_string(), + AgentState::WaitingInput, + ); + let window = Duration::from_secs(60); + let t0 = Instant::now(); + assert!(check_and_record(&mut last, key.clone(), t0, window)); + // 61 seconds later — outside the window. + let t1 = t0 + Duration::from_secs(61); + assert!(check_and_record(&mut last, key, t1, window)); + } + + #[test] + fn rate_limiter_keys_per_agent_and_state() { + let mut last = HashMap::new(); + let window = Duration::from_secs(60); + let now = Instant::now(); + let claude_waiting = ( + AgentKind::ClaudeCode, + "sess-1".to_string(), + AgentState::WaitingInput, + ); + let codex_waiting = ( + AgentKind::Codex, + "sess-1".to_string(), + AgentState::WaitingInput, + ); + let claude_error = ( + AgentKind::ClaudeCode, + "sess-1".to_string(), + AgentState::Error, + ); + + assert!(check_and_record(&mut last, claude_waiting, now, window)); + // Different kind / same session must NOT be suppressed. + assert!(check_and_record(&mut last, codex_waiting, now, window)); + // Same kind / different state must NOT be suppressed. + assert!(check_and_record(&mut last, claude_error, now, window)); + } + + #[test] + fn flavor_inference_from_url() { + assert_eq!( + infer_flavor("https://hooks.slack.com/services/T0/B0/abc"), + WebhookFlavor::Slack + ); + assert_eq!( + infer_flavor("https://discord.com/api/webhooks/123/abc"), + WebhookFlavor::Discord + ); + assert_eq!( + infer_flavor("https://discordapp.com/api/webhooks/123/abc"), + WebhookFlavor::Discord + ); + assert_eq!( + infer_flavor("https://example.com/hook"), + WebhookFlavor::Generic + ); + } + + #[test] + fn payload_for_slack_uses_text_field() { + let a = agent(AgentKind::ClaudeCode, Some("p:0"), Some("hi")); + let t = transition(AgentState::Working, AgentState::WaitingInput, a); + let v = build_payload(WebhookFlavor::Slack, "msg", &t); + let obj = v.as_object().expect("object payload"); + assert!(obj.contains_key("text")); + assert_eq!(obj.get("text").and_then(|v| v.as_str()), Some("msg")); + assert!(!obj.contains_key("content")); + } + + #[test] + fn payload_for_discord_uses_content_field() { + let a = agent(AgentKind::ClaudeCode, Some("p:0"), Some("hi")); + let t = transition(AgentState::Working, AgentState::WaitingInput, a); + let v = build_payload(WebhookFlavor::Discord, "msg", &t); + let obj = v.as_object().expect("object payload"); + assert!(obj.contains_key("content")); + assert_eq!(obj.get("content").and_then(|v| v.as_str()), Some("msg")); + assert!(!obj.contains_key("text")); + } + + #[test] + fn payload_for_generic_serializes_full_transition() { + let a = agent(AgentKind::ClaudeCode, Some("p:0"), Some("hi")); + let t = transition(AgentState::Working, AgentState::WaitingInput, a); + let v = build_payload(WebhookFlavor::Generic, "msg", &t); + let obj = v.as_object().expect("object payload"); + // Generic flavor passes the full Transition through; assert the + // fields are present so a future change to `Transition` (rename + // / drop) breaks this test. + assert!(obj.contains_key("from")); + assert!(obj.contains_key("to")); + assert!(obj.contains_key("agent")); + } + + #[test] + fn parse_state_accepts_both_cases() { + assert_eq!(parse_state("WaitingInput"), Some(AgentState::WaitingInput)); + assert_eq!(parse_state("waiting_input"), Some(AgentState::WaitingInput)); + assert_eq!(parse_state("Error"), Some(AgentState::Error)); + assert_eq!(parse_state("error"), Some(AgentState::Error)); + assert_eq!(parse_state("nonsense"), None); + } + + #[test] + fn resolve_returns_none_when_disabled() { + let toml = WebhookToml::default(); + let out = WebhookSink::resolve(&toml).unwrap(); + assert!(out.is_none()); + } + + #[test] + fn resolve_errors_when_enabled_without_endpoint_or_env() { + let toml = WebhookToml { + enabled: Some(true), + ..WebhookToml::default() + }; + let err = WebhookSink::resolve(&toml).unwrap_err(); + assert!(matches!(err, WebhookError::MissingEndpoint)); + } + + #[test] + fn resolve_errors_when_endpoint_env_unset() { + let var = "WEBHOOK_TEST_MISSING_URL_ENV"; + std::env::remove_var(var); + let toml = WebhookToml { + enabled: Some(true), + endpoint_env: Some(var.into()), + ..WebhookToml::default() + }; + let err = WebhookSink::resolve(&toml).unwrap_err(); + assert!(matches!(err, WebhookError::MissingEndpointEnv { .. })); + } + + #[test] + fn resolve_prefers_endpoint_env_over_endpoint() { + // Set both to distinguishable values; endpoint_env should win. + let var = "WEBHOOK_TEST_PREFER_ENV_URL"; + std::env::set_var(var, "https://hooks.slack.com/services/from/env"); + let toml = WebhookToml { + enabled: Some(true), + endpoint: Some("https://example.com/from-toml".into()), + endpoint_env: Some(var.into()), + ..WebhookToml::default() + }; + let sink = WebhookSink::resolve(&toml).unwrap().expect("resolved"); + assert_eq!(sink.endpoint, "https://hooks.slack.com/services/from/env"); + assert_eq!(sink.flavor, WebhookFlavor::Slack); + std::env::remove_var(var); + } + + #[test] + fn resolve_uses_explicit_flavor_over_inference() { + let toml = WebhookToml { + enabled: Some(true), + endpoint: Some("https://example.com/hook".into()), + flavor: Some("slack".into()), + ..WebhookToml::default() + }; + let sink = WebhookSink::resolve(&toml).unwrap().expect("resolved"); + assert_eq!(sink.flavor, WebhookFlavor::Slack); + } + + #[test] + fn resolve_default_on_states_are_waiting_input_and_error() { + let toml = WebhookToml { + enabled: Some(true), + endpoint: Some("https://example.com/hook".into()), + ..WebhookToml::default() + }; + let sink = WebhookSink::resolve(&toml).unwrap().expect("resolved"); + assert_eq!(sink.on_states, default_on_states()); + } + + #[test] + fn resolve_rejects_invalid_endpoint() { + let toml = WebhookToml { + enabled: Some(true), + endpoint: Some("not-a-url".into()), + ..WebhookToml::default() + }; + let err = WebhookSink::resolve(&toml).unwrap_err(); + assert!(matches!(err, WebhookError::InvalidEndpoint { .. })); + } +} diff --git a/crates/muxad/src/main.rs b/crates/muxad/src/main.rs index 8f4e4f8..61fdb06 100644 --- a/crates/muxad/src/main.rs +++ b/crates/muxad/src/main.rs @@ -13,7 +13,7 @@ use muxa::history::{HistoryOptions, PromptHistory}; use muxa::ipc::{harden_permissions, Client, Server}; use muxa::notify::Notifier; use muxa::reconcile::Reconciler; -use muxa::sinks::OhMyPromptSink; +use muxa::sinks::{webhook as webhook_sink, OhMyPromptSink, WebhookSink}; use muxa::snapshot::{self, Snapshotter, SnapshotterOptions}; use muxa::tmux::scanner::PaneCache; use muxa::{discovery, paths, Config, Store}; @@ -204,6 +204,7 @@ async fn main() -> Result<()> { } spawn_oh_my_prompt_sink(&cfg, &store, &shutdown_tx)?; + spawn_webhook_sink(&cfg, &store, &shutdown_tx)?; let server = Server::new(socket.clone(), store); let handle = tokio::spawn(server.run(shutdown_tx.subscribe())); @@ -636,6 +637,33 @@ fn spawn_oh_my_prompt_sink( Ok(()) } +/// Resolve the webhook (Slack/Discord) sink config and, if enabled, +/// spawn its task. Mirrors `spawn_oh_my_prompt_sink`: failures to +/// resolve are surfaced at startup so a typo in the URL doesn't lead +/// to silent missed notifications. +fn spawn_webhook_sink( + cfg: &Config, + store: &muxa::SharedStore, + shutdown_tx: &broadcast::Sender<()>, +) -> Result<()> { + match WebhookSink::resolve(&cfg.sinks.webhook) { + Ok(Some(sink)) => { + let transition_rx = store.subscribe(); + let shutdown_rx = shutdown_tx.subscribe(); + // `webhook_sink::spawn` returns the JoinHandle; we drop it + // because the sink lifetime is bounded by the shutdown + // broadcast (matching the ohmyprompt pattern). + std::mem::drop(webhook_sink::spawn(sink, transition_rx, shutdown_rx)); + tracing::info!("webhook sink enabled"); + } + Ok(None) => {} + Err(e) => { + return Err(anyhow::Error::new(e).context("resolving webhook sink")); + } + } + Ok(()) +} + /// Decide whether to fire the one-shot discovery pass and, if so, spawn it /// onto the current tokio runtime. ///