From 1ec28e1ffc6b98b16efc714e1ae6db3ce53e70d8 Mon Sep 17 00:00:00 2001 From: ben Date: Mon, 11 May 2026 22:13:37 +0200 Subject: [PATCH] feat(env): add env record with host/NUMA/affinity for reproducibility Adds a one-shot env record at the head of JSONL output capturing host, kernel, CPU topology, NUMA distances/sizes, inherited CPU affinity, and reproducibility-critical knobs (governor, frequency, THP, SMT, cgroup). Enables comparing benchmark runs across machines or affinity settings without external bookkeeping. Schema: every record now carries a "kind" discriminator (env, metadata, sample, tree) via an internally-tagged enum. Legacy untagged files remain readable via a fallback path in the file reader, so existing stats/summary workflows are unaffected. - new EnvRecord collector reads /proc/cpuinfo, /sys/devices/system/cpu/*, /sys/devices/system/node/*, sched_getaffinity, and cpufreq/THP/SMT/cgroup - new Record enum and tagged_json() helper in src/monitor/record.rs - CLI: --write-env flag in src/bin/denet.rs - Python: write_env kwarg and monitor.get_env() in src/python.rs - ProcessMonitor::get_env() exposed for direct use - Reader updated to try tagged Record first, fall back to untagged - 13 new tests covering parsers, range compression, Record roundtrip, back-compat with untagged lines, and a Linux smoke test - Documented in docs/data-format.md and docs/python-api.md --- Cargo.lock | 2 +- docs/data-format.md | 58 ++++- docs/python-api.md | 12 + src/bin/denet.rs | 31 ++- src/config.rs | 19 ++ src/core/process_monitor.rs | 62 +++-- src/monitor/env.rs | 500 ++++++++++++++++++++++++++++++++++++ src/monitor/mod.rs | 4 + src/monitor/record.rs | 194 ++++++++++++++ src/python.rs | 62 ++++- tests/env_tests.rs | 171 ++++++++++++ 11 files changed, 1066 insertions(+), 49 deletions(-) create mode 100644 src/monitor/env.rs create mode 100644 src/monitor/record.rs create mode 100644 tests/env_tests.rs diff --git a/Cargo.lock b/Cargo.lock index baa2ed0..083d414 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -376,7 +376,7 @@ dependencies = [ [[package]] name = "denet" -version = "0.6.1" +version = "0.6.2" dependencies = [ "aya", "aya-log", diff --git a/docs/data-format.md b/docs/data-format.md index 7142f62..debc9b2 100644 --- a/docs/data-format.md +++ b/docs/data-format.md @@ -4,23 +4,59 @@ Denet outputs JSON in a streaming format optimized for efficiency and time-serie ## Format Structure -**First line**: Process metadata (emitted once) +Every line carries a `"kind"` discriminator so downstream tooling can dispatch by type. The possible values are `env`, `metadata`, `sample`, and `tree`. + +**Optional first line** (when `--write-env` is set): host/NUMA/affinity snapshot, emitted once. +```json +{"kind":"env","ts_ms":1748542000000,"host":"omnibenchmark","kernel":"6.18.7-...","lscpu":{...},"numa":{...},"affinity_inherited":"0-127", ...} +``` + +**Header line**: Process metadata, emitted once. ```json -{"pid": 1234, "cmd": ["sleep", "5"], "exe": "/usr/bin/sleep", "t0_ms": 1748542000000} +{"kind":"metadata","pid":1234,"cmd":["sleep","5"],"executable":"/usr/bin/sleep","t0_ms":1748542000000} ``` -**Subsequent lines**: Process tree metrics (streamed continuously) +**Subsequent lines**: Process tree metrics, streamed continuously. ```json -{"ts_ms": 1748542001000, "parent": {...}, "children": [...], "aggregated": {...}} +{"kind":"tree","ts_ms":1748542001000,"parent":{...},"children":[...],"aggregated":{...}} ``` +Single-process mode (`--exclude-children`) emits `{"kind":"sample",...}` records instead. + +> **Back-compat:** files written before the `kind` field existed are still readable by the `stats` / `summary` subcommands and by the Python reader — parsers fall back to the legacy untagged shapes when no `kind` is present. + +## Env Record (reproducibility snapshot) + +Enabled with `--write-env` on the CLI or `write_env=True` in the Python binding. Captured once at the start of monitoring; useful for benchmark reproducibility (NUMA placement, CPU governor, hyperthreading, cgroup limits). + +| Field | Type | Description | +|-------|------|-------------| +| `ts_ms` | number | Capture timestamp (Unix milliseconds) | +| `host` | string | Hostname (`/proc/sys/kernel/hostname`) | +| `kernel` | string | Kernel release (`/proc/sys/kernel/osrelease`) | +| `lscpu.sockets` | number | Physical sockets | +| `lscpu.cores_per_socket` | number | Cores per socket | +| `lscpu.threads_per_core` | number | Threads per core (SMT siblings) | +| `lscpu.model` | string | First `model name` from `/proc/cpuinfo` | +| `numa.nodes` | number | NUMA node count | +| `numa.distances` | number[][] | Square distance matrix from `/sys/.../node*/distance` | +| `numa.node_sizes_mb` | number[] | `MemTotal` per node, in MB | +| `affinity_inherited` | string | CPU affinity of the monitor process as a range list (e.g. `"0-3,7-9"`) | +| `cpu_governor` | string[]? | `scaling_governor` per CPU (omitted if cpufreq is unavailable) | +| `cpu_freq_khz` | number[]? | `scaling_cur_freq` per CPU | +| `thp_enabled` | string? | `/sys/kernel/mm/transparent_hugepage/enabled` raw value | +| `smt_active` | bool? | `/sys/devices/system/cpu/smt/active` | +| `cgroup` | string? | `/proc//cgroup` of the monitored process | + +Optional fields degrade to `null`/absent on kernels, distros, or containers where the source file is missing. On non-Linux platforms only `ts_ms`/`host`/`kernel` are populated. + ## Metadata Fields | Field | Type | Description | |-------|------|-------------| | `pid` | number | Process ID | | `cmd` | string[] | Command line arguments | -| `exe` | string | Executable path | +| `executable` | string | Executable path | | `t0_ms` | number | Process start time (Unix milliseconds) | | `capabilities` | object? | Manifest of optional metric sources detected at startup. See below. | @@ -94,22 +130,24 @@ Includes all fields from Individual Process Metrics plus: - **`--nodump`**: Disable automatic JSON dump to `out.json` - **`--out FILE`**: Write JSON output to specified file - **`--stats FILE`**: Write summary statistics to specified file +- **`--write-env`**: Prepend a one-shot `env` record (host/NUMA/affinity/governor/THP/SMT/cgroup) for reproducibility ## Example Complete Record The output is [JSON Lines](https://jsonlines.org/) — one JSON object per line, newline-delimited. Each line is a self-contained record and can be parsed independently (e.g. with `jq`). -The **first line is a header** containing process metadata (pid, command, `t0_ms`). All subsequent lines are metric samples. +Lines are tagged with `"kind"`. When `--write-env` is set, an `env` header precedes the `metadata` header; otherwise the file starts at `metadata`. All subsequent lines are `sample` (single-process) or `tree` (process tree, default). Timestamps use Unix milliseconds (ms since 1970-01-01 00:00:00 UTC): -- `t0_ms`: process start time, in the header line only -- `ts_ms`: sample timestamp in every metrics line +- `t0_ms`: process start time, in the `metadata` line only +- `ts_ms`: capture/sample timestamp in every other line To get the elapsed time of a sample relative to process start: `elapsed_ms = ts_ms - t0_ms`. ```json -{"pid":1234,"cmd":["python","script.py"],"exe":"/usr/bin/python3","t0_ms":1748542000000} -{"ts_ms":1748542001000,"parent":{"ts_ms":1748542001050,"cpu_usage":15.2,"mem_rss_kb":8192,"mem_vms_kb":32768,"disk_read_bytes":1024,"disk_write_bytes":2048,"net_rx_bytes":512,"net_tx_bytes":256,"thread_count":3,"uptime_secs":1},"children":[{"pid":1235,"command":"worker","metrics":{"ts_ms":1748542001060,"cpu_usage":5.1,"mem_rss_kb":4096,"mem_vms_kb":16384,"disk_read_bytes":512,"disk_write_bytes":0,"net_rx_bytes":0,"net_tx_bytes":0,"thread_count":1,"uptime_secs":1}}],"aggregated":{"ts_ms":1748542001000,"cpu_usage":20.3,"mem_rss_kb":12288,"mem_vms_kb":49152,"disk_read_bytes":1536,"disk_write_bytes":2048,"net_rx_bytes":512,"net_tx_bytes":256,"thread_count":4,"process_count":2,"uptime_secs":1}} +{"kind":"env","ts_ms":1748542000000,"host":"omnibenchmark","kernel":"6.18.7-76061807-generic","lscpu":{"sockets":1,"cores_per_socket":64,"threads_per_core":2,"model":"AMD EPYC 7742 64-Core Processor"},"numa":{"nodes":4,"distances":[[10,12,12,12],[12,10,12,12],[12,12,10,12],[12,12,12,10]],"node_sizes_mb":[64272,64500,64500,64481]},"affinity_inherited":"0-127","cpu_governor":["performance"],"thp_enabled":"always [madvise] never","smt_active":true,"cgroup":"0::/user.slice"} +{"kind":"metadata","pid":1234,"cmd":["python","script.py"],"executable":"/usr/bin/python3","t0_ms":1748542000000} +{"kind":"tree","ts_ms":1748542001000,"parent":{"ts_ms":1748542001050,"cpu_usage":15.2,"mem_rss_kb":8192,"mem_vms_kb":32768,"disk_read_bytes":1024,"disk_write_bytes":2048,"sys_net_rx_bytes":512,"sys_net_tx_bytes":256,"thread_count":3,"uptime_secs":1},"children":[{"pid":1235,"command":"worker","metrics":{"ts_ms":1748542001060,"cpu_usage":5.1,"mem_rss_kb":4096,"mem_vms_kb":16384,"disk_read_bytes":512,"disk_write_bytes":0,"sys_net_rx_bytes":0,"sys_net_tx_bytes":0,"thread_count":1,"uptime_secs":1}}],"aggregated":{"ts_ms":1748542001000,"cpu_usage":20.3,"mem_rss_kb":12288,"mem_vms_kb":49152,"disk_read_bytes":1536,"disk_write_bytes":2048,"sys_net_rx_bytes":512,"sys_net_tx_bytes":256,"thread_count":4,"process_count":2,"uptime_secs":1}} ``` ## Statistics Output diff --git a/docs/python-api.md b/docs/python-api.md index 62d2710..e84c676 100644 --- a/docs/python-api.md +++ b/docs/python-api.md @@ -61,6 +61,7 @@ exit_code, monitor = denet.execute_with_monitoring( store_in_memory=True, # Store samples in memory output_file=None, # Optional file output write_metadata=False, # Write metadata as first line to output file (default False) + write_env=False, # Prepend a host/NUMA/affinity `env` record (default False) include_children=True # Monitor child processes (default True) ) @@ -111,6 +112,17 @@ exit_code, monitor = denet.execute_with_monitoring( write_metadata=True # Includes metadata as first line: {"pid": 1234, "cmd": ["python", "script.py"], "executable": "/usr/bin/python", "t0_ms": 1625184000000} ) +# Capture host/NUMA/affinity reproducibility info as the very first line. +# Useful when comparing benchmark runs across machines or affinity settings. +exit_code, monitor = denet.execute_with_monitoring( + cmd=["python", "bench.py"], + output_file="metrics.jsonl", + write_env=True, + write_metadata=True, +) +# Or grab it as a string on demand: +env_line = monitor.get_env() # tagged JSON: {"kind":"env","host":...,"numa":{...},"affinity_inherited":"0-127",...} + # execute_with_monitoring also accepts subprocess.run arguments: exit_code, monitor = denet.execute_with_monitoring( cmd=["python", "script.py"], diff --git a/src/bin/denet.rs b/src/bin/denet.rs index 87fb873..6deb7b4 100644 --- a/src/bin/denet.rs +++ b/src/bin/denet.rs @@ -3,7 +3,7 @@ use colored::Colorize; #[cfg(feature = "ebpf")] use denet::ebpf::debug; use denet::error::Result; -use denet::monitor::{AggregatedMetrics, Metrics, Summary, SummaryGenerator}; +use denet::monitor::{tagged_json, AggregatedMetrics, Metrics, Summary, SummaryGenerator}; use denet::ProcessMonitor; use std::fs::File; use std::io::{self, Write}; @@ -71,6 +71,10 @@ struct Args { #[clap(long)] no_polling: bool, + /// Write a host/NUMA/affinity `env` record before metadata (for reproducibility) + #[clap(long)] + write_env: bool, + #[command(subcommand)] command: Commands, } @@ -306,12 +310,23 @@ fn execute_monitoring_with_output( None }; + // Emit env record first (if requested) — captures host/NUMA/affinity for reproducibility. + if args.write_env { + let env_json = tagged_json("env", &monitor.get_env()).unwrap(); + if let Some(file) = &mut file_handles.out_file { + writeln!(file, "{env_json}")?; + } + if args.json && !args.quiet { + println!("{env_json}"); + } + } + // Get metadata let metadata = monitor.get_metadata(); - // Emit metadata first (always for files, only output to console if JSON mode) + // Emit metadata (always for files, only output to console if JSON mode) if let Some(metadata_ref) = &metadata { - let metadata_json = serde_json::to_string(&metadata_ref).unwrap(); + let metadata_json = tagged_json("metadata", metadata_ref).unwrap(); if let Some(file) = &mut file_handles.out_file { writeln!(file, "{metadata_json}")?; } @@ -346,7 +361,7 @@ fn execute_monitoring_with_output( let final_tree_metrics = monitor.sample_tree_metrics(); if args.json { - let json = serde_json::to_string(&final_tree_metrics).unwrap(); + let json = tagged_json("tree", &final_tree_metrics).unwrap(); println!("{json}"); } else if let Some(agg) = final_tree_metrics.aggregated { results.push(convert_aggregated_to_metrics(&agg)); @@ -375,7 +390,7 @@ fn execute_monitoring_with_output( // Format and display metrics if args.json { - let json = serde_json::to_string(&metrics).unwrap(); + let json = tagged_json("sample", &metrics).unwrap(); if let Some(file) = &mut file_handles.out_file { writeln!(file, "{json}")?; } @@ -401,7 +416,7 @@ fn execute_monitoring_with_output( } else { let formatted = format_metrics(&metrics); if let Some(file) = &mut file_handles.out_file { - writeln!(file, "{}", serde_json::to_string(&metrics).unwrap())?; + writeln!(file, "{}", tagged_json("sample", &metrics).unwrap())?; } if !args.quiet { if update_in_place { @@ -441,7 +456,7 @@ fn execute_monitoring_with_output( // Format and display tree metrics if args.json { - let json = serde_json::to_string(&tree_metrics).unwrap(); + let json = tagged_json("tree", &tree_metrics).unwrap(); if let Some(file) = &mut file_handles.out_file { writeln!(file, "{json}")?; } @@ -469,7 +484,7 @@ fn execute_monitoring_with_output( // Format and display tree metrics with parent and children let formatted = format_aggregated_metrics(agg_metrics); if let Some(file) = &mut file_handles.out_file { - writeln!(file, "{}", serde_json::to_string(&tree_metrics).unwrap())?; + writeln!(file, "{}", tagged_json("tree", &tree_metrics).unwrap())?; } if !args.quiet { if update_in_place { diff --git a/src/config.rs b/src/config.rs index 75999fd..8fb4b50 100644 --- a/src/config.rs +++ b/src/config.rs @@ -106,6 +106,9 @@ pub struct OutputConfig { pub update_in_place: bool, /// Whether to write metadata as first line when writing to file pub write_metadata: bool, + /// Whether to write an `env` (host/NUMA/affinity) record before metadata + /// when writing to file. Captured once at the start of monitoring. + pub write_env: bool, } impl Default for OutputConfig { @@ -117,6 +120,7 @@ impl Default for OutputConfig { quiet: false, update_in_place: true, write_metadata: false, + write_env: false, } } } @@ -210,6 +214,7 @@ pub struct OutputConfigBuilder { quiet: Option, update_in_place: Option, write_metadata: Option, + write_env: Option, } impl OutputConfigBuilder { @@ -248,6 +253,11 @@ impl OutputConfigBuilder { self } + pub fn write_env(mut self, write: bool) -> Self { + self.write_env = Some(write); + self + } + pub fn build(self) -> OutputConfig { OutputConfig { output_file: self.output_file, @@ -256,6 +266,7 @@ impl OutputConfigBuilder { quiet: self.quiet.unwrap_or(false), update_in_place: self.update_in_place.unwrap_or(true), write_metadata: self.write_metadata.unwrap_or(false), + write_env: self.write_env.unwrap_or(false), } } } @@ -481,6 +492,7 @@ mod tests { .quiet(true) .update_in_place(false) .write_metadata(true) + .write_env(true) .build(); assert_eq!(config.output_file, Some(PathBuf::from("output.json"))); @@ -489,6 +501,13 @@ mod tests { assert!(config.quiet); assert!(!config.update_in_place); assert!(config.write_metadata); + assert!(config.write_env); + } + + #[test] + fn test_output_config_write_env_default_false() { + let config = OutputConfigBuilder::default().build(); + assert!(!config.write_env); } #[test] diff --git a/src/core/process_monitor.rs b/src/core/process_monitor.rs index a23565a..a57f48f 100644 --- a/src/core/process_monitor.rs +++ b/src/core/process_monitor.rs @@ -169,32 +169,35 @@ pub fn summary_from_json_file>(path: P) -> io::Result { continue; } - // Try to parse as different types of metrics - if let Ok(agg_metric) = serde_json::from_str::(&line) { - // Got aggregated metrics - if first_timestamp.is_none() { - first_timestamp = Some(agg_metric.ts_ms); - } - last_timestamp = Some(agg_metric.ts_ms); - metrics_vec.push(agg_metric); - } else if let Ok(tree_metrics) = serde_json::from_str::(&line) { - // Got tree metrics, extract aggregated metrics if available - if let Some(agg) = tree_metrics.aggregated { + // Try the tagged Record schema first; fall back to legacy untagged + // shapes for files written before the `kind` discriminator existed. + match crate::monitor::record::parse_record(&line) { + Some(crate::monitor::record::Record::Aggregated(agg)) => { if first_timestamp.is_none() { first_timestamp = Some(agg.ts_ms); } last_timestamp = Some(agg.ts_ms); - metrics_vec.push(agg); + metrics_vec.push(*agg); } - } else if let Ok(metric) = serde_json::from_str::(&line) { - // Got regular metrics - if first_timestamp.is_none() { - first_timestamp = Some(metric.ts_ms); + Some(crate::monitor::record::Record::Tree(tree)) => { + if let Some(agg) = tree.aggregated { + if first_timestamp.is_none() { + first_timestamp = Some(agg.ts_ms); + } + last_timestamp = Some(agg.ts_ms); + metrics_vec.push(agg); + } } - last_timestamp = Some(metric.ts_ms); - regular_metrics.push(metric); + Some(crate::monitor::record::Record::Sample(metric)) => { + if first_timestamp.is_none() { + first_timestamp = Some(metric.ts_ms); + } + last_timestamp = Some(metric.ts_ms); + regular_metrics.push(metric); + } + // Env / Metadata / unknown: header records, no metric content. + _ => {} } - // Ignore metadata and other lines we can't parse } // Calculate total time @@ -858,6 +861,12 @@ impl ProcessMonitor { self.include_children } + /// Snapshot host/NUMA/affinity/governor state for the monitored PID. + /// One-shot, suitable for writing as the first JSONL line. + pub fn get_env(&self) -> crate::monitor::EnvRecord { + crate::monitor::EnvRecord::collect(self.pid as u32) + } + /// Returns metadata about the monitored process // Get process metadata (static information) pub fn get_metadata(&mut self) -> Option { @@ -2226,6 +2235,21 @@ mod tests { } } + #[test] + fn test_get_env_returns_record_for_running_process() { + // get_env is a one-shot wrapper; just verify it returns a record + // whose ts_ms is populated and host/kernel are non-empty on Linux. + let cmd = vec!["sleep".to_string(), "1".to_string()]; + let monitor = create_test_monitor(cmd).unwrap(); + let env = monitor.get_env(); + assert!(env.ts_ms > 0); + #[cfg(target_os = "linux")] + { + assert!(!env.host.is_empty()); + assert!(!env.kernel.is_empty()); + } + } + #[test] fn test_process_metadata() { use std::thread; diff --git a/src/monitor/env.rs b/src/monitor/env.rs new file mode 100644 index 0000000..ae221e0 --- /dev/null +++ b/src/monitor/env.rs @@ -0,0 +1,500 @@ +//! Host/NUMA/affinity environment snapshot for reproducibility. +//! +//! A one-shot `env` record captured at the start of monitoring. Fields are +//! best-effort: anything not readable (containers, non-x86, non-Linux) +//! degrades to `None` or empty. + +use serde::{Deserialize, Serialize}; +use std::fs; +use std::time::{SystemTime, UNIX_EPOCH}; + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct EnvRecord { + pub ts_ms: u64, + pub host: String, + pub kernel: String, + pub lscpu: LsCpu, + pub numa: Numa, + /// CPU affinity inherited by the monitoring process, as a range list + /// (e.g. "0-3,7-9"). Empty string if unknown. + pub affinity_inherited: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub cpu_governor: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub cpu_freq_khz: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub thp_enabled: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub smt_active: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub cgroup: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone, Default)] +pub struct LsCpu { + pub sockets: u32, + pub cores_per_socket: u32, + pub threads_per_core: u32, + pub model: String, +} + +#[derive(Serialize, Deserialize, Debug, Clone, Default)] +pub struct Numa { + pub nodes: u32, + pub distances: Vec>, + pub node_sizes_mb: Vec, +} + +impl EnvRecord { + /// Collect the environment snapshot. `pid` is used to look up cgroup + /// membership; pass the monitored PID (not the monitor's own PID). + pub fn collect(pid: u32) -> Self { + let ts_ms = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_millis() as u64) + .unwrap_or(0); + + Self { + ts_ms, + host: hostname(), + kernel: kernel_release(), + lscpu: lscpu(), + numa: numa(), + affinity_inherited: affinity_range_list(), + cpu_governor: read_cpu_attr("scaling_governor", |s| s.trim().to_string()), + cpu_freq_khz: read_cpu_attr("scaling_cur_freq", |s| s.trim().parse().ok()) + .map(|v: Vec>| v.into_iter().flatten().collect()), + thp_enabled: fs::read_to_string("/sys/kernel/mm/transparent_hugepage/enabled") + .ok() + .map(|s| s.trim().to_string()), + smt_active: fs::read_to_string("/sys/devices/system/cpu/smt/active") + .ok() + .and_then(|s| s.trim().parse::().ok().map(|n| n != 0)), + cgroup: fs::read_to_string(format!("/proc/{pid}/cgroup")) + .ok() + .map(|s| s.trim().to_string()), + } + } +} + +// ---------- low-level collectors ---------- + +fn hostname() -> String { + #[cfg(target_os = "linux")] + { + if let Ok(s) = fs::read_to_string("/proc/sys/kernel/hostname") { + return s.trim().to_string(); + } + } + String::new() +} + +fn kernel_release() -> String { + #[cfg(target_os = "linux")] + { + if let Ok(s) = fs::read_to_string("/proc/sys/kernel/osrelease") { + return s.trim().to_string(); + } + } + String::new() +} + +fn lscpu() -> LsCpu { + let model = fs::read_to_string("/proc/cpuinfo") + .ok() + .and_then(|s| parse_cpu_model(&s)) + .unwrap_or_default(); + + let (sockets, cores_per_socket, threads_per_core) = cpu_topology().unwrap_or((0, 0, 0)); + + LsCpu { + sockets, + cores_per_socket, + threads_per_core, + model, + } +} + +fn numa() -> Numa { + let mut node_ids: Vec = match fs::read_dir("/sys/devices/system/node") { + Ok(rd) => rd + .filter_map(|e| e.ok()) + .filter_map(|e| { + let n = e.file_name(); + let s = n.to_str()?; + s.strip_prefix("node")?.parse::().ok() + }) + .collect(), + Err(_) => Vec::new(), + }; + node_ids.sort_unstable(); + + let nodes = node_ids.len() as u32; + let distances: Vec> = node_ids + .iter() + .map(|id| { + fs::read_to_string(format!("/sys/devices/system/node/node{id}/distance")) + .ok() + .map(|s| parse_distance_row(&s)) + .unwrap_or_default() + }) + .collect(); + let node_sizes_mb: Vec = node_ids + .iter() + .map(|id| { + fs::read_to_string(format!("/sys/devices/system/node/node{id}/meminfo")) + .ok() + .and_then(|s| parse_node_memtotal_mb(&s)) + .unwrap_or(0) + }) + .collect(); + + Numa { + nodes, + distances, + node_sizes_mb, + } +} + +fn cpu_topology() -> Option<(u32, u32, u32)> { + use std::collections::BTreeSet; + let cpus = fs::read_dir("/sys/devices/system/cpu").ok()?; + let mut sockets: BTreeSet = BTreeSet::new(); + let mut cores_per_socket: std::collections::HashMap> = Default::default(); + let mut siblings_count: Option = None; + let mut total_cpus: u32 = 0; + + for entry in cpus.flatten() { + let name = entry.file_name(); + let name = match name.to_str() { + Some(s) => s.to_string(), + None => continue, + }; + if !name.starts_with("cpu") || !name[3..].chars().all(|c| c.is_ascii_digit()) { + continue; + } + let topo = entry.path().join("topology"); + let pkg = fs::read_to_string(topo.join("physical_package_id")) + .ok() + .and_then(|s| s.trim().parse::().ok()); + let core = fs::read_to_string(topo.join("core_id")) + .ok() + .and_then(|s| s.trim().parse::().ok()); + let sibs = fs::read_to_string(topo.join("thread_siblings_list")).ok(); + + if let (Some(p), Some(c)) = (pkg, core) { + sockets.insert(p); + cores_per_socket.entry(p).or_default().insert(c); + total_cpus += 1; + if siblings_count.is_none() { + if let Some(list) = sibs { + siblings_count = Some(count_range_list(list.trim()) as u32); + } + } + } + } + + if total_cpus == 0 { + return None; + } + let n_sockets = sockets.len() as u32; + let cores = cores_per_socket + .values() + .map(|s| s.len() as u32) + .max() + .unwrap_or(0); + let threads = siblings_count.unwrap_or(1).max(1); + Some((n_sockets, cores, threads)) +} + +/// Read a per-cpu sysfs attribute (e.g. cpufreq/scaling_governor) for every +/// online CPU. Returns None if the attribute is unreadable for cpu0. +fn read_cpu_attr(attr: &str, mut parse: F) -> Option> +where + F: FnMut(&str) -> T, +{ + let mut results = Vec::new(); + for cpu in 0u32.. { + let path = format!("/sys/devices/system/cpu/cpu{cpu}/cpufreq/{attr}"); + match fs::read_to_string(&path) { + Ok(s) => results.push(parse(&s)), + Err(_) => break, + } + } + if results.is_empty() { + None + } else { + Some(results) + } +} + +#[cfg(target_os = "linux")] +fn affinity_range_list() -> String { + let mut set = unsafe { std::mem::zeroed::() }; + let rc = + unsafe { libc::sched_getaffinity(0, std::mem::size_of::(), &mut set) }; + if rc != 0 { + return String::new(); + } + let max = libc::CPU_SETSIZE as usize; + let cpus: Vec = (0..max) + .filter(|&i| unsafe { libc::CPU_ISSET(i, &set) }) + .map(|i| i as u32) + .collect(); + format_range_list(&cpus) +} + +#[cfg(not(target_os = "linux"))] +fn affinity_range_list() -> String { + String::new() +} + +// ---------- pure parsers (unit-testable) ---------- + +/// Parse the first `model name` line out of /proc/cpuinfo content. +pub fn parse_cpu_model(cpuinfo: &str) -> Option { + for line in cpuinfo.lines() { + if let Some(rest) = line.strip_prefix("model name") { + let v = rest.trim_start_matches(|c: char| c == ':' || c.is_whitespace()); + return Some(v.trim().to_string()); + } + } + None +} + +/// Parse a `/sys/.../node{i}/distance` row: whitespace-separated u32s. +pub fn parse_distance_row(s: &str) -> Vec { + s.split_whitespace() + .filter_map(|t| t.parse::().ok()) + .collect() +} + +/// Parse `MemTotal:` (in kB) out of a `/sys/.../node{i}/meminfo` and return MB. +pub fn parse_node_memtotal_mb(meminfo: &str) -> Option { + for line in meminfo.lines() { + // Format: "Node 0 MemTotal: 65814528 kB" + let lower = line.to_ascii_lowercase(); + if let Some(idx) = lower.find("memtotal:") { + let rest = &line[idx + "memtotal:".len()..]; + let mut it = rest.split_whitespace(); + if let Some(kb) = it.next().and_then(|t| t.parse::().ok()) { + return Some(kb / 1024); + } + } + } + None +} + +/// Format a sorted-or-unsorted list of CPU ids as a range list: +/// `[0,1,2,3,7,8,9] -> "0-3,7-9"`. Empty input returns "". +pub fn format_range_list(cpus: &[u32]) -> String { + if cpus.is_empty() { + return String::new(); + } + let mut v = cpus.to_vec(); + v.sort_unstable(); + v.dedup(); + + let mut out = String::new(); + let mut start = v[0]; + let mut prev = v[0]; + for &n in &v[1..] { + if n == prev + 1 { + prev = n; + continue; + } + push_range(&mut out, start, prev); + start = n; + prev = n; + } + push_range(&mut out, start, prev); + out +} + +fn push_range(out: &mut String, start: u32, end: u32) { + if !out.is_empty() { + out.push(','); + } + if start == end { + out.push_str(&start.to_string()); + } else { + out.push_str(&format!("{start}-{end}")); + } +} + +/// Count the CPUs in a range-list string like "0,2-4,7" -> 5. +pub fn count_range_list(s: &str) -> usize { + let mut total = 0; + for part in s.split(',') { + let part = part.trim(); + if part.is_empty() { + continue; + } + if let Some((a, b)) = part.split_once('-') { + let (a, b) = (a.parse::().ok(), b.parse::().ok()); + if let (Some(a), Some(b)) = (a, b) { + if b >= a { + total += (b - a + 1) as usize; + } + } + } else if part.parse::().is_ok() { + total += 1; + } + } + total +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn affinity_range_compression_basic() { + assert_eq!(format_range_list(&[0, 1, 2, 3]), "0-3"); + assert_eq!(format_range_list(&[0, 1, 2, 3, 7, 8, 9]), "0-3,7-9"); + assert_eq!(format_range_list(&[5]), "5"); + assert_eq!(format_range_list(&[]), ""); + } + + #[test] + fn affinity_range_compression_unsorted_and_duplicates() { + assert_eq!(format_range_list(&[3, 0, 2, 1, 1]), "0-3"); + assert_eq!(format_range_list(&[7, 9, 8]), "7-9"); + } + + #[test] + fn affinity_range_compression_singletons_mixed() { + assert_eq!(format_range_list(&[0, 2, 4]), "0,2,4"); + assert_eq!(format_range_list(&[0, 1, 4, 5]), "0-1,4-5"); + } + + #[test] + fn count_range_list_roundtrip() { + assert_eq!(count_range_list("0-3,7-9"), 7); + assert_eq!(count_range_list("0,2,4"), 3); + assert_eq!(count_range_list("5"), 1); + assert_eq!(count_range_list(""), 0); + } + + #[test] + fn count_range_list_handles_malformed_parts() { + assert_eq!(count_range_list("0, ,2"), 2); + assert_eq!(count_range_list("5-3"), 0); // inverted range + assert_eq!(count_range_list("a-b,2"), 1); + } + + #[test] + fn numa_distance_parser_square_row() { + assert_eq!(parse_distance_row("10 12 12 12"), vec![10, 12, 12, 12]); + assert_eq!(parse_distance_row("10\n12\n12"), vec![10, 12, 12]); + assert_eq!(parse_distance_row(""), Vec::::new()); + } + + #[test] + fn meminfo_parses_memtotal_in_mb() { + let s = "Node 0 MemTotal: 65814528 kB\nNode 0 Other: 0 kB\n"; + assert_eq!(parse_node_memtotal_mb(s), Some(64272)); + } + + #[test] + fn meminfo_missing_returns_none() { + assert_eq!(parse_node_memtotal_mb("Node 0 Free: 1 kB"), None); + } + + #[test] + fn cpu_model_parsed_from_proc_cpuinfo() { + let s = "processor\t: 0\nvendor_id\t: AuthenticAMD\nmodel name\t: AMD EPYC 7742 64-Core Processor\ncache size\t: 512 KB\n"; + assert_eq!( + parse_cpu_model(s), + Some("AMD EPYC 7742 64-Core Processor".to_string()) + ); + } + + #[test] + fn cpu_model_missing_returns_none() { + assert_eq!(parse_cpu_model("processor: 0\n"), None); + } + + #[test] + fn env_record_serializes_with_optional_fields() { + let env = EnvRecord { + ts_ms: 1_700_000_000_000, + host: "omnibenchmark".into(), + kernel: "6.18.7-test".into(), + lscpu: LsCpu { + sockets: 1, + cores_per_socket: 64, + threads_per_core: 2, + model: "AMD EPYC 7742".into(), + }, + numa: Numa { + nodes: 4, + distances: vec![ + vec![10, 12, 12, 12], + vec![12, 10, 12, 12], + vec![12, 12, 10, 12], + vec![12, 12, 12, 10], + ], + node_sizes_mb: vec![64272, 64500, 64500, 64481], + }, + affinity_inherited: "0-127".into(), + cpu_governor: Some(vec!["performance".into()]), + cpu_freq_khz: Some(vec![2_400_000]), + thp_enabled: Some("always [madvise] never".into()), + smt_active: Some(true), + cgroup: Some("0::/user.slice".into()), + }; + let s = serde_json::to_string(&env).unwrap(); + assert!(s.contains("\"host\":\"omnibenchmark\"")); + assert!(s.contains("\"affinity_inherited\":\"0-127\"")); + assert!(s.contains("\"smt_active\":true")); + + // Optional Nones get elided. + let mut env2 = env.clone(); + env2.cpu_governor = None; + env2.cpu_freq_khz = None; + env2.thp_enabled = None; + env2.smt_active = None; + env2.cgroup = None; + let s2 = serde_json::to_string(&env2).unwrap(); + assert!(!s2.contains("cpu_governor")); + assert!(!s2.contains("thp_enabled")); + + // Roundtrip preserves NUMA matrix. + let back: EnvRecord = serde_json::from_str(&s).unwrap(); + assert_eq!(back.numa.distances[1][0], 12); + assert_eq!(back.lscpu.cores_per_socket, 64); + } + + #[cfg(target_os = "linux")] + #[test] + fn collect_env_smoke_linux() { + let env = EnvRecord::collect(std::process::id()); + assert!(!env.host.is_empty(), "host should be non-empty"); + assert!(!env.kernel.is_empty(), "kernel should be non-empty"); + assert!( + !env.lscpu.model.is_empty(), + "CPU model should be readable from /proc/cpuinfo" + ); + assert!(env.ts_ms > 0); + assert!( + !env.affinity_inherited.is_empty(), + "sched_getaffinity should populate affinity_inherited" + ); + } + + #[cfg(target_os = "linux")] + #[test] + fn affinity_range_list_returns_nonempty_on_linux() { + let s = affinity_range_list(); + assert!(!s.is_empty()); + // Whatever subset is returned, count must match a positive number. + assert!(count_range_list(&s) > 0); + } + + #[test] + fn cpu_topology_returns_some_on_linux_or_none_off() { + // Non-fatal smoke test: the function shouldn't panic regardless. + let _ = cpu_topology(); + } +} diff --git a/src/monitor/mod.rs b/src/monitor/mod.rs index 7f6e4fe..da81823 100644 --- a/src/monitor/mod.rs +++ b/src/monitor/mod.rs @@ -3,9 +3,13 @@ //! This module provides the core process monitoring functionality, //! split into focused submodules for better organization. +pub mod env; pub mod metrics; +pub mod record; pub mod summary; // Re-export the main types for convenience +pub use env::EnvRecord; pub use metrics::*; +pub use record::{tagged_json, Record}; pub use summary::SummaryGenerator; diff --git a/src/monitor/record.rs b/src/monitor/record.rs new file mode 100644 index 0000000..4fca798 --- /dev/null +++ b/src/monitor/record.rs @@ -0,0 +1,194 @@ +//! Tagged JSONL record schema. +//! +//! Every line in a JSONL stream is one of these variants. The internal +//! `"kind"` tag lets downstream tooling dispatch by type without +//! shape-guessing. Untagged legacy files are still readable via the +//! `parse_record` fallback. + +use serde::{Deserialize, Serialize}; + +use super::env::EnvRecord; +use super::metrics::{AggregatedMetrics, Metrics, ProcessMetadata, ProcessTreeMetrics}; + +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(tag = "kind", rename_all = "lowercase")] +pub enum Record { + Env(Box), + Metadata(ProcessMetadata), + Sample(Metrics), + Tree(Box), + Aggregated(Box), +} + +/// Wrapper used to inject `"kind"` into a struct without owning it. +/// Equivalent to `Record::Sample(metrics)` but avoids the clone. +#[derive(Serialize)] +struct Tagged<'a, T: Serialize> { + kind: &'static str, + #[serde(flatten)] + inner: &'a T, +} + +/// Serialize `value` as a single JSON line tagged with `kind`. +/// +/// ```ignore +/// let line = tagged_json("sample", &metrics); +/// // {"kind":"sample","ts_ms":...,"cpu_usage":...} +/// ``` +pub fn tagged_json(kind: &'static str, value: &T) -> serde_json::Result { + serde_json::to_string(&Tagged { kind, inner: value }) +} + +/// Parse one JSONL line, trying the tagged schema first and falling back +/// to the legacy untagged shapes used by pre-tag-era files. +pub fn parse_record(line: &str) -> Option { + if let Ok(r) = serde_json::from_str::(line) { + return Some(r); + } + // Fallback: try each known untagged shape in order from most-specific + // (Tree has nested Aggregated) to least. + if let Ok(t) = serde_json::from_str::(line) { + return Some(Record::Tree(Box::new(t))); + } + if let Ok(a) = serde_json::from_str::(line) { + return Some(Record::Aggregated(Box::new(a))); + } + if let Ok(m) = serde_json::from_str::(line) { + return Some(Record::Sample(m)); + } + if let Ok(md) = serde_json::from_str::(line) { + return Some(Record::Metadata(md)); + } + None +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn tagged_emits_kind_field() { + let m = ProcessMetadata::new(123, vec!["sleep".into()], "/usr/bin/sleep".into()); + let s = tagged_json("metadata", &m).unwrap(); + assert!(s.contains("\"kind\":\"metadata\"")); + assert!(s.contains("\"pid\":123")); + } + + #[test] + fn record_enum_roundtrip_metadata() { + let m = ProcessMetadata::new(7, vec!["a".into(), "b".into()], "/bin/a".into()); + let json = serde_json::to_string(&Record::Metadata(m.clone())).unwrap(); + assert!(json.contains("\"kind\":\"metadata\"")); + let back: Record = serde_json::from_str(&json).unwrap(); + match back { + Record::Metadata(md) => { + assert_eq!(md.pid, 7); + assert_eq!(md.cmd, vec!["a".to_string(), "b".to_string()]); + } + _ => panic!("wrong variant"), + } + } + + #[test] + fn record_enum_roundtrip_sample() { + let mut m = Metrics::new(); + m.cpu_usage = 42.5; + m.mem_rss_kb = 1234; + let json = serde_json::to_string(&Record::Sample(m)).unwrap(); + assert!(json.contains("\"kind\":\"sample\"")); + let back: Record = serde_json::from_str(&json).unwrap(); + match back { + Record::Sample(s) => assert_eq!(s.cpu_usage, 42.5), + _ => panic!("wrong variant"), + } + } + + #[test] + fn parse_record_handles_untagged_legacy() { + // Pre-tag-era ProcessMetadata line, no "kind" field. + let legacy = r#"{"pid":1,"cmd":["x"],"executable":"/x","t0_ms":0}"#; + match parse_record(legacy).unwrap() { + Record::Metadata(_) => {} + _ => panic!("legacy metadata should parse via fallback"), + } + } + + #[test] + fn parse_record_handles_untagged_sample() { + let m = Metrics::new(); + let untagged = serde_json::to_string(&m).unwrap(); + assert!(!untagged.contains("\"kind\"")); + match parse_record(&untagged).unwrap() { + Record::Sample(_) => {} + _ => panic!("expected Sample"), + } + } + + #[test] + fn record_env_roundtrip_carries_kind_tag() { + let env = EnvRecord { + ts_ms: 1_700_000_000_000, + host: "omnibenchmark".into(), + kernel: "6.18.7-test".into(), + lscpu: crate::monitor::env::LsCpu { + sockets: 1, + cores_per_socket: 64, + threads_per_core: 2, + model: "AMD EPYC 7742".into(), + }, + numa: crate::monitor::env::Numa { + nodes: 2, + distances: vec![vec![10, 12], vec![12, 10]], + node_sizes_mb: vec![64272, 64500], + }, + affinity_inherited: "0-127".into(), + cpu_governor: None, + cpu_freq_khz: None, + thp_enabled: None, + smt_active: None, + cgroup: None, + }; + let json = tagged_json("env", &env).unwrap(); + assert!(json.contains("\"kind\":\"env\"")); + assert!(json.contains("\"host\":\"omnibenchmark\"")); + + let r: Record = serde_json::from_str(&json).unwrap(); + match r { + Record::Env(e) => { + assert_eq!(e.host, "omnibenchmark"); + assert_eq!(e.numa.nodes, 2); + assert_eq!(e.affinity_inherited, "0-127"); + } + _ => panic!("expected Env variant"), + } + } + + #[test] + fn parse_record_handles_tagged_aggregated_and_tree() { + let agg = AggregatedMetrics::default(); + let agg_line = tagged_json("aggregated", &agg).unwrap(); + match parse_record(&agg_line).unwrap() { + Record::Aggregated(_) => {} + _ => panic!("expected Aggregated"), + } + + let tree = ProcessTreeMetrics { + ts_ms: 0, + parent: None, + children: Vec::new(), + aggregated: None, + }; + let tree_line = tagged_json("tree", &tree).unwrap(); + match parse_record(&tree_line).unwrap() { + Record::Tree(_) => {} + _ => panic!("expected Tree"), + } + } + + #[test] + fn parse_record_returns_none_for_garbage() { + assert!(parse_record("not json").is_none()); + // valid JSON but no shape match + assert!(parse_record("{\"unknown\":42}").is_none()); + } +} diff --git a/src/python.rs b/src/python.rs index a35d590..e541627 100644 --- a/src/python.rs +++ b/src/python.rs @@ -6,7 +6,7 @@ use crate::config::{OutputConfig, OutputFormat}; use crate::core::process_monitor::ProcessMonitor; use crate::error::DenetError; -use crate::monitor::{Summary, SummaryGenerator}; +use crate::monitor::{tagged_json, Summary, SummaryGenerator}; use pyo3::prelude::*; use pyo3::wrap_pyfunction; @@ -24,21 +24,25 @@ struct PyProcessMonitor { samples: Vec, output_config: OutputConfig, metadata_written: bool, + env_written: bool, } /// Build OutputConfig with consistent settings +#[allow(clippy::too_many_arguments)] fn build_output_config( output_file: Option, output_format: &str, store_in_memory: bool, quiet: bool, write_metadata: bool, + write_env: bool, ) -> PyResult { let mut builder = OutputConfig::builder() .format_str(output_format)? .store_in_memory(store_in_memory) .quiet(quiet) - .write_metadata(write_metadata); + .write_metadata(write_metadata) + .write_env(write_env); if let Some(path) = output_file { builder = builder.output_file(path); @@ -50,7 +54,7 @@ fn build_output_config( #[pymethods] impl PyProcessMonitor { #[new] - #[pyo3(signature = (cmd, base_interval_ms, max_interval_ms, since_process_start=false, output_file=None, output_format="jsonl", store_in_memory=true, quiet=false, include_children=true, write_metadata=false))] + #[pyo3(signature = (cmd, base_interval_ms, max_interval_ms, since_process_start=false, output_file=None, output_format="jsonl", store_in_memory=true, quiet=false, include_children=true, write_metadata=false, write_env=false))] #[allow(clippy::too_many_arguments)] fn new( cmd: Vec, @@ -63,6 +67,7 @@ impl PyProcessMonitor { quiet: bool, include_children: bool, write_metadata: bool, + write_env: bool, ) -> PyResult { let output_config = build_output_config( output_file, @@ -70,6 +75,7 @@ impl PyProcessMonitor { store_in_memory, quiet, write_metadata, + write_env, )?; let mut inner = ProcessMonitor::new_with_options( @@ -88,12 +94,13 @@ impl PyProcessMonitor { samples: Vec::new(), output_config, metadata_written: false, + env_written: false, }) } #[staticmethod] #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (pid, base_interval_ms, max_interval_ms, since_process_start=false, output_file=None, output_format="jsonl", store_in_memory=true, quiet=false, include_children=true, write_metadata=false))] + #[pyo3(signature = (pid, base_interval_ms, max_interval_ms, since_process_start=false, output_file=None, output_format="jsonl", store_in_memory=true, quiet=false, include_children=true, write_metadata=false, write_env=false))] fn from_pid( pid: usize, base_interval_ms: u64, @@ -105,6 +112,7 @@ impl PyProcessMonitor { quiet: bool, include_children: bool, write_metadata: Option, + write_env: Option, ) -> PyResult { let output_config = build_output_config( output_file, @@ -112,6 +120,7 @@ impl PyProcessMonitor { store_in_memory, quiet, write_metadata.unwrap_or(false), + write_env.unwrap_or(false), )?; let mut inner = ProcessMonitor::from_pid_with_options( pid, @@ -129,6 +138,7 @@ impl PyProcessMonitor { samples: Vec::new(), output_config, metadata_written: false, + env_written: false, }) } @@ -238,6 +248,7 @@ impl PyProcessMonitor { samples: Vec::new(), output_config, metadata_written: false, + env_written: false, }; // Resume the process if it was paused @@ -289,11 +300,11 @@ impl PyProcessMonitor { while self.inner.is_running() { let json = if self.inner.get_include_children() { let tree_metrics = self.inner.sample_tree_metrics(); - serde_json::to_string(&tree_metrics) + tagged_json("tree", &tree_metrics) .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(e.to_string()))? } else { match self.inner.sample_metrics() { - Some(metrics) => serde_json::to_string(&metrics) + Some(metrics) => tagged_json("sample", &metrics) .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(e.to_string()))?, None => { sleep(self.inner.adaptive_interval()); @@ -330,12 +341,12 @@ impl PyProcessMonitor { let metrics_json = if self.inner.get_include_children() { // Sample the metrics including child processes let tree_metrics = self.inner.sample_tree_metrics(); - serde_json::to_string(&tree_metrics) + tagged_json("tree", &tree_metrics) .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(e.to_string()))? } else { // Sample only the parent process match self.inner.sample_metrics() { - Some(metrics) => serde_json::to_string(&metrics) + Some(metrics) => tagged_json("sample", &metrics) .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(e.to_string()))?, None => return Ok(None), } @@ -348,16 +359,38 @@ impl PyProcessMonitor { // Write to file if output_file is specified if let Some(path) = &self.output_config.output_file { - // Write metadata as first line if enabled and not yet written + // Write env record as first line if enabled. Env is captured once + // and must precede metadata. We open with truncate on the first + // write (env OR metadata, whichever fires first). + let mut needs_truncate = (self.output_config.write_env && !self.env_written) + || (self.output_config.write_metadata && !self.metadata_written); + + if self.output_config.write_env && !self.env_written { + let env_json = tagged_json("env", &self.inner.get_env()) + .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(e.to_string()))?; + let mut file = OpenOptions::new() + .create(true) + .write(true) + .truncate(needs_truncate) + .append(!needs_truncate) + .open(path) + .map_err(map_io_error)?; + writeln!(file, "{env_json}").map_err(map_io_error)?; + self.env_written = true; + needs_truncate = false; + } + + // Write metadata as next line if enabled and not yet written if self.output_config.write_metadata && !self.metadata_written { if let Some(metadata) = self.inner.get_metadata() { - let metadata_json = serde_json::to_string(&metadata) + let metadata_json = tagged_json("metadata", &metadata) .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(e.to_string()))?; let mut file = OpenOptions::new() .create(true) .write(true) - .truncate(true) + .truncate(needs_truncate) + .append(!needs_truncate) .open(path) .map_err(map_io_error)?; @@ -394,6 +427,13 @@ impl PyProcessMonitor { .and_then(|metadata| serde_json::to_string(&metadata).ok())) } + /// Return the env record (host/NUMA/affinity/governor/THP/SMT/cgroup) + /// as a tagged JSON string, suitable for prepending to a JSONL stream. + fn get_env(&self) -> PyResult { + tagged_json("env", &self.inner.get_env()) + .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(e.to_string())) + } + fn get_samples(&self) -> Vec { // Samples are already stored as strings, just clone them self.samples.clone() diff --git a/tests/env_tests.rs b/tests/env_tests.rs new file mode 100644 index 0000000..72e2846 --- /dev/null +++ b/tests/env_tests.rs @@ -0,0 +1,171 @@ +//! Tests for the env record (host/NUMA/affinity reproducibility snapshot) +//! and the tagged JSONL Record schema. + +use denet::monitor::env::{ + count_range_list, format_range_list, parse_cpu_model, parse_distance_row, + parse_node_memtotal_mb, +}; +use denet::monitor::record::{parse_record, Record}; +use denet::monitor::{tagged_json, EnvRecord, Metrics, ProcessMetadata}; + +// ---------- pure parsers ---------- + +#[test] +fn affinity_range_compression_basic() { + assert_eq!(format_range_list(&[0, 1, 2, 3]), "0-3"); + assert_eq!(format_range_list(&[0, 1, 2, 3, 7, 8, 9]), "0-3,7-9"); + assert_eq!(format_range_list(&[5]), "5"); + assert_eq!(format_range_list(&[]), ""); +} + +#[test] +fn affinity_range_compression_unsorted_and_duplicates() { + assert_eq!(format_range_list(&[3, 0, 2, 1, 1]), "0-3"); + assert_eq!(format_range_list(&[7, 9, 8]), "7-9"); +} + +#[test] +fn affinity_range_compression_singletons_mixed() { + assert_eq!(format_range_list(&[0, 2, 4]), "0,2,4"); + assert_eq!(format_range_list(&[0, 1, 4, 5]), "0-1,4-5"); +} + +#[test] +fn count_range_list_roundtrip() { + assert_eq!(count_range_list("0-3,7-9"), 7); + assert_eq!(count_range_list("0,2,4"), 3); + assert_eq!(count_range_list("5"), 1); + assert_eq!(count_range_list(""), 0); +} + +#[test] +fn numa_distance_parser_square_row() { + assert_eq!(parse_distance_row("10 12 12 12"), vec![10, 12, 12, 12]); + assert_eq!(parse_distance_row("10\n12\n12"), vec![10, 12, 12]); + assert_eq!(parse_distance_row(""), Vec::::new()); +} + +#[test] +fn meminfo_parses_memtotal_in_mb() { + let s = "Node 0 MemTotal: 65814528 kB\nNode 0 Other: 0 kB\n"; + assert_eq!(parse_node_memtotal_mb(s), Some(64272)); +} + +#[test] +fn meminfo_missing_returns_none() { + assert_eq!(parse_node_memtotal_mb("Node 0 Free: 1 kB"), None); +} + +#[test] +fn cpu_model_parsed_from_proc_cpuinfo() { + let s = "processor\t: 0\nvendor_id\t: AuthenticAMD\nmodel name\t: AMD EPYC 7742 64-Core Processor\ncache size\t: 512 KB\n"; + assert_eq!( + parse_cpu_model(s), + Some("AMD EPYC 7742 64-Core Processor".to_string()) + ); +} + +#[test] +fn cpu_model_missing_returns_none() { + assert_eq!(parse_cpu_model("processor: 0\n"), None); +} + +// ---------- Record / tagged JSON ---------- + +#[test] +fn record_env_roundtrip_carries_kind_tag() { + let env = EnvRecord { + ts_ms: 1_700_000_000_000, + host: "omnibenchmark".into(), + kernel: "6.18.7-test".into(), + lscpu: denet::monitor::env::LsCpu { + sockets: 1, + cores_per_socket: 64, + threads_per_core: 2, + model: "AMD EPYC 7742".into(), + }, + numa: denet::monitor::env::Numa { + nodes: 4, + distances: vec![ + vec![10, 12, 12, 12], + vec![12, 10, 12, 12], + vec![12, 12, 10, 12], + vec![12, 12, 12, 10], + ], + node_sizes_mb: vec![64272, 64500, 64500, 64481], + }, + affinity_inherited: "0-127".into(), + cpu_governor: Some(vec!["performance".into()]), + cpu_freq_khz: Some(vec![2_400_000]), + thp_enabled: Some("always [madvise] never".into()), + smt_active: Some(true), + cgroup: Some("0::/user.slice".into()), + }; + let json = tagged_json("env", &env).unwrap(); + assert!(json.contains("\"kind\":\"env\"")); + assert!(json.contains("\"host\":\"omnibenchmark\"")); + assert!(json.contains("\"affinity_inherited\":\"0-127\"")); + + // Roundtrip through the Record enum. + let r: Record = serde_json::from_str(&json).unwrap(); + match r { + Record::Env(e) => { + assert_eq!(e.host, "omnibenchmark"); + assert_eq!(e.numa.nodes, 4); + assert_eq!(e.numa.distances[1][0], 12); + assert_eq!(e.affinity_inherited, "0-127"); + } + _ => panic!("expected Env variant"), + } +} + +/// Regression: a pre-tag-era JSONL file (untagged ProcessMetadata + Metrics) +/// must still be readable via `parse_record`. +#[test] +fn parse_record_back_compat_with_untagged_lines() { + let md = ProcessMetadata::new(123, vec!["sleep".into()], "/usr/bin/sleep".into()); + let md_line = serde_json::to_string(&md).unwrap(); + assert!(!md_line.contains("\"kind\"")); + matches!(parse_record(&md_line), Some(Record::Metadata(_))); + + let m = Metrics::new(); + let m_line = serde_json::to_string(&m).unwrap(); + assert!(!m_line.contains("\"kind\"")); + matches!(parse_record(&m_line), Some(Record::Sample(_))); +} + +#[test] +fn tagged_record_lines_parse_to_correct_variants() { + let env_line = tagged_json("env", &EnvRecord::collect(std::process::id())).unwrap(); + assert!(env_line.contains("\"kind\":\"env\"")); + matches!(parse_record(&env_line), Some(Record::Env(_))); + + let md = ProcessMetadata::new(1, vec!["x".into()], "/x".into()); + let md_line = tagged_json("metadata", &md).unwrap(); + matches!(parse_record(&md_line), Some(Record::Metadata(_))); + + let m_line = tagged_json("sample", &Metrics::new()).unwrap(); + matches!(parse_record(&m_line), Some(Record::Sample(_))); +} + +// ---------- Linux-only smoke test ---------- + +#[cfg(target_os = "linux")] +#[test] +fn collect_env_smoke_linux() { + let env = EnvRecord::collect(std::process::id()); + // /proc and /sys/devices/system/cpu are present on essentially every + // Linux runner; assert the must-haves and let optional fields be None. + assert!(!env.host.is_empty(), "host should be non-empty"); + assert!(!env.kernel.is_empty(), "kernel should be non-empty"); + assert!( + !env.lscpu.model.is_empty(), + "CPU model should be readable from /proc/cpuinfo" + ); + assert!(env.ts_ms > 0); + // affinity should be readable for the calling process + assert!( + !env.affinity_inherited.is_empty(), + "sched_getaffinity should populate affinity_inherited" + ); +}