From 38544a49c792e2870fef80bbef693c5a857a1bbd Mon Sep 17 00:00:00 2001 From: BlackKeyZ Date: Wed, 10 Jun 2026 20:31:21 +0800 Subject: [PATCH] feat(process): add self-contained aionui-process subprocess crate (001) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Foundation-layer crate that spawns, supervises, and reaps the agent subprocesses it itself starts — parallel to and unaware of the existing CliAgentProcess / process registry. Depends only on aionui-common + aionui-runtime; never parses agent output, holds no session state, and never mutates std::env. Identity-gated kills (recorded start-time) + single-instance lock/epoch + startup reap give crash-recovery without ever touching a live sibling instance's processes. Unix pgid + Windows Job Object containment. Purely additive: no in-repo consumers yet (boot wiring + claude_code manager arrive with the F1 series). 41 inline unit tests + 1 doctest green; clippy -D warnings + fmt clean. Real-process lifecycle integration tests are kept out of the repo (archived in the project's external docs). --- Cargo.lock | 21 + Cargo.toml | 2 + crates/aionui-process/Cargo.toml | 44 ++ crates/aionui-process/src/capabilities.rs | 136 ++++ crates/aionui-process/src/containment.rs | 69 ++ crates/aionui-process/src/error.rs | 75 ++ crates/aionui-process/src/instance_lock.rs | 151 ++++ crates/aionui-process/src/lib.rs | 88 +++ crates/aionui-process/src/proc_control.rs | 678 ++++++++++++++++++ crates/aionui-process/src/process.rs | 712 +++++++++++++++++++ crates/aionui-process/src/registry_store.rs | 710 ++++++++++++++++++ crates/aionui-process/src/spawner.rs | 159 +++++ crates/aionui-process/src/supervisor/core.rs | 271 +++++++ crates/aionui-process/src/supervisor/mod.rs | 315 ++++++++ 14 files changed, 3431 insertions(+) create mode 100644 crates/aionui-process/Cargo.toml create mode 100644 crates/aionui-process/src/capabilities.rs create mode 100644 crates/aionui-process/src/containment.rs create mode 100644 crates/aionui-process/src/error.rs create mode 100644 crates/aionui-process/src/instance_lock.rs create mode 100644 crates/aionui-process/src/lib.rs create mode 100644 crates/aionui-process/src/proc_control.rs create mode 100644 crates/aionui-process/src/process.rs create mode 100644 crates/aionui-process/src/registry_store.rs create mode 100644 crates/aionui-process/src/spawner.rs create mode 100644 crates/aionui-process/src/supervisor/core.rs create mode 100644 crates/aionui-process/src/supervisor/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 8845bea9..4c955742 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -707,6 +707,26 @@ dependencies = [ "which 7.0.3", ] +[[package]] +name = "aionui-process" +version = "0.1.26" +dependencies = [ + "aionui-common", + "aionui-runtime", + "async-trait", + "fs2", + "libc", + "serde", + "serde_json", + "tempfile", + "thiserror 2.0.18", + "tokio", + "tokio-util", + "tracing", + "uuid", + "windows-sys 0.61.2", +] + [[package]] name = "aionui-realtime" version = "0.1.26" @@ -5759,6 +5779,7 @@ checksum = "ddd74a9687298c6858e9b88ec8935ec45d22e8fd5e6394fa1bd4e99a87789c76" dependencies = [ "getrandom 0.4.2", "js-sys", + "serde_core", "wasm-bindgen", ] diff --git a/Cargo.toml b/Cargo.toml index e315f81d..619843d4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ members = [ "crates/aionui-api-types", "crates/aionui-realtime", "crates/aionui-runtime", + "crates/aionui-process", "crates/aionui-auth", "crates/aionui-system", "crates/aionui-file", @@ -36,6 +37,7 @@ aionui-db = { path = "crates/aionui-db" } aionui-api-types = { path = "crates/aionui-api-types" } aionui-realtime = { path = "crates/aionui-realtime" } aionui-runtime = { path = "crates/aionui-runtime" } +aionui-process = { path = "crates/aionui-process" } aionui-auth = { path = "crates/aionui-auth" } aionui-system = { path = "crates/aionui-system" } aionui-file = { path = "crates/aionui-file" } diff --git a/crates/aionui-process/Cargo.toml b/crates/aionui-process/Cargo.toml new file mode 100644 index 00000000..22bfc175 --- /dev/null +++ b/crates/aionui-process/Cargo.toml @@ -0,0 +1,44 @@ +[package] +name = "aionui-process" +version.workspace = true +edition.workspace = true + +# Self-contained subprocess mechanism (feature 001). Foundation layer: +# depends ONLY on aionui-common + aionui-runtime. Manages ONLY the processes +# it itself spawns — never touches the existing CliAgentProcess / registry. + +[dependencies] +aionui-common.workspace = true +aionui-runtime.workspace = true +async-trait.workspace = true +serde = { workspace = true } +serde_json.workspace = true +thiserror.workspace = true +tracing.workspace = true +uuid = { workspace = true, features = ["serde", "v4"] } +fs2.workspace = true +tokio = { workspace = true, features = ["process", "io-util", "sync", "rt", "macros", "time"] } +tokio-util = { version = "0.7", features = ["compat"] } + +[dev-dependencies] +tempfile.workspace = true + +[target.'cfg(unix)'.dependencies] +libc.workspace = true + +# Windows (feature 005 batch B): raw Win32 FFI (windows-sys, zero-overhead, NOT +# the heavy `windows` crate — Decision 2). Covers: +# - liveness/identity probe + cold-reap: OpenProcess / GetProcessTimes / +# WaitForSingleObject / TerminateProcess (Win32_System_Threading); +# - Job Object subtree containment (hot path): CreateJobObjectW / +# SetInformationJobObject(KILL_ON_JOB_CLOSE) / AssignProcessToJobObject / +# TerminateJobObject (Win32_System_JobObjects); the CREATE_SUSPENDED → assign +# → resume race-close needs toolhelp thread-walk (Win32_System_Diagnostics_ToolHelp). +# aionui-process owns the child's lifetime, so the Job handle lives here. +[target.'cfg(windows)'.dependencies] +windows-sys = { version = "0.61", features = [ + "Win32_Foundation", + "Win32_System_Threading", + "Win32_System_JobObjects", + "Win32_System_Diagnostics_ToolHelp", +] } diff --git a/crates/aionui-process/src/capabilities.rs b/crates/aionui-process/src/capabilities.rs new file mode 100644 index 00000000..cdda1257 --- /dev/null +++ b/crates/aionui-process/src/capabilities.rs @@ -0,0 +1,136 @@ +//! Per-platform capability descriptor (feature 005, WORKFLOW discipline 7). +//! +//! Turns "what this crate can actually do on this OS" from scattered, silent +//! `cfg` branches into a single TYPED, ASSERTABLE value. The matrix in the 005 +//! design doc maps 1:1 to these fields; the `capabilities_matrix_per_platform` +//! test pins each platform's row, so a capability regression (e.g. someone +//! re-stubs macOS start-time to `None`) turns a test RED instead of silently +//! degrading reap safety. +//! +//! "Hot" vs "cold" kill is the load-bearing distinction (design I-9): while a +//! live `ManagedProcess` handle is held (normal exit / explicit kill / Drop) +//! the whole subtree is torn down on every platform; only the post-CRASH +//! cold-reap (reconstruct from a persisted pid, no live handle) degrades — and +//! only on Windows, where the Job handle does not survive the owner's death. + +use serde::{Deserialize, Serialize}; + +/// What kind of OS primitive contains a spawned subtree. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum ContainmentKind { + /// No subtree containment (grandchildren are not corralled). + None, + /// POSIX process group (`setpgid` + `kill(-pgid)`); a `setsid` grandchild escapes. + ProcessGroup, + /// Windows Job Object (`KILL_ON_JOB_CLOSE` + `TerminateJobObject`); stronger than a group. + JobObject, +} + +/// How well crash-recovery reap (from a persisted pid, no live handle) works. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum ReapSupport { + /// No cross-restart reap on this platform. + None, + /// Full subtree reap survives restart (Unix: persisted pgid → `kill(-pgid)`). + Full, + /// Single-process kill after identity gating, plus a best-effort `taskkill /T` + /// sweep (Windows: the Job handle does not persist across the owner's death, + /// so the subtree guarantee degrades — design I-9). + SingleProcessGated, +} + +/// The subprocess-mechanism capabilities of the platform this binary was +/// compiled for. A `const fn` per-platform value — no runtime probing. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub struct Capabilities { + /// Can actively kill a process we spawned. + pub can_kill: bool, + /// What contains a spawned subtree. + pub subtree_containment: ContainmentKind, + /// `probe` can truthfully report liveness. + pub liveness_probe: bool, + /// `read_process_start_time` yields a real value (the reap-safety identity gate). + pub identity_gate: bool, + /// While a live handle is held (normal exit / kill / Drop), the WHOLE subtree + /// is torn down. True on every supported platform — no degradation here. + pub hot_kill_subtree: bool, + /// Crash-recovery reap quality (no live handle, from a persisted pid). + pub cold_reap: ReapSupport, + /// The kernel auto-kills our children when the parent dies (Linux + /// `PR_SET_PDEATHSIG` / Windows `KILL_ON_JOB_CLOSE`); shrinks crash orphans. + /// macOS has no equivalent. + pub parent_death_signal: bool, + /// Dropping a `ManagedProcess` reaps its subtree. + pub drop_reaps: bool, +} + +impl Capabilities { + /// The capabilities of the current compile target. + pub const fn current() -> Self { + #[cfg(target_os = "linux")] + { + Self { + can_kill: true, + subtree_containment: ContainmentKind::ProcessGroup, + liveness_probe: true, + identity_gate: true, // /proc//stat field 22 + hot_kill_subtree: true, + cold_reap: ReapSupport::Full, // persisted pgid → kill(-pgid) + parent_death_signal: true, // PR_SET_PDEATHSIG (R9) + drop_reaps: true, + } + } + #[cfg(target_os = "macos")] + { + Self { + can_kill: true, + subtree_containment: ContainmentKind::ProcessGroup, + liveness_probe: true, + identity_gate: true, // proc_pidinfo PROC_PIDTBSDINFO (R1) + hot_kill_subtree: true, + cold_reap: ReapSupport::Full, // persisted pgid → kill(-pgid) + parent_death_signal: false, // no PDEATHSIG equivalent; reaper is load-bearing + drop_reaps: true, + } + } + #[cfg(target_os = "windows")] + { + // BATCH B implemented (feature 005). Windows now has real: + // - probe + identity gate: OpenProcess + WaitForSingleObject + + // GetProcessTimes creation-FILETIME (proc_control windows_impl); + // - hot-kill subtree: Job Object (CREATE_SUSPENDED → assign → + // resume) + TerminateJobObject / KILL_ON_JOB_CLOSE on Drop; + // - parent-death: KILL_ON_JOB_CLOSE (job dies with the last handle). + // cold-reap stays SingleProcessGated (I-9): the Job handle does NOT + // persist across the owner's death, so a from-disk pid is terminated + // as a single process (TerminateProcess), not the whole subtree. + // ⚠️ Verified by cross-compile (cargo-xwin) + must be run on a real + // Windows host / UTM VM (no x86 CI lane) — until then treat the + // RUNTIME behavior as LocalVerifiedOnly in spirit. + Self { + can_kill: true, // TerminateJobObject / TerminateProcess + subtree_containment: ContainmentKind::JobObject, // Job Object + liveness_probe: true, // OpenProcess + WaitForSingleObject + identity_gate: true, // GetProcessTimes creation FILETIME + hot_kill_subtree: true, // Job terminate while handle held + cold_reap: ReapSupport::SingleProcessGated, // Job doesn't persist (I-9) + parent_death_signal: true, // KILL_ON_JOB_CLOSE + drop_reaps: true, // Drop terminates the Job + } + } + #[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))] + { + // Unknown platform: claim nothing (safe defaults — never kill on doubt). + Self { + can_kill: false, + subtree_containment: ContainmentKind::None, + liveness_probe: false, + identity_gate: false, + hot_kill_subtree: false, + cold_reap: ReapSupport::None, + parent_death_signal: false, + drop_reaps: false, + } + } + } +} diff --git a/crates/aionui-process/src/containment.rs b/crates/aionui-process/src/containment.rs new file mode 100644 index 00000000..f42cdb6d --- /dev/null +++ b/crates/aionui-process/src/containment.rs @@ -0,0 +1,69 @@ +//! Per-platform lifecycle fence (Containment). Tears down a whole subprocess +//! subtree (agent CLI + grandchildren like MCP servers), not just the direct +//! child. Lifecycle-only — orthogonal to any security sandbox. +//! +//! Single tier ships: [`ProcessGroupContainment`] (best-effort Unix process +//! group). Job Object / cgroup tiers are intentionally not built (no CI lane; +//! they collapse to the process-group kill on testable platforms). The seam +//! lets them land later without touching callers. + +use crate::ProcessError; + +/// Strength of a containment's teardown guarantee. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ReapGuarantee { + /// Process-group SIGKILL: reaps descendants that stay in the group; misses + /// any that escaped via `setsid` (documented gap). + BestEffort, +} + +/// Outcome of [`Containment::kill_all`] — never a bare `Ok(())` the caller can +/// misread as "tree definitely gone". +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ContainmentKillOutcome { + /// Post-kill liveness probe confirmed the group is gone. + ProbedGone, + /// Kill issued but not confirmed gone (e.g. a member escaped the group). + DegradedBestEffort, +} + +/// A lifecycle fence around a spawned subprocess subtree. +pub trait Containment: Send + Sync { + fn kill_all(&self) -> Result; + fn guarantee(&self) -> ReapGuarantee; +} + +/// Best-effort containment via the Unix process group captured at spawn. +pub struct ProcessGroupContainment { + pid: u32, + process_group_id: Option, +} + +impl ProcessGroupContainment { + pub fn new(pid: u32, process_group_id: Option) -> Self { + Self { pid, process_group_id } + } +} + +impl Containment for ProcessGroupContainment { + fn kill_all(&self) -> Result { + crate::force_kill(self.pid, self.process_group_id)?; + // SIGKILL is async; give the kernel a brief bounded settle before the + // confirmation probe, else a clean kill almost always reads alive and + // ProbedGone would be unreachable. Still alive after settle => honest + // Degraded (escaped grandchild) rather than a false "gone". + const ATTEMPTS: u32 = 20; + const STEP: std::time::Duration = std::time::Duration::from_millis(25); + for _ in 0..ATTEMPTS { + if !crate::process_group_alive(self.process_group_id) { + return Ok(ContainmentKillOutcome::ProbedGone); + } + std::thread::sleep(STEP); + } + Ok(ContainmentKillOutcome::DegradedBestEffort) + } + + fn guarantee(&self) -> ReapGuarantee { + ReapGuarantee::BestEffort + } +} diff --git a/crates/aionui-process/src/error.rs b/crates/aionui-process/src/error.rs new file mode 100644 index 00000000..5f21a5be --- /dev/null +++ b/crates/aionui-process/src/error.rs @@ -0,0 +1,75 @@ +//! Mechanism-layer error. This crate is Foundation-layer and must not depend +//! on any domain error type; it owns a small enum covering only what the +//! spawn / lifecycle / reap mechanism produces. + +/// Errors produced by the subprocess mechanism layer. +#[derive(Debug, thiserror::Error)] +#[non_exhaustive] +pub enum ProcessError { + /// Invalid caller input (e.g. a missing / non-directory / whitespace cwd). + #[error("bad request: {0}")] + BadRequest(String), + /// Workspace path contains a whitespace segment the bundled runtime cannot handle. + #[error("workspace path contains whitespace (runtime unsupported): {0}")] + WorkspacePathContainsWhitespaceRuntimeUnsupported(String), + /// An OS / runtime failure (spawn failed, pipe capture failed, kill failed, fs error). + #[error("internal error: {0}")] + Internal(String), +} + +impl ProcessError { + pub fn bad_request(message: impl Into) -> Self { + Self::BadRequest(message.into()) + } + + pub fn workspace_path_contains_whitespace_runtime_unsupported(path: impl Into) -> Self { + Self::WorkspacePathContainsWhitespaceRuntimeUnsupported(path.into()) + } + + pub fn internal(message: impl Into) -> Self { + Self::Internal(message.into()) + } +} + +impl From for ProcessError { + fn from(e: std::io::Error) -> Self { + Self::Internal(e.to_string()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + /// INPUTVAL-B4: the named constructors build the matching variant and the + /// `Display` impl renders the documented prefix (callers/log scrapers rely + /// on these exact prefixes). + #[test] + fn constructors_build_matching_variant_and_render_prefix() { + let bad = ProcessError::bad_request("nope"); + assert!(matches!(bad, ProcessError::BadRequest(ref m) if m == "nope")); + assert_eq!(bad.to_string(), "bad request: nope"); + + let ws = ProcessError::workspace_path_contains_whitespace_runtime_unsupported("/a b"); + assert!(matches!( + ws, + ProcessError::WorkspacePathContainsWhitespaceRuntimeUnsupported(ref p) if p == "/a b" + )); + assert_eq!( + ws.to_string(), + "workspace path contains whitespace (runtime unsupported): /a b" + ); + + let internal = ProcessError::internal("boom"); + assert!(matches!(internal, ProcessError::Internal(ref m) if m == "boom")); + assert_eq!(internal.to_string(), "internal error: boom"); + } + + /// `From` maps to `Internal` carrying the io error's text. + #[test] + fn io_error_maps_to_internal() { + let io = std::io::Error::new(std::io::ErrorKind::PermissionDenied, "denied"); + let err: ProcessError = io.into(); + assert!(matches!(err, ProcessError::Internal(ref m) if m.contains("denied"))); + } +} diff --git a/crates/aionui-process/src/instance_lock.rs b/crates/aionui-process/src/instance_lock.rs new file mode 100644 index 00000000..c02265ed --- /dev/null +++ b/crates/aionui-process/src/instance_lock.rs @@ -0,0 +1,151 @@ +//! Single-instance advisory lock + per-run epoch (IC-1 defense / IC-3 naming). +//! +//! The lock file is a dedicated sidecar `{data_dir}/runtime/aionui-process/ +//! instance.lock` — provably disjoint from bun's `runtime.lock`, the node +//! install lock, the db migrate lock, and the builtin-skills lock (IC-3). It +//! is acquired NON-BLOCKING (try-lock) and fails fast on contention, so a +//! second overlapping instance (auto-update) never silently hangs and — more +//! importantly — never reaps the live sibling's processes (the reap pass is +//! gated on holding this lock). +//! +//! Each successful acquisition also mints a fresh per-run `instance_epoch` +//! (random v4 UUID). Equality-only: a registry row whose epoch differs from +//! the current run is a prior-run entry. No persistence, no ordering — so NTP +//! / reinstall can never make a stale epoch look current (IC-1, design Dec 1). + +use std::fs::File; +use std::path::{Path, PathBuf}; + +use fs2::FileExt; +use uuid::Uuid; + +use crate::registry_store::{LOCK_FILE, SUBDIR}; + +/// Held single-instance lock. Held for the whole process lifetime by keeping +/// this value alive; dropping it releases the advisory lock. +pub struct InstanceLock { + file: File, + path: PathBuf, +} + +impl InstanceLock { + pub fn path(&self) -> &Path { + &self.path + } +} + +impl Drop for InstanceLock { + fn drop(&mut self) { + // Explicit unlock for deterministic release. Closing the File would + // also release the flock, but on some platforms (macOS) the close- + // triggered release can lag; an explicit unlock() makes re-acquire by + // a successor deterministic. + let _ = fs2::FileExt::unlock(&self.file); + } +} + +/// Returned when the lock is already held by another instance. +#[derive(Debug)] +pub struct LockHeld { + pub path: PathBuf, +} + +impl std::fmt::Display for LockHeld { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "instance lock already held: {}", self.path.display()) + } +} + +/// Try to acquire the single-instance lock under `data_dir`. On success +/// returns the held lock + a freshly minted per-run epoch. On contention +/// returns `Err(LockHeld)` — the caller must NOT run any reap (IC-1: never +/// reap a live sibling instance's processes). +pub fn acquire_instance_lock(data_dir: &Path) -> Result<(InstanceLock, Uuid), LockHeld> { + let dir = data_dir.join(SUBDIR); + let path = dir.join(LOCK_FILE); + // create_dir_all is idempotent + benign under races (IC: already-isolated). + if std::fs::create_dir_all(&dir).is_err() { + return Err(LockHeld { path }); + } + // `File::create` sets `O_CLOEXEC` by default on unix (Rust std since 1.0), + // so this advisory-lock fd is NOT inherited by spawned children / MCP + // grandchildren. That matters: an inherited flock fd held by a surviving + // grandchild would keep the lock "held" after the parent dies, making the + // NEXT instance see contention and refuse to reap — defeating cold-reap. + // CLOEXEC closes that hole for free; the `cloexec_lock_fd_not_inherited` + // test guards against a future switch to a non-CLOEXEC open path. + let file = match File::create(&path) { + Ok(f) => f, + Err(_) => return Err(LockHeld { path }), + }; + // NON-BLOCKING: fail fast on contention, never block (IC-3). + match file.try_lock_exclusive() { + Ok(()) => Ok((InstanceLock { file, path }, Uuid::new_v4())), + Err(_) => Err(LockHeld { path }), + } +} + +#[cfg(test)] +mod tests { + //! Single-process-safe lock tests (fresh acquire, epoch mint, path accessor, + //! LockHeld shape). True two-instance MUTUAL EXCLUSION (LOCK-12) needs a + //! second OS process — `fs2` advisory locks are per-open-file-description and + //! a single test process can re-lock its own fd — so that contention test is + //! a concurrency-harness gap (see audit), not asserted here. + + use tempfile::TempDir; + + use super::*; + + /// LOCK-1/5/6: a fresh acquire on an empty data dir succeeds, creates the + /// lock file at the namespaced path, and mints a per-run epoch. + #[test] + fn fresh_acquire_succeeds_creates_file_and_mints_epoch() { + let tmp = TempDir::new().unwrap(); + let (lock, epoch) = acquire_instance_lock(tmp.path()).expect("fresh acquire succeeds"); + + assert_eq!( + lock.path(), + tmp.path().join(SUBDIR).join(LOCK_FILE), + "lock at namespaced path" + ); + assert!(lock.path().exists(), "lock file created on disk"); + assert_ne!(epoch, Uuid::nil(), "a real v4 epoch is minted"); + assert_eq!(epoch.get_version_num(), 4, "epoch is a v4 (random) UUID"); + } + + /// LOCK-5: each successful acquisition mints a DISTINCT epoch (acquire → + /// drop → re-acquire yields a different UUID; epochs are never reused). + #[test] + fn each_acquire_mints_a_distinct_epoch() { + let tmp = TempDir::new().unwrap(); + let (lock1, e1) = acquire_instance_lock(tmp.path()).expect("first acquire"); + drop(lock1); // release so the successor can take it + let (_lock2, e2) = acquire_instance_lock(tmp.path()).expect("re-acquire after drop"); + assert_ne!(e1, e2, "a fresh epoch is minted on every acquisition"); + } + + /// LOCK-8: `LockHeld` carries the contended path and renders it in `Display` + /// (callers log/inspect which lock blocked them). + #[test] + fn lock_held_carries_and_displays_the_path() { + let held = LockHeld { + path: PathBuf::from("/data/runtime/aionui-process/instance.lock"), + }; + assert_eq!(held.path, PathBuf::from("/data/runtime/aionui-process/instance.lock")); + assert!( + held.to_string().contains("instance.lock"), + "Display must surface the contended path, got {held}" + ); + } + + /// LOCK-4: `create_dir_all` is idempotent — acquiring, dropping, and + /// re-acquiring in the same dir does not error on the pre-existing dir/file. + #[test] + fn reacquire_in_existing_dir_is_benign() { + let tmp = TempDir::new().unwrap(); + let (lock1, _) = acquire_instance_lock(tmp.path()).expect("first"); + drop(lock1); + acquire_instance_lock(tmp.path()).expect("re-acquire over existing dir/file must not error"); + } +} diff --git a/crates/aionui-process/src/lib.rs b/crates/aionui-process/src/lib.rs new file mode 100644 index 00000000..a360df6d --- /dev/null +++ b/crates/aionui-process/src/lib.rs @@ -0,0 +1,88 @@ +//! `aionui-process` — self-contained subprocess mechanism (feature 001). +//! +//! A Foundation-layer crate that spawns, supervises, and reaps the agent +//! subprocesses **it itself starts** — fully parallel to and unaware of the +//! existing `CliAgentProcess` / process registry in `aionui-ai-agent`. +//! +//! "Bytes not semantics": it never parses agent output, holds no session +//! state, and never mutates `std::env`. It depends only on `aionui-common` +//! and `aionui-runtime`. +//! +//! ## Isolation contract (why two mechanisms coexist without conflict) +//! All shared resources are namespaced under `{data_dir}/runtime/aionui-process/` +//! and every kill is identity-gated against a recorded process start-time so a +//! recycled PID/PGID is never mistaken for one of ours. See the feature +//! design doc §隔离契约 (IC-1..6). +//! +//! ## Usage +//! The standard flow: mint this run's identity (single-instance lock + epoch), +//! build a [`RealSpawner`] over a [`FileRegistryStore`], and spawn. The returned +//! [`ManagedProcess`] owns the child's lifetime — dropping it group-kills the +//! child and deregisters its registry row, so an orphan can never outlive its +//! handle. On the next startup, [`run_startup_reap`] cleans up any rows left by +//! a previous crash (identity-gated: only a confirmed prior-run orphan is killed). +//! +//! ```no_run +//! use std::sync::Arc; +//! use std::time::Duration; +//! use aionui_common::CommandSpec; +//! use aionui_process::{ +//! acquire_instance_lock, FileRegistryStore, LockState, RealSpawner, Spawner, +//! local_machine_id, run_startup_reap, +//! }; +//! +//! # async fn example(data_dir: &std::path::Path, cache_dir: &std::path::Path) -> Result<(), Box> { +//! // 1. Take the single-instance lock; it yields this run's fresh epoch. +//! // Contention (a sibling instance holds it) is NOT fatal — we just mint a +//! // fresh epoch, skip reap (never touch the live sibling's processes), and +//! // keep serving. `LockHeld` carries the contended path for logging. +//! let (lock, epoch, lock_state) = match acquire_instance_lock(data_dir) { +//! Ok((lock, epoch)) => (Some(lock), epoch, LockState::Acquired), +//! Err(_held) => (None, uuid::Uuid::new_v4(), LockState::HeldBySibling), +//! }; +//! let _lock = lock; // hold for the whole process lifetime; drop releases it +//! let machine_id = local_machine_id(cache_dir); +//! let registry = Arc::new(FileRegistryStore::new(data_dir)); +//! +//! // 2. Reap orphans left by a prior crash. Gated on the lock: when held by a +//! // sibling, reconcile emits nothing (no action taken). +//! run_startup_reap(&*registry, lock_state, epoch, &machine_id)?; +//! +//! // 3. Spawn an agent subprocess; it is recorded for crash-recovery. +//! let spawner = RealSpawner::new(Arc::clone(®istry), epoch, machine_id); +//! let spec = CommandSpec { command: "/usr/bin/my-agent".into(), args: vec![], env: vec![], cwd: None }; +//! let proc = spawner.spawn(spec, &[], "conversation-123").await?; +//! +//! // 4. Hand the duplex to a transport, then tear down when the turn ends. +//! if let Some((_stdin, _stdout)) = proc.take_stdio().await { /* drive I/O */ } +//! proc.kill(Duration::from_secs(2)).await?; // or just drop(proc) for fire-and-forget teardown +//! # Ok(()) +//! # } +//! ``` + +mod capabilities; +mod containment; +mod error; +mod instance_lock; +mod proc_control; +mod process; +mod registry_store; +mod spawner; +mod supervisor; + +pub use capabilities::{Capabilities, ContainmentKind, ReapSupport}; +pub use containment::{Containment, ContainmentKillOutcome, ProcessGroupContainment, ReapGuarantee}; +pub use error::ProcessError; +pub use instance_lock::{InstanceLock, LockHeld, acquire_instance_lock}; +pub use proc_control::{ + Liveness, ObservedLiveness, classify_liveness, force_kill, probe, process_group_alive, read_process_start_time, +}; +pub use process::{BoxedStdin, BoxedStdout, ManagedProcess, TerminalExit}; +pub use registry_store::{ + FileRegistryStore, LOCK_FILE, ProcessIdentity, REGISTRY_FILE, RegisteredProcess, RegistryStore, SUBDIR, +}; +pub use spawner::{RealSpawner, Spawner, local_machine_id}; +pub use supervisor::{ + Action, LockState, ObservedState, execute_actions, gather_observed, reconcile, reconcile_with_capability, + run_startup_reap, +}; diff --git a/crates/aionui-process/src/proc_control.rs b/crates/aionui-process/src/proc_control.rs new file mode 100644 index 00000000..b0df47fd --- /dev/null +++ b/crates/aionui-process/src/proc_control.rs @@ -0,0 +1,678 @@ +//! Kill + identity-gated liveness probing (IC-1 core). +//! +//! The crate spawns each child as its own process-group leader, so teardown +//! can `kill(-pgid, SIGKILL)` the whole subtree. Before reaping a *persisted* +//! pgid (which may have been recycled by the OS onto an unrelated live +//! process), we re-read the live process start-time and require it to MATCH +//! what we recorded — otherwise we prune the registry entry and never kill. +//! +//! `pgid > 1` blast-floor everywhere (IC-6): never `kill(-1)` / `kill(-0)`. + +use uuid::Uuid; + +use crate::ProcessError; + +/// Result of an identity-gated liveness probe (the pure decision input). +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Liveness { + /// Live, recorded epoch differs from current (a prior-run orphan), and the + /// process start-time matches what we recorded — safe to reap. + Match, + /// A live process holds the pid, but its start-time differs from what we + /// recorded → the PID was recycled onto an unrelated process. Prune only. + RecycledPid, + /// Live and start-time matches, but the recorded epoch == current epoch → + /// this is one of THIS run's own processes, never a reap target. + DiffEpoch, + /// No process holds the pid (ESRCH). Prune only. + Gone, + /// Could not determine start-time on this platform (macOS/Windows fallback) + /// or the recorded start-time was absent. Prune only — never kill on doubt. + Unknown, + /// `kill(pid, 0)` returned EPERM: some process holds the id but it is not + /// provably ours. Prune only. + EpermAlive, +} + +/// Pure identity-gate decision. Reads NOTHING from the OS — the caller supplies +/// the observed start-time (from [`read_process_start_time`]). This is the +/// Tier-A-exhaustible core of IC-1. +/// +/// Reap is permitted ONLY when the process is alive, its start-time matches the +/// recorded one, and the recorded epoch is from a *prior* run. +pub fn classify_liveness( + recorded_start_ticks: Option, + observed: ObservedLiveness, + recorded_epoch: Uuid, + current_epoch: Uuid, +) -> Liveness { + match observed { + ObservedLiveness::Gone => Liveness::Gone, + ObservedLiveness::EpermAlive => Liveness::EpermAlive, + ObservedLiveness::Alive { start_ticks } => { + match (recorded_start_ticks, start_ticks) { + // We could not record or cannot observe a start-time → never kill. + (None, _) | (_, None) => Liveness::Unknown, + // F57: `0` is a NON-identity sentinel. A real recorder never + // emits 0 as a legitimate discriminator (Linux starttime jiffies + // and the macOS (tvsec<<20)|tvusec packing are non-zero for any + // post-boot process); a `Some(0)` therefore means corrupt / + // zeroed data. Treating two zeros as a Match would AUTHORIZE A + // KILL on garbage — the one value where the safe-failing argument + // breaks. Refuse to gate on 0 → Unknown (prune-only, never kill). + (Some(0), _) | (_, Some(0)) => Liveness::Unknown, + (Some(rec), Some(obs)) if rec == obs => { + if recorded_epoch == current_epoch { + Liveness::DiffEpoch // our own current-run process + } else { + Liveness::Match // prior-run orphan, identity-confirmed + } + } + // Same pid, different start-time → recycled. + (Some(_), Some(_)) => Liveness::RecycledPid, + } + } + } +} + +/// What an OS probe observed about a pid, fed into [`classify_liveness`]. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ObservedLiveness { + /// The pid is live; `start_ticks` is its kernel start-time if obtainable. + Alive { start_ticks: Option }, + /// No process holds the pid (ESRCH). + Gone, + /// `kill(pid, 0)` returned EPERM. + EpermAlive, +} + +/// Probe a pid's liveness + start-time via the OS (the impure shell around +/// [`classify_liveness`]). +pub fn probe(pid: u32) -> ObservedLiveness { + #[cfg(unix)] + { + // Refuse to wrap: a pid that doesn't fit positive i32 would, as a + // negative target, probe a GROUP instead of the process (F50). Treat as + // not-observable → Gone (prune-only; never authorizes a kill). + let Some(target) = pid_signal_target(pid) else { + return ObservedLiveness::Gone; + }; + let rc = unsafe { libc::kill(target, 0) }; + if rc == 0 { + return ObservedLiveness::Alive { + start_ticks: read_process_start_time(pid), + }; + } + match std::io::Error::last_os_error().raw_os_error() { + Some(libc::ESRCH) => ObservedLiveness::Gone, + Some(libc::EPERM) => ObservedLiveness::EpermAlive, + _ => ObservedLiveness::Gone, + } + } + #[cfg(windows)] + { + windows_impl::probe(pid) + } + #[cfg(not(any(unix, windows)))] + { + let _ = pid; + // No cheap probe on an unknown platform; Unknown-alive so the gate prunes. + ObservedLiveness::Alive { start_ticks: None } + } +} + +/// Read a process's kernel start-time in platform clock ticks, if obtainable. +/// `None` on platforms where we have no cheap accessor (→ identity gate +/// downgrades to Unknown → prune-only). +pub fn read_process_start_time(pid: u32) -> Option { + #[cfg(target_os = "linux")] + { + // /proc//stat field 22 (starttime), after the (comm) field which + // may itself contain spaces/parens — split on the last ')'. + let stat = std::fs::read_to_string(format!("/proc/{pid}/stat")).ok()?; + let after = stat.rsplit_once(')')?.1; + let starttime = after.split_whitespace().nth(19)?; // field 22 = index 19 after comm + starttime.parse::().ok() + } + #[cfg(target_os = "macos")] + { + read_start_time_macos(pid) + } + #[cfg(target_os = "windows")] + { + windows_impl::read_start_time(pid) + } + #[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))] + { + // Unknown platform: no cheap accessor → None keeps the identity gate + // conservative (prune, never kill on doubt). + let _ = pid; + None + } +} + +/// macOS kernel process start-time via `proc_pidinfo(PROC_PIDTBSDINFO)`, packed +/// as `(tvsec << 20) | (tvusec & 0xF_FFFF)` to keep sub-second resolution in a +/// single `u64` without touching the platform-agnostic pure core (design +/// Decision 1/5). `tvusec` < 1_000_000 < 2^20, so the low 20 bits hold it +/// losslessly; the comparison is equality-only so the exact packing is opaque +/// to `classify_liveness`. `None` on any failure → identity gate stays +/// conservative (prune, never kill). +#[cfg(target_os = "macos")] +fn read_start_time_macos(pid: u32) -> Option { + // SAFETY: proc_pidinfo writes at most `size` bytes into `info`; we pass the + // exact size of the zeroed struct and only read it back on the documented + // success return (bytes-written == struct size). + let mut info: libc::proc_bsdinfo = unsafe { std::mem::zeroed() }; + let size = std::mem::size_of::() as libc::c_int; + let n = unsafe { + libc::proc_pidinfo( + pid as libc::c_int, + libc::PROC_PIDTBSDINFO, + 0, + &mut info as *mut _ as *mut libc::c_void, + size, + ) + }; + if n != size { + // 0 / -1 / short read → could not obtain a trustworthy start-time. + return None; + } + let secs = info.pbi_start_tvsec; + let usecs = info.pbi_start_tvusec & 0xF_FFFF; // < 2^20, fits the low 20 bits + Some((secs << 20) | usecs) +} + +/// The reaper's OWN process group id, if obtainable. Used to refuse killing a +/// pgid we ourselves belong to (would self-SIGKILL). `None` on non-unix or if +/// the value would not fit a `u32` (defensive). +pub(crate) fn current_process_group() -> Option { + #[cfg(unix)] + { + let pgrp = unsafe { libc::getpgrp() }; + u32::try_from(pgrp).ok() + } + #[cfg(not(unix))] + { + None + } +} + +/// The `libc::kill` target for SIGNALLING A SINGLE PID: the pid as a positive +/// `i32`. Returns `None` if the pid does not fit in the positive `i32` range +/// (≥ 2^31) — `pid as i32` would wrap NEGATIVE and accidentally signal a process +/// GROUP instead of the single process (F50). Pure; unit-tested. +#[cfg(unix)] +pub(crate) fn pid_signal_target(pid: u32) -> Option { + i32::try_from(pid).ok().filter(|t| *t > 0) +} + +/// The `libc::kill` target for SIGNALLING A WHOLE GROUP: the negative pgid. +/// Returns `None` if pgid ≤ 1 (IC-6 blast-floor) OR pgid ≥ 2^31 — in the latter +/// case `pgid as i32` wraps negative and `-(negative)` becomes POSITIVE, so the +/// group-kill would silently collapse into a single-PID kill and leak the +/// grandchild subtree (F50). Pure; unit-tested. +#[cfg(unix)] +pub(crate) fn group_kill_target(pgid: u32) -> Option { + if pgid <= 1 { + return None; + } + // Must fit positive i32 so that negation is a well-defined negative target. + i32::try_from(pgid).ok().map(|p| -p) +} + +/// Force-kill a process group (or the bare pid if no group), best-effort. +/// `ESRCH` (already gone) is treated as success. Honors the `pgid > 1` +/// blast-floor (IC-6) and refuses to wrap large ids (F50). +pub fn force_kill(pid: u32, process_group_id: Option) -> Result<(), ProcessError> { + #[cfg(unix)] + { + if let Some(target) = process_group_id.and_then(group_kill_target) { + return kill_target(target).or_else(|e| { + // Group leader may already be gone; fall back to the bare pid. + if e.already_gone { + match pid_signal_target(pid) { + Some(t) => kill_target(t).map_err(|e| e.into()), + None => Ok(()), // pid out of i32 range — nothing safe to do + } + } else { + Err(e.into()) + } + }); + } + match pid_signal_target(pid) { + Some(t) => kill_target(t).map_err(Into::into), + None => Err(ProcessError::internal(format!("pid {pid} out of signalable i32 range"))), + } + } + #[cfg(windows)] + { + // Windows COLD-REAP path (from a persisted registry pid, no live Job + // handle): single-process TerminateProcess. The Job Object does NOT + // persist across the owner's death, so a recycled-from-disk pid can only + // be terminated as a single process — grandchildren are not reachable + // here (I-9, documented). HOT kill (live ManagedProcess in hand) goes + // through the Job in process.rs and DOES kill the whole subtree. + let _ = process_group_id; // no pgid concept on Windows + windows_impl::terminate_process(pid) + } + #[cfg(not(any(unix, windows)))] + { + let _ = (pid, process_group_id); + Err(ProcessError::internal("force_kill not supported on this platform")) + } +} + +#[cfg(unix)] +struct KillErr { + already_gone: bool, + msg: String, +} + +#[cfg(unix)] +impl From for ProcessError { + fn from(e: KillErr) -> Self { + ProcessError::internal(e.msg) + } +} + +#[cfg(unix)] +fn kill_target(target: i32) -> Result<(), KillErr> { + let rc = unsafe { libc::kill(target, libc::SIGKILL) }; + if rc == 0 { + return Ok(()); + } + let err = std::io::Error::last_os_error(); + if err.raw_os_error() == Some(libc::ESRCH) { + return Ok(()); // already gone == success + } + Err(KillErr { + already_gone: false, + msg: format!("SIGKILL to {target} failed: {err}"), + }) +} + +/// Liveness of a process group by signal-0 to the negative pgid (Unix). +/// `true` if the group still has any member. Non-Unix / no pgid → false. +pub fn process_group_alive(process_group_id: Option) -> bool { + #[cfg(unix)] + { + match process_group_id.and_then(group_kill_target) { + Some(target) => { + let rc = unsafe { libc::kill(target, 0) }; + if rc == 0 { + return true; + } + !matches!(std::io::Error::last_os_error().raw_os_error(), Some(libc::ESRCH)) + } + None => false, + } + } + #[cfg(not(unix))] + { + // Windows has no process-group concept (containment is a Job Object, and + // `process_group_id` is always None here). Group-liveness is therefore + // not meaningful; callers needing Windows liveness use `probe(pid)` + // instead. Returns false (no group to be alive). + let _ = process_group_id; + false + } +} + +/// Windows Win32 FFI: liveness/identity probe + single-process termination +/// (cold-reap). Job Object subtree containment lives in aionui-runtime's +/// spawn.rs (it owns the Command). windows-sys raw FFI (Decision 2). +#[cfg(windows)] +mod windows_impl { + use windows_sys::Win32::Foundation::{ + CloseHandle, ERROR_ACCESS_DENIED, FILETIME, GetLastError, WAIT_OBJECT_0, WAIT_TIMEOUT, + }; + use windows_sys::Win32::System::Threading::{ + GetProcessTimes, OpenProcess, PROCESS_QUERY_LIMITED_INFORMATION, PROCESS_TERMINATE, TerminateProcess, + WaitForSingleObject, + }; + + use super::{ObservedLiveness, ProcessError}; + + /// `SYNCHRONIZE` access right (0x0010_0000). Required for `WaitForSingleObject` + /// on a process handle. windows-sys only re-exports the const under + /// `Storage::FileSystem` (typed `FILE_ACCESS_RIGHTS`), so we use the literal — + /// it is the standard-rights bit, identical across object types. + const SYNCHRONIZE: u32 = 0x0010_0000; + + /// Open a handle with the given access. `Ok(Some(h))` = opened (caller MUST + /// CloseHandle it — it is OUR handle, NOT a tokio-owned Child handle). + /// `Ok(None)` = the pid is genuinely gone (ERROR-not-access-denied). + /// `Err(())` = access denied (a process holds the id but we may not query it). + fn open(pid: u32, access: u32) -> Result, ()> { + // SAFETY: OpenProcess returns a handle or null on failure. + let h = unsafe { OpenProcess(access, 0, pid) }; + if !h.is_null() { + return Ok(Some(h)); + } + // SAFETY: GetLastError reads the calling thread's last error. + let err = unsafe { GetLastError() }; + if err == ERROR_ACCESS_DENIED { Err(()) } else { Ok(None) } + } + + /// Liveness + identity (creation-time) probe. + /// + /// The handle MUST be opened with `SYNCHRONIZE` in addition to + /// `PROCESS_QUERY_LIMITED_INFORMATION` — `WaitForSingleObject` requires + /// SYNCHRONIZE, and without it the wait returns `WAIT_FAILED`, which would + /// make a LIVE process look gone (the real-Windows bug this fixes: a fresh + /// child probed as `Gone`). + /// + /// `OpenProcess` null + access-denied ⇒ `EpermAlive` (something holds the id, + /// not provably ours). `OpenProcess` null + other ⇒ `Gone`. + /// `WaitForSingleObject == WAIT_TIMEOUT` ⇒ still running (Alive); + /// `WAIT_OBJECT_0` (signaled) ⇒ exited (Gone). Any other wait result + /// (incl. WAIT_FAILED) is treated conservatively as Gone but only AFTER we + /// confirmed the handle opened — a real failure here is logged by the caller. + pub(super) fn probe(pid: u32) -> ObservedLiveness { + let h = match open(pid, PROCESS_QUERY_LIMITED_INFORMATION | SYNCHRONIZE) { + Ok(Some(h)) => h, + Ok(None) => return ObservedLiveness::Gone, + Err(()) => return ObservedLiveness::EpermAlive, + }; + // SAFETY: h is a valid handle (opened with SYNCHRONIZE) we own; closed below. + let wait = unsafe { WaitForSingleObject(h, 0) }; + let start = read_creation_token(h); + // SAFETY: close the handle WE opened (not a tokio Child handle). + unsafe { CloseHandle(h) }; + if wait == WAIT_TIMEOUT { + // Still running. Avoids the GetExitCodeProcess STILL_ACTIVE(259) + // ambiguity by using WaitForSingleObject as the liveness oracle. + ObservedLiveness::Alive { start_ticks: start } + } else if wait == WAIT_OBJECT_0 { + ObservedLiveness::Gone // signaled = exited + } else { + // WAIT_FAILED or unexpected — conservative Gone (prune-only, never + // authorizes a kill). With SYNCHRONIZE present this should not occur + // for a live process. + ObservedLiveness::Gone + } + } + + /// Creation-time identity token: combine the FILETIME's two u32 halves into + /// a u64 (100ns ticks since 1601). Stable for the process's lifetime; a + /// recycled PID gets a different creation time → the identity gate's + /// `(pid, token)` pair defeats PID reuse. `None` on any failure. + pub(super) fn read_start_time(pid: u32) -> Option { + // GetProcessTimes needs only QUERY_LIMITED (no SYNCHRONIZE). open() returns + // Ok(Some)=opened / Ok(None)=gone / Err=access-denied; only Some yields a token. + let h = match open(pid, PROCESS_QUERY_LIMITED_INFORMATION) { + Ok(Some(h)) => h, + Ok(None) | Err(()) => return None, + }; + let token = read_creation_token(h); + // SAFETY: close the handle we opened. + unsafe { CloseHandle(h) }; + token + } + + fn read_creation_token(h: *mut core::ffi::c_void) -> Option { + let mut creation = FILETIME { + dwLowDateTime: 0, + dwHighDateTime: 0, + }; + let mut exit = FILETIME { + dwLowDateTime: 0, + dwHighDateTime: 0, + }; + let mut kernel = FILETIME { + dwLowDateTime: 0, + dwHighDateTime: 0, + }; + let mut user = FILETIME { + dwLowDateTime: 0, + dwHighDateTime: 0, + }; + // SAFETY: all four out-params are valid; h is a live handle. + let ok = unsafe { GetProcessTimes(h, &mut creation, &mut exit, &mut kernel, &mut user) }; + if ok == 0 { + return None; + } + let token = ((creation.dwHighDateTime as u64) << 32) | (creation.dwLowDateTime as u64); + // A zeroed creation time is not a trustworthy identity (F57: 0 is the + // non-identity sentinel the pure gate refuses); treat as unobtainable. + if token == 0 { None } else { Some(token) } + } + + /// Cold-reap single-process termination (I-9) via `OpenProcess(PROCESS_TERMINATE)` + /// then `TerminateProcess`. A pid that is already gone (OpenProcess fails) is + /// success. Does NOT reach grandchildren — that needs the live Job handle + /// (hot path), which a cold reap from disk does not have. + pub(super) fn terminate_process(pid: u32) -> Result<(), ProcessError> { + let h = match open(pid, PROCESS_TERMINATE) { + Ok(Some(h)) => h, + // already gone == success (like ESRCH); access-denied → nothing safe + // to do, also treat as success (best-effort cold reap). + Ok(None) | Err(()) => return Ok(()), + }; + // SAFETY: h is a valid handle with PROCESS_TERMINATE; closed below. + let ok = unsafe { TerminateProcess(h, 1) }; + let err = if ok == 0 { + Some(std::io::Error::last_os_error()) + } else { + None + }; + // SAFETY: close the handle we opened. + unsafe { CloseHandle(h) }; + match err { + None => Ok(()), + Some(e) => Err(ProcessError::internal(format!("TerminateProcess({pid}) failed: {e}"))), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + // Two distinct, stable epochs for the identity gate. + fn run_a() -> Uuid { + Uuid::from_u128(0xA) + } + fn run_b() -> Uuid { + Uuid::from_u128(0xB) + } + + /// classify_liveness is the KILL-SAFETY core (3-defense identity gate). A + /// table over every decision branch — a kill is authorized ONLY on + /// `Liveness::Match`, so every other row must NOT be Match. This crate had + /// ZERO tests for it (PROC-G1); the logic was correct-but-unverified. + /// (name, recorded_start, observed, recorded_epoch, current_epoch, expect). + type Case = (&'static str, Option, ObservedLiveness, Uuid, Uuid, Liveness); + + #[test] + fn classify_liveness_decision_table() { + let cases: &[Case] = &[ + // (name, recorded_start, observed, recorded_epoch, current_epoch, expect) + ( + "prior-run orphan, start-time matches → Match (the ONLY reapable)", + Some(111), + ObservedLiveness::Alive { start_ticks: Some(111) }, + run_a(), + run_b(), + Liveness::Match, + ), + ( + "our own current run (epoch equal) → DiffEpoch, never reap", + Some(111), + ObservedLiveness::Alive { start_ticks: Some(111) }, + run_a(), + run_a(), + Liveness::DiffEpoch, + ), + ( + "same pid, different start-time → RecycledPid, never reap", + Some(111), + ObservedLiveness::Alive { start_ticks: Some(222) }, + run_a(), + run_b(), + Liveness::RecycledPid, + ), + ( + "process gone (ESRCH) → Gone", + Some(111), + ObservedLiveness::Gone, + run_a(), + run_b(), + Liveness::Gone, + ), + ( + "EPERM (alive but unprovable) → EpermAlive, never reap", + Some(111), + ObservedLiveness::EpermAlive, + run_a(), + run_b(), + Liveness::EpermAlive, + ), + ( + "could not record start-time → Unknown, never reap", + None, + ObservedLiveness::Alive { start_ticks: Some(111) }, + run_a(), + run_b(), + Liveness::Unknown, + ), + ( + "could not observe start-time → Unknown, never reap", + Some(111), + ObservedLiveness::Alive { start_ticks: None }, + run_a(), + run_b(), + Liveness::Unknown, + ), + ( + "F57: recorded start-time 0 (corrupt sentinel) → Unknown, refuse to gate on garbage", + Some(0), + ObservedLiveness::Alive { start_ticks: Some(0) }, + run_a(), + run_b(), + Liveness::Unknown, + ), + ( + "F57: observed start-time 0 → Unknown even if recorded matches non-zero", + Some(111), + ObservedLiveness::Alive { start_ticks: Some(0) }, + run_a(), + run_b(), + Liveness::Unknown, + ), + ]; + for (name, rec, obs, re, ce, expect) in cases { + let got = classify_liveness(*rec, *obs, *re, *ce); + assert_eq!(got, *expect, "case: {name}"); + } + } + + /// The ONLY value that authorizes a kill is Match — assert no other input + /// shape produces it (the negative-space guard the kill path relies on). + #[test] + fn only_prior_run_matching_start_time_is_reapable() { + // recycled pid must never be reapable even across epochs + assert_ne!( + classify_liveness( + Some(5), + ObservedLiveness::Alive { start_ticks: Some(9) }, + run_a(), + run_b() + ), + Liveness::Match, + "recycled PID must not be Match" + ); + // zero sentinel must never be reapable + assert_ne!( + classify_liveness( + Some(0), + ObservedLiveness::Alive { start_ticks: Some(0) }, + run_a(), + run_b() + ), + Liveness::Match, + "zero sentinel must not be Match" + ); + } + + /// F50 / IC-6: group-kill target guards. pgid<=1 (blast-floor) and pgid that + /// would wrap a negative i32 must return None (never collapse a group-kill + /// into a stray single-pid signal or kill pgid 0/-1 = "every process"). + #[test] + fn group_kill_target_blast_floor_and_overflow() { + assert_eq!(group_kill_target(0), None, "pgid 0 = whole-world, refuse"); + assert_eq!(group_kill_target(1), None, "pgid 1 = init group, refuse (IC-6 floor)"); + assert_eq!(group_kill_target(2), Some(-2), "normal pgid → negative group target"); + assert_eq!(group_kill_target(12345), Some(-12345)); + // > i32::MAX would wrap; must refuse (F50) + assert_eq!( + group_kill_target(u32::MAX), + None, + "pgid > i32::MAX must not wrap to a positive single-pid kill" + ); + assert_eq!(group_kill_target(0x8000_0000), None, "exactly i32::MAX+1 → refuse"); + } + + /// F50: pid signal target must be a positive i32 or None (never wrap). + #[test] + fn pid_signal_target_positive_or_none() { + assert_eq!(pid_signal_target(0), None, "pid 0 has special signal semantics; refuse"); + assert_eq!(pid_signal_target(1), Some(1)); + assert_eq!(pid_signal_target(99999), Some(99999)); + assert_eq!( + pid_signal_target(u32::MAX), + None, + "pid > i32::MAX must not wrap negative" + ); + assert_eq!(pid_signal_target(0x8000_0000), None); + } + + /// LIVENESS-C7: `current_process_group` (the reaper's-own-pgid source for the + /// self-group kill guard) is crate-internal, so it is tested here, not from + /// `tests/`. On Unix it must equal `getpgrp()` and be a sane (>1) value; on + /// non-Unix it is `None`. + #[test] + fn current_process_group_matches_syscall_on_unix() { + #[cfg(unix)] + { + let got = current_process_group().expect("Unix process always has a pgid"); + let expected = u32::try_from(unsafe { libc::getpgrp() }).unwrap(); + assert_eq!(got, expected, "must reflect the live getpgrp()"); + assert!(got > 1, "a real pgid is never the 0/1 kernel sentinels"); + } + #[cfg(not(unix))] + assert_eq!(current_process_group(), None, "no pgid concept off-Unix"); + } + + /// LIVENESS-C6: on platforms with a start-time accessor, reading our OWN + /// process's start-time yields a stable, non-zero token (the identity-gate + /// value). On macOS the token packs `(secs << 20) | usecs`; assert the + /// unpacked fields are a plausible wall-clock instant. Stability across two + /// reads is the load-bearing property (the gate compares recorded vs live). + #[test] + fn read_process_start_time_is_stable_and_well_formed() { + let me = std::process::id(); + let first = read_process_start_time(me); + #[cfg(any(target_os = "linux", target_os = "macos"))] + { + let t = first.expect("Linux/macOS expose a start-time for self"); + assert!(t > 0, "start-time token is non-zero (zero is the F57 corrupt sentinel)"); + assert_eq!( + read_process_start_time(me), + Some(t), + "stable across reads (gate relies on this)" + ); + #[cfg(target_os = "macos")] + { + // macOS packing: secs in the high bits, usecs (< 1e6) in low 20. + let usecs = t & 0xF_FFFF; + let secs = t >> 20; + assert!(usecs < 1_000_000, "unpacked usecs field is a valid microsecond count"); + assert!(secs > 1_000_000_000, "unpacked secs is a plausible UNIX timestamp"); + } + } + #[cfg(not(any(target_os = "linux", target_os = "macos")))] + let _ = first; // other platforms may legitimately return None + } +} diff --git a/crates/aionui-process/src/process.rs b/crates/aionui-process/src/process.rs new file mode 100644 index 00000000..d5a6def7 --- /dev/null +++ b/crates/aionui-process/src/process.rs @@ -0,0 +1,712 @@ +//! `ManagedProcess` — the live handle for a subprocess THIS crate spawned. +//! +//! Freshly written (not moved from the existing CliAgentProcess). Owns the +//! child's raw stdio for byte-duplex handoff (IC: bytes not semantics — it +//! never parses output), a watch latch that flips to the exit status, a +//! bounded stderr ring buffer for diagnostics, and the background tasks that +//! drain stderr + monitor exit. Reaping is EXCLUSIVELY via tokio's per-Child +//! wait() (IC-2: no waitpid(-1) / no SIGCHLD handler / no raw Command). + +use std::path::PathBuf; +use std::process::ExitStatus; +use std::sync::Arc; +use std::time::Duration; + +use aionui_common::CommandSpec; +use aionui_runtime::Builder; +use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio::process::{ChildStdin, ChildStdout}; +use tokio::sync::{Mutex, watch}; +use tokio::task::JoinHandle; +use tracing::{debug, error, warn}; + +use crate::ProcessError; + +/// Max bytes retained in the stderr ring buffer (diagnostics only). +const STDERR_BUFFER_MAX: usize = 8192; + +/// Boxed stdio handed to a transport (type erased so the handle type — real +/// ChildStdin/out vs a fake duplex — is hidden). +pub type BoxedStdin = Box; +pub type BoxedStdout = Box; + +/// Terminal exit state held in the exit watch. `None` in the watch = STILL +/// RUNNING; `Some(_)` = terminally gone. The two `Some` variants distinguish a +/// real status from a `wait()` that errored — the distinction the old +/// `Option` payload could not express (F45): on a `wait()` error the +/// old code left the payload `None`, making a terminally-gone process look like +/// "still running" to `exit_status()`/`kill()` and causing a spurious 5s +/// "did not exit" timeout. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum TerminalExit { + /// `wait()` returned a status (the normal case). + Exited(ExitStatus), + /// `wait()` itself errored (e.g. ECHILD — another reaper got there first). + /// The process is terminally gone but with no obtainable status. + WaitErrored, +} + +/// A registry-deregistration (or other cleanup) hook fired exactly once when the +/// `ManagedProcess` is dropped. Opaque to this crate ("bytes not semantics" — it +/// never knows about RegistryStore); injected by the assembly layer (RealSpawner) +/// so a clean turn-end drop removes the registry row (F38) without this module +/// depending on the registry. The hook MUST NOT block the calling thread — the +/// injector offloads any file I/O (e.g. to `spawn_blocking`). +type OnDropHook = Box; + +/// A live subprocess this crate owns. +pub struct ManagedProcess { + stdin: Mutex>, + stdout: Mutex>, + pid: u32, + process_group_id: Option, + exit_rx: watch::Receiver>, + stderr_buffer: Arc>>, + stderr_task: JoinHandle<()>, + /// The exit-monitor task. Held only to keep it attached for the process's + /// lifetime; on Drop it is DETACHED (not aborted) so it completes the single + /// `child.wait()` reap of the direct child after the group-kill — one reaper, + /// no manual `waitpid`, no blocking sleep (F43/F44). `Option` so Drop can + /// `take()` it and explicitly detach (documents intent + silences "unread"). + exit_task: Option>, + /// Fired once on Drop (registry dereg, F38). `Mutex` so `&self` Drop can take it. + on_drop: std::sync::Mutex>, + /// Windows Job Object holding this child + all its descendants (batch B). + /// Held for the child's whole lifetime; on Drop it terminates the whole + /// subtree (KILL_ON_JOB_CLOSE). `None` only if assignment failed (the spawn + /// then errors before this struct is built, so in practice always `Some`). + #[cfg(windows)] + job: Option, +} + +impl Drop for ManagedProcess { + /// Owning-handle contract: dropping a `ManagedProcess` tears down the + /// subprocess subtree it owns — WITHOUT blocking the calling thread (which, + /// in the consumer, is a tokio worker; F43) and WITHOUT a second reaper + /// racing the exit task (F44). + /// + /// Steps: (1) group-SIGKILL the whole subtree if not already terminally gone + /// (covers grandchildren in the group; `force_kill` maps already-gone→Ok so a + /// normal post-exit drop is a no-op); (2) abort the stderr task; (3) detach + /// the exit-monitor task by dropping its handle; (4) fire the once-only + /// on-drop hook (registry dereg, F38). + /// + /// WHO REAPS THE DIRECT CHILD'S ZOMBIE (corrected per review): + /// the authoritative reaper is tokio's `kill_on_drop(true)` (set on the + /// Builder Child in aionui-runtime spawn.rs) — when the exit-monitor task + /// (which OWNS the `Child`) is dropped, tokio's orphan queue reaps the Child. + /// So we do NOT rely on the detached `child.wait()` running to completion + /// (a detached task is cancelled at its next await on runtime shutdown, NOT + /// driven to completion — the earlier claim here was overstated). We detach + /// rather than `abort()` purely so a turn-end drop (runtime still alive) lets + /// the in-flight `wait()` finish naturally; either way kill_on_drop guarantees + /// the reap. The point that IS load-bearing (F43/F44): Drop does NO blocking + /// `std::thread::sleep` and NO manual `waitpid`, so it never stalls the + /// (tokio worker) thread releasing the last Arc and there is no second reaper + /// racing tokio's. + /// + /// `already_terminal` now reads the watch as a real liveness check (F45): + /// `Some(_)` — whether `Exited` or `WaitErrored` — means terminally gone, so + /// a `wait()`-error no longer makes a dead process look alive and trigger a + /// redundant group-kill. + fn drop(&mut self) { + let already_terminal = self.exit_rx.borrow().is_some(); + if !already_terminal { + // HOT-path whole-subtree kill. Non-blocking. + // + // F41 (why no identity re-gate on the HOT path, unlike cold reap): + // the exit-monitor task still holds the tokio `Child`, which OWNS + // this pid — the kernel cannot recycle a pid until it is + // `wait()`-reaped, and that reap only happens inside exit_task (which + // we detach, not abort). So while this handle is live the pgid/job + // CANNOT have been recycled onto an innocent process; killing our own + // recorded group/job here is safe by Child-ownership, no start-time + // re-check needed. The identity gate is required only for the COLD + // reap path (no live Child, pid reconstructed from disk), where it + // exists (F42). `already_terminal` uses the F45-correct watch so a + // wait()-errored (terminally gone) process is not re-killed. + #[cfg(unix)] + let _ = crate::force_kill(self.pid, self.process_group_id); + // Windows HOT kill = terminate the whole Job (subtree), not just the + // single process. The Job's own Drop (below, KILL_ON_JOB_CLOSE) would + // also do this, but an explicit synchronous terminate here guarantees + // teardown before the dereg hook fires. + #[cfg(windows)] + if let Some(job) = &self.job { + job.terminate(); + } + } + // Stop draining stderr eagerly (its fd/buffer/task would otherwise be + // pinned by a surviving grandchild holding the write end; F48). + self.stderr_task.abort(); + // DETACH the exit task (do NOT abort): take it out and drop it. Dropping + // a tokio `JoinHandle` lets the task run to completion, so the single + // `child.wait()` inside it completes the lone reap of the direct child + // after the group-kill above — one reaper, no manual `waitpid`, no + // `std::thread::sleep`, no blocking on this (worker) thread (F43/F44). + if let Some(task) = self.exit_task.take() { + drop(task); // explicit detach + } + + // Fire the dereg hook exactly once (registry dereg, F38). The injector + // (RealSpawner) makes the hook non-blocking (offloads file I/O), so this + // does not stall the dropping thread. + if let Some(hook) = self.on_drop.lock().unwrap_or_else(|e| e.into_inner()).take() { + hook(); + } + } +} + +impl ManagedProcess { + /// Spawn `spec` as its own process-group leader and start the background + /// stderr-drain + exit-monitor tasks. `extra_env` is applied per-child via + /// the Builder (IC-5: never mutates global env). + pub async fn spawn(spec: CommandSpec, extra_env: &[(String, String)]) -> Result { + let cwd = match spec.cwd.as_deref() { + Some(c) => Some(prepare_command_cwd(c)?), + None => None, + }; + + let mut builder = Builder::new(&spec.command); + builder + .args(&spec.args) + .envs(spec.env.iter().map(|e| (&e.name, &e.value))) + .envs(extra_env.iter().map(|(k, v)| (k, v))) + .stdin(std::process::Stdio::piped()) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()); + if let Some(cwd) = cwd { + builder.current_dir(cwd); + } + + let mut child = builder + .spawn() + .map_err(|e| ProcessError::internal(format!("spawn '{}' failed: {e}", spec.command.display())))?; + + let pid = child + .id() + .ok_or_else(|| ProcessError::internal("spawned child has no pid"))?; + let process_group_id = tracked_process_group_id(pid); + + // Windows (batch B): the child was spawned CREATE_SUSPENDED. Assign it to + // a Job Object (subtree containment) and resume it BEFORE we touch its + // stdio — closing the assign-vs-fork race. On failure we must NOT leave a + // suspended zombie (S7): kill it and surface the error. + #[cfg(windows)] + let job = { + // raw_handle() is valid while the child is alive (it is, suspended). + match child.raw_handle() { + Some(raw) => match job_windows::JobObject::assign_and_resume(raw, pid) { + Ok(j) => Some(j), + Err(e) => { + // Tear down the suspended child (kill_on_drop SIGKILLs it + // when `child` drops at end of scope) and error out (S7). + let _ = child.start_kill(); + return Err(ProcessError::internal(format!("windows job containment: {e}"))); + } + }, + None => { + let _ = child.start_kill(); + return Err(ProcessError::internal( + "windows: child has no raw handle for job assignment", + )); + } + } + }; + + let stdin = child + .stdin + .take() + .ok_or_else(|| ProcessError::internal("failed to capture stdin"))?; + let stdout = child + .stdout + .take() + .ok_or_else(|| ProcessError::internal("failed to capture stdout"))?; + let stderr = child + .stderr + .take() + .ok_or_else(|| ProcessError::internal("failed to capture stderr"))?; + + // Background: drain stderr into a bounded ring buffer. + let stderr_buffer = Arc::new(Mutex::new(Vec::::new())); + let buf_clone = Arc::clone(&stderr_buffer); + let stderr_task = tokio::spawn(async move { + let mut lines = BufReader::new(stderr).lines(); + while let Ok(Some(line)) = lines.next_line().await { + let trimmed = line.trim(); + if !trimmed.is_empty() { + warn!(pid, stderr = trimmed, "subprocess stderr"); + } + let mut buf = buf_clone.lock().await; + buf.extend_from_slice(line.as_bytes()); + buf.push(b'\n'); + if buf.len() > STDERR_BUFFER_MAX { + // Truncate by raw byte offset (UTF-8-safe: we never index + // mid-char, we drain a byte prefix of a Vec). + let cut = buf.len() - STDERR_BUFFER_MAX; + buf.drain(..cut); + } + } + debug!(pid, "stderr drain finished"); + }); + + // Background: monitor exit via tokio's per-Child wait (IC-2). The watch + // payload is `Option`: `None` = still running, `Some(_)` = + // terminally gone. A `wait()` error latches `Some(WaitErrored)` — NOT + // `None` — so `exit_status()`/`kill()` see a real terminal state instead + // of conflating "wait errored" with "never exited" (F45). + let (exit_tx, exit_rx) = watch::channel(None); + // Hand the exit task an abort handle to the stderr drain so it can stop + // it the moment the direct child is gone — otherwise a grandchild holding + // the stderr write end keeps the drain (fd + 8KB buffer + task) pinned + // until the ManagedProcess is dropped (F48). + let stderr_abort = stderr_task.abort_handle(); + let exit_task = tokio::spawn(async move { + match child.wait().await { + Ok(status) => { + debug!(pid, ?status, "subprocess exited"); + let _ = exit_tx.send(Some(TerminalExit::Exited(status))); + } + Err(e) => { + error!(pid, error = %e, "wait() on subprocess errored"); + let _ = exit_tx.send(Some(TerminalExit::WaitErrored)); + } + } + // Direct child is gone: stop the stderr drain so a surviving + // grandchild on the same pipe cannot pin the fd/task (F48). + stderr_abort.abort(); + }); + + Ok(Self { + stdin: Mutex::new(Some(stdin)), + stdout: Mutex::new(Some(stdout)), + pid, + process_group_id, + exit_rx, + stderr_buffer, + stderr_task, + exit_task: Some(exit_task), + on_drop: std::sync::Mutex::new(None), + #[cfg(windows)] + job, + }) + } + + /// Install a once-only cleanup hook fired on Drop (registry dereg, F38). + /// Opaque to this crate; the injector (RealSpawner) must make it non-blocking. + /// Overwrites any previously-set hook (last writer wins; intended single set). + pub fn set_on_drop(&self, hook: Box) { + *self.on_drop.lock().unwrap_or_else(|e| e.into_inner()) = Some(hook); + } + + /// Hand off stdin+stdout (boxed) to a transport. Once-only: a second call + /// returns None. Takes both under one critical section so it is all-or- + /// nothing — a partial handoff never leaves one taken and one intact. + pub async fn take_stdio(&self) -> Option<(BoxedStdin, BoxedStdout)> { + let mut sin = self.stdin.lock().await; + let mut sout = self.stdout.lock().await; + match (sin.is_some(), sout.is_some()) { + (true, true) => { + let i = sin.take().unwrap(); + let o = sout.take().unwrap(); + Some((Box::new(i), Box::new(o))) + } + _ => None, // already taken (or never present) — leave both as-is + } + } + + pub fn pid(&self) -> u32 { + self.pid + } + + pub fn process_group_id(&self) -> Option { + self.process_group_id + } + + /// The exit status if the process has terminally exited WITH an obtainable + /// status. `None` means EITHER still running OR `wait()` errored + /// (`WaitErrored`) — callers needing to distinguish those use + /// [`Self::terminal_exit`] / [`Self::has_exited`]. + pub fn exit_status(&self) -> Option { + match *self.exit_rx.borrow() { + Some(TerminalExit::Exited(s)) => Some(s), + Some(TerminalExit::WaitErrored) | None => None, + } + } + + /// The full terminal state: `None` = still running; `Some(Exited)` / + /// `Some(WaitErrored)` = terminally gone. This is the F45-correct liveness + /// signal (a `WaitErrored` process is GONE, not running). + pub fn terminal_exit(&self) -> Option { + *self.exit_rx.borrow() + } + + /// True iff the process is terminally gone (exited or wait-errored). + pub fn has_exited(&self) -> bool { + self.exit_rx.borrow().is_some() + } + + /// Resolve when the process is terminally gone (exited or wait() errored). + /// Returns the status if one was obtainable (`None` for `WaitErrored`). + pub async fn wait_for_exit(&self) -> Option { + let mut rx = self.exit_rx.clone(); + if rx.borrow().is_some() { + return self.exit_status(); + } + let _ = rx.changed().await; + self.exit_status() + } + + /// Peek the last `max_lines` lines of buffered stderr without draining. + pub async fn peek_stderr_tail(&self, max_lines: usize) -> String { + if max_lines == 0 { + return String::new(); + } + let buf = self.stderr_buffer.lock().await; + // Lossy render — diagnostics channel, never the byte-duplex contract. + let text = String::from_utf8_lossy(&buf); + let trimmed = text.trim_end_matches('\n'); + if trimmed.is_empty() { + return String::new(); + } + let mut tail: Vec<&str> = trimmed.rsplit('\n').take(max_lines).collect(); + tail.reverse(); + tail.join("\n") + } + + /// Close stdin (signals EOF to the child) without killing. + pub async fn close_stdin(&self) { + if self.stdin.lock().await.take().is_some() { + debug!(pid = self.pid, "stdin closed"); + } + } + + /// Non-destructive interrupt: close stdin so a cooperative child sees EOF + /// and winds down on its own. Distinct from kill (which force-terminates). + pub async fn interrupt(&self) -> Result<(), ProcessError> { + self.close_stdin().await; + Ok(()) + } + + /// Graceful kill: close stdin, wait up to `grace` for self-exit, then + /// group-SIGKILL. Confirms exit before returning. + pub async fn kill(&self, grace: Duration) -> Result<(), ProcessError> { + self.close_stdin().await; + + let mut rx = self.exit_rx.clone(); + let exited = tokio::time::timeout(grace, async { + if rx.borrow().is_some() { + return; + } + let _ = rx.changed().await; + }) + .await; + if exited.is_ok() && self.exit_rx.borrow().is_some() { + debug!(pid = self.pid, "exited within grace"); + return Ok(()); + } + + warn!(pid = self.pid, "grace expired, force-killing subtree"); + // Unix: group-SIGKILL. Windows: terminate the whole Job (subtree), not + // the single process — the live handle holds the job (hot path). + #[cfg(unix)] + crate::force_kill(self.pid, self.process_group_id)?; + #[cfg(windows)] + if let Some(job) = &self.job { + job.terminate(); + } else { + crate::force_kill(self.pid, self.process_group_id)?; + } + + // Wait for the exit monitor to observe termination so callers don't + // race a still-live child after the kill returns. + let mut rx = self.exit_rx.clone(); + tokio::time::timeout(Duration::from_secs(5), async { + if rx.borrow().is_some() { + return; + } + let _ = rx.changed().await; + }) + .await + .map_err(|_| ProcessError::internal(format!("process {} did not exit after SIGKILL", self.pid)))?; + Ok(()) + } +} + +/// Validate a workspace cwd: non-empty, no whitespace segment (bundled runtime +/// can't handle it), exists, is a directory. +pub(crate) fn prepare_command_cwd(cwd: &str) -> Result { + if cwd.trim().is_empty() { + return Err(ProcessError::bad_request("workspace directory is empty")); + } + let path = PathBuf::from(cwd); + if path + .components() + .any(|c| c.as_os_str().to_string_lossy().contains(char::is_whitespace)) + { + return Err(ProcessError::workspace_path_contains_whitespace_runtime_unsupported( + path.display().to_string(), + )); + } + match std::fs::metadata(&path) { + Ok(m) if m.is_dir() => Ok(path), + Ok(_) => Err(ProcessError::bad_request(format!( + "workspace path is not a directory: {}", + path.display() + ))), + Err(e) if e.kind() == std::io::ErrorKind::NotFound => Err(ProcessError::bad_request(format!( + "workspace directory does not exist: {}", + path.display() + ))), + Err(e) => Err(ProcessError::bad_request(format!( + "workspace directory not accessible: {}: {e}", + path.display() + ))), + } +} + +#[cfg(unix)] +pub(crate) fn tracked_process_group_id(pid: u32) -> Option { + Some(pid) // Builder sets process_group(0): the leader's pgid == its pid. +} + +#[cfg(not(unix))] +pub(crate) fn tracked_process_group_id(_pid: u32) -> Option { + None +} + +/// Windows Job Object subtree containment (feature 005 batch B). Transcribed +/// from watchexec/process-wrap's verified `src/windows.rs` scheme (Decision 2b: +/// borrow the SCHEME, not the crate). A `JobObject` owns a Job handle for the +/// child's whole lifetime: every descendant the child spawns is auto-trapped in +/// the job (Windows inherits job membership), so `terminate()` / +/// `KILL_ON_JOB_CLOSE` reaps the WHOLE subtree — the Windows analog of the Unix +/// process-group kill, and STRONGER (a `setsid`-style breakaway is not possible +/// without `JOB_OBJECT_LIMIT_BREAKAWAY_OK`). +#[cfg(windows)] +pub(crate) mod job_windows { + use std::os::windows::io::RawHandle; + + use windows_sys::Win32::Foundation::{CloseHandle, HANDLE, INVALID_HANDLE_VALUE}; + use windows_sys::Win32::System::Diagnostics::ToolHelp::{ + CreateToolhelp32Snapshot, TH32CS_SNAPTHREAD, THREADENTRY32, Thread32First, Thread32Next, + }; + use windows_sys::Win32::System::JobObjects::{ + AssignProcessToJobObject, CreateJobObjectW, JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE, + JOBOBJECT_EXTENDED_LIMIT_INFORMATION, JobObjectExtendedLimitInformation, SetInformationJobObject, + TerminateJobObject, + }; + use windows_sys::Win32::System::Threading::{OpenThread, ResumeThread, THREAD_SUSPEND_RESUME}; + + /// An owned Job Object handle. RAII: dropping it closes the handle, which — + /// because we set `KILL_ON_JOB_CLOSE` — terminates every process still in the + /// job (the Windows kill-on-drop guarantee, even if our supervisor crashes). + pub(crate) struct JobObject { + job: HANDLE, + } + + // SAFETY: a Windows job HANDLE is just a kernel handle value; it is safe to + // move/share across threads (we only ever close it once, on Drop). + unsafe impl Send for JobObject {} + unsafe impl Sync for JobObject {} + + impl JobObject { + /// Create a job, set `KILL_ON_JOB_CLOSE`, assign the (suspended) child to + /// it, then resume the child's threads. The child MUST have been spawned + /// with `CREATE_SUSPENDED` (runtime's `configure_platform_spawn`) so it + /// has not yet run any instruction — closing the assign-vs-fork race. + /// + /// `child_raw` is tokio's `Child::raw_handle()` — we use it read-only for + /// `AssignProcessToJobObject` and NEVER close it (tokio owns it; a + /// double-close would be a use-after-free). `pid` is used only to find + /// the child's threads for resume (std/tokio expose no primary-thread + /// handle, hence the toolhelp walk). + pub(crate) fn assign_and_resume(child_raw: RawHandle, pid: u32) -> Result { + // SAFETY: CreateJobObjectW with null attrs/name returns a job handle + // or null on failure. + let job = unsafe { CreateJobObjectW(core::ptr::null(), core::ptr::null()) }; + if job.is_null() { + return Err(format!("CreateJobObjectW failed: {}", std::io::Error::last_os_error())); + } + let this = JobObject { job }; + + // Set KILL_ON_JOB_CLOSE so dropping the last job handle kills the + // whole subtree (crash-safety). + let mut info: JOBOBJECT_EXTENDED_LIMIT_INFORMATION = unsafe { core::mem::zeroed() }; + info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE; + // SAFETY: info is a valid, fully-initialized struct of the given size. + let ok = unsafe { + SetInformationJobObject( + job, + JobObjectExtendedLimitInformation, + core::ptr::from_ref(&info).cast(), + core::mem::size_of::() as u32, + ) + }; + if ok == 0 { + return Err(format!( + "SetInformationJobObject failed: {}", + std::io::Error::last_os_error() + )); + } + + // Assign the suspended child to the job. Descendants inherit it. + // SAFETY: child_raw is a live process handle owned by tokio; we only + // read it here. + let ok = unsafe { AssignProcessToJobObject(job, child_raw as HANDLE) }; + if ok == 0 { + return Err(format!( + "AssignProcessToJobObject failed: {}", + std::io::Error::last_os_error() + )); + } + + // Resume the child (it was spawned CREATE_SUSPENDED). std/tokio give + // no primary-thread handle, so enumerate the system thread table and + // resume every thread owned by our pid (process-wrap's documented + // "terrible hack" — the only option without reimplementing spawn). + resume_threads(pid).map_err(|e| format!("resume failed: {e}"))?; + + Ok(this) + } + + /// Synchronously terminate every process in the job (the whole subtree). + pub(crate) fn terminate(&self) { + // SAFETY: self.job is a valid job handle for our lifetime. + unsafe { TerminateJobObject(self.job, 1) }; + } + } + + impl Drop for JobObject { + fn drop(&mut self) { + // Closing the last job handle triggers KILL_ON_JOB_CLOSE → the whole + // subtree dies. This IS the Windows kill-on-drop guarantee. + // SAFETY: we own the handle and close it exactly once. + unsafe { CloseHandle(self.job) }; + } + } + + /// Resume all threads of `pid` via a system-wide toolhelp thread snapshot. + /// Edge cases (from process-wrap source): THREADENTRY32.dwSize MUST be the + /// struct size or Thread32First silently returns nothing; ResumeThread's + /// error sentinel is `u32::MAX` (not -1); the snapshot is SYSTEM-WIDE + /// (TH32CS_SNAPTHREAD ignores the pid arg) so we filter by th32OwnerProcessID. + fn resume_threads(pid: u32) -> Result<(), std::io::Error> { + // SAFETY: returns a snapshot handle or INVALID_HANDLE_VALUE. + let snap = unsafe { CreateToolhelp32Snapshot(TH32CS_SNAPTHREAD, 0) }; + if snap == INVALID_HANDLE_VALUE { + return Err(std::io::Error::last_os_error()); + } + let result = (|| { + let mut entry: THREADENTRY32 = unsafe { core::mem::zeroed() }; + entry.dwSize = core::mem::size_of::() as u32; + // SAFETY: entry is a valid, dwSize-initialized THREADENTRY32. + if unsafe { Thread32First(snap, &mut entry) } == 0 { + return Err(std::io::Error::last_os_error()); + } + loop { + if entry.th32OwnerProcessID == pid { + // SAFETY: open the thread for suspend/resume by tid. + let th = unsafe { OpenThread(THREAD_SUSPEND_RESUME, 0, entry.th32ThreadID) }; + if !th.is_null() { + // SAFETY: th is a valid thread handle; resume + close it. + let rc = unsafe { ResumeThread(th) }; + unsafe { CloseHandle(th) }; + if rc == u32::MAX { + return Err(std::io::Error::last_os_error()); + } + } + } + // SAFETY: entry stays valid; Thread32Next advances or signals end. + if unsafe { Thread32Next(snap, &mut entry) } == 0 { + break; // no more threads + } + } + Ok(()) + })(); + // SAFETY: close the snapshot handle we created. + unsafe { CloseHandle(snap) }; + result + } +} + +#[cfg(test)] +mod tests { + //! Tests for the pure / cheap helpers in this module: the workspace-cwd + //! validation guard (input validation, platform-independent) and the + //! process-group-id derivation. The real `ManagedProcess` lifecycle + //! (spawn/kill/Drop/take_stdio) needs a live child and is covered by + //! real-process integration tests (see audit PROC-L*). + + use super::*; + + /// INPUTVAL-B1: any whitespace in ANY path segment is rejected with the + /// dedicated `WorkspacePathContainsWhitespaceRuntimeUnsupported` variant + /// (the bundled runtime cannot handle it — this was the F1 spawn-fail + /// incident root cause, and was previously untested). + #[test] + fn prepare_command_cwd_rejects_whitespace_in_any_segment() { + for bad in ["/tmp/my project", "/tmp/proj/sub folder/bin", "/tmp/dir\twith\ttab"] { + let err = prepare_command_cwd(bad).expect_err("whitespace cwd must be rejected"); + assert!( + matches!(err, ProcessError::WorkspacePathContainsWhitespaceRuntimeUnsupported(_)), + "{bad:?} → expected WhitespaceRuntimeUnsupported, got {err:?}" + ); + } + } + + /// INPUTVAL-B1 boundary: the whitespace guard fires BEFORE the filesystem + /// metadata check, so a non-existent whitespace path still yields the + /// whitespace error (not a "does not exist" error). This pins the ordering + /// that makes the guard testable without a real directory. + #[test] + fn prepare_command_cwd_whitespace_guard_precedes_existence_check() { + let err = prepare_command_cwd("/nonexistent path/that does not exist").expect_err("must reject"); + assert!( + matches!(err, ProcessError::WorkspacePathContainsWhitespaceRuntimeUnsupported(_)), + "whitespace must be detected before the metadata/existence check, got {err:?}" + ); + } + + /// INPUTVAL-B2: an empty or whitespace-only cwd is a `BadRequest` + /// ("workspace directory is empty"), distinct from the whitespace-segment + /// variant. + #[test] + fn prepare_command_cwd_rejects_empty_or_blank() { + for blank in ["", " ", "\t", "\n"] { + let err = prepare_command_cwd(blank).expect_err("blank cwd must be rejected"); + assert!( + matches!(err, ProcessError::BadRequest(ref m) if m.contains("empty")), + "{blank:?} → expected BadRequest(empty), got {err:?}" + ); + } + } + + /// INPUTVAL-B1 negative: a clean, existing directory with no whitespace + /// passes the guard and returns the canonical PathBuf. Uses the OS temp dir, + /// which is guaranteed to exist; skip if its path happens to contain + /// whitespace (some CI home dirs do) so the assertion stays meaningful. + #[test] + fn prepare_command_cwd_accepts_clean_existing_dir() { + let dir = std::env::temp_dir(); + let has_ws = dir + .components() + .any(|c| c.as_os_str().to_string_lossy().contains(char::is_whitespace)); + if has_ws { + return; // temp dir itself has whitespace; guard would (correctly) reject. + } + let out = prepare_command_cwd(&dir.to_string_lossy()).expect("clean existing dir accepted"); + assert_eq!(out, dir, "returns the validated path unchanged"); + } + + /// PROC-L16: on Unix the child is spawned as its own process-group leader + /// (Builder calls `process_group(0)`), so the tracked pgid equals the pid; + /// on non-Unix there is no pgid concept and it is `None`. + #[test] + fn tracked_process_group_id_is_pid_on_unix_none_elsewhere() { + #[cfg(unix)] + assert_eq!(tracked_process_group_id(12345), Some(12345), "Unix leader pgid == pid"); + #[cfg(not(unix))] + assert_eq!(tracked_process_group_id(12345), None, "no pgid concept off-Unix"); + } +} diff --git a/crates/aionui-process/src/registry_store.rs b/crates/aionui-process/src/registry_store.rs new file mode 100644 index 00000000..356461ee --- /dev/null +++ b/crates/aionui-process/src/registry_store.rs @@ -0,0 +1,710 @@ +//! Persisted registry of processes THIS crate spawned (IC-4). +//! +//! Lives in its own subdir `{data_dir}/runtime/aionui-process/registry.json`, +//! never touching the existing `agent-process-registry.json`. Written via a +//! durable atomic write whose temp file is namespaced to this crate + pid, so +//! it can never clobber another mechanism's temp. Accessed by exact path only +//! — never by directory scan/glob. + +use std::path::{Path, PathBuf}; + +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +use crate::ProcessError; + +/// Subdir + filenames, namespaced so they are provably disjoint from the +/// existing mechanism's artifacts and from bun's `runtime.lock` (IC-3/IC-4). +pub const SUBDIR: &str = "runtime/aionui-process"; +pub const REGISTRY_FILE: &str = "registry.json"; +pub const LOCK_FILE: &str = "instance.lock"; +/// Cross-process advisory lock for the registry read-modify-write (F49). +/// DELIBERATELY a SEPARATE file from `LOCK_FILE` (`instance.lock`): the +/// single-instance lock is held NON-BLOCKING for the whole process lifetime +/// (it gates reap), whereas this one is taken BLOCKING for the duration of a +/// single millisecond-scale RMW. Reusing the same file would self-deadlock — +/// the process already holds `instance.lock` exclusively for its whole life. +pub const REGISTRY_LOCK_FILE: &str = "registry.json.lock"; + +/// One process this crate spawned. Identity fields (`start_time_ticks` + +/// `instance_epoch`) back the IC-1 kill gate. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct RegisteredProcess { + pub pid: u32, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub pgid: Option, + /// Kernel start-time in clock ticks (identity gate). None if unobtainable. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub start_time_ticks: Option, + /// This-run UUID; a row whose epoch != current run is a prior-run orphan. + pub instance_epoch: Uuid, + /// Host identity; a row from another machine (cloud-synced data dir) is + /// prune-only, never killed (IC-1 cross-machine guard, design I-5). + pub machine_id: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub containment_id: Option, + /// Opaque owner tag (e.g. a conversation id); this layer never parses it. + #[serde(default, skip_serializing_if = "String::is_empty")] + pub opaque_owner_tag: String, + pub registered_at_ms: i64, +} + +/// Identity key for `unregister` — by (pid, start_time, epoch), not bare pid, +/// so a recycled-pid row is never accidentally removed. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ProcessIdentity { + pub pid: u32, + pub start_time_ticks: Option, + pub instance_epoch: Uuid, +} + +impl RegisteredProcess { + fn identity(&self) -> ProcessIdentity { + ProcessIdentity { + pid: self.pid, + start_time_ticks: self.start_time_ticks, + instance_epoch: self.instance_epoch, + } + } +} + +/// The on-disk registry schema version this build writes and understands. +const CURRENT_REGISTRY_VERSION: u32 = 1; + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct RegistryFile { + /// `#[serde(default)]` so a registry missing the field (older/hand-edited) + /// is NOT a hard parse error that aborts the reap (F55 + ties into F39); + /// it defaults to the current version instead. + #[serde(default = "default_registry_version")] + version: u32, + processes: Vec, +} + +fn default_registry_version() -> u32 { + CURRENT_REGISTRY_VERSION +} + +impl Default for RegistryFile { + fn default() -> Self { + Self { + version: CURRENT_REGISTRY_VERSION, + processes: Vec::new(), + } + } +} + +/// Persisted store of this crate's spawned processes. +pub trait RegistryStore: Send + Sync { + fn record(&self, entry: RegisteredProcess) -> Result<(), ProcessError>; + /// Remove by full identity (recycled-pid rows with a different identity + /// are left intact). + fn unregister(&self, id: &ProcessIdentity) -> Result<(), ProcessError>; + /// The production reader — read all rows back (used by startup reap). + fn read_all(&self) -> Result, ProcessError>; +} + +/// File-backed registry under `{data_dir}/runtime/aionui-process/registry.json`. +pub struct FileRegistryStore { + path: PathBuf, + /// Sidecar path for the cross-process RMW lock (F49). See + /// [`REGISTRY_LOCK_FILE`]; a separate file from `instance.lock`. + lock_path: PathBuf, + /// Serializes read-modify-write within this process. `atomic_write` makes + /// only the final rename atomic; without this, two concurrent `record`s + /// in one process could lose a row (last-writer-wins) — leaking a spawned- + /// but-unrecorded, un-reapable orphan. Cross-process writers are serialized + /// by the additional fs lock in [`Self::with_rmw_flock`] (F49). + rmw: std::sync::Mutex<()>, +} + +impl FileRegistryStore { + pub fn new(data_dir: &Path) -> Self { + let dir = data_dir.join(SUBDIR); + let store = Self { + path: dir.join(REGISTRY_FILE), + lock_path: dir.join(REGISTRY_LOCK_FILE), + rmw: std::sync::Mutex::new(()), + }; + // F51: best-effort sweep of stray atomic-write temp files left by a + // crash/SIGKILL between fsync and rename. We scan ONLY this crate's own + // subdir and match ONLY our own exact temp prefix (`.registry.json.` + + // `.corrupt.`) — never a broad `*.tmp`/`*.json` glob (IC-4: never touch + // another mechanism's artifacts). Without this the design's + // "by-exact-path-only" rule means strays can NEVER be GC'd. + store.sweep_stray_temps(); + store + } + + /// Remove THIS PROCESS's own orphaned `.registry.json...tmp` + /// strays from a prior crash. PID-SCOPED (F51 review fix): the temp name is + /// `.{stem}.{pid}.{counter}.tmp`, so we only sweep temps carrying OUR pid — + /// never a SIBLING instance's in-flight atomic-write temp (deleting that + /// would make the sibling's `rename` fail NotFound and silently lose a row → + /// an un-reapable orphan). A stale temp from an OLD run that happens to share + /// our recycled pid is the only (harmless) over-match, and it is genuinely a + /// stray. Deliberately does NOT touch `.corrupt.` quarantine files (forensics, + /// F39). + fn sweep_stray_temps(&self) { + let Some(parent) = self.path.parent() else { return }; + let stem = self.path.file_name().and_then(|n| n.to_str()).unwrap_or(REGISTRY_FILE); + // Pid-scoped prefix: only our own process's temps. + let our_prefix = format!(".{stem}.{}.", std::process::id()); + let Ok(entries) = std::fs::read_dir(parent) else { return }; + for entry in entries.flatten() { + let name = entry.file_name(); + let Some(name) = name.to_str() else { continue }; + if name.starts_with(&our_prefix) && name.ends_with(".tmp") && !name.contains(".corrupt.") { + let _ = std::fs::remove_file(entry.path()); + } + } + } + + pub fn path(&self) -> &Path { + &self.path + } + + /// Read the registry, FAIL-SAFE on corruption (F39). A truncated / malformed + /// registry.json (e.g. an interrupted prior `atomic_write` that never reached + /// rename, or a stray byte) must NOT abort the whole reap — aborting would + /// leak every real orphan from the prior crash. Instead we QUARANTINE the bad + /// file (rename it aside for forensics, non-destructively) and degrade to an + /// empty registry: this run reaps nothing (safe — killing pids read from + /// corrupt data is strictly more dangerous than skipping a round), and the + /// next `write_file` starts clean (self-heal). A genuine I/O error (not a + /// parse error) still propagates — that is an environment fault, not data we + /// can safely ignore. + fn read_file(&self) -> Result { + let contents = match std::fs::read_to_string(&self.path) { + Ok(s) => s, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(RegistryFile::default()), + Err(e) => return Err(e.into()), + }; + match serde_json::from_str::(&contents) { + Ok(reg) if reg.version > CURRENT_REGISTRY_VERSION => { + // F55: a registry written by a FUTURE build may have renamed / + // repurposed fields. Feeding it into reconcile could group-kill on + // stale field meanings. Degrade to empty (reap nothing this round + // — safe) rather than trust forward-incompatible data. Do NOT + // quarantine: a newer sibling owns that file legitimately. + tracing::warn!( + path = %self.path.display(), + found = reg.version, + understood = CURRENT_REGISTRY_VERSION, + "registry schema version is newer than this build understands; degrading to empty (reap skipped)" + ); + Ok(RegistryFile::default()) + } + Ok(reg) if reg.version < CURRENT_REGISTRY_VERSION => { + // F55 (asymmetry made explicit, per review): a registry written + // by an OLDER build. This build understands all prior schemas by + // construction (fields are serde-default-tolerant + only additive + // changes are allowed across versions), so an older registry is + // SAFE to trust and reap from — unlike a newer one (above) whose + // field meanings we can't know. We log it for observability but + // proceed. If a future version ever makes a BREAKING change, this + // arm must change to a migration/degrade instead of blind trust. + tracing::debug!( + path = %self.path.display(), + found = reg.version, + understood = CURRENT_REGISTRY_VERSION, + "registry schema version is older than current; trusting (backward-compatible additive schema)" + ); + Ok(reg) + } + Ok(reg) => Ok(reg), + Err(e) => { + tracing::warn!( + path = %self.path.display(), + error = %e, + "registry is corrupt/unparseable; quarantining and degrading to empty (reap skipped this round)" + ); + self.quarantine_corrupt(); + Ok(RegistryFile::default()) + } + } + } + + /// Best-effort rename the corrupt registry aside so it is preserved for + /// forensics and a fresh one can be written. Namespaced to this crate's + /// subdir + pid/counter so it never collides; failure here is non-fatal + /// (we still degrade to empty). + fn quarantine_corrupt(&self) { + let stem = self + .path + .file_name() + .and_then(|n| n.to_str()) + .unwrap_or("registry.json"); + if let Some(parent) = self.path.parent() { + let dst = parent.join(format!(".{stem}.corrupt.{}.{}", std::process::id(), next_counter())); + if let Err(e) = std::fs::rename(&self.path, &dst) { + tracing::warn!(error = %e, "failed to quarantine corrupt registry (will be overwritten on next write)"); + } + } + } + + fn write_file(&self, reg: &RegistryFile) -> Result<(), ProcessError> { + let bytes = + serde_json::to_vec_pretty(reg).map_err(|e| ProcessError::internal(format!("serialize registry: {e}")))?; + atomic_write(&self.path, &bytes) + } + + /// Run `f` while holding the CROSS-PROCESS registry lock (F49), so a + /// concurrent sibling instance cannot interleave its own read-modify-write + /// and lose a row (last-writer-wins). `atomic_write` only makes the final + /// rename atomic — it does NOT prevent two processes each reading the same + /// N rows, each appending one, and the second `rename` clobbering the + /// first's row. A BLOCKING `flock` around the whole RMW serializes that. + /// + /// Degrade-not-fail: if the lock file cannot be opened or locked (rare I/O + /// fault), we WARN and still run `f` — the cross-process guard is an + /// enhancement over the always-present in-process `rmw` Mutex; failing the + /// `record`/`unregister` outright would be a worse regression than the + /// single-instance behavior we had before F49. + fn with_rmw_flock(&self, f: impl FnOnce() -> Result) -> Result { + use fs2::FileExt; + if let Some(parent) = self.lock_path.parent() { + let _ = std::fs::create_dir_all(parent); + } + // `File::create` sets O_CLOEXEC on unix by default, so this advisory-lock + // fd is NOT inherited by spawned children (same reasoning as + // instance_lock.rs) — an inherited lock fd could keep the lock "held" + // after we exit. + let lock_file = match std::fs::File::create(&self.lock_path) { + Ok(file) => file, + Err(e) => { + tracing::warn!( + path = %self.lock_path.display(), + error = %e, + "could not open registry lock file; proceeding without cross-process guard (F49 degraded)" + ); + return f(); + } + }; + // BLOCKING exclusive lock for the whole RMW (≠ instance.lock's + // non-blocking try_lock): we WANT to wait out a sibling's in-flight RMW, + // not bail. RMW is millisecond-scale so contention is brief. + if let Err(e) = lock_file.lock_exclusive() { + tracing::warn!( + path = %self.lock_path.display(), + error = %e, + "could not acquire registry lock; proceeding without cross-process guard (F49 degraded)" + ); + return f(); + } + let result = f(); + // Explicit unlock for deterministic release (closing the File would also + // release it, but on some platforms the close-triggered release can lag). + let _ = fs2::FileExt::unlock(&lock_file); + result + } +} + +impl RegistryStore for FileRegistryStore { + fn record(&self, entry: RegisteredProcess) -> Result<(), ProcessError> { + // Lock order (fixed, both RMW methods): in-process `rmw` Mutex (outer) + // → cross-process flock (inner). This crate takes both locks at exactly + // these two sites only, so there is no opposite-order acquisition and + // thus no cross-lock deadlock risk. + let _guard = self.rmw.lock().unwrap_or_else(|e| e.into_inner()); + self.with_rmw_flock(|| { + let mut reg = self.read_file()?; + reg.processes.retain(|p| p.identity() != entry.identity()); + reg.processes.push(entry); + self.write_file(®) + }) + } + + fn unregister(&self, id: &ProcessIdentity) -> Result<(), ProcessError> { + let _guard = self.rmw.lock().unwrap_or_else(|e| e.into_inner()); + self.with_rmw_flock(|| { + let mut reg = self.read_file()?; + let before = reg.processes.len(); + reg.processes.retain(|p| &p.identity() != id); + if reg.processes.len() == before { + return Ok(()); // nothing matched — idempotent + } + self.write_file(®) + }) + } + + fn read_all(&self) -> Result, ProcessError> { + Ok(self.read_file()?.processes) + } +} + +/// Durable atomic write into this crate's subdir. Temp file is namespaced to +/// the final path + pid + counter so it cannot collide with another +/// mechanism's temp or with concurrent writers (IC-4). Best-effort dir fsync. +/// `pub(crate)` so other durable artifacts (the machine-id, F40) reuse the same +/// temp+fsync+rename discipline instead of a torn `std::fs::write`. +pub(crate) fn atomic_write(path: &Path, bytes: &[u8]) -> Result<(), ProcessError> { + use std::io::Write; + let parent = path + .parent() + .ok_or_else(|| ProcessError::internal("registry path has no parent"))?; + std::fs::create_dir_all(parent)?; + + let stem = path.file_name().and_then(|n| n.to_str()).unwrap_or("registry.json"); + let tmp = parent.join(format!(".{stem}.{}.{}.tmp", std::process::id(), next_counter())); + + { + let mut f = std::fs::File::create(&tmp)?; + f.write_all(bytes)?; + f.flush()?; + f.sync_all()?; + } + if let Err(e) = std::fs::rename(&tmp, path).or_else(|e| { + if cfg!(windows) { + let _ = std::fs::remove_file(path); + std::fs::rename(&tmp, path) + } else { + Err(e) + } + }) { + let _ = std::fs::remove_file(&tmp); + return Err(e.into()); + } + if let Ok(dir) = std::fs::File::open(parent) { + let _ = dir.sync_all(); + } + Ok(()) +} + +fn next_counter() -> u64 { + use std::sync::atomic::{AtomicU64, Ordering}; + static C: AtomicU64 = AtomicU64::new(0); + C.fetch_add(1, Ordering::Relaxed) +} + +#[cfg(test)] +mod tests { + //! Persistence-layer tests against a real `FileRegistryStore` rooted in a + //! `tempfile::TempDir` (single-process; the cross-process RMW flock race is + //! a concurrency-harness gap, see audit PERS-P10/P20). Covers identity-keyed + //! record/unregister, version compatibility, corruption quarantine, and the + //! atomic-write durability discipline. + + use std::path::Path; + + use tempfile::TempDir; + use uuid::Uuid; + + use super::*; + + fn store() -> (TempDir, FileRegistryStore) { + let tmp = TempDir::new().unwrap(); + let store = FileRegistryStore::new(tmp.path()); + (tmp, store) + } + + fn proc(pid: u32, start: u64, epoch: u128) -> RegisteredProcess { + RegisteredProcess { + pid, + pgid: Some(pid), + start_time_ticks: Some(start), + instance_epoch: Uuid::from_u128(epoch), + machine_id: "m".into(), + containment_id: None, + opaque_owner_tag: String::new(), + registered_at_ms: 0, + } + } + + /// PERS-P9: `record` is idempotent on FULL identity — recording the same + /// (pid, start_time, epoch) twice leaves exactly one row (retain-then-push + /// dedups), never two. + #[test] + fn record_is_idempotent_on_identical_identity() { + let (_tmp, store) = store(); + store.record(proc(100, 1000, 0xA)).unwrap(); + store.record(proc(100, 1000, 0xA)).unwrap(); + assert_eq!( + store.read_all().unwrap().len(), + 1, + "identical identity dedups to one row" + ); + } + + /// PERS-P9 (identity gate, the load-bearing half): a SAME-pid row with a + /// DIFFERENT start_time is a DIFFERENT identity (a recycled pid = a genuinely + /// different process), so BOTH rows are kept. This is the whole point of + /// keying on start_time, not bare pid — recording must NOT collapse them. + #[test] + fn record_keeps_both_rows_for_recycled_pid_different_start() { + let (_tmp, store) = store(); + store.record(proc(100, 1000, 0xA)).unwrap(); + store.record(proc(100, 2000, 0xA)).unwrap(); + let rows = store.read_all().unwrap(); + assert_eq!( + rows.len(), + 2, + "recycled pid (diff start_time) is a distinct identity → both kept" + ); + let starts: Vec> = rows.iter().map(|r| r.start_time_ticks).collect(); + assert!(starts.contains(&Some(1000)) && starts.contains(&Some(2000))); + } + + /// PERS-P8: `unregister` matches by FULL identity. A mismatched start_time + /// must NOT remove the row (defends against recycled-pid mis-deletion); + /// an exact identity match removes it. `unregister` is idempotent when + /// nothing matches. + #[test] + fn unregister_requires_exact_identity_match() { + let (_tmp, store) = store(); + store.record(proc(100, 1000, 0xA)).unwrap(); + + // Wrong start_time → no-op (row survives). + store + .unregister(&ProcessIdentity { + pid: 100, + start_time_ticks: Some(999), + instance_epoch: Uuid::from_u128(0xA), + }) + .unwrap(); + assert_eq!( + store.read_all().unwrap().len(), + 1, + "start_time mismatch must NOT remove the row" + ); + + // Wrong epoch → no-op. + store + .unregister(&ProcessIdentity { + pid: 100, + start_time_ticks: Some(1000), + instance_epoch: Uuid::from_u128(0xB), + }) + .unwrap(); + assert_eq!( + store.read_all().unwrap().len(), + 1, + "epoch mismatch must NOT remove the row" + ); + + // Exact identity → removed. + store + .unregister(&ProcessIdentity { + pid: 100, + start_time_ticks: Some(1000), + instance_epoch: Uuid::from_u128(0xA), + }) + .unwrap(); + assert!( + store.read_all().unwrap().is_empty(), + "exact identity match removes the row" + ); + } + + /// PERS-P6: schema version compatibility. A FUTURE version degrades to empty + /// (never trust forward-incompatible field meanings); an OLDER version is + /// trusted (additive-only schema, backward compatible). + #[test] + fn read_degrades_on_future_version_and_trusts_older() { + let (_tmp, store) = store(); + let parent = store.path().parent().unwrap(); + std::fs::create_dir_all(parent).unwrap(); + + // Future version → empty (reap nothing). + std::fs::write( + store.path(), + serde_json::json!({ "version": CURRENT_REGISTRY_VERSION + 99, "processes": [proc(7, 70, 0xA)] }) + .to_string(), + ) + .unwrap(); + assert!( + store.read_all().unwrap().is_empty(), + "future schema version must degrade to empty, not be trusted" + ); + + // Older version → trusted. + std::fs::write( + store.path(), + serde_json::json!({ "version": 0, "processes": [proc(7, 70, 0xA)] }).to_string(), + ) + .unwrap(); + assert_eq!( + store.read_all().unwrap().len(), + 1, + "older schema version is trusted (additive-only)" + ); + } + + /// PERS-P12: the `version` field defaults to `CURRENT_REGISTRY_VERSION` when + /// omitted (a hand-edited / older file with no version key parses, not errors). + #[test] + fn registry_file_version_defaults_when_omitted() { + let json = r#"{ "processes": [] }"#; + let reg: RegistryFile = serde_json::from_str(json).expect("missing version must default, not error"); + assert_eq!(reg.version, CURRENT_REGISTRY_VERSION); + } + + /// PERS-P5: a corrupt/unparseable registry is quarantined aside (forensics, + /// `.{stem}.corrupt.*`) and read degrades to empty — corruption never aborts + /// the reap, and the original bytes are preserved (not deleted). + #[test] + fn corrupt_registry_is_quarantined_and_degrades_to_empty() { + let (_tmp, store) = store(); + let parent = store.path().parent().unwrap(); + std::fs::create_dir_all(parent).unwrap(); + std::fs::write(store.path(), b"{ this is not json ]").unwrap(); + + assert!(store.read_all().unwrap().is_empty(), "corrupt file degrades to empty"); + // Original is gone (renamed) and a .corrupt.* sibling now holds the bytes. + assert!( + !store.path().exists(), + "corrupt original is renamed aside, not left in place" + ); + let quarantined: Vec<_> = std::fs::read_dir(parent) + .unwrap() + .flatten() + .filter(|e| e.file_name().to_string_lossy().contains(".corrupt.")) + .collect(); + assert_eq!(quarantined.len(), 1, "exactly one quarantine file for forensics"); + let saved = std::fs::read_to_string(quarantined[0].path()).unwrap(); + assert!( + saved.contains("not json"), + "quarantine preserves the original corrupt bytes" + ); + } + + /// PERS-P3: `next_counter` yields strictly-increasing, collision-free values + /// (it is the temp-name disambiguator preventing concurrent atomic_write + /// collisions). It is a process-global static, so we assert monotonicity over + /// a captured window rather than absolute values. + #[test] + fn next_counter_is_strictly_increasing_and_unique() { + let seq: Vec = (0..100).map(|_| next_counter()).collect(); + for w in seq.windows(2) { + assert!(w[1] > w[0], "counter must strictly increase: {} !> {}", w[1], w[0]); + } + let uniq: std::collections::HashSet<_> = seq.iter().collect(); + assert_eq!(uniq.len(), seq.len(), "no duplicate counter values"); + } + + /// PERS-P1/P17: `atomic_write` writes the exact bytes and creates missing + /// parent directories (temp+fsync+rename discipline), leaving no `.tmp` stray. + #[test] + fn atomic_write_persists_bytes_and_creates_parent_dirs() { + let tmp = TempDir::new().unwrap(); + let target = tmp.path().join("a/b/c/file.json"); + atomic_write(&target, b"hello-atomic").unwrap(); + assert_eq!( + std::fs::read(&target).unwrap(), + b"hello-atomic", + "exact bytes persisted" + ); + // No leftover temp in the final dir. + let strays: Vec<_> = std::fs::read_dir(target.parent().unwrap()) + .unwrap() + .flatten() + .filter(|e| e.file_name().to_string_lossy().ends_with(".tmp")) + .collect(); + assert!(strays.is_empty(), "rename consumes the temp; no .tmp stray remains"); + } + + /// PERS-P18: `atomic_write` rejects a path with no parent directory. + #[test] + fn atomic_write_rejects_path_without_parent() { + let err = atomic_write(Path::new("/"), b"x").expect_err("root path has no parent"); + assert!( + matches!(err, ProcessError::Internal(ref m) if m.contains("no parent")), + "expected internal 'no parent', got {err:?}" + ); + } + + /// PERS-P19: `RegisteredProcess` equality is field-wise (backs identity + /// comparison + dedup). Identical structs are equal; a single field change + /// makes them unequal. + #[test] + fn registered_process_equality_is_fieldwise() { + let a = proc(1, 10, 0xA); + let b = proc(1, 10, 0xA); + assert_eq!(a, b); + let mut c = a.clone(); + c.start_time_ticks = Some(11); + assert_ne!(a, c, "differing start_time makes rows unequal"); + } + + /// PERS-P4: `read_file` on a missing registry returns the empty default + /// (NotFound is the normal first-run case, never an error). + #[test] + fn read_file_missing_returns_empty_default() { + let (_tmp, store) = store(); + // FileRegistryStore::new does not create the file; read_all → empty. + assert!( + store.read_all().unwrap().is_empty(), + "absent registry reads as empty, not error" + ); + } + + /// PERS-P7: serde tolerance — a row JSON omitting the optional fields + /// (pgid / start_time_ticks / containment_id / opaque_owner_tag) deserializes + /// with the documented defaults, and re-serializing skips the empty ones + /// (skip_serializing_if). This is what lets older/hand-edited registries load. + #[test] + fn registered_process_serde_defaults_and_skips() { + let json = r#"{ "pid": 42, "instance_epoch": "00000000-0000-0000-0000-00000000000a", "machine_id": "m", "registered_at_ms": 0 }"#; + let row: RegisteredProcess = serde_json::from_str(json).expect("minimal row must deserialize"); + assert_eq!(row.pid, 42); + assert_eq!(row.pgid, None); + assert_eq!(row.start_time_ticks, None); + assert_eq!(row.containment_id, None); + assert_eq!(row.opaque_owner_tag, "", "missing owner tag defaults empty"); + + // Re-serialize: the None/empty optionals must be omitted from output. + let out = serde_json::to_string(&row).unwrap(); + assert!(!out.contains("pgid"), "None pgid is skipped"); + assert!(!out.contains("start_time_ticks"), "None start_time is skipped"); + assert!(!out.contains("containment_id"), "None containment is skipped"); + assert!(!out.contains("opaque_owner_tag"), "empty owner tag is skipped"); + // Required fields always present. + assert!(out.contains("\"pid\":42") && out.contains("instance_epoch") && out.contains("machine_id")); + } + + /// PERS-P2 / PERS-P16: `FileRegistryStore::new` creates its subdir and sweeps + /// THIS process's stray atomic-write temps (`.registry.json..*.tmp`) + /// while leaving a sibling's in-flight temp and `.corrupt.` forensics intact. + #[test] + fn new_creates_subdir_and_sweeps_only_own_pid_temps() { + let tmp = TempDir::new().unwrap(); + let dir = tmp.path().join(SUBDIR); + std::fs::create_dir_all(&dir).unwrap(); + let our = std::process::id(); + let own_temp = dir.join(format!(".{REGISTRY_FILE}.{our}.7.tmp")); + let sibling_temp = dir.join(format!(".{REGISTRY_FILE}.{}.3.tmp", our.wrapping_add(1))); + let corrupt = dir.join(format!(".{REGISTRY_FILE}.corrupt.{our}.1")); + std::fs::write(&own_temp, b"stale").unwrap(); + std::fs::write(&sibling_temp, b"in-flight").unwrap(); + std::fs::write(&corrupt, b"forensics").unwrap(); + + // new() runs the sweep on construction. + let _store = FileRegistryStore::new(tmp.path()); + + assert!(!own_temp.exists(), "our own stray temp is swept"); + assert!( + sibling_temp.exists(), + "a SIBLING's in-flight temp is NEVER swept (would lose its row)" + ); + assert!(corrupt.exists(), "a .corrupt. quarantine file is preserved (forensics)"); + } + + /// PERS-P16: `new` is cheap and side-effect-light — it does NOT eagerly + /// create the subdir (the sweep tolerates an absent dir), and `path()` points + /// into the namespaced subdir. The dir is created lazily on the first write + /// (`atomic_write` → `create_dir_all`), which `record` then materializes. + #[test] + fn new_is_lazy_then_record_materializes_subdir() { + let tmp = TempDir::new().unwrap(); + let store = FileRegistryStore::new(tmp.path()); + assert_eq!( + store.path(), + tmp.path().join(SUBDIR).join(REGISTRY_FILE), + "path() is under the namespaced subdir" + ); + // The first durable write creates the subdir + file. + store.record(proc(1, 10, 0xA)).unwrap(); + assert!(tmp.path().join(SUBDIR).is_dir(), "subdir materialized on first record"); + assert!(store.path().is_file(), "registry.json written on first record"); + } +} diff --git a/crates/aionui-process/src/spawner.rs b/crates/aionui-process/src/spawner.rs new file mode 100644 index 00000000..29504d4b --- /dev/null +++ b/crates/aionui-process/src/spawner.rs @@ -0,0 +1,159 @@ +//! `Spawner` — the assembly entry that wires spawn → containment → registry. +//! +//! Record-once (design Decision 2): we spawn FIRST, get the real pid + identity, +//! then `record` a single complete row — no placeholder pid, no backfill window. +//! The crash window before `record` is acceptable: a process spawned but not yet +//! recorded is simply not reaped on the next run (it leaks at most one), which is +//! strictly safer than recording a placeholder we might group-kill. + +use std::path::PathBuf; +use std::sync::Arc; + +use aionui_common::CommandSpec; +use uuid::Uuid; + +use crate::registry_store::{ProcessIdentity, RegisteredProcess, RegistryStore}; +use crate::{ManagedProcess, ProcessError}; + +/// Spawns subprocesses and registers them so the supervisor can reap our own +/// orphans after a crash. +#[async_trait::async_trait] +pub trait Spawner: Send + Sync { + /// Spawn `spec`, place it under containment, and record it. `extra_env` is + /// applied per-child (IC-5). `opaque_owner_tag` is stored verbatim and + /// never parsed by this layer (it's the caller's conversation id etc). + async fn spawn( + &self, + spec: CommandSpec, + extra_env: &[(String, String)], + opaque_owner_tag: &str, + ) -> Result, ProcessError>; +} + +/// Production spawner backed by the file registry. Holds this run's identity +/// (epoch + machine_id) so every recorded row is identity-gateable on reap. +pub struct RealSpawner { + registry: Arc, + instance_epoch: Uuid, + machine_id: String, +} + +impl RealSpawner { + pub fn new(registry: Arc, instance_epoch: Uuid, machine_id: impl Into) -> Self { + Self { + registry, + instance_epoch, + machine_id: machine_id.into(), + } + } +} + +#[async_trait::async_trait] +impl Spawner for RealSpawner { + async fn spawn( + &self, + spec: CommandSpec, + extra_env: &[(String, String)], + opaque_owner_tag: &str, + ) -> Result, ProcessError> { + // Spawn first — the child is already its own process-group leader + // (Builder process_group(0)), so it is contained from birth. + let proc = ManagedProcess::spawn(spec, extra_env).await?; + let pid = proc.pid(); + let pgid = proc.process_group_id(); + + // Read start-time ONCE, while the child is freshly alive, and reuse it + // for BOTH the recorded row and the dereg identity below. Re-reading it + // at dereg time would race the process's exit (→ None / different value) + // and the identity would no longer match the recorded row, so dereg + // would silently fail (the bug the F38 test caught). + let start_time_ticks = crate::read_process_start_time(pid); + + // record-once with the real pid + identity (no placeholder). + let entry = RegisteredProcess { + pid, + pgid, + start_time_ticks, + instance_epoch: self.instance_epoch, + machine_id: self.machine_id.clone(), + // The pgid IS the containment id for the process-group tier. + containment_id: pgid.map(|g| g.to_string()), + opaque_owner_tag: opaque_owner_tag.to_owned(), + registered_at_ms: aionui_common::now_ms(), + }; + if let Err(e) = self.registry.record(entry) { + // If we cannot durably record it, we must not leave a live child we + // can never reap: tear it down before surfacing the error + // (no live-child-without-durable-row). Surface a kill failure rather + // than silently swallowing it (F59) — but still return the original + // record error (that is the root cause the caller must see). + if let Err(kill_err) = proc.kill(std::time::Duration::from_millis(200)).await { + tracing::warn!(pid, error = %kill_err, "record failed AND teardown of the un-recorded child failed"); + } + return Err(e); + } + + // Steady-state dereg (F38): when the handle is dropped (turn end / clean + // exit), remove this registry row so registry.json does not grow + // unbounded for the parent's whole lifetime. The hook is opaque to + // ManagedProcess and offloads the file I/O to a blocking pool so Drop + // never stalls the (tokio worker) thread that releases the last Arc. + let registry = Arc::clone(&self.registry); + let identity = ProcessIdentity { + pid, + start_time_ticks, // SAME value recorded above — must match for dereg + instance_epoch: self.instance_epoch, + }; + proc.set_on_drop(Box::new(move || { + // We may be inside a tokio runtime (consumer worker) or not (tests). + // If a runtime is present, offload; otherwise do it inline (cheap in + // tests / non-async drop sites). + match tokio::runtime::Handle::try_current() { + Ok(handle) => { + handle.spawn_blocking(move || { + if let Err(e) = registry.unregister(&identity) { + tracing::warn!(error = %e, "failed to deregister process from registry on drop"); + } + }); + } + Err(_) => { + if let Err(e) = registry.unregister(&identity) { + tracing::warn!(error = %e, "failed to deregister process from registry on drop"); + } + } + } + })); + + Ok(Arc::new(proc)) + } +} + +/// A stable, NON-synced machine identifier for the cross-machine reap guard +/// (IC-5). Persisted in the OS-LOCAL cache dir (NOT `data_dir`, which may be +/// iCloud/Dropbox-synced — a synced id would defeat the guard). First call +/// mints + persists a random UUID; later calls read it back. +pub fn local_machine_id(os_local_cache_dir: &std::path::Path) -> String { + let path: PathBuf = os_local_cache_dir.join("aionui-process").join("machine-id"); + if let Ok(s) = std::fs::read_to_string(&path) { + let t = s.trim(); + if !t.is_empty() { + return t.to_owned(); + } + // Non-empty file existed but was blank/whitespace → a prior torn write. + // Re-minting here orphans every prior-machine registry row as foreign, + // silently disabling reaping (F40). Warn loudly so this is observable. + tracing::warn!( + path = %path.display(), + "machine-id file present but empty (likely a torn write); minting a NEW id — \ + prior registry rows from the old id will be treated as foreign and only pruned" + ); + } + let id = Uuid::new_v4().to_string(); + // Durable atomic write (F40): temp + fsync + rename, so a kill/disk-full + // mid-write can never leave the empty file that triggers the silent re-mint + // above. A bare `std::fs::write` is NOT atomic and was the root cause. + if let Err(e) = crate::registry_store::atomic_write(&path, id.as_bytes()) { + tracing::warn!(error = %e, "failed to durably persist machine-id; reap identity may not be stable across runs"); + } + id +} diff --git a/crates/aionui-process/src/supervisor/core.rs b/crates/aionui-process/src/supervisor/core.rs new file mode 100644 index 00000000..223a7b07 --- /dev/null +++ b/crates/aionui-process/src/supervisor/core.rs @@ -0,0 +1,271 @@ +//! Pure reconcile kernel — the reap decision brain (Tier-A exhaustible). +//! +//! No `.await`, no clock, no syscall: it takes an already-gathered +//! [`ObservedState`] and emits [`Action`]s. Every IC-1 safety rule lives here +//! as a pure predicate, so the dangerous "what do we kill" logic is unit-test +//! exhaustible and mutation-guardable. +//! +//! The whole point: a reap is emitted ONLY for a registry row that is a +//! prior-run orphan, on THIS machine, whose live process identity MATCHES what +//! we recorded — and only when we hold the single-instance lock. Everything +//! else prunes (removes the stale row) without killing. + +use uuid::Uuid; + +use crate::Liveness; +use crate::registry_store::RegisteredProcess; + +/// Whether the single-instance lock is held this run. Reaping is gated on it: +/// if a sibling instance holds it, we must NOT kill (its processes are live). +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum LockState { + Acquired, + HeldBySibling, +} + +/// Everything the kernel needs, gathered impurely by the caller. +pub struct ObservedState { + /// Registry rows read back from disk (this crate's own registry). + pub rows: Vec, + /// Liveness/identity classification per row pid (from proc_control::probe + /// + classify_liveness), keyed by index into `rows`. + pub liveness: Vec, + pub lock_state: LockState, + pub current_epoch: Uuid, + pub current_machine: String, +} + +/// A decision the runner must execute. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Action { + /// Group-SIGKILL a confirmed prior-run orphan, then prune its row. + /// `recorded_start_ticks` is carried so the runner can RE-VERIFY identity + /// immediately before the kill (F42 TOCTOU close): between `gather_observed` + /// (which classified this as Match) and the actual `force_kill`, the pid + /// could exit and be recycled. The runner re-probes and only kills if the + /// live start-time still matches this recorded value. + ReapOrphanGroup { + pid: u32, + pgid: Option, + recorded_start_ticks: Option, + }, + /// Kill the containment fence of a confirmed orphan (covers grandchildren). + ReapContainment { containment_id: String }, + /// Remove a stale/unkillable/foreign row without killing anything. + PruneRegistryEntry { pid: u32 }, +} + +/// The pure reap decision. For each row, decide reap-and-prune vs prune-only. +/// +/// Safety rules enforced here (IC-1): +/// - Lock not Acquired => emit NOTHING (never touch a live sibling's procs). +/// - Foreign machine_id => prune-only (cloud-synced data dir, design I-5). +/// - Own-epoch row => never a reap target (it's one of THIS run's, I-4). +/// - Liveness != Match => prune-only (recycled/unknown/eperm/gone, I-1). +/// - Only Match + prior-epoch + same-machine => ReapOrphanGroup (+ContainmentReap). +pub fn reconcile(observed: &ObservedState) -> Vec { + reconcile_with_capability(observed, crate::Capabilities::current().can_kill) +} + +/// Capability-gated reconcile (F58). `can_kill` is normally +/// `Capabilities::current().can_kill`; the seam exists so the "unknown platform +/// must never emit a kill it cannot perform" invariant is ENFORCED (not merely +/// emergent from `probe`→Unknown), and is unit-testable by passing `false`. +pub fn reconcile_with_capability(observed: &ObservedState, can_kill: bool) -> Vec { + // Gate 1: no lock => do not reap anything (IC-1 / I-3 lock-gate). + if observed.lock_state != LockState::Acquired { + return Vec::new(); + } + + let mut actions = Vec::new(); + for (row, &live) in observed.rows.iter().zip(observed.liveness.iter()) { + // Gate 2: foreign machine => prune-only (I-5). A pgid on machine B is + // meaningless/dangerous here. + if row.machine_id != observed.current_machine { + actions.push(Action::PruneRegistryEntry { pid: row.pid }); + continue; + } + // Gate 3: identity. Only an identity-MATCHED, prior-epoch, live process + // is a reap target. classify_liveness already encodes "Match == alive + // && start-time matches && prior epoch"; DiffEpoch == our own run. + match live { + // Gate 0 (F58): only emit a kill if the platform can actually + // perform it. On a platform without a real `force_kill`, emitting + // ReapOrphanGroup would warn-and-swallow while the row is pruned and + // the live process forgotten — prune-and-forget. If we cannot kill, + // prune-only (safe), enforced here rather than relying on probe + // happening to return Unknown. + Liveness::Match if can_kill => { + // Reap the containment first (covers grandchildren), then the + // group, then prune. The runner executes in order. + if let Some(cid) = &row.containment_id { + actions.push(Action::ReapContainment { + containment_id: cid.clone(), + }); + } + actions.push(Action::ReapOrphanGroup { + pid: row.pid, + pgid: row.pgid, + recorded_start_ticks: row.start_time_ticks, + }); + actions.push(Action::PruneRegistryEntry { pid: row.pid }); + } + // Match but platform cannot kill → prune-only (F58 safe degrade). + Liveness::Match => { + actions.push(Action::PruneRegistryEntry { pid: row.pid }); + } + // Everything else: prune the stale row, never kill (I-1 negative). + Liveness::RecycledPid | Liveness::DiffEpoch | Liveness::Gone | Liveness::Unknown | Liveness::EpermAlive => { + actions.push(Action::PruneRegistryEntry { pid: row.pid }); + } + } + } + actions +} + +#[cfg(test)] +mod tests { + use super::*; + + const MACHINE: &str = "machine-this"; + + // The reconcile-level reap decision keys on `liveness` (already classified + // upstream by classify_liveness, which is itself table-tested); the row's + // own epoch is not re-read here. A prior-run epoch value keeps the row + // realistic, but the gate that matters is the passed-in Liveness. + const PRIOR_EPOCH: u128 = 0xA; + const CURRENT_EPOCH: u128 = 0xB; + + fn row(pid: u32) -> RegisteredProcess { + RegisteredProcess { + pid, + pgid: Some(pid), + start_time_ticks: Some(1000), + instance_epoch: Uuid::from_u128(PRIOR_EPOCH), + machine_id: MACHINE.to_string(), + containment_id: None, + opaque_owner_tag: String::new(), + registered_at_ms: 0, + } + } + + fn observed( + rows: Vec, + liveness: Vec, + lock: LockState, + machine: &str, + ) -> ObservedState { + ObservedState { + rows, + liveness, + lock_state: lock, + current_epoch: Uuid::from_u128(CURRENT_EPOCH), + current_machine: machine.to_string(), + } + } + + fn is_kill(a: &Action) -> bool { + matches!(a, Action::ReapOrphanGroup { .. }) + } + + /// Gate 1 (IC-1/I-3): no lock acquired → emit NOTHING (never touch a live + /// sibling's processes). Even a perfect Match row must produce zero actions. + #[test] + fn gate1_no_lock_emits_nothing() { + let r = row(100); + let st = observed(vec![r], vec![Liveness::Match], LockState::HeldBySibling, MACHINE); + assert!( + reconcile_with_capability(&st, true).is_empty(), + "no lock → no actions at all" + ); + } + + /// Gate 2 (I-5): foreign machine_id → prune-only, NEVER kill (cloud-synced + /// data dir; a pgid from machine B is meaningless/dangerous here). + #[test] + fn gate2_foreign_machine_prune_only() { + let mut r = row(100); + r.machine_id = "machine-OTHER".into(); + let st = observed(vec![r], vec![Liveness::Match], LockState::Acquired, MACHINE); + let acts = reconcile_with_capability(&st, true); + assert_eq!( + acts, + vec![Action::PruneRegistryEntry { pid: 100 }], + "foreign machine → prune-only" + ); + assert!(!acts.iter().any(is_kill), "must NOT kill a foreign-machine row"); + } + + /// Gate 0 (F58): platform cannot kill → Match degrades to prune-only (never + /// emit a ReapOrphanGroup the platform will warn-and-swallow = prune-and-forget). + #[test] + fn gate0_cannot_kill_match_degrades_to_prune() { + let st = observed(vec![row(100)], vec![Liveness::Match], LockState::Acquired, MACHINE); + let acts = reconcile_with_capability(&st, false); + assert_eq!( + acts, + vec![Action::PruneRegistryEntry { pid: 100 }], + "can_kill=false → prune-only even on Match" + ); + } + + /// Gate 3 (I-1 negative): every non-Match liveness → prune-only, never kill. + #[test] + fn gate3_non_match_never_kills() { + for live in [ + Liveness::RecycledPid, + Liveness::DiffEpoch, + Liveness::Gone, + Liveness::Unknown, + Liveness::EpermAlive, + ] { + let st = observed(vec![row(100)], vec![live], LockState::Acquired, MACHINE); + let acts = reconcile_with_capability(&st, true); + assert!(!acts.iter().any(is_kill), "{live:?} must never produce a kill"); + assert_eq!( + acts, + vec![Action::PruneRegistryEntry { pid: 100 }], + "{live:?} → prune-only" + ); + } + } + + /// The ONLY reapable path: lock acquired + same machine + Match + can_kill → + /// ReapOrphanGroup (carrying recorded_start_ticks for the runner's F42 + /// re-verify) THEN prune, in order. + #[test] + fn only_match_same_machine_locked_reaps() { + let st = observed(vec![row(100)], vec![Liveness::Match], LockState::Acquired, MACHINE); + let acts = reconcile_with_capability(&st, true); + assert_eq!( + acts, + vec![ + Action::ReapOrphanGroup { + pid: 100, + pgid: Some(100), + recorded_start_ticks: Some(1000), + }, + Action::PruneRegistryEntry { pid: 100 }, + ], + "reap-then-prune in order, start-ticks carried for TOCTOU re-verify" + ); + } + + /// Containment fence is reaped BEFORE the group (covers grandchildren). + #[test] + fn match_with_containment_reaps_fence_first() { + let mut r = row(100); + r.containment_id = Some("job-xyz".into()); + let st = observed(vec![r], vec![Liveness::Match], LockState::Acquired, MACHINE); + let acts = reconcile_with_capability(&st, true); + assert_eq!( + acts[0], + Action::ReapContainment { + containment_id: "job-xyz".into() + }, + "containment first" + ); + assert!(matches!(acts[1], Action::ReapOrphanGroup { .. }), "then group"); + assert_eq!(acts[2], Action::PruneRegistryEntry { pid: 100 }, "then prune"); + } +} diff --git a/crates/aionui-process/src/supervisor/mod.rs b/crates/aionui-process/src/supervisor/mod.rs new file mode 100644 index 00000000..998c15ba --- /dev/null +++ b/crates/aionui-process/src/supervisor/mod.rs @@ -0,0 +1,315 @@ +//! Supervisor — the runner around the pure [`core::reconcile`] kernel. +//! +//! Gathers `ObservedState` (read registry + probe each row's live identity + +//! lock state), calls the pure kernel, and executes the returned `Action`s +//! (group-kill + container-kill + prune). This is the impure shell; all the +//! dangerous decisions live in the pure kernel so they stay exhaustible. + +mod core; + +pub use core::{Action, LockState, ObservedState, reconcile, reconcile_with_capability}; + +use uuid::Uuid; + +use crate::Liveness; +use crate::proc_control::{self, ObservedLiveness, current_process_group}; +use crate::registry_store::{ProcessIdentity, RegistryStore}; + +/// Gather the observed state for a startup reap: read every registry row and +/// probe its live identity against the recorded start-time + epoch. +pub fn gather_observed( + registry: &S, + lock_state: LockState, + current_epoch: Uuid, + current_machine: &str, +) -> Result { + let rows = registry.read_all()?; + let liveness = rows + .iter() + .map(|r| { + let observed: ObservedLiveness = proc_control::probe(r.pid); + proc_control::classify_liveness(r.start_time_ticks, observed, r.instance_epoch, current_epoch) + }) + .collect::>(); + Ok(ObservedState { + rows, + liveness, + lock_state, + current_epoch, + current_machine: current_machine.to_owned(), + }) +} + +/// Execute the reconcile actions: kill confirmed orphans, prune stale rows. +/// Best-effort and warn-logged — a single failure never aborts the rest. +pub fn execute_actions(actions: &[Action], registry: &S) { + for action in actions { + match action { + Action::ReapOrphanGroup { + pid, + pgid, + recorded_start_ticks, + } => { + // F42: RE-VERIFY identity immediately before the kill. reconcile + // classified this row as Match using a start-time read back in + // gather_observed; the pid could have exited and been recycled + // onto an innocent process in the window. Re-probe NOW and only + // kill if the live start-time still matches what we recorded. + let observed = proc_control::probe(*pid); + let still_ours = matches!( + observed, + ObservedLiveness::Alive { start_ticks } if start_ticks == *recorded_start_ticks && start_ticks.is_some() + ); + // Self-group guard (critic category): never group-kill a pgid + // the REAPER itself belongs to — that would SIGKILL ourselves. + let our_pgid = current_process_group(); + let targets_own_group = matches!((*pgid, our_pgid), (Some(p), Some(o)) if p == o); + if !still_ours { + tracing::warn!( + pid, + "skipping reap: identity changed between gather and kill (pid exited/recycled) — never kill on doubt (F42)" + ); + } else if targets_own_group { + tracing::error!( + pid, + ?pgid, + "refusing to reap: target pgid is the reaper's OWN process group (would self-SIGKILL)" + ); + } else if let Err(e) = proc_control::force_kill(*pid, *pgid) { + tracing::warn!(pid, ?pgid, error = %e, "reap orphan group failed"); + } else { + // Post-reap verification (critic category): SIGKILL is async; + // confirm the group is actually gone rather than asserting + // success. A still-alive group after the kill = an escaped + // (setsid) grandchild — log it honestly (DegradedBestEffort), + // do not pretend it was fully reaped. + if proc_control::process_group_alive(*pgid) { + tracing::warn!( + pid, + ?pgid, + "reap issued but group still alive (likely a setsid-escaped grandchild) — degraded, not confirmed gone" + ); + } + } + } + Action::ReapContainment { containment_id } => { + // Containment teardown for grandchildren is owned by the + // caller that created it; here we only have the id recorded. + // The group-kill above already covers same-group descendants; + // a future cgroup/JobObject tier would act on this id. + tracing::debug!( + containment_id, + "reap containment (process-group tier: covered by group kill)" + ); + } + Action::PruneRegistryEntry { pid } => { + // Prune by pid is sufficient here: the row is stale/foreign/ + // dead. We remove every identity sharing this pid in our own + // registry (there can only be our own rows). + if let Err(e) = prune_pid(registry, *pid) { + tracing::warn!(pid, error = %e, "prune registry entry failed"); + } + } + } + } +} + +fn prune_pid(registry: &S, pid: u32) -> Result<(), crate::ProcessError> { + // Find the row(s) with this pid and unregister by full identity. + for r in registry.read_all()? { + if r.pid == pid { + registry.unregister(&ProcessIdentity { + pid: r.pid, + start_time_ticks: r.start_time_ticks, + instance_epoch: r.instance_epoch, + })?; + } + } + Ok(()) +} + +/// One-shot startup reap: gather → reconcile → execute. Returns the actions +/// taken (for logging/testing). +pub fn run_startup_reap( + registry: &S, + lock_state: LockState, + current_epoch: Uuid, + current_machine: &str, +) -> Result, crate::ProcessError> { + let observed = gather_observed(registry, lock_state, current_epoch, current_machine)?; + let actions = reconcile(&observed); + execute_actions(&actions, registry); + Ok(actions) +} + +#[cfg(test)] +mod tests { + //! Tests for the impure reap shell. The pure decision kernel + //! (`core::reconcile`) is exhaustively table-tested in `core.rs`; here we + //! cover the SHELL that executes its `Action`s — specifically the registry + //! mutation paths that need no real OS process and so are testable with an + //! in-memory mock store. + //! + //! The `ReapOrphanGroup` kill path (F42 TOCTOU re-verify + self-group guard) + //! calls `proc_control::probe` / `current_process_group` directly (no seam), + //! so it is real-process-only and lives elsewhere (see audit REAP-C8/C9). + + use std::sync::Mutex; + + use uuid::Uuid; + + use super::*; + use crate::ProcessError; + use crate::registry_store::{ProcessIdentity, RegisteredProcess, RegistryStore}; + + /// In-memory `RegistryStore` for shell tests: records every call and can be + /// armed to fail a chosen method, so best-effort/error-propagation contracts + /// are observable without touching disk. + #[derive(Default)] + struct MockRegistry { + rows: Mutex>, + unregister_calls: Mutex>, + fail_read_all: bool, + fail_unregister: bool, + } + + impl MockRegistry { + fn with_rows(rows: Vec) -> Self { + Self { + rows: Mutex::new(rows), + ..Self::default() + } + } + } + + impl RegistryStore for MockRegistry { + fn record(&self, entry: RegisteredProcess) -> Result<(), ProcessError> { + self.rows.lock().unwrap().push(entry); + Ok(()) + } + + fn unregister(&self, id: &ProcessIdentity) -> Result<(), ProcessError> { + self.unregister_calls.lock().unwrap().push(id.clone()); + if self.fail_unregister { + return Err(ProcessError::internal("injected unregister failure")); + } + self.rows.lock().unwrap().retain(|r| { + r.pid != id.pid || r.start_time_ticks != id.start_time_ticks || r.instance_epoch != id.instance_epoch + }); + Ok(()) + } + + fn read_all(&self) -> Result, ProcessError> { + if self.fail_read_all { + return Err(ProcessError::internal("injected read_all failure")); + } + Ok(self.rows.lock().unwrap().clone()) + } + } + + fn row(pid: u32, start: u64, epoch: u128) -> RegisteredProcess { + RegisteredProcess { + pid, + pgid: Some(pid), + start_time_ticks: Some(start), + instance_epoch: Uuid::from_u128(epoch), + machine_id: "machine-this".to_string(), + containment_id: None, + opaque_owner_tag: String::new(), + registered_at_ms: 0, + } + } + + /// REAP-C13: `PruneRegistryEntry` unregisters ONLY the row(s) matching the + /// action's pid, by FULL identity (pid + start_time + epoch), leaving every + /// other row untouched. + #[test] + fn prune_action_unregisters_only_the_matching_pid_by_full_identity() { + let reg = MockRegistry::with_rows(vec![row(10, 1000, 0xA), row(20, 2000, 0xA), row(30, 3000, 0xA)]); + + execute_actions(&[Action::PruneRegistryEntry { pid: 20 }], ®); + + // Exactly one unregister, carrying pid 20's full identity. + let calls = reg.unregister_calls.lock().unwrap(); + assert_eq!(calls.len(), 1, "prune must unregister exactly the matching row"); + assert_eq!( + calls[0], + ProcessIdentity { + pid: 20, + start_time_ticks: Some(2000), + instance_epoch: Uuid::from_u128(0xA), + }, + "unregister must carry the matched row's FULL identity, not a bare pid" + ); + // Rows 10 and 30 survive; 20 is gone. + let remaining: Vec = reg.rows.lock().unwrap().iter().map(|r| r.pid).collect(); + assert_eq!(remaining, vec![10, 30], "only the pruned pid is removed"); + } + + /// REAP-C14: `execute_actions` is best-effort — a failing `unregister` is + /// warn-logged and the loop CONTINUES to the next action (no panic, no early + /// abort). Here a failing prune for pid 10 must not stop the prune for pid 20. + #[test] + fn execute_actions_continues_after_a_failing_prune() { + let reg = MockRegistry { + rows: Mutex::new(vec![row(10, 1000, 0xA), row(20, 2000, 0xA)]), + fail_unregister: true, + ..MockRegistry::default() + }; + + // Two prune actions; the first fails (injected). The call must not panic + // and must still attempt the second. + execute_actions( + &[ + Action::PruneRegistryEntry { pid: 10 }, + Action::PruneRegistryEntry { pid: 20 }, + ], + ®, + ); + + let calls = reg.unregister_calls.lock().unwrap(); + let pids: Vec = calls.iter().map(|c| c.pid).collect(); + assert!( + pids.contains(&10) && pids.contains(&20), + "both prunes attempted despite the first failing (best-effort, no abort); saw {pids:?}" + ); + } + + /// REAP-C16: `run_startup_reap` propagates a gather failure. If `read_all` + /// (inside `gather_observed`) errors, the function returns `Err` and NEVER + /// reaches `execute_actions` (no rows are mutated). + #[test] + fn run_startup_reap_propagates_gather_failure_without_executing() { + let reg = MockRegistry { + rows: Mutex::new(vec![row(10, 1000, 0xA)]), + fail_read_all: true, + ..MockRegistry::default() + }; + + let result = run_startup_reap(®, LockState::Acquired, Uuid::from_u128(0xB), "machine-this"); + + assert!(result.is_err(), "a read_all failure during gather must surface as Err"); + // execute_actions never ran → no unregister attempted. + assert!( + reg.unregister_calls.lock().unwrap().is_empty(), + "gather failure must short-circuit BEFORE execute_actions (no mutation)" + ); + } + + /// Companion to REAP-C16: when the lock is NOT held, `reconcile` emits no + /// actions, so a successful gather still mutates nothing — `run_startup_reap` + /// returns an empty action list and the registry is untouched. + #[test] + fn run_startup_reap_without_lock_takes_no_actions() { + let reg = MockRegistry::with_rows(vec![row(10, 1000, 0xA)]); + + let actions = run_startup_reap(®, LockState::HeldBySibling, Uuid::from_u128(0xB), "machine-this") + .expect("gather succeeds"); + + assert!(actions.is_empty(), "no lock → no actions (gate 1)"); + assert!( + reg.unregister_calls.lock().unwrap().is_empty(), + "no actions → no registry mutation" + ); + } +}