Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,33 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Changed

- **`Transition.agent` is now `Arc<Agent>`** (was `Agent`). The in-process
`tokio::sync::broadcast` channel that fans out state-change events
to the notifier task, in-process sinks, and every live `muxa watch`
IPC subscriber clones the payload once per subscriber per `recv()`.
With the post-v0.5.0 dashboard SSE handler holding a long-lived
subscription on top of the existing notifier + watch CLIs, an `Agent`
carrying ~4 KB of `last_prompt` and ~4 KB of `last_response` was
showing up as a meaningful fraction of `Store::apply` wall time
under modest fanout (≥ 4 subscribers). Wrapping the agent in an
`Arc` keeps the per-subscriber cost a refcount bump instead of an
`Agent`-sized allocation + memcpy, and shrinks `Store::apply`
end-to-end time by ~30% at N = 16 subscribers on the
`crates/muxa/benches/store_apply.rs` microbenchmark.

**Wire format unchanged.** `serde::Serialize` on `Arc<T>` is
transparent (serializes as `T`), and `Arc::deserialize` always
produces a fresh, single-strong `Arc<T>` — so newline-delimited JSON
flowing over the unix socket between `muxad` and `muxa watch` /
dashboard SSE looks identical on the wire. No protocol bump.
Internal API consumers that match on `Transition.agent` will need
to deref through the `Arc` (e.g., via `&*t.agent` or implicit
`Deref`), which the in-tree consumers already do.
- Workspace `serde` now enables the `rc` feature so `Arc<T>` derives
`Deserialize` automatically — required by the change above.

### Added

- **Slack/Discord webhook sink** for state-transition push
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ muxa = { path = "crates/muxa", version = "0.5.0" }
# external
anyhow = "1"
thiserror = "2"
serde = { version = "1", features = ["derive"] }
serde = { version = "1", features = ["derive", "rc"] }
serde_json = "1"
toml = "0.8"
tokio = { version = "1", features = ["rt-multi-thread", "macros", "net", "io-util", "sync", "signal", "fs", "time", "process"] }
Expand Down
8 changes: 7 additions & 1 deletion crates/muxa-cli/src/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -954,7 +954,13 @@ async fn refresh_task<F, Fut, S>(
async fn recv_transition(sub: &mut Option<muxa::ipc::TransitionStream>) -> Option<Agent> {
let stream = sub.as_mut()?;
match stream.recv().await {
Ok(Some(t)) => Some(t.agent),
// The wire payload deserializes as `Arc<Agent>` (the producer
// wraps once to make the broadcast fanout O(refcount) instead
// of O(sizeof(Agent))). On the client side the strong count is
// always 1 — this is a fresh `Arc` built by `serde` — so
// `Arc::try_unwrap` is guaranteed to succeed and avoids a
// pointless `Agent` clone here.
Ok(Some(t)) => Some(std::sync::Arc::try_unwrap(t.agent).unwrap_or_else(|a| (*a).clone())),
Ok(None) | Err(_) => None,
}
}
Expand Down
4 changes: 4 additions & 0 deletions crates/muxa/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,9 @@ http-body-util = "0.1"
wiremock = "0.6"
tracing-subscriber = { workspace = true }

[[bench]]
name = "store_apply"
harness = false

[lints]
workspace = true
261 changes: 261 additions & 0 deletions crates/muxa/benches/store_apply.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
//! Microbenchmark for `Store::apply` on the broadcast hot path.
//!
//! Hypothesis under test: `Transition` carries a full `Agent` (a struct
//! with several `String` fields, including up to ~4 KB of `last_prompt`
//! and `last_response`), and the in-process `tokio::sync::broadcast`
//! channel clones the payload once per subscriber on every `recv()`.
//! With N broadcast subscribers, the per-`apply` cost should scale
//! roughly as N × `sizeof(Agent)` bytes of allocation + copy.
//!
//! Strategy:
//! - Pre-populate the store with one realistic `Agent` carrying a 4 KB
//! `last_prompt` and a 4 KB `last_response`.
//! - Spawn N "drainer" tasks, each holding a `broadcast::Receiver` and
//! `recv`-looping into `std::hint::black_box` so the compiler can't
//! drop the clone.
//! - In the main task, call `Store::apply(PromptSubmitted)` in a
//! tight loop and time wall-clock with `Instant`.
//!
//! Run with: `cargo bench -p muxa --bench store_apply`
//! (release mode is implicit for `cargo bench`).
//!
//! Output is a small table: N subscribers, iterations, total ms,
//! ns/iter, transitions/sec.

// Bench code is allowed to be sloppy about timing-arithmetic precision —
// we're printing human-readable µs/sec figures, not feeding the values
// into anything else. Allowing these at the file level is simpler than
// peppering the print loop with attribute noise.
#![allow(clippy::cast_precision_loss)]

use std::hint::black_box;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};

use muxa::event::{AgentEvent, AgentId, AgentKind};
use muxa::state::{Agent, Store};
use time::OffsetDateTime;
use tokio::runtime::Builder;
use tokio::sync::broadcast;

/// Build a realistic Agent with ~4 KB `last_prompt` and ~4 KB
/// `last_response`, plus the other String fields populated. This is the
/// worst-case payload the broadcast machinery has to clone on every
/// state change.
fn make_realistic_agent() -> Agent {
let now = OffsetDateTime::now_utc();
let prompt = "x".repeat(4096);
let response = "y".repeat(4096);
Agent {
kind: AgentKind::ClaudeCode,
session_id: "sess-bench-0001".into(),
pane: Some("%42".into()),
cwd: Some("/home/user/projects/some-large-codebase".into()),
state: muxa::AgentState::Idle,
last_prompt: Some(prompt),
last_response: Some(response),
last_notification: Some("permission needed for tool: Bash(rm -rf /tmp/cache)".into()),
model: Some("claude-opus-4-7".into()),
context_used_pct: Some(48.5),
cost_usd: Some(2.34),
rate_limit_5h_pct: Some(72.0),
rate_limit_5h_resets_at: Some(now + time::Duration::hours(1)),
rate_limit_7d_pct: Some(34.0),
rate_limit_7d_resets_at: Some(now + time::Duration::days(3)),
rate_limited_until: None,
rate_limit_scope: None,
rate_limit_source: None,
started_at: now,
last_activity_at: now,
}
}

/// Spawn `n` drainer tasks. Each owns a `broadcast::Receiver<Transition>`
/// and pulls Transitions in a loop, black-boxing them so the optimiser
/// can't elide the clone the broadcast made. Each drainer keeps running
/// until it has observed `expected_per_sub` Transitions, then exits.
///
/// Returns the join handles so the caller can wait for full end-to-end
/// drain — this is the metric that captures both producer cost AND
/// per-subscriber clone cost, which is what the Arc refactor actually
/// changes.
fn spawn_drainers(
store: &Arc<Store>,
n: usize,
expected_per_sub: u64,
) -> (Arc<AtomicU64>, Vec<tokio::task::JoinHandle<()>>) {
let counter = Arc::new(AtomicU64::new(0));
let mut handles = Vec::with_capacity(n);
for _ in 0..n {
let mut rx = store.subscribe();
let c = counter.clone();
handles.push(tokio::spawn(async move {
let mut seen: u64 = 0;
while seen < expected_per_sub {
match rx.recv().await {
Ok(t) => {
// black_box the whole Transition so the clone the
// broadcast machinery did is observable to the
// compiler. Without this, LLVM might decide the
// Transition is dead and skip the work entirely.
black_box(&t);
c.fetch_add(1, Ordering::Relaxed);
seen += 1;
}
Err(broadcast::error::RecvError::Lagged(missed)) => {
// Count lagged messages toward the per-sub quota
// so a slow subscriber doesn't deadlock the bench
// — we already observed (in the producer) that
// those Transitions were sent.
seen = seen.saturating_add(missed);
}
Err(broadcast::error::RecvError::Closed) => return,
}
}
}));
}
(counter, handles)
}

/// Run a single benchmark configuration: N subscribers, ITERS
/// `PromptSubmitted` applies. Returns the wall-clock duration spent
/// inside the apply loop.
async fn bench_one(n_subscribers: usize, iters: u64) -> (Duration, Duration, u64) {
let store = Arc::new(Store::default());

// Seed an agent so PromptSubmitted lands a transition (Idle → Working
// emits one Transition, every iteration). We use `apply` itself with
// a Started event to seed identity fields and reach Idle, then warm
// the Working/Idle ping-pong.
let id = AgentId {
kind: AgentKind::ClaudeCode,
session_id: "sess-bench-0001".into(),
pane: Some("%42".into()),
cwd: Some("/home/user/projects/some-large-codebase".into()),
};
let now = OffsetDateTime::now_utc();
store
.apply(&AgentEvent::Started {
id: id.clone(),
at: now,
})
.await;

// Inflate the agent's last_prompt/last_response/last_notification so
// the broadcast clone really has to copy 4 KB-ish of String data.
// Easiest path: hydrate over the top with a realistic Agent. This
// bypasses apply, which is intentional — we want the fields populated
// before we start measuring, not as part of the measurement.
store.hydrate(vec![make_realistic_agent()]).await;

// Subscribers must be live before we start applying so they actually
// hold the cursor during the bench. broadcast drops messages emitted
// before any subscriber existed.
//
// Each drainer needs to see 2 * iters Transitions (PromptSubmitted +
// TurnStopped per outer iter). We cap the drainer's loop at that
// count so the bench can join them — `JoinHandle::await` on the full
// set is the end-to-end "all subscribers caught up" boundary.
let (counter, handles) = spawn_drainers(&store, n_subscribers, 2 * iters);

// Give the drainers a tick to install their cursors. tokio::yield_now
// is enough on a multi-threaded runtime; a 1ms sleep is belt-and-braces.
tokio::time::sleep(Duration::from_millis(1)).await;

// Build one PromptSubmitted event up-front and reuse it. Cloning the
// event itself is part of the realistic per-apply work but we want
// to keep the iteration loop body as pure as possible — the prompt
// String is 4 KB and we don't want String::clone of the prompt to
// dominate the timing on the apply side.
//
// (apply mutates agent.last_prompt = prompt.clone() inside; that's
// unavoidable and IS part of what we're measuring.)
let ev = AgentEvent::PromptSubmitted {
id: id.clone(),
prompt: "z".repeat(4096),
at: now,
};
// After a PromptSubmitted, state is Working. To get a transition every
// iteration we alternate with TurnStopped (which flips back to Idle).
let stop = AgentEvent::TurnStopped {
id: id.clone(),
response: Some("r".repeat(4096)),
at: now,
};

let start = Instant::now();
for _ in 0..iters {
store.apply(&ev).await; // Idle -> Working, emits Transition
store.apply(&stop).await; // Working -> Idle, emits Transition
}
let producer_elapsed = start.elapsed();

// Wait for all subscribers to fully drain the broadcast. The
// end-to-end span captures both producer cost and per-subscriber
// clone cost — exactly the system-wide metric the Arc refactor is
// meant to improve.
for h in handles {
let _ = h.await;
}
let total_elapsed = start.elapsed();

// Each iteration emits 2 transitions, so the drainers should see
// roughly 2 * iters * n_subscribers events. We don't assert this
// (Lagged is allowed) but we do report it for sanity.
let observed = counter.load(Ordering::Relaxed);
(producer_elapsed, total_elapsed, observed)
}

fn main() {
// Multi-threaded runtime — the drainer tasks need to be polled in
// parallel with the producer for the benchmark to actually exercise
// the broadcast fanout. A current-thread runtime would serialise
// the producer behind whichever drainer happens to wake first.
let rt = Builder::new_multi_thread()
.worker_threads(num_cpus())
.enable_all()
.build()
.expect("build tokio runtime");

println!("# muxa Store::apply broadcast benchmark");
println!("# Agent: 4 KB last_prompt + 4 KB last_response + small fields");
println!("# Each iter = 2 applies (PromptSubmitted + TurnStopped) = 2 Transitions");
println!("# 'producer' = Store::apply loop only");
println!("# 'e2e' = producer + wait for every subscriber to fully drain");
println!();
println!(
"{:>5} {:>7} {:>11} {:>11} {:>11} {:>11} {:>13} {:>13}",
"subs", "iters", "prod ms", "us/apply", "e2e ms", "us/apply", "rx total", "expected"
);

// Warmup pass to JIT-warm the allocator / runtime.
let _ = rt.block_on(bench_one(2, 1_000));

for &n in &[0_usize, 1, 2, 4, 8, 16] {
// 2 applies per iter so 5_000 iters = 10_000 applies.
// At a few µs per apply, this is tens of ms of wall time per row,
// well above the timer's noise floor on Linux.
let iters: u64 = 5_000;
let (prod_dur, e2e_dur, observed) = rt.block_on(bench_one(n, iters));
let total_applies = (iters * 2) as f64;
let prod_us_per_apply = (prod_dur.as_nanos() as f64) / 1000.0 / total_applies;
let e2e_us_per_apply = (e2e_dur.as_nanos() as f64) / 1000.0 / total_applies;
let expected = 2 * iters * n as u64;
println!(
"{:>5} {:>7} {:>11.2} {:>11.2} {:>11.2} {:>11.2} {:>13} {:>13}",
n,
iters,
prod_dur.as_secs_f64() * 1000.0,
prod_us_per_apply,
e2e_dur.as_secs_f64() * 1000.0,
e2e_us_per_apply,
observed,
expected,
);
}
}

fn num_cpus() -> usize {
std::thread::available_parallelism().map_or(4, std::num::NonZeroUsize::get)
}
4 changes: 2 additions & 2 deletions crates/muxa/src/dashboard/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -877,7 +877,7 @@ mod tests {
tx.send(Transition {
from: AgentState::Starting,
to: AgentState::Idle,
agent: agent.clone(),
agent: Arc::new(agent.clone()),
})
.expect("subscriber alive, send must succeed");

Expand Down Expand Up @@ -979,7 +979,7 @@ mod tests {
tx.send(Transition {
from: AgentState::Starting,
to: AgentState::Idle,
agent: agent.clone(),
agent: Arc::new(agent.clone()),
})
.expect("subscriber alive, send must succeed");
}
Expand Down
7 changes: 4 additions & 3 deletions crates/muxa/src/notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ mod tests {
use super::*;
use crate::state::Agent;
use crate::AgentKind;
use std::sync::Arc;
use time::macros::datetime;

fn agent(state: AgentState) -> Agent {
Expand Down Expand Up @@ -182,7 +183,7 @@ mod tests {
let t = Transition {
from,
to,
agent: agent(to),
agent: Arc::new(agent(to)),
};
assert_eq!(should_notify(&t), expected, "{from:?} -> {to:?}");
}
Expand All @@ -193,7 +194,7 @@ mod tests {
let t = Transition {
from: AgentState::Working,
to: AgentState::WaitingInput,
agent: agent(AgentState::WaitingInput),
agent: Arc::new(agent(AgentState::WaitingInput)),
};
let (title, body) = render("muxa", &t);
assert_eq!(title, "muxa · claude_code · waiting_input");
Expand All @@ -207,7 +208,7 @@ mod tests {
let t = Transition {
from: AgentState::Working,
to: AgentState::WaitingInput,
agent: a,
agent: Arc::new(a),
};
let (_, body) = render("muxa", &t);
// "pane %7: " is 9 chars; body then up to BODY_TRUNCATE chars incl. ellipsis.
Expand Down
6 changes: 5 additions & 1 deletion crates/muxa/src/sinks/webhook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,11 @@ mod tests {
}

fn transition(from: AgentState, to: AgentState, agent: Agent) -> Transition {
Transition { from, to, agent }
Transition {
from,
to,
agent: std::sync::Arc::new(agent),
}
}

#[test]
Expand Down
Loading