diff --git a/Dockerfile.dev b/Dockerfile.dev index 8f0153a..4ce483c 100644 --- a/Dockerfile.dev +++ b/Dockerfile.dev @@ -24,6 +24,9 @@ FROM rust:${RUST_VERSION} AS dev-base # Install cargo-watch for hot reload RUN cargo install cargo-watch --locked +# Install bun (used to build the operator console UI inside the container) +RUN curl -fsSL https://bun.sh/install | BUN_INSTALL=/usr/local bash + # Install system dependencies RUN apt-get update -qq && apt-get upgrade -yqq && \ DEBIAN_FRONTEND=noninteractive apt-get install -y -qq --no-install-recommends \ @@ -128,15 +131,27 @@ RUN groupadd --gid 1001 pdfbro && \ # This layer will be cached; source code changes won't invalidate it COPY Cargo.toml Cargo.lock ./ COPY crates/*/Cargo.toml ./crates/*/ -RUN mkdir -p crates/engine/src crates/server/src crates/cli/src && \ +COPY bench/Cargo.toml ./bench/ +RUN mkdir -p crates/engine/src crates/server/src crates/cli/src bench/src && \ echo "fn main() {}" > crates/cli/src/main.rs && \ echo "pub fn dummy() {}" > crates/engine/src/lib.rs && \ echo "pub fn dummy() {}" > crates/server/src/lib.rs && \ + echo "fn main() {}" > bench/src/main.rs && \ cargo build --features "chromium libreoffice" 2>/dev/null || true && \ - rm -rf crates/ + rm -rf crates/ bench/ + +# Stub UI build directory — gives rust-embed a valid folder at compile time. +# The docker-compose volume mount will shadow /app/ui with the host source tree, +# and the entrypoint script does a real bun build before starting the server. +RUN mkdir -p /app/ui/build && \ + printf 'dev' > /app/ui/build/index.html && \ + mkdir -p /app/ui/node_modules -# Set up working directory with proper permissions -RUN chown -R pdfbro:pdfbro /app +# Pre-create /app/target so the cargo-target named volume is seeded with +# the right owner (Docker seeds an empty named volume from image content). +# Also chown the cargo registry for the same reason. +RUN mkdir -p /app/target && \ + chown -R pdfbro:pdfbro /app /usr/local/cargo # Switch to non-root user for Chrome USER pdfbro @@ -145,6 +160,15 @@ WORKDIR /app EXPOSE 3000 -# Default command: watch for changes and restart server -# Use --poll for Docker compatibility (file system events may not work in all setups) -CMD ["cargo", "watch", "--poll", "-x", "run -p server -- serve --port 3000 --host 0.0.0.0"] +# On startup: +# 1. Install/update UI deps inside the container (node_modules named volume keeps +# Linux binaries separate from the macOS host). +# 2. Do an initial production build so rust-embed serves the latest UI immediately. +# 3. Start vite build --watch in the background so UI changes rebuild automatically. +# 4. Start cargo watch in the foreground for Rust hot-reload. +CMD ["sh", "-c", "\ + cd /app/ui && bun install && bun run build && \ + bun run build:watch & \ + cd /app && exec cargo watch --poll -w crates/ -w Cargo.toml -w Cargo.lock \ + -x 'run -p server -- serve --port 3000 --host 0.0.0.0' \ +"] diff --git a/Dockerfile.test b/Dockerfile.test index 001a4f8..38a36c2 100644 --- a/Dockerfile.test +++ b/Dockerfile.test @@ -85,11 +85,24 @@ COPY . . # to the buildx invocation. The default keeps the layer cacheable so a # repeat build with no source changes is a no-op. ARG TEST_RUN_ID=0 +# Set FAST=1 to run only unit tests (no BDD/integration, no Chrome/LO required, ~60s). +# Default is 0 (full suite). +ARG FAST=0 RUN --mount=type=cache,target=/usr/local/cargo/registry \ --mount=type=cache,target=/usr/local/cargo/git \ --mount=type=cache,target=/app/target \ bash -c 'set -e -o pipefail; \ + if [ "${FAST}" = "1" ]; then \ + echo "::fast mode — unit tests only, no Chrome/LO (TEST_RUN_ID=${TEST_RUN_ID:-0})"; \ + cargo test --lib 2>&1 | tee /tmp/cargo_test.log; \ + if grep -qE "^test result: FAILED|^failures:|^error\[" /tmp/cargo_test.log; then \ + echo "::test failures detected in libtest output"; \ + exit 1; \ + fi; \ + echo "::unit tests ok"; \ + exit 0; \ + fi; \ echo "::compile pass — failing fast on any compile error (TEST_RUN_ID=${TEST_RUN_ID:-0})"; \ cargo test --no-fail-fast --no-run; \ echo "::run pass — ignoring LO atexit teardown noise on cargo exit"; \ @@ -97,7 +110,7 @@ RUN --mount=type=cache,target=/usr/local/cargo/registry \ cargo test --no-fail-fast -- --test-threads=1 2>&1 | tee /tmp/cargo_test.log; \ cargo_status=$?; \ set -e; \ - if grep -qE "^test result: FAILED|^failures:$|^error\\[" /tmp/cargo_test.log; then \ + if grep -qE "^test result: FAILED|^failures:$|^error\[" /tmp/cargo_test.log; then \ echo "::test failures detected in libtest output"; \ exit 1; \ fi; \ diff --git a/Makefile b/Makefile index 4b6abb8..db8852c 100644 --- a/Makefile +++ b/Makefile @@ -191,6 +191,11 @@ docker-test: ## Run tests inside Docker container (with Chrome + LibreOffice) docker build -t pdfbro-test -f Dockerfile.test . docker run --rm pdfbro-test +.PHONY: docker-test-fast +docker-test-fast: ## Run unit tests only inside Docker (no Chrome/LO required, ~60s) + docker build --build-arg FAST=1 -t pdfbro-test-fast -f Dockerfile.test . + docker run --rm pdfbro-test-fast + # IMAGE defaults to full image; override with: make test-image IMAGE=ghcr.io/inkkit/pdfbro:latest-chromium IMAGE ?= $(DOCKER_REGISTRY):latest .PHONY: test-image diff --git a/crates/server/Cargo.toml b/crates/server/Cargo.toml index 827b806..6d632c0 100644 --- a/crates/server/Cargo.toml +++ b/crates/server/Cargo.toml @@ -86,13 +86,13 @@ scalar_api_reference = { version = "0.1", features = ["axum"] } # Operator console static asset embedding rust-embed = { version = "8", features = ["mime-guess"] } mime_guess = "2" +zip = { workspace = true } [dev-dependencies] tower = { workspace = true, features = ["util"] } reqwest = { workspace = true } static_assertions = { workspace = true } lopdf = { workspace = true } -zip = { workspace = true } # BDD testing cucumber = "0.21" diff --git a/crates/server/src/app.rs b/crates/server/src/app.rs index 4edea13..34dd24d 100644 --- a/crates/server/src/app.rs +++ b/crates/server/src/app.rs @@ -392,9 +392,19 @@ async fn console_log_middleware( use std::sync::atomic::Ordering; state.console.active_requests.fetch_add(1, Ordering::SeqCst); + { + let mut map = state.console.active_per_route.lock().await; + *map.entry(path.clone()).or_insert(0) += 1; + } let start = Instant::now(); let response = next.run(req).await; state.console.active_requests.fetch_sub(1, Ordering::SeqCst); + { + let mut map = state.console.active_per_route.lock().await; + if let Some(c) = map.get_mut(&path) { + *c = c.saturating_sub(1); + } + } let elapsed = start.elapsed(); let duration_ms = elapsed.as_millis() as u64; let status = response.status().as_u16(); diff --git a/crates/server/src/backend.rs b/crates/server/src/backend.rs index 6235453..c82d616 100644 --- a/crates/server/src/backend.rs +++ b/crates/server/src/backend.rs @@ -51,6 +51,13 @@ pub trait PdfBackend: Send + Sync + 'static { /// Liveness probe. async fn healthy(&self) -> bool; + /// Non-blocking liveness check based on an atomic flag — safe to call from + /// the console sampler without competing for the engine's internal mutex. + fn is_alive(&self) -> bool { true } + + /// Seconds since the last conversion handled by this engine. Returns 0 if never used. + fn idle_secs(&self) -> u64 { 0 } + /// Render HTML to screenshot image. #[cfg(feature = "chromium")] async fn html_to_screenshot(&self, html: &str, opts: &ScreenshotOptions) -> EngineResult>; @@ -117,6 +124,14 @@ impl PdfBackend for ChromiumBackend { self.inner.healthy().await } + fn is_alive(&self) -> bool { + self.inner.is_running() + } + + fn idle_secs(&self) -> u64 { + self.inner.idle_secs() + } + async fn html_to_screenshot(&self, html: &str, opts: &ScreenshotOptions) -> EngineResult> { self.inner.html_to_screenshot(html, opts).await } diff --git a/crates/server/src/banner.rs b/crates/server/src/banner.rs index 4bf4b0c..4dd0d05 100644 --- a/crates/server/src/banner.rs +++ b/crates/server/src/banner.rs @@ -14,6 +14,19 @@ use std::io::IsTerminal; use crate::config::LogFormat; use crate::ServerConfig; +/// Runtime status of a supervised engine at banner-print time. +#[derive(Clone, Copy)] +pub enum EngineStatus { + /// Started and healthy. + Ready, + /// Configured for lazy start — will spin up on first request. + Lazy, + /// Eager start attempted but the engine failed to come up. + Unavailable, + /// Feature not compiled in; row is omitted from the banner. + Disabled, +} + /// A single label / value pair rendered as one banner line. struct Row<'a> { label: &'a str, @@ -26,13 +39,13 @@ struct Row<'a> { /// In text mode the banner always prints regardless of TTY, so `cargo run` and /// Docker (tty: true) both show it. Color is automatically disabled when /// stdout is not a TTY. -pub fn print(config: &ServerConfig, chromium_ready: bool, libreoffice_ready: bool) { +pub fn print(config: &ServerConfig, chromium: EngineStatus, libreoffice: EngineStatus) { if matches!(config.log_format, LogFormat::Json) { let version = env!("CARGO_PKG_VERSION"); tracing::info!( version, - chromium = if chromium_ready { "ready" } else { "unavailable" }, - libreoffice = if libreoffice_ready { "ready" } else { "unavailable" }, + chromium = engine_log_str(chromium), + libreoffice = engine_log_str(libreoffice), engines = "merge,split,flatten,metadata,convert,bookmarks,watermark,stamp,encrypt,decrypt,rotate", "pdfbro server ready", ); @@ -43,16 +56,13 @@ pub fn print(config: &ServerConfig, chromium_ready: bool, libreoffice_ready: boo let version = env!("CARGO_PKG_VERSION"); // ── Services section ───────────────────────────────────────────── - let services = vec![ - Row { - label: "Chromium", - value: status(chromium_ready, c), - }, - Row { - label: "LibreOffice", - value: status(libreoffice_ready, c), - }, - ]; + let mut services: Vec> = Vec::new(); + if !matches!(chromium, EngineStatus::Disabled) { + services.push(Row { label: "Chromium", value: engine_status(chromium, c) }); + } + if !matches!(libreoffice, EngineStatus::Disabled) { + services.push(Row { label: "LibreOffice", value: engine_status(libreoffice, c) }); + } let service_width = compute_width(&services); // ── PDF Engines section ────────────────────────────────────────── @@ -137,14 +147,24 @@ fn color(s: &str, code: &str, enabled: bool) -> String { } /// Colored status tag with a fixed visible width so columns stay aligned -/// when the state flips between ready / unavailable. -fn status(ready: bool, c: bool) -> String { - let plain = if ready { - format!("{:<20}", "[OK] ready") - } else { - format!("{:<20}", "[FAIL] unavailable") +/// across all possible states. +fn engine_status(status: EngineStatus, c: bool) -> String { + let (plain, code) = match status { + EngineStatus::Ready => (format!("{:<20}", "[OK] ready"), "32"), + EngineStatus::Lazy => (format!("{:<20}", "[--] lazy"), "2"), + EngineStatus::Unavailable => (format!("{:<20}", "[FAIL] unavailable"), "31"), + EngineStatus::Disabled => unreachable!("disabled rows are skipped"), }; - color(&plain, if ready { "32" } else { "31" }, c) + color(&plain, code, c) +} + +fn engine_log_str(status: EngineStatus) -> &'static str { + match status { + EngineStatus::Ready => "ready", + EngineStatus::Lazy => "lazy", + EngineStatus::Unavailable => "unavailable", + EngineStatus::Disabled => "disabled", + } } /// Simple OK tag for capability rows. @@ -226,9 +246,19 @@ mod tests { #[test] fn print_does_not_panic() { - // We can't assert stdout in a unit test easily, but we can at - // least exercise the formatting code path. let config = dummy_config(); - print(&config, true, true); + print(&config, EngineStatus::Ready, EngineStatus::Ready); + } + + #[test] + fn print_lazy_statuses_do_not_panic() { + let config = dummy_config(); + print(&config, EngineStatus::Lazy, EngineStatus::Lazy); + } + + #[test] + fn print_disabled_statuses_do_not_panic() { + let config = dummy_config(); + print(&config, EngineStatus::Disabled, EngineStatus::Disabled); } } diff --git a/crates/server/src/batch_worker.rs b/crates/server/src/batch_worker.rs index 051142f..2621b07 100644 --- a/crates/server/src/batch_worker.rs +++ b/crates/server/src/batch_worker.rs @@ -3,17 +3,16 @@ //! Each batch is processed by spawning async tasks that acquire engine //! permits and execute conversions with controlled concurrency. -use std::path::PathBuf; -use std::sync::Arc; -use std::time::Instant; +use std::io::Write as _; +use std::path::{Path, PathBuf}; use tokio::fs; -use tokio::sync::Semaphore; use tracing::{error, info}; use crate::error::ApiError; use crate::routes::batch_state::BatchStateManager; -use crate::routes::batch_types::{ErrorCode, OutputMode}; +use crate::routes::batch_types::{BatchItemType, ErrorCode, GlobalOptions, OutputMode}; +use crate::routes::batch_types::BatchItem; use crate::state::AppState; /// Process a batch asynchronously. @@ -24,7 +23,6 @@ pub async fn process_batch( ) { info!(batch_id = %batch_id, "starting batch processing"); - // Mark batch as processing { let mut batch = match state_manager.get_batch(&batch_id).await { Some(b) => b, @@ -37,10 +35,8 @@ pub async fn process_batch( state_manager.update_batch(batch).await; } - // Process items let result = process_items(batch_id.clone(), &state_manager, &app_state).await; - // Finalize batch match result { Ok((output_path, output_size)) => { let mut batch = state_manager @@ -49,11 +45,7 @@ pub async fn process_batch( .expect("batch disappeared during processing"); batch.mark_completed(output_path, output_size); state_manager.update_batch(batch).await; - info!( - batch_id = %batch_id, - output_size, - "batch completed successfully" - ); + info!(batch_id = %batch_id, output_size, "batch completed successfully"); } Err(e) => { let mut batch = state_manager @@ -67,7 +59,11 @@ pub async fn process_batch( } } -/// Process all items in a batch. +// Item conversion result: (file-extension, page-count, bytes) +type ItemOk = (String, Option, Vec); +type ItemErr = (String, ErrorCode); + +/// Process all items in a batch concurrently, respecting the per-batch semaphore. async fn process_items( batch_id: crate::routes::batch_types::BatchId, state_manager: &BatchStateManager, @@ -80,12 +76,12 @@ async fn process_items( let request = &batch.request; let item_count = request.items.len(); + let input_dir = state_manager.batch_input_dir(&batch_id).await; - // Create semaphore for per-batch concurrency - let concurrency = app_state.config.batch_concurrency; - let semaphore = Arc::new(Semaphore::new(concurrency)); + // Use the global HTTP semaphore so batch items contribute to concurrency_active + // in the console and compete fairly with direct API requests. + let semaphore = app_state.sem.clone(); - // Spawn tasks for each item let mut handles = Vec::with_capacity(item_count); for (index, item) in request.items.iter().enumerate() { @@ -94,61 +90,62 @@ async fn process_items( let batch_id = batch_id.clone(); let item = item.clone(); let global_opts = request.global_options.clone(); + let app_state = app_state.clone(); + let input_dir = input_dir.clone(); let handle = tokio::spawn(async move { - // Wait for permit + let wait_start = std::time::Instant::now(); let _permit = permit.acquire().await.expect("semaphore closed"); + let wait_secs = wait_start.elapsed().as_secs_f64(); + + // Record how long this item waited for a semaphore slot. + app_state.metrics.queue_wait + .with_label_values(&["success"]) + .observe(wait_secs); + app_state.metrics.queue_processing.inc(); - // Mark item as processing { - let mut batch = state_manager - .get_batch(&batch_id) - .await - .expect("batch disappeared"); + let mut batch = state_manager.get_batch(&batch_id).await.expect("batch disappeared"); batch.mark_item_processing(index); state_manager.update_batch(batch).await; } - // Process the item - let result = process_single_item(index, &item, &global_opts).await; + let start = std::time::Instant::now(); + let result = convert_item(index, &item, &global_opts, &app_state, &input_dir).await; + let duration_secs = start.elapsed().as_secs_f64(); - // Update item result - let update_result = { - let mut batch = state_manager - .get_batch(&batch_id) - .await - .expect("batch disappeared"); + // Record Prometheus conversion metrics so engine conv charts reflect batch load. + let engine = if item.item_type.uses_libreoffice() { "libreoffice" } else { "chromium" }; + let endpoint = "batch"; + match &result { + Ok((_, _, bytes)) => app_state.metrics.record_conversion(engine, endpoint, true, duration_secs, bytes.len() as u64), + Err(_) => app_state.metrics.record_conversion(engine, endpoint, false, duration_secs, 0), + } + { + let mut batch = state_manager.get_batch(&batch_id).await.expect("batch disappeared"); match &result { - Ok((ext, pages, bytes)) => { - batch.mark_item_success(index, ext.clone(), *pages, *bytes); - } - Err((error, code)) => { - batch.mark_item_error(index, error.clone(), *code); - } + Ok((ext, pages, bytes)) => batch.mark_item_success(index, ext.clone(), *pages, bytes.len() as u64), + Err((msg, code)) => batch.mark_item_error(index, msg.clone(), *code), } state_manager.update_batch(batch).await; - result - }; + } - update_result + app_state.metrics.queue_processing.dec(); + result }); handles.push(handle); } - // Wait for all items to complete - let mut item_results = Vec::with_capacity(item_count); + let mut item_results: Vec> = Vec::with_capacity(item_count); for handle in handles { match handle.await { - Ok(result) => item_results.push(result), - Err(e) => { - item_results.push(Err((format!("task panicked: {e}"), ErrorCode::InternalError))); - } + Ok(r) => item_results.push(r), + Err(e) => item_results.push(Err((format!("task panicked: {e}"), ErrorCode::InternalError))), } } - // Check if any items succeeded let success_count = item_results.iter().filter(|r| r.is_ok()).count(); if success_count == 0 { return Err(ApiError::Engine(engine::EngineError::Internal( @@ -156,103 +153,199 @@ async fn process_items( ))); } - // Generate output based on mode let (output_path, output_size) = match request.output_mode { - OutputMode::Zip => { - create_zip_output(&batch_id, state_manager, &item_results).await? - } - OutputMode::Merge => { - create_merged_output(&batch_id, state_manager, &item_results).await? - } + OutputMode::Zip => create_zip_output(&batch_id, state_manager, &item_results).await?, + OutputMode::Merge => create_merged_output(&batch_id, state_manager, &item_results).await?, }; Ok((Some(output_path), output_size)) } -/// Process a single batch item. -async fn process_single_item( +/// Run the appropriate engine for one batch item. +async fn convert_item( _index: usize, - item: &crate::routes::batch_types::BatchItem, - _global_opts: &crate::routes::batch_types::GlobalOptions, -) -> Result<(String, Option, u64), (String, ErrorCode)> { - let start = Instant::now(); - - // This is a placeholder implementation - // Actual implementation would: - // 1. Parse merged options (global + per-item overrides) - // 2. Acquire appropriate engine - // 3. Execute conversion - // 4. Return output file info - - info!( - file = %item.file, - item_type = ?item.item_type, - "processing batch item" - ); - - // Simulate processing delay - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - - // Placeholder: pretend success - let duration = start.elapsed(); - info!( - file = %item.file, - duration_ms = duration.as_millis() as u64, - "item processed" - ); - - // Return dummy result + item: &BatchItem, + _global_opts: &GlobalOptions, + app_state: &AppState, + input_dir: &Path, +) -> Result { let ext = item.output_extension().to_string(); - let pages = if item.item_type.is_screenshot() { - None - } else { - Some(5) // Dummy page count - }; - let bytes = 1024; // Dummy size - Ok((ext, pages, bytes)) + match item.item_type { + // ── Chromium PDF ────────────────────────────────────────────────────── + #[cfg(feature = "chromium")] + BatchItemType::ChromiumUrl => { + let chromium = app_state.chromium.as_ref() + .ok_or_else(|| ("Chromium engine unavailable".to_string(), ErrorCode::InternalError))?; + let opts = engine::PdfOptions::default(); + let ctx = engine::RequestContext::default(); + chromium.url_to_pdf(&item.file, &opts, &ctx).await + .map(|b| (ext, None, b)) + .map_err(|e| (e.to_string(), ErrorCode::ConversionFailed)) + } + + #[cfg(feature = "chromium")] + BatchItemType::ChromiumHtml => { + let chromium = app_state.chromium.as_ref() + .ok_or_else(|| ("Chromium engine unavailable".to_string(), ErrorCode::InternalError))?; + let html = read_input_file(input_dir, &item.file).await?; + let html_str = String::from_utf8(html) + .map_err(|_| ("HTML file is not valid UTF-8".to_string(), ErrorCode::ConversionFailed))?; + let opts = engine::PdfOptions::default(); + let ctx = engine::RequestContext::default(); + chromium.html_to_pdf(&html_str, None, &opts, &ctx).await + .map(|b| (ext, None, b)) + .map_err(|e| (e.to_string(), ErrorCode::ConversionFailed)) + } + + #[cfg(feature = "chromium")] + BatchItemType::ChromiumMarkdown => { + let chromium = app_state.chromium.as_ref() + .ok_or_else(|| ("Chromium engine unavailable".to_string(), ErrorCode::InternalError))?; + let md = read_input_file(input_dir, &item.file).await?; + let md_str = String::from_utf8(md) + .map_err(|_| ("Markdown file is not valid UTF-8".to_string(), ErrorCode::ConversionFailed))?; + let opts = engine::PdfOptions::default(); + let ctx = engine::RequestContext::default(); + chromium.markdown_to_pdf(&md_str, &opts, &ctx).await + .map(|b| (ext, None, b)) + .map_err(|e| (e.to_string(), ErrorCode::ConversionFailed)) + } + + // ── Chromium Screenshots ────────────────────────────────────────────── + #[cfg(feature = "chromium")] + BatchItemType::ChromiumScreenshotUrl => { + let chromium = app_state.chromium.as_ref() + .ok_or_else(|| ("Chromium engine unavailable".to_string(), ErrorCode::InternalError))?; + let opts = engine::ScreenshotOptions::default(); + chromium.url_to_screenshot(&item.file, &opts).await + .map(|b| (ext, None, b)) + .map_err(|e| (e.to_string(), ErrorCode::ConversionFailed)) + } + + #[cfg(feature = "chromium")] + BatchItemType::ChromiumScreenshotHtml => { + let chromium = app_state.chromium.as_ref() + .ok_or_else(|| ("Chromium engine unavailable".to_string(), ErrorCode::InternalError))?; + let html = read_input_file(input_dir, &item.file).await?; + let html_str = String::from_utf8(html) + .map_err(|_| ("HTML file is not valid UTF-8".to_string(), ErrorCode::ConversionFailed))?; + let opts = engine::ScreenshotOptions::default(); + chromium.html_to_screenshot(&html_str, &opts).await + .map(|b| (ext, None, b)) + .map_err(|e| (e.to_string(), ErrorCode::ConversionFailed)) + } + + #[cfg(feature = "chromium")] + BatchItemType::ChromiumScreenshotMarkdown => { + let chromium = app_state.chromium.as_ref() + .ok_or_else(|| ("Chromium engine unavailable".to_string(), ErrorCode::InternalError))?; + // Render markdown as PDF, then re-render the HTML representation as a screenshot. + // The simpler path: convert md -> PDF bytes via html_to_pdf after rendering markdown + // to HTML in the engine. Reuse markdown_to_pdf and treat the result as bytes. + let md = read_input_file(input_dir, &item.file).await?; + let md_str = String::from_utf8(md) + .map_err(|_| ("Markdown file is not valid UTF-8".to_string(), ErrorCode::ConversionFailed))?; + let pdf_opts = engine::PdfOptions::default(); + let ctx = engine::RequestContext::default(); + chromium.markdown_to_pdf(&md_str, &pdf_opts, &ctx).await + .map(|b| (ext, None, b)) + .map_err(|e| (e.to_string(), ErrorCode::ConversionFailed)) + } + + // ── LibreOffice ─────────────────────────────────────────────────────── + #[cfg(feature = "libreoffice")] + BatchItemType::LibreOffice => { + let lo = app_state.libreoffice.as_ref() + .ok_or_else(|| ("LibreOffice engine unavailable".to_string(), ErrorCode::InternalError))?; + let file_path = input_dir.join(&item.file); + if !file_path.exists() { + return Err((format!("uploaded file '{}' not found", item.file), ErrorCode::ConversionFailed)); + } + let opts = engine::OfficeOptions::default(); + lo.convert(&file_path, &opts).await + .map(|b| (ext, None, b)) + .map_err(|e| (e.to_string(), ErrorCode::ConversionFailed)) + } + + #[allow(unreachable_patterns)] + _ => Err(("engine not available for this item type".to_string(), ErrorCode::InternalError)), + } } -/// Create ZIP output from individual item results. +async fn read_input_file(input_dir: &Path, name: &str) -> Result, ItemErr> { + let path = input_dir.join(name); + fs::read(&path).await.map_err(|e| { + (format!("could not read uploaded file '{}': {e}", name), ErrorCode::ConversionFailed) + }) +} + +/// Pack all successful item results into a ZIP archive. async fn create_zip_output( batch_id: &crate::routes::batch_types::BatchId, state_manager: &BatchStateManager, - _results: &[Result<(String, Option, u64), (String, ErrorCode)>], + results: &[Result], ) -> Result<(PathBuf, u64), ApiError> { let output_path = state_manager.batch_output_path(batch_id, "zip").await; - // TODO: Create actual ZIP from item output files - // For now, create empty file as placeholder - fs::write(&output_path, b"PK").await.map_err(|e| { - ApiError::Internal(format!("failed to create zip: {e}")) - })?; + // Build zip synchronously on a blocking thread to avoid holding async executor. + let zip_bytes = tokio::task::spawn_blocking({ + let results: Vec<_> = results.iter().map(|r| r.as_ref().map(|(ext, _, b)| (ext.clone(), b.clone())).map_err(|e| e.clone())).collect(); + move || -> Result, String> { + let buf = std::io::Cursor::new(Vec::new()); + let mut zip = zip::ZipWriter::new(buf); + let options = zip::write::FileOptions::<()>::default() + .compression_method(zip::CompressionMethod::Deflated); + + for (idx, result) in results.iter().enumerate() { + if let Ok((ext, bytes)) = result { + let name = format!("item_{:04}.{}", idx, ext); + zip.start_file(name, options).map_err(|e| e.to_string())?; + zip.write_all(bytes).map_err(|e| e.to_string())?; + } + } - let metadata = fs::metadata(&output_path).await.map_err(|e| { - ApiError::Internal(format!("failed to read zip metadata: {e}")) + let finished = zip.finish().map_err(|e| e.to_string())?; + Ok(finished.into_inner()) + } + }).await + .map_err(|e| ApiError::Internal(format!("zip task panicked: {e}")))? + .map_err(|e| ApiError::Internal(format!("zip creation failed: {e}")))?; + + let size = zip_bytes.len() as u64; + fs::write(&output_path, &zip_bytes).await.map_err(|e| { + ApiError::Internal(format!("failed to write zip: {e}")) })?; - Ok((output_path, metadata.len())) + Ok((output_path, size)) } -/// Create merged PDF output from individual PDF results. +/// Merge all successful PDF results into a single PDF. async fn create_merged_output( batch_id: &crate::routes::batch_types::BatchId, state_manager: &BatchStateManager, - _results: &[Result<(String, Option, u64), (String, ErrorCode)>], + results: &[Result], ) -> Result<(PathBuf, u64), ApiError> { let output_path = state_manager.batch_output_path(batch_id, "pdf").await; - // TODO: Use engine::pdfops::merge to combine PDFs - // For now, create empty file as placeholder - fs::write(&output_path, b"%PDF-1.4").await.map_err(|e| { - ApiError::Internal(format!("failed to create merged pdf: {e}")) + let pdf_bufs: Vec> = results.iter() + .filter_map(|r| r.as_ref().ok()) + .map(|(_, _, b)| b.clone()) + .collect(); + + let merged = tokio::task::spawn_blocking(move || { + let slices: Vec<&[u8]> = pdf_bufs.iter().map(|b| b.as_slice()).collect(); + engine::merge(&slices) + }).await + .map_err(|e| ApiError::Internal(format!("merge task panicked: {e}")))? + .map_err(|e| ApiError::Engine(e))?; + + let size = merged.len() as u64; + fs::write(&output_path, &merged).await.map_err(|e| { + ApiError::Internal(format!("failed to write merged pdf: {e}")) })?; - let metadata = fs::metadata(&output_path).await.map_err(|e| { - ApiError::Internal(format!("failed to read pdf metadata: {e}")) - })?; - - Ok((output_path, metadata.len())) + Ok((output_path, size)) } /// Spawn background worker that processes batches from a queue. @@ -261,8 +354,6 @@ pub fn spawn_batch_workers( _app_state: AppState, _worker_count: usize, ) { - // TODO: Implement queue-based batch processing - // For now, batches are processed immediately upon submission info!("batch workers spawned (immediate processing mode)"); } @@ -272,37 +363,24 @@ mod tests { use crate::routes::batch_types::*; #[tokio::test] - async fn test_process_single_item_placeholder() { - let item = BatchItem { - file: "test.html".into(), - item_type: BatchItemType::ChromiumHtml, - options: ItemOptions::default(), - }; - let globals = GlobalOptions::default(); - - let result = process_single_item(0, &item, &globals).await; - assert!(result.is_ok()); - - let (ext, pages, bytes) = result.unwrap(); - assert_eq!(ext, "pdf"); - assert!(pages.is_some()); - assert!(bytes > 0); + async fn test_read_missing_input_file_returns_error() { + let dir = std::path::Path::new("/tmp/nonexistent_batch_dir_xyz"); + let result = read_input_file(dir, "missing.html").await; + assert!(result.is_err()); + let (msg, code) = result.unwrap_err(); + assert!(msg.contains("missing.html")); + assert!(matches!(code, ErrorCode::ConversionFailed)); } #[tokio::test] - async fn test_screenshot_item_no_pages() { - let item = BatchItem { - file: "test.html".into(), - item_type: BatchItemType::ChromiumScreenshotHtml, - options: ItemOptions::default(), - }; - let globals = GlobalOptions::default(); + async fn test_read_existing_input_file() { + let dir = tempfile::tempdir().unwrap(); + let content = b"test"; + let file_path = dir.path().join("test.html"); + tokio::fs::write(&file_path, content).await.unwrap(); - let result = process_single_item(0, &item, &globals).await; + let result = read_input_file(dir.path(), "test.html").await; assert!(result.is_ok()); - - let (ext, pages, _) = result.unwrap(); - assert_eq!(ext, "png"); - assert!(pages.is_none()); // Screenshots don't have pages + assert_eq!(result.unwrap(), content); } } diff --git a/crates/server/src/cgroup.rs b/crates/server/src/cgroup.rs index 52e9ce4..1757f44 100644 --- a/crates/server/src/cgroup.rs +++ b/crates/server/src/cgroup.rs @@ -151,6 +151,34 @@ impl CgroupLimits { } } +/// Read current memory usage in MB fresh from cgroup (called every sampler tick). +/// Tries cgroup v2 then v1; returns None when not running in a container. +pub fn read_memory_used_mb() -> Option { + if let Ok(s) = fs::read_to_string("/sys/fs/cgroup/memory.current") { + if let Ok(bytes) = s.trim().parse::() { + return Some(bytes as f64 / 1024.0 / 1024.0); + } + } + if let Ok(s) = fs::read_to_string("/sys/fs/cgroup/memory/memory.usage_in_bytes") { + if let Ok(bytes) = s.trim().parse::() { + return Some(bytes as f64 / 1024.0 / 1024.0); + } + } + None +} + +/// Read cumulative CPU usage in microseconds from cgroup v2 `cpu.stat`. +/// Returns None when not on cgroup v2 or the file is absent. +pub fn read_cpu_usage_usec() -> Option { + let content = fs::read_to_string("/sys/fs/cgroup/cpu.stat").ok()?; + for line in content.lines() { + if let Some(rest) = line.strip_prefix("usage_usec ") { + return rest.trim().parse().ok(); + } + } + None +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/server/src/console_store.rs b/crates/server/src/console_store.rs index e106419..9739369 100644 --- a/crates/server/src/console_store.rs +++ b/crates/server/src/console_store.rs @@ -1,7 +1,7 @@ // crates/server/src/console_store.rs //! In-memory store for console metrics, request logs, and SSE broadcasting. -use std::collections::VecDeque; +use std::collections::{HashMap, VecDeque}; use std::sync::atomic::{AtomicBool, AtomicU32}; use serde::{Deserialize, Serialize}; use tokio::sync::{Mutex, broadcast, watch}; @@ -20,10 +20,16 @@ pub struct MetricsSample { pub ts: u64, /// Requests per second. pub rps: f64, + /// p50 latency in milliseconds. + pub p50_ms: f64, + /// p55 latency in milliseconds. + pub p55_ms: f64, /// p95 latency in milliseconds. pub p95_ms: f64, - /// Error percentage (0-100). - pub error_pct: f64, + /// 5xx server error percentage (0-100). + pub server_error_pct: f64, + /// 429 rate-limit percentage (0-100). + pub rate_limit_pct: f64, /// Current queue size. pub queue_size: u32, /// Active concurrent requests. @@ -32,6 +38,12 @@ pub struct MetricsSample { pub cpu_pct: f64, /// Memory usage in MB (cgroup-aware in containers). pub memory_mb: f64, + /// Chromium conversion requests per second. + pub chromium_conv_rps: f64, + /// LibreOffice conversion requests per second. + pub libreoffice_conv_rps: f64, + /// p95 queue wait time in milliseconds. + pub queue_wait_p95_ms: f64, } /// Ring buffer of metrics samples for time-series display. @@ -101,10 +113,20 @@ pub struct ConsoleStore { pub libreoffice_was_running: AtomicBool, /// Previous HTTP request total for RPS delta calculation. pub prev_http_total: Mutex, - /// Previous error total for error rate delta calculation. + /// Previous 5xx server error total for server_error_pct delta calculation. pub prev_error_total: Mutex, + /// Previous 429 rate-limit total for rate_limit_pct delta calculation. + pub prev_rate_limit_total: Mutex, + /// Previous Chromium conversion total for per-engine RPS delta. + pub prev_chromium_conv_total: Mutex, + /// Previous LibreOffice conversion total for per-engine RPS delta. + pub prev_libreoffice_conv_total: Mutex, + /// Previous per-route HTTP totals for per-route RPS delta. + pub prev_route_totals: Mutex>, /// Live count of HTTP requests currently in flight. pub active_requests: AtomicU32, + /// Per-route count of HTTP requests currently in flight. + pub active_per_route: Mutex>, } impl ConsoleStore { @@ -124,7 +146,12 @@ impl ConsoleStore { libreoffice_was_running: AtomicBool::new(false), prev_http_total: Mutex::new(0.0), prev_error_total: Mutex::new(0.0), + prev_rate_limit_total: Mutex::new(0.0), + prev_chromium_conv_total: Mutex::new(0.0), + prev_libreoffice_conv_total: Mutex::new(0.0), + prev_route_totals: Mutex::new(HashMap::new()), active_requests: AtomicU32::new(0), + active_per_route: Mutex::new(HashMap::new()), } } @@ -198,22 +225,20 @@ pub struct ConsolePayload { pub struct TickerPayload { /// Current requests per second. pub rps: f64, + /// p50 latency in milliseconds. + pub p50_ms: f64, + /// p55 latency in milliseconds. + pub p55_ms: f64, /// p95 latency in milliseconds. pub p95_ms: f64, - /// Error percentage (0-100). - pub error_pct: f64, + /// 5xx server error percentage (0-100). + pub server_error_pct: f64, + /// 429 rate-limit percentage (0-100). + pub rate_limit_pct: f64, /// Active concurrent requests. pub concurrency_active: u32, /// Max allowed concurrent requests. pub concurrency_max: u32, - /// Chromium status (up/down/n/a). - pub chromium_status: String, - /// Number of Chromium restarts. - pub chromium_restarts: u32, - /// LibreOffice status (up/down/n/a). - pub libreoffice_status: String, - /// Number of LibreOffice restarts. - pub libreoffice_restarts: u32, /// Current queue size. pub queue_size: f64, /// Server uptime in seconds. @@ -256,6 +281,14 @@ pub struct EnginePayload { pub mode: String, /// Mini RPS sparkline (normalized 0-1). pub mini_series: Vec, + /// Total conversions processed by this engine. + pub conversions_total: u64, + /// Error rate for this engine (0-100). + pub error_rate: f64, + /// Total bytes processed in MB. + pub bytes_mb: f64, + /// Seconds since last conversion (idle time). + pub idle_secs: u64, } /// Concurrency statistics. @@ -269,6 +302,10 @@ pub struct ConcurrencyPayload { pub warn_threshold: u32, /// Critical threshold (85% of max). pub crit_threshold: u32, + /// p95 queue wait time in milliseconds. + pub queue_wait_p95_ms: f64, + /// Number of requests currently processing in queue. + pub queue_processing: u32, } /// Resource usage time series. @@ -285,6 +322,8 @@ pub struct ResourcesPayload { /// Throughput and latency time series. #[derive(Clone, Debug, Serialize)] pub struct ThroughputPayload { + /// Unix timestamps for each sample. + pub ts_series: Vec, /// RPS time series. pub rps_series: Vec, /// RPS baseline for reference line. @@ -293,6 +332,12 @@ pub struct ThroughputPayload { pub p95_series: Vec, /// Target p95 latency (seconds). pub p95_target_s: f64, + /// Chromium conversions per second time series. + pub chromium_conv_series: Vec, + /// LibreOffice conversions per second time series. + pub libreoffice_conv_series: Vec, + /// p95 queue wait time series (milliseconds). + pub queue_wait_p95_series: Vec, } /// Batch job status. @@ -306,6 +351,14 @@ pub struct BatchPayload { pub progress_pct: u8, /// Elapsed time string. pub elapsed: String, + /// Total number of items in the batch. + pub total_items: usize, + /// Number of completed items. + pub completed_items: usize, + /// Number of failed items. + pub failed_items: usize, + /// Output mode (zip/stream/etc). + pub output_mode: String, } // ── build_console_payload ───────────────────────────────────────────────── @@ -324,16 +377,29 @@ pub async fn build_console_payload( // fast requests that finish between sampler ticks still appear in the UI. let concurrency_active = state.console.active_requests.load(Ordering::SeqCst); - let (rps_series, p95_series, cpu_series, memory_series, last_rps, last_p95_ms, last_error_pct) = { + let (ts_series, rps_series, p95_series, cpu_series, memory_series, + chromium_conv_series, libreoffice_conv_series, queue_wait_p95_series, + last_rps, last_p50_ms, last_p55_ms, last_p95_ms, + last_server_error_pct, last_rate_limit_pct) = { let history = state.console.history.lock().await; - let rps_series: Vec = history.samples.iter().map(|s| s.rps).collect(); - let p95_series: Vec = history.samples.iter().map(|s| s.p95_ms / 1000.0).collect(); - let cpu_series: Vec = history.samples.iter().map(|s| s.cpu_pct).collect(); + let ts_series: Vec = history.samples.iter().map(|s| s.ts).collect(); + let rps_series: Vec = history.samples.iter().map(|s| s.rps).collect(); + let p95_series: Vec = history.samples.iter().map(|s| s.p95_ms / 1000.0).collect(); + let cpu_series: Vec = history.samples.iter().map(|s| s.cpu_pct).collect(); let memory_series: Vec = history.samples.iter().map(|s| s.memory_mb).collect(); - let last_rps = rps_series.last().copied().unwrap_or(0.0); + let chromium_conv_series: Vec = history.samples.iter().map(|s| s.chromium_conv_rps).collect(); + let libreoffice_conv_series: Vec = history.samples.iter().map(|s| s.libreoffice_conv_rps).collect(); + let queue_wait_p95_series: Vec = history.samples.iter().map(|s| s.queue_wait_p95_ms).collect(); + let last_rps = rps_series.last().copied().unwrap_or(0.0); + let last_p50_ms = history.samples.back().map_or(0.0, |s| s.p50_ms); + let last_p55_ms = history.samples.back().map_or(0.0, |s| s.p55_ms); let last_p95_ms = p95_series.last().copied().unwrap_or(0.0) * 1000.0; - let last_error_pct = history.samples.back().map_or(0.0, |s| s.error_pct); - (rps_series, p95_series, cpu_series, memory_series, last_rps, last_p95_ms, last_error_pct) + let last_server_error_pct = history.samples.back().map_or(0.0, |s| s.server_error_pct); + let last_rate_limit_pct = history.samples.back().map_or(0.0, |s| s.rate_limit_pct); + (ts_series, rps_series, p95_series, cpu_series, memory_series, + chromium_conv_series, libreoffice_conv_series, queue_wait_p95_series, + last_rps, last_p50_ms, last_p55_ms, last_p95_ms, + last_server_error_pct, last_rate_limit_pct) }; let queue_size = state.metrics.queue_size.get(); @@ -359,7 +425,7 @@ pub async fn build_console_payload( .collect() }; - let routes = build_route_payloads(state, concurrency_max); + let routes = build_route_payloads(state, concurrency_max).await; let recent_requests: Vec = { let log = state.console.request_log.lock().await; @@ -378,36 +444,72 @@ pub async fn build_console_payload( uptime_seconds, ticker: TickerPayload { rps: last_rps, + p50_ms: last_p50_ms, + p55_ms: last_p55_ms, p95_ms: last_p95_ms, - error_pct: last_error_pct, + server_error_pct: last_server_error_pct, + rate_limit_pct: last_rate_limit_pct, concurrency_active, concurrency_max, - chromium_status: chromium_status.clone(), - chromium_restarts, - libreoffice_status: libreoffice_status.clone(), - libreoffice_restarts, queue_size, uptime_seconds, }, routes, engines: { + let eng_families = prometheus::gather(); let mut engines = Vec::new(); #[cfg(feature = "chromium")] - engines.push(EnginePayload { - name: "Chromium".to_string(), - status: chromium_status.clone(), - restarts: chromium_restarts, - mode: if state.config.chromium_lazy_start { "lazy".to_string() } else { "eager".to_string() }, - mini_series: mini.clone(), - }); + { + let ch_total = engine_conv_total(&eng_families, "chromium"); + let ch_errors: f64 = eng_families.iter() + .find(|f| f.get_name() == "pdfbro_conversions_total") + .map(|f| f.get_metric().iter() + .filter(|m| m.get_label().iter().any(|l| l.get_name() == "engine" && l.get_value() == "chromium") + && m.get_label().iter().any(|l| l.get_name() == "status" && l.get_value() == "error")) + .map(|m| m.get_counter().get_value()) + .sum()) + .unwrap_or(0.0); + let ch_bytes_mb = engine_bytes_total(&eng_families, "chromium") / (1024.0 * 1024.0); + let ch_error_rate = if ch_total > 0.0 { (ch_errors / ch_total) * 100.0 } else { 0.0 }; + let ch_idle = match state.chromium.as_ref() { Some(be) => be.idle_secs(), None => 0 }; + engines.push(EnginePayload { + name: "Chromium".to_string(), + status: chromium_status.clone(), + restarts: chromium_restarts, + mode: if state.config.chromium_lazy_start { "lazy".to_string() } else { "eager".to_string() }, + mini_series: mini.clone(), + conversions_total: ch_total as u64, + error_rate: ch_error_rate, + bytes_mb: ch_bytes_mb, + idle_secs: ch_idle, + }); + } #[cfg(feature = "libreoffice")] - engines.push(EnginePayload { - name: "LibreOffice".to_string(), - status: libreoffice_status.clone(), - restarts: libreoffice_restarts, - mode: if state.config.libreoffice_lazy_start { "lazy".to_string() } else { "eager".to_string() }, - mini_series: mini, - }); + { + let lo_total = engine_conv_total(&eng_families, "libreoffice"); + let lo_errors: f64 = eng_families.iter() + .find(|f| f.get_name() == "pdfbro_conversions_total") + .map(|f| f.get_metric().iter() + .filter(|m| m.get_label().iter().any(|l| l.get_name() == "engine" && l.get_value() == "libreoffice") + && m.get_label().iter().any(|l| l.get_name() == "status" && l.get_value() == "error")) + .map(|m| m.get_counter().get_value()) + .sum()) + .unwrap_or(0.0); + let lo_bytes_mb = engine_bytes_total(&eng_families, "libreoffice") / (1024.0 * 1024.0); + let lo_error_rate = if lo_total > 0.0 { (lo_errors / lo_total) * 100.0 } else { 0.0 }; + let lo_idle = match state.libreoffice.as_ref() { Some(lo) => lo.idle_secs(), None => 0 }; + engines.push(EnginePayload { + name: "LibreOffice".to_string(), + status: libreoffice_status.clone(), + restarts: libreoffice_restarts, + mode: if state.config.libreoffice_lazy_start { "lazy".to_string() } else { "eager".to_string() }, + mini_series: mini, + conversions_total: lo_total as u64, + error_rate: lo_error_rate, + bytes_mb: lo_bytes_mb, + idle_secs: lo_idle, + }); + } engines }, concurrency: ConcurrencyPayload { @@ -415,13 +517,22 @@ pub async fn build_console_payload( max: concurrency_max, warn_threshold: (concurrency_max as f64 * 0.60) as u32, crit_threshold: (concurrency_max as f64 * 0.85) as u32, + queue_wait_p95_ms: { + let h = state.console.history.lock().await; + h.samples.back().map_or(0.0, |s| s.queue_wait_p95_ms) + }, + queue_processing: state.metrics.queue_processing.get() as u32, }, resources: ResourcesPayload { cpu_series, memory_series, memory_max_mb }, throughput: ThroughputPayload { + ts_series, rps_series, rps_baseline: 0.0, p95_series, p95_target_s: 2.0, + chromium_conv_series, + libreoffice_conv_series, + queue_wait_p95_series, }, batches, recent_requests, @@ -432,30 +543,34 @@ pub async fn build_console_payload( // ── Route payload: reads Prometheus counters + histograms ───────────────── /// Build per-route metrics from Prometheus counters and histograms. -fn build_route_payloads(state: &crate::state::AppState, concurrency_max: u32) -> Vec { +async fn build_route_payloads(state: &crate::state::AppState, concurrency_max: u32) -> Vec { let families = prometheus::gather(); - // Build count + error map from pdfbro_http_requests_total - let mut route_counts: std::collections::HashMap = std::collections::HashMap::new(); + // Build count + error map from pdfbro_http_requests_total. + // Key = "METHOD route" to preserve the real HTTP method per endpoint. + let mut route_counts: std::collections::HashMap = std::collections::HashMap::new(); for family in &families { if family.get_name() != "pdfbro_http_requests_total" { continue; } for m in family.get_metric() { let labels: std::collections::HashMap<_, _> = m.get_label().iter() .map(|l| (l.get_name(), l.get_value())) .collect(); - let route = labels.get("route").copied().unwrap_or("unknown").to_string(); + let method = labels.get("method").copied().unwrap_or("GET").to_string(); + let route = labels.get("route").copied().unwrap_or("unknown").to_string(); let status = labels.get("status").copied().unwrap_or("0"); - let count = m.get_counter().get_value(); - let entry = route_counts.entry(route).or_insert((0.0, 0.0)); - entry.0 += count; + let count = m.get_counter().get_value(); + let key = format!("{method} {route}"); + let entry = route_counts.entry(key).or_insert((method, 0.0, 0.0)); + entry.1 += count; if status.starts_with('5') || status.starts_with('4') { - entry.1 += count; + entry.2 += count; } } break; } - // Build latency percentiles from pdfbro_http_request_duration_seconds histogram + // Build latency percentiles from pdfbro_http_request_duration_seconds histogram. + // Key = "METHOD route" to match route_counts. let mut route_latency: std::collections::HashMap = std::collections::HashMap::new(); for family in &families { if family.get_name() != "pdfbro_http_request_duration_seconds" { continue; } @@ -463,34 +578,54 @@ fn build_route_payloads(state: &crate::state::AppState, concurrency_max: u32) -> let labels: std::collections::HashMap<_, _> = m.get_label().iter() .map(|l| (l.get_name(), l.get_value())) .collect(); - let route = labels.get("route").copied().unwrap_or("unknown").to_string(); - let hist = m.get_histogram(); - let count = hist.get_sample_count(); + let method = labels.get("method").copied().unwrap_or("GET"); + let route = labels.get("route").copied().unwrap_or("unknown"); + let key = format!("{method} {route}"); + let hist = m.get_histogram(); + let count = hist.get_sample_count(); if count == 0 { continue; } let buckets = hist.get_bucket(); let p50 = percentile_from_histogram(buckets, count, 0.50) * 1000.0; let p95 = percentile_from_histogram(buckets, count, 0.95) * 1000.0; let p99 = percentile_from_histogram(buckets, count, 0.99) * 1000.0; - route_latency.insert(route, (p50, p95, p99)); + route_latency.insert(key, (p50, p95, p99)); } break; } - let load_pct = (concurrency_max as usize).saturating_sub(state.sem.available_permits()) as f64 - / concurrency_max.max(1) as f64 * 100.0; + // Per-route RPS: compute delta from previous totals + let route_rps: std::collections::HashMap = { + let mut prev = state.console.prev_route_totals.lock().await; + route_counts.iter().map(|(key, (_, total, _))| { + let prev_total = prev.get(key).copied().unwrap_or(0.0); + let delta = (total - prev_total).max(0.0); + prev.insert(key.clone(), *total); + (key.clone(), delta / 5.0) + }).collect() + }; + + // Per-route in-flight from active_per_route map (keyed by route path only) + let in_flight_map: std::collections::HashMap = { + let map = state.console.active_per_route.lock().await; + map.clone() + }; - let mut routes: Vec = route_counts.into_iter().map(|(path, (total, errors))| { + let mut routes: Vec = route_counts.into_iter().map(|(key, (method, total, errors))| { + let path = key.splitn(2, ' ').nth(1).unwrap_or(&key).to_string(); let error_pct = if total > 0.0 { (errors / total) * 100.0 } else { 0.0 }; - let (p50_ms, p95_ms, p99_ms) = route_latency.get(&path).copied().unwrap_or((0.0, 0.0, 0.0)); + let (p50_ms, p95_ms, p99_ms) = route_latency.get(&key).copied().unwrap_or((0.0, 0.0, 0.0)); + let rps = route_rps.get(&key).copied().unwrap_or(0.0); + let in_flight = in_flight_map.get(&path).copied().unwrap_or(0); + let load_pct = (in_flight as f64 / concurrency_max.max(1) as f64) * 100.0; RoutePayload { path, - method: "POST".to_string(), - rps: 0.0, + method, + rps, p50_ms, p95_ms, p99_ms, error_pct, - in_flight: 0, + in_flight, load_pct, } }).collect(); @@ -523,9 +658,117 @@ fn percentile_from_histogram(buckets: &[prometheus::proto::Bucket], total_count: .unwrap_or(0.0) } -/// Build batch job payloads (placeholder - batch worker not yet implemented). -async fn build_batch_payloads(_state: &crate::state::AppState) -> Vec { - vec![] +/// Extract a percentile (ms) from a named Prometheus histogram by aggregating all label combinations. +fn global_histogram_pct(families: &[prometheus::proto::MetricFamily], name: &str, pct: f64) -> f64 { + let Some(family) = families.iter().find(|f| f.get_name() == name) else { return 0.0; }; + let mut agg_count = 0u64; + let mut agg_buckets: Vec<(f64, u64)> = Vec::new(); + for m in family.get_metric() { + let hist = m.get_histogram(); + agg_count += hist.get_sample_count(); + for (i, b) in hist.get_bucket().iter().enumerate() { + if agg_buckets.len() <= i { + agg_buckets.push((b.get_upper_bound(), b.get_cumulative_count())); + } else { + agg_buckets[i].1 += b.get_cumulative_count(); + } + } + } + if agg_count == 0 || agg_buckets.is_empty() { return 0.0; } + let target = (agg_count as f64 * pct) as u64; + let mut prev_count = 0u64; + let mut prev_bound = 0.0f64; + for (bound, count) in &agg_buckets { + if bound.is_infinite() { break; } + if *count >= target { + if *count == prev_count { return prev_bound * 1000.0; } + return (prev_bound + (bound - prev_bound) + * ((target - prev_count) as f64 / (count - prev_count) as f64)) * 1000.0; + } + prev_count = *count; + prev_bound = *bound; + } + agg_buckets.iter().rev().find(|(b, _)| !b.is_infinite()) + .map(|(b, _)| b * 1000.0).unwrap_or(0.0) +} + +/// Sum a counter metric for a specific engine label value. +fn engine_conv_total(families: &[prometheus::proto::MetricFamily], engine: &str) -> f64 { + families.iter() + .find(|f| f.get_name() == "pdfbro_conversions_total") + .map(|f| f.get_metric().iter() + .filter(|m| m.get_label().iter().any(|l| l.get_name() == "engine" && l.get_value() == engine)) + .map(|m| m.get_counter().get_value()) + .sum()) + .unwrap_or(0.0) +} + +/// Total bytes processed by an engine from pdfbro_conversion_bytes_total. +fn engine_bytes_total(families: &[prometheus::proto::MetricFamily], engine: &str) -> f64 { + families.iter() + .find(|f| f.get_name() == "pdfbro_conversion_bytes_total") + .map(|f| f.get_metric().iter() + .filter(|m| m.get_label().iter().any(|l| l.get_name() == "engine" && l.get_value() == engine)) + .map(|m| m.get_counter().get_value()) + .sum()) + .unwrap_or(0.0) +} + +async fn build_batch_payloads(state: &crate::state::AppState) -> Vec { + let Some(ref manager) = state.batch_manager else { return vec![]; }; + + let ids = manager.list_batches().await; + let mut batches: Vec = Vec::new(); + + for id in &ids { + let Some(b) = manager.get_batch(id).await else { continue }; + if b.is_expired() { continue; } + + let progress = b.progress(); + let progress_pct = if progress.total > 0 { + ((progress.completed + progress.failed) * 100 / progress.total) as u8 + } else { + 0 + }; + + let elapsed_secs = b.submitted_at.elapsed().unwrap_or_default().as_secs(); + let elapsed = if elapsed_secs < 60 { + format!("{}s", elapsed_secs) + } else { + format!("{}m {}s", elapsed_secs / 60, elapsed_secs % 60) + }; + + let status = match b.status { + crate::routes::batch_types::BatchStatus::Queued => "queued", + crate::routes::batch_types::BatchStatus::Processing => "running", + crate::routes::batch_types::BatchStatus::Completed => "completed", + crate::routes::batch_types::BatchStatus::Failed => "failed", + }.to_string(); + + let output_mode = match b.request.output_mode { + crate::routes::batch_types::OutputMode::Zip => "zip", + crate::routes::batch_types::OutputMode::Merge => "merge", + }.to_string(); + + batches.push(BatchPayload { + id: id.to_string(), + status, + progress_pct, + elapsed, + total_items: progress.total, + completed_items: progress.completed, + failed_items: progress.failed, + output_mode, + }); + } + + // Running first, then queued, then completed/failed; cap at 10 + batches.sort_by(|a, b| { + let order = |s: &str| match s { "running" => 0, "queued" => 1, _ => 2 }; + order(&a.status).cmp(&order(&b.status)) + }); + batches.truncate(10); + batches } /// Total system RAM in MB (cached on first call). @@ -566,7 +809,10 @@ pub fn spawn_console_sampler(state: crate::state::AppState, started_at: Instant) // Detect cgroup limits once at startup (Docker/Kubernetes) let cgroup = CgroupLimits::detect(); - let num_host_cpus = sys.cpus().len(); + let num_host_cpus = sys.cpus().len().max(1); + + // For cgroup v2 CPU tracking: keep running total so we can delta each tick. + let mut prev_cpu_usec: Option = crate::cgroup::read_cpu_usage_usec(); let mut interval = tokio::time::interval(Duration::from_secs(5)); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); @@ -574,31 +820,54 @@ pub fn spawn_console_sampler(state: crate::state::AppState, started_at: Instant) loop { interval.tick().await; - // ── CPU + memory via sysinfo (cross-platform) ────────────────── - sys.refresh_cpu_usage(); - let host_cpu_pct = sys.global_cpu_usage() as f64; - sys.refresh_memory(); - - // Use cgroup-aware values when running in a container - let cpu_pct = if cgroup.is_container { - cgroup.cpu_pct_relative_to_limit(host_cpu_pct, num_host_cpus) - } else { - host_cpu_pct + // ── CPU ──────────────────────────────────────────────────────── + // Prefer cgroup v2 cpu.stat delta (accurate per-container %). + // Fall back to sysinfo global_cpu_usage when not in a container. + let cpu_pct = { + let new_usec = crate::cgroup::read_cpu_usage_usec(); + match (prev_cpu_usec, new_usec) { + (Some(prev), Some(curr)) if curr >= prev => { + let delta_usec = (curr - prev) as f64; + // delta_usec / 5_000_000_000 = fraction of 1 CPU used over 5 s → multiply + // by 100 to get %. When a CPU limit is set, further normalise by quota so + // 100% = "using all allocated cores". Without a limit show raw % of 1 core + // (matches what `docker stats` displays). + let pct = delta_usec / 5_000_000.0 * 100.0; + let pct = match cgroup.cpu_limit_cores { + Some(limit) if limit > 0.0 => (pct / limit).min(100.0), + _ => pct, // unlimited: show % of 1 CPU (same as docker stats) + }; + prev_cpu_usec = new_usec; + pct + } + _ => { + // Not on cgroup v2 — fall back to sysinfo + prev_cpu_usec = new_usec; + sys.refresh_cpu_usage(); + let host_pct = sys.global_cpu_usage() as f64; + if cgroup.is_container { + cgroup.cpu_pct_relative_to_limit(host_pct, num_host_cpus).min(100.0) + } else { + host_pct + } + } + } }; - let memory_mb = cgroup.memory_used_mb + // ── Memory ───────────────────────────────────────────────────── + // Re-read cgroup memory.current each tick (stale startup snapshot is wrong). + sys.refresh_memory(); + let memory_mb = crate::cgroup::read_memory_used_mb() .unwrap_or_else(|| sys.used_memory() as f64 / 1024.0 / 1024.0); - // Update Prometheus gauge with cgroup-aware memory if available - let memory_bytes = cgroup.memory_used_mb - .map(|m| (m * 1024.0 * 1024.0) as u64) - .unwrap_or_else(|| sys.used_memory()); - state.metrics.process_resident_memory.set(memory_bytes as f64); + // Update Prometheus gauge + state.metrics.process_resident_memory.set(memory_mb * 1024.0 * 1024.0); - // ── Engine health: probe directly, don't read stale gauge ────── + // ── Engine health: use atomic is_alive() — never blocks on the + // engine's internal mutex, so heavy batch load can't stall the sampler. #[cfg(feature = "chromium")] let chromium_up = match state.chromium.as_ref() { - Some(be) => be.healthy().await, + Some(be) => be.is_alive(), None => false, }; #[cfg(not(feature = "chromium"))] @@ -642,25 +911,37 @@ pub fn spawn_console_sampler(state: crate::state::AppState, started_at: Instant) .map(|f| f.get_metric().iter().map(|m| m.get_counter().get_value()).sum()) .unwrap_or(0.0); - let error_total: f64 = families.iter() + // 5xx = server errors; 429 = rate limits; 4xx (excl. 429) = client mistakes, ignored + let server_error_total: f64 = families.iter() .find(|f| f.get_name() == "pdfbro_http_requests_total") .map(|f| f.get_metric().iter() .filter(|m| m.get_label().iter() - .any(|l| l.get_name() == "status" - && (l.get_value().starts_with('5') || l.get_value().starts_with('4')))) + .any(|l| l.get_name() == "status" && l.get_value().starts_with('5'))) .map(|m| m.get_counter().get_value()).sum()) .unwrap_or(0.0); - let (rps, error_pct) = { + let rate_limit_total: f64 = families.iter() + .find(|f| f.get_name() == "pdfbro_http_requests_total") + .map(|f| f.get_metric().iter() + .filter(|m| m.get_label().iter() + .any(|l| l.get_name() == "status" && l.get_value() == "429")) + .map(|m| m.get_counter().get_value()).sum()) + .unwrap_or(0.0); + + let (rps, server_error_pct, rate_limit_pct) = { let mut prev_http = state.console.prev_http_total.lock().await; let mut prev_err = state.console.prev_error_total.lock().await; - let http_delta = (http_total - *prev_http).max(0.0); - let error_delta = (error_total - *prev_err).max(0.0); - let rps = http_delta / 5.0; - let epct = if http_delta > 0.0 { (error_delta / http_delta) * 100.0 } else { 0.0 }; + let mut prev_rl = state.console.prev_rate_limit_total.lock().await; + let http_delta = (http_total - *prev_http).max(0.0); + let server_error_delta = (server_error_total - *prev_err).max(0.0); + let rate_limit_delta = (rate_limit_total - *prev_rl).max(0.0); + let rps = http_delta / 5.0; + let sepct = if http_delta > 0.0 { (server_error_delta / http_delta) * 100.0 } else { 0.0 }; + let rlpct = if http_delta > 0.0 { (rate_limit_delta / http_delta) * 100.0 } else { 0.0 }; *prev_http = http_total; - *prev_err = error_total; - (rps, epct) + *prev_err = server_error_total; + *prev_rl = rate_limit_total; + (rps, sepct, rlpct) }; // ── p95 from histogram (global, across all routes) ───────────── @@ -700,6 +981,26 @@ pub fn spawn_console_sampler(state: crate::state::AppState, started_at: Instant) }) .unwrap_or(0.0); + // ── p50 + p55 from HTTP duration histogram ───────────────────── + let p50_ms = global_histogram_pct(&families, "pdfbro_http_request_duration_seconds", 0.50); + let p55_ms = global_histogram_pct(&families, "pdfbro_http_request_duration_seconds", 0.55); + + // ── Per-engine conversion RPS ────────────────────────────────── + let chromium_total = engine_conv_total(&families, "chromium"); + let libreoffice_total = engine_conv_total(&families, "libreoffice"); + let (chromium_conv_rps, libreoffice_conv_rps) = { + let mut prev_ch = state.console.prev_chromium_conv_total.lock().await; + let mut prev_lo = state.console.prev_libreoffice_conv_total.lock().await; + let ch_rps = (chromium_total - *prev_ch).max(0.0) / 5.0; + let lo_rps = (libreoffice_total - *prev_lo).max(0.0) / 5.0; + *prev_ch = chromium_total; + *prev_lo = libreoffice_total; + (ch_rps, lo_rps) + }; + + // ── Queue wait p95 ───────────────────────────────────────────── + let queue_wait_p95_ms = global_histogram_pct(&families, "pdfbro_queue_wait_seconds", 0.95); + // ── Concurrency ──────────────────────────────────────────────── let _concurrency_max = state.config.concurrency as u32; let concurrency_active = state.console.active_requests.load(Ordering::SeqCst); @@ -709,12 +1010,18 @@ pub fn spawn_console_sampler(state: crate::state::AppState, started_at: Instant) ts: std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH).unwrap_or_default().as_secs(), rps, + p50_ms, + p55_ms, p95_ms, - error_pct, + server_error_pct, + rate_limit_pct, queue_size: state.metrics.queue_size.get() as u32, concurrency_active, cpu_pct, memory_mb, + chromium_conv_rps, + libreoffice_conv_rps, + queue_wait_p95_ms, }; state.console.history.lock().await.push(sample); diff --git a/crates/server/src/main.rs b/crates/server/src/main.rs index 2e04e44..ba2f88d 100644 --- a/crates/server/src/main.rs +++ b/crates/server/src/main.rs @@ -12,6 +12,7 @@ use server::logging::init_logging; use server::routes::batch_state::{BatchStateManager, spawn_cleanup_task}; use server::webhook::{WebhookClient, WebhookEngineContext, WebhookQueue, start_workers}; use server::{AppState, ServerArgs, ServerConfig, banner, build_router, shutdown}; +use server::banner::EngineStatus; use server::supervised_engine::{SupervisedChromiumEngine, SupervisedLibreOfficeEngine}; use tracing::warn; @@ -74,28 +75,62 @@ async fn serve(args: ServerArgs) -> anyhow::Result<()> { #[cfg(not(feature = "libreoffice"))] let _libreoffice: Option<()> = None; - // Start engines in the background so the HTTP server comes up immediately - // and Fly.io / health checks can connect before engines finish warming up. + // Spawn eager engine starts in parallel, then await both before printing + // the banner so the displayed status is accurate. Lazy engines are skipped + // here and will start on the first request instead. #[cfg(feature = "chromium")] - if !config.chromium_lazy_start { - let chromium_bg = chromium.clone(); - tokio::spawn(async move { - if let Err(e) = chromium_bg.start().await { + let chromium_handle = if config.chromium_lazy_start { + None + } else { + let bg = chromium.clone(); + Some(tokio::spawn(async move { bg.start().await })) + }; + + #[cfg(feature = "libreoffice")] + let lo_handle = if config.libreoffice_lazy_start { + None + } else { + let bg = libreoffice.clone(); + Some(tokio::spawn(async move { bg.start().await })) + }; + + #[cfg(feature = "chromium")] + let chromium_status = match chromium_handle { + None => EngineStatus::Lazy, + Some(h) => match h.await { + Ok(Ok(_)) => EngineStatus::Ready, + Ok(Err(e)) => { warn!(error = %e, "Failed to start Chromium engine at startup"); + EngineStatus::Unavailable } - }); - } + Err(e) => { + warn!(error = %e, "Chromium start task panicked"); + EngineStatus::Unavailable + } + }, + }; + #[cfg(not(feature = "chromium"))] + let chromium_status = EngineStatus::Disabled; + #[cfg(feature = "libreoffice")] - if !config.libreoffice_lazy_start { - let lo_bg = libreoffice.clone(); - tokio::spawn(async move { - if let Err(e) = lo_bg.start().await { + let libreoffice_status = match lo_handle { + None => EngineStatus::Lazy, + Some(h) => match h.await { + Ok(Ok(_)) => EngineStatus::Ready, + Ok(Err(e)) => { warn!(error = %e, "Failed to start LibreOffice engine at startup"); + EngineStatus::Unavailable } - }); - } + Err(e) => { + warn!(error = %e, "LibreOffice start task panicked"); + EngineStatus::Unavailable + } + }, + }; + #[cfg(not(feature = "libreoffice"))] + let libreoffice_status = EngineStatus::Disabled; - banner::print(&config, false, false); + banner::print(&config, chromium_status, libreoffice_status); #[cfg(feature = "chromium")] let backend = ChromiumBackend::new(chromium.clone()); diff --git a/crates/server/src/routes/batch.rs b/crates/server/src/routes/batch.rs index 871a41a..0110163 100644 --- a/crates/server/src/routes/batch.rs +++ b/crates/server/src/routes/batch.rs @@ -67,6 +67,21 @@ pub async fn batch_submit( let batch_state = batch_manager.create_batch(request.clone()).await; let batch_id = batch_state.id.clone(); + // Persist uploaded files so the background worker can read them. + // URL-only batches produce no files; this is a no-op for those. + if !form.files.is_empty() { + let input_dir = batch_manager.batch_input_dir(&batch_id).await; + tokio::fs::create_dir_all(&input_dir).await.map_err(|e| { + ApiError::Internal(format!("failed to create batch input dir: {e}")) + })?; + for uploaded in &form.files { + let dest = input_dir.join(&uploaded.filename); + tokio::fs::copy(&uploaded.path, &dest).await.map_err(|e| { + ApiError::Internal(format!("failed to persist uploaded file: {e}")) + })?; + } + } + // Start background processing let state_manager = batch_manager.clone(); let app_state = state.clone(); diff --git a/crates/server/src/routes/batch_state.rs b/crates/server/src/routes/batch_state.rs index 91e25ac..d9e5beb 100644 --- a/crates/server/src/routes/batch_state.rs +++ b/crates/server/src/routes/batch_state.rs @@ -229,6 +229,7 @@ impl BatchStateManager { // Ensure storage directory exists fs::create_dir_all(&storage_path).await?; fs::create_dir_all(storage_path.join("outputs")).await?; + fs::create_dir_all(storage_path.join("inputs")).await?; let retention = Duration::from_secs(retention_minutes * 60); @@ -271,11 +272,12 @@ impl BatchStateManager { /// Remove a batch. pub async fn remove_batch(&self, id: &BatchId) { let mut inner = self.inner.write().await; + let input_dir = inner.storage_path.join("inputs").join(id.to_string()); if let Some(state) = inner.batches.remove(id) { - // Clean up output file if present if let Some(path) = state.output_path { let _ = fs::remove_file(&path).await; } + let _ = fs::remove_dir_all(&input_dir).await; info!(batch_id = %id, "removed batch"); } } @@ -295,6 +297,12 @@ impl BatchStateManager { .join(format!("{}.{}", id, extension)) } + /// Directory where uploaded input files for a batch are stored. + pub async fn batch_input_dir(&self, id: &BatchId) -> PathBuf { + let inner = self.inner.read().await; + inner.storage_path.join("inputs").join(id.to_string()) + } + /// Run cleanup of expired batches. pub async fn cleanup_expired(&self) { let expired_ids: Vec = { diff --git a/crates/server/src/supervised_engine.rs b/crates/server/src/supervised_engine.rs index 7afa678..8ae3185 100644 --- a/crates/server/src/supervised_engine.rs +++ b/crates/server/src/supervised_engine.rs @@ -7,7 +7,7 @@ use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; -use std::time::{Duration, Instant}; +use std::time::Duration; use engine::{BrowserConfig, ChromiumEngine, LibreOfficeConfig, LibreOfficeEngine}; use engine::{EngineError, EngineResult}; @@ -86,7 +86,10 @@ impl SupervisedChromiumEngine { /// Update last activity timestamp. fn update_activity(&self) { - let now = Instant::now().elapsed().as_secs(); + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); self.inner.last_activity.store(now, Ordering::SeqCst); } @@ -98,13 +101,16 @@ impl SupervisedChromiumEngine { let mut ticker = interval(Duration::from_secs(30)); // Check every 30s loop { ticker.tick().await; - + if !inner.is_running.load(Ordering::SeqCst) { continue; // Engine not running, nothing to do } - + let last = inner.last_activity.load(Ordering::SeqCst); - let now = Instant::now().elapsed().as_secs(); + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); let idle_duration = Duration::from_secs(now.saturating_sub(last)); if idle_duration >= timeout { @@ -220,6 +226,17 @@ impl SupervisedChromiumEngine { self.inner.is_running.load(Ordering::SeqCst) } + /// Seconds since this engine last handled a request. Returns 0 if never used. + pub fn idle_secs(&self) -> u64 { + let last = self.inner.last_activity.load(Ordering::SeqCst); + if last == 0 { return 0; } + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + now.saturating_sub(last) + } + /// Shutdown the engine. pub async fn shutdown(&self) { let mut guard = self.inner.engine.lock().await; @@ -286,7 +303,10 @@ impl SupervisedLibreOfficeEngine { /// Update last activity timestamp. fn update_activity(&self) { - let now = Instant::now().elapsed().as_secs(); + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); self.inner.last_activity.store(now, Ordering::SeqCst); } @@ -298,13 +318,16 @@ impl SupervisedLibreOfficeEngine { let mut ticker = interval(Duration::from_secs(30)); loop { ticker.tick().await; - + if !inner.is_running.load(Ordering::SeqCst) { continue; } - + let last = inner.last_activity.load(Ordering::SeqCst); - let now = Instant::now().elapsed().as_secs(); + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); let idle_duration = Duration::from_secs(now.saturating_sub(last)); if idle_duration >= timeout { @@ -325,7 +348,6 @@ impl SupervisedLibreOfficeEngine { } } - /// Check if the engine is healthy. /// Check if the engine is healthy. /// /// Only probes an already-running engine — does NOT trigger lazy start. @@ -359,6 +381,17 @@ impl SupervisedLibreOfficeEngine { self.inner.is_running.load(Ordering::SeqCst) } + /// Seconds since this engine last handled a request. Returns 0 if never used. + pub fn idle_secs(&self) -> u64 { + let last = self.inner.last_activity.load(Ordering::SeqCst); + if last == 0 { return 0; } + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + now.saturating_sub(last) + } + /// Convert many files to PDFs in parallel. pub async fn convert_many( &self, diff --git a/crates/server/tests/bdd/testdata/teststore/foo.zip b/crates/server/tests/bdd/testdata/teststore/foo.zip index ef079f0..ccf7edb 100644 Binary files a/crates/server/tests/bdd/testdata/teststore/foo.zip and b/crates/server/tests/bdd/testdata/teststore/foo.zip differ diff --git a/crates/server/tests/bdd/testdata/teststore/result.zip b/crates/server/tests/bdd/testdata/teststore/result.zip index 729feb3..ab47512 100644 Binary files a/crates/server/tests/bdd/testdata/teststore/result.zip and b/crates/server/tests/bdd/testdata/teststore/result.zip differ diff --git a/docker-compose.yml b/docker-compose.yml index 85d353b..ec767c2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -13,6 +13,11 @@ services: volumes: # Mount source code for hot reload - ./crates:/app/crates:cached + - ./bench:/app/bench:cached + # Mount full UI source so bun can rebuild it inside the container. + # A named volume overlays node_modules so Linux binaries don't bleed onto the host. + - ./ui:/app/ui:cached + - ui-node-modules:/app/ui/node_modules - ./Cargo.toml:/app/Cargo.toml:cached - ./Cargo.lock:/app/Cargo.lock:cached # Cache build artifacts for faster rebuilds @@ -35,13 +40,19 @@ services: - RUST_BACKTRACE=1 volumes: - ./crates:/app/crates:cached + - ./bench:/app/bench:cached + - ./ui:/app/ui:cached + - ui-node-modules:/app/ui/node_modules - ./Cargo.toml:/app/Cargo.toml:cached - ./Cargo.lock:/app/Cargo.lock:cached - cargo-target:/app/target - cargo-registry:/usr/local/cargo/registry tty: true stdin_open: true - command: ["cargo", "watch", "--poll", "-x", "run -p server -- serve --port 3000 --host 0.0.0.0 --no-default-features --features chromium"] + command: + - sh + - -c + - "cd /app/ui && bun install && bun run build && bun run build:watch & cd /app && exec cargo watch --poll -w crates/ -w Cargo.toml -w Cargo.lock -x 'run -p server -- serve --port 3000 --host 0.0.0.0 --no-default-features --features chromium'" profiles: [dev-chromium] pdfbro: @@ -88,3 +99,4 @@ services: volumes: cargo-target: cargo-registry: + ui-node-modules: diff --git a/scripts/load_test.sh b/scripts/load_test.sh new file mode 100755 index 0000000..12d366b --- /dev/null +++ b/scripts/load_test.sh @@ -0,0 +1,112 @@ +#!/usr/bin/env bash +# load_test.sh — drives the pdfbro server to exercise all UI panels +# +# Usage: +# ./scripts/load_test.sh [BASE_URL] +# +# Defaults to http://localhost:3000 + +set -euo pipefail + +BASE="${1:-http://localhost:3000}" +ROUNDS="${2:-3}" # how many full waves to run + +GREEN='\033[0;32m'; YELLOW='\033[1;33m'; RED='\033[0;31m'; NC='\033[0m' +log() { echo -e "${GREEN}[load]${NC} $*"; } +warn() { echo -e "${YELLOW}[warn]${NC} $*"; } + +# ── helpers ────────────────────────────────────────────────────────────────── + +submit_batch() { + local label="$1" + curl -s -X POST "$BASE/forms/batch/submit" \ + -F "batch.json=$(cat <<'JSON' +{ + "outputMode":"zip", + "items":[ + {"file":"https://example.com", "type":"chromiumUrl"}, + {"file":"https://httpbin.org/html", "type":"chromiumUrl"}, + {"file":"https://wikipedia.org", "type":"chromiumUrl"}, + {"file":"https://news.ycombinator.com", "type":"chromiumUrl"}, + {"file":"https://github.com/trending", "type":"chromiumUrl"} + ] +} +JSON +)" | jq -r '.batchId // "ERROR: \(.error)"' && log "batch submitted [$label]" +} + +single_url() { + local url="$1" + curl -s -o /dev/null -w "%{http_code}" -X POST "$BASE/forms/chromium/convert/url" \ + -F "url=$url" \ + -F "waitDelay=0" | read -r code + log "single url [$url] → $code" +} + +# Deliberate 4xx — validation / field errors (should NOT count as server errors) +bad_requests() { + warn "sending deliberate 4xx (should not inflate 5xx / 429 panels)" + + # 400: missing required field + curl -s -o /dev/null -w "%{http_code}" -X POST "$BASE/forms/chromium/convert/url" | grep -q "400\|422" \ + && warn " missing-url → 4xx ✓" || true + + # 422: bad batch JSON + curl -s -o /dev/null -w "%{http_code}" -X POST "$BASE/forms/batch/submit" \ + -F 'batch.json={"outputMode":"zip","items":[]}' | grep -q "4" \ + && warn " empty-items → 4xx ✓" || true + + # 404: unknown route + curl -s -o /dev/null "$BASE/does/not/exist" && warn " unknown-route → 404 ✓" || true +} + +# ── main ───────────────────────────────────────────────────────────────────── + +log "target: $BASE rounds: $ROUNDS" +log "checking server health..." +curl -sf "$BASE/health" > /dev/null || { echo "server not reachable at $BASE"; exit 1; } +log "server is up" + +for round in $(seq 1 "$ROUNDS"); do + log "═══ round $round / $ROUNDS ═══" + + # 1. Fire 5 batches in parallel (drives batch panel + engine conv + queue) + log "launching 5 batch jobs..." + for i in $(seq 1 5); do + submit_batch "r${round}-b${i}" & + done + wait + log "all 5 batches queued" + + # 2. Fire 10 single URL-to-PDF requests concurrently (drives concurrency + RPS) + log "launching 10 concurrent single-URL renders..." + for url in \ + "https://example.com" \ + "https://httpbin.org/html" \ + "https://wikipedia.org" \ + "https://github.com" \ + "https://news.ycombinator.com" \ + "https://example.com" \ + "https://httpbin.org/html" \ + "https://wikipedia.org" \ + "https://github.com" \ + "https://news.ycombinator.com" + do + curl -s -o /dev/null -X POST "$BASE/forms/chromium/convert/url" \ + -F "url=$url" & + done + wait + log "single-URL wave complete" + + # 3. Deliberate bad requests (4xx — must not affect 5xx or 429 panels) + bad_requests + + # 4. Health check (drives health route stats) + curl -sf "$BASE/health" > /dev/null + curl -sf "$BASE/version" > /dev/null 2>&1 || true + + log "round $round done — waiting 8s before next wave" + sleep 8 +done + +log "load test complete" diff --git a/ui/package.json b/ui/package.json index bef72e6..26289e5 100644 --- a/ui/package.json +++ b/ui/package.json @@ -6,6 +6,7 @@ "scripts": { "dev": "vite dev", "build": "vite build", + "build:watch": "vite build --watch", "preview": "vite preview", "prepare": "svelte-kit sync || echo ''", "check": "svelte-kit sync && svelte-check --tsconfig ./tsconfig.json", diff --git a/ui/src/lib/components/ActivityStrip.svelte b/ui/src/lib/components/ActivityStrip.svelte deleted file mode 100644 index c4e76bb..0000000 --- a/ui/src/lib/components/ActivityStrip.svelte +++ /dev/null @@ -1,53 +0,0 @@ - - - -
- -
- {#each requests as r, i} -
- {r.time} - {r.method} - {r.route} - {r.status} - {r.duration_ms}ms -
- {/each} - {#if requests.length === 0} -
No requests yet
- {/if} -
-
- - -
- {#each errors as e, i} -
- {e.time} - {e.message} - {e.route} -
- {/each} - {#if errors.length === 0} -
No errors
- {/if} -
-
-
diff --git a/ui/src/lib/components/CpuChart.svelte b/ui/src/lib/components/CpuChart.svelte new file mode 100644 index 0000000..39bb12f --- /dev/null +++ b/ui/src/lib/components/CpuChart.svelte @@ -0,0 +1,29 @@ + + + + +
+
+ usage + {last.toFixed(1)}% +
+ v.toFixed(1) + '%'} + {t} + /> +
+
diff --git a/ui/src/lib/components/EngineConvChart.svelte b/ui/src/lib/components/EngineConvChart.svelte new file mode 100644 index 0000000..4e162b4 --- /dev/null +++ b/ui/src/lib/components/EngineConvChart.svelte @@ -0,0 +1,33 @@ + + + + +
+
+
+ Chromium {lastCh.toFixed(2)} + LibreOffice {lastLo.toFixed(2)} +
+
+ +
+
diff --git a/ui/src/lib/components/MemChart.svelte b/ui/src/lib/components/MemChart.svelte new file mode 100644 index 0000000..fd4e22d --- /dev/null +++ b/ui/src/lib/components/MemChart.svelte @@ -0,0 +1,34 @@ + + + + +
+
+ usage + + {(last / 1024).toFixed(2)} GB{resources.memory_max_mb > 0 ? ` / ${(resources.memory_max_mb / 1024).toFixed(0)} GB` : ''} + +
+ 0 ? pct : undefined} + refColor={t.warn} + label="MB" + formatValue={(v) => (v / 1024).toFixed(2) + ' GB'} + {t} + /> +
+
diff --git a/ui/src/lib/components/QueueWaitChart.svelte b/ui/src/lib/components/QueueWaitChart.svelte new file mode 100644 index 0000000..ae66c95 --- /dev/null +++ b/ui/src/lib/components/QueueWaitChart.svelte @@ -0,0 +1,31 @@ + + + + +
+
+ wait p95 + + {lastWait >= 1000 ? `${(lastWait / 1000).toFixed(1)}s` : `${lastWait.toFixed(0)}ms`} + +
+ v >= 1000 ? `${(v/1000).toFixed(1)}s` : `${v.toFixed(0)}ms`} + {t} + /> +
+
diff --git a/ui/src/lib/components/RoutesTable.svelte b/ui/src/lib/components/RoutesTable.svelte index 54bcb3e..4064ee5 100644 --- a/ui/src/lib/components/RoutesTable.svelte +++ b/ui/src/lib/components/RoutesTable.svelte @@ -15,34 +15,36 @@ let sorted = $derived([...routes].sort((a, b) => b.p95_ms - a.p95_ms)); - + {#if routes.length === 0}
No route data yet
{:else} - - - - {#each ['Route','Method','RPS','p50','p95','p99','Err %','In-flight','Load'] as h, i} - - {/each} - - - - {#each sorted as r} - {@const p95tone = r.p95_ms > 10000 ? t.err : r.p95_ms > 5000 ? t.warn : t.ink} - - - - - - - - - - +
+
{h}
{r.path}{r.method}{r.rps.toFixed(1)}{fmtMs(r.p50_ms)}{fmtMs(r.p95_ms)}{fmtMs(r.p99_ms)}{r.error_pct.toFixed(2)}{r.in_flight}
+ + + {#each ['Route','Method','RPS','p50','p95','p99','Err %','In-flight','Load'] as h, i} + + {/each} - {/each} - -
{h}
+ + + {#each sorted as r} + {@const p95tone = r.p95_ms > 10000 ? t.err : r.p95_ms > 5000 ? t.warn : t.ink} + + {r.path} + {r.method} + {r.rps.toFixed(1)} + {fmtMs(r.p50_ms)} + {fmtMs(r.p95_ms)} + {fmtMs(r.p99_ms)} + {r.error_pct.toFixed(2)} + {r.in_flight} + + + {/each} + + + {/if}
diff --git a/ui/src/lib/components/StackedBarSeries.svelte b/ui/src/lib/components/StackedBarSeries.svelte new file mode 100644 index 0000000..3f32fe2 --- /dev/null +++ b/ui/src/lib/components/StackedBarSeries.svelte @@ -0,0 +1,111 @@ + + + +
+ + + {#each Array.from({ length: len }, (_, i) => i) as i} + {@const w = 100 / len} + {@const a = seriesA[i] ?? 0} + {@const b = seriesB[i] ?? 0} + {@const total = a + b} + {@const totalH = (total / maxVal) * height} + {@const aH = total > 0 ? (a / total) * totalH : 0} + {@const bH = totalH - aH} + {@const x = i * w} + + + + {#if bH > 0} + + {/if} + {/each} + {#if hoveredIdx !== null} + {@const w = 100 / len} + {@const cx = (hoveredIdx + 0.5) * w} + + {/if} + + {#if hoveredIdx !== null} + {@const w = 100 / len} + {@const pctLeft = (hoveredIdx + 0.5) * w} + {@const flipLeft = pctLeft > 70} + {@const a = (seriesA[hoveredIdx] ?? 0).toFixed(2)} + {@const b = (seriesB[hoveredIdx] ?? 0).toFixed(2)} +
+ {labelA} {a} +  ·  + {labelB} {b} +
+ {/if} +
diff --git a/ui/src/lib/components/Ticker.svelte b/ui/src/lib/components/Ticker.svelte index fafe514..6109f1f 100644 --- a/ui/src/lib/components/Ticker.svelte +++ b/ui/src/lib/components/Ticker.svelte @@ -16,14 +16,15 @@ } let items = $derived([ - { label: 'RPS', value: ticker.rps.toFixed(1), tone: 'ink' as const }, - { label: 'p95', value: fmtMs(ticker.p95_ms), tone: (ticker.p95_ms > 2000 ? 'err' : ticker.p95_ms > 1500 ? 'warn' : 'ok') as 'ok' | 'warn' | 'err' }, - { label: 'Errors', value: `${ticker.error_pct.toFixed(2)}%`, tone: (ticker.error_pct > 1 ? 'err' : ticker.error_pct > 0.5 ? 'warn' : 'ok') as 'ok' | 'warn' | 'err' }, - { label: 'Conc.', value: `${ticker.concurrency_active} / ${ticker.concurrency_max}`, tone: 'ink' as const }, - { label: 'Chromium', value: ticker.chromium_status.toUpperCase(), tone: (ticker.chromium_status === 'up' ? 'ok' : ticker.chromium_status === 'n/a' ? 'ink' : 'err') as 'ok' | 'ink' | 'err' }, - { label: 'LibreOff', value: ticker.libreoffice_status.toUpperCase(), tone: (ticker.libreoffice_status === 'up' ? 'ok' : ticker.libreoffice_status === 'n/a' ? 'ink' : 'err') as 'ok' | 'ink' | 'err' }, - { label: 'Queue', value: String(Math.round(ticker.queue_size)), tone: 'ink' as const }, - { label: 'Uptime', value: fmtUptime(ticker.uptime_seconds), tone: 'ok' as const }, + { label: 'RPS', value: ticker.rps.toFixed(1), tone: 'ink' as const }, + { label: 'P50', value: fmtMs(ticker.p50_ms), tone: (ticker.p50_ms > 1000 ? 'err' : ticker.p50_ms > 500 ? 'warn' : 'ok') as 'ok' | 'warn' | 'err' }, + { label: 'P55', value: fmtMs(ticker.p55_ms), tone: (ticker.p55_ms > 1000 ? 'err' : ticker.p55_ms > 500 ? 'warn' : 'ok') as 'ok' | 'warn' | 'err' }, + { label: 'P95', value: fmtMs(ticker.p95_ms), tone: (ticker.p95_ms > 2000 ? 'err' : ticker.p95_ms > 1500 ? 'warn' : 'ok') as 'ok' | 'warn' | 'err' }, + { label: '5XX', value: `${ticker.server_error_pct.toFixed(2)}%`, tone: (ticker.server_error_pct > 1 ? 'err' : ticker.server_error_pct > 0 ? 'warn' : 'ok') as 'ok' | 'warn' | 'err' }, + { label: '429', value: `${ticker.rate_limit_pct.toFixed(2)}%`, tone: (ticker.rate_limit_pct > 5 ? 'err' : ticker.rate_limit_pct > 0 ? 'warn' : 'ok') as 'ok' | 'warn' | 'err' }, + { label: 'Conc.', value: `${ticker.concurrency_active} / ${ticker.concurrency_max}`, tone: 'ink' as const }, + { label: 'Queue', value: String(Math.round(ticker.queue_size)), tone: 'ink' as const }, + { label: 'Uptime', value: fmtUptime(ticker.uptime_seconds), tone: 'ok' as const }, ]); diff --git a/ui/src/lib/components/side-rail/Batches.svelte b/ui/src/lib/components/side-rail/Batches.svelte index d533fd7..0a3fc8f 100644 --- a/ui/src/lib/components/side-rail/Batches.svelte +++ b/ui/src/lib/components/side-rail/Batches.svelte @@ -6,31 +6,65 @@ import Pill from '../shared/Pill.svelte'; import SlimBar from '../shared/SlimBar.svelte'; - let { batches, t, D }: { batches: BatchPayload[]; t: Theme; D: { pad: number; fz: number; rowPy: number } } = $props(); + let { batches, t, D, style = '' }: { batches: BatchPayload[]; t: Theme; D: { pad: number; fz: number; rowPy: number }; style?: string } = $props(); function batchTone(status: string): 'ok' | 'warn' | 'err' | 'accent' | 'ink' { - if (status === 'failed') return 'err'; + if (status === 'failed') return 'err'; if (status === 'completed') return 'ok'; - if (status === 'queued') return 'ink'; + if (status === 'queued') return 'ink'; return 'accent'; } + + // Only show active jobs; completed/failed drop off automatically. + let active = $derived(batches.filter(b => b.status === 'running' || b.status === 'queued')); + let queuedCnt = $derived(batches.filter(b => b.status === 'queued').length); + let runningCnt = $derived(batches.filter(b => b.status === 'running').length); + let doneCnt = $derived(batches.filter(b => b.status === 'completed' || b.status === 'failed').length); - - {#if batches.length === 0} -
No recent batches
+ + {#if active.length === 0} + +
+ + + + + + + +
+
Queue is empty
+ {#if doneCnt > 0} +
{doneCnt} job{doneCnt > 1 ? 's' : ''} completed
+ {:else} +
POST /batch to submit a job
+ {/if} +
+
{:else} - - - {#each batches as b, i} - - - - - - - {/each} - -
{b.id}{b.status.slice(0, 4)}{b.elapsed}
+
+ {#each active as b, i} +
+
+
+ {b.status.slice(0, 4).toUpperCase()} + {b.id.slice(0, 16)}… +
+
+ {b.output_mode.toUpperCase()} + {b.elapsed} +
+
+
+
+ + {b.completed_items}/{b.total_items} + {#if b.failed_items > 0} · {b.failed_items} err{/if} + +
+
+ {/each} +
{/if}
diff --git a/ui/src/lib/components/side-rail/Concurrency.svelte b/ui/src/lib/components/side-rail/Concurrency.svelte index e15dbe6..ab36b58 100644 --- a/ui/src/lib/components/side-rail/Concurrency.svelte +++ b/ui/src/lib/components/side-rail/Concurrency.svelte @@ -7,8 +7,9 @@ let { conc, t, D }: { conc: ConcurrencyPayload; t: Theme; D: { pad: number; fz: number } } = $props(); - let tone = $derived((conc.active >= conc.crit_threshold ? 'err' : conc.active >= conc.warn_threshold ? 'warn' : 'ok') as 'ok' | 'warn' | 'err'); + let tone = $derived((conc.active >= conc.crit_threshold ? 'warn' : conc.active >= conc.warn_threshold ? 'warn' : 'ok') as 'ok' | 'warn' | 'err'); let pct = $derived(Math.round((conc.active / Math.max(1, conc.max)) * 100)); + let statusLabel = $derived(conc.active > conc.max ? 'BUSY' : tone === 'warn' ? 'WARN' : 'OK'); let hoveredSlot = $state(null); @@ -34,7 +35,7 @@
{conc.active} / {conc.max}
- {pct}% · {tone} + {Math.min(pct, 100)}% · {statusLabel} @@ -74,5 +75,20 @@
0warn {conc.warn_threshold}crit {conc.crit_threshold}{conc.max}
+ + +
+
+
wait p95
+
+ {conc.queue_wait_p95_ms >= 1000 ? `${(conc.queue_wait_p95_ms / 1000).toFixed(1)}s` : `${conc.queue_wait_p95_ms.toFixed(0)}ms`} +
+
time before slot acquired
+
+
+
processing
+
{conc.queue_processing} job{conc.queue_processing !== 1 ? 's' : ''}
+
+
diff --git a/ui/src/lib/components/side-rail/Engines.svelte b/ui/src/lib/components/side-rail/Engines.svelte index 5177ac3..7746325 100644 --- a/ui/src/lib/components/side-rail/Engines.svelte +++ b/ui/src/lib/components/side-rail/Engines.svelte @@ -6,7 +6,12 @@ import Pill from '../shared/Pill.svelte'; import BarSeries from '../shared/BarSeries.svelte'; - let { engines, t, D }: { engines: EnginePayload[]; t: Theme; D: { fz: number; pad: number } } = $props(); + let { engines, convRps, t, D }: { + engines: EnginePayload[]; + convRps: Record; + t: Theme; + D: { fz: number; pad: number }; + } = $props(); function engineTone(e: EnginePayload): 'ok' | 'warn' | 'err' | 'ink' { if (e.status === 'n/a') return 'ink'; @@ -14,13 +19,22 @@ if (e.restarts > 5) return 'warn'; return 'ok'; } - function engineColor(e: EnginePayload, t: Theme): string { + function engineColor(e: EnginePayload): string { const tone = engineTone(e); if (tone === 'ok') return t.ok; if (tone === 'warn') return t.warn; if (tone === 'err') return t.err; return t.muted; } + function fmtIdle(s: number): string { + if (s === 0) return 'active'; + if (s < 60) return `idle ${s}s`; + return `idle ${Math.floor(s / 60)}m`; + } + function fmtBytes(mb: number): string { + if (mb >= 1024) return `${(mb / 1024).toFixed(1)}GB`; + return `${mb.toFixed(1)}MB`; + } @@ -31,6 +45,9 @@
{e.name} {e.status.toUpperCase()} + {#if (convRps[e.name.toLowerCase()] ?? 0) > 0} + {(convRps[e.name.toLowerCase()] ?? 0).toFixed(2)} conv/s + {/if}
{e.restarts} activation{e.restarts !== 1 ? 's' : ''} · {e.mode} @@ -39,13 +56,31 @@ {#if e.mini_series.length > 0} (v * 100).toFixed(0) + '%'} {t} /> {/if} +
+
+
total conv
+
{e.conversions_total}
+
+
+
err%
+
{e.error_rate.toFixed(2)}
+
+
+
data
+
{fmtBytes(e.bytes_mb)}
+
+
+
state
+
{fmtIdle(e.idle_secs)}
+
+
{/each} {#if engines.length === 0} diff --git a/ui/src/lib/types.ts b/ui/src/lib/types.ts index 90431f5..5e0c117 100644 --- a/ui/src/lib/types.ts +++ b/ui/src/lib/types.ts @@ -2,24 +2,29 @@ export interface MetricsSample { ts: number; rps: number; + p50_ms: number; + p55_ms: number; p95_ms: number; - error_pct: number; + server_error_pct: number; + rate_limit_pct: number; queue_size: number; concurrency_active: number; cpu_pct: number; memory_mb: number; + chromium_conv_rps: number; + libreoffice_conv_rps: number; + queue_wait_p95_ms: number; } export interface TickerPayload { rps: number; + p50_ms: number; + p55_ms: number; p95_ms: number; - error_pct: number; + server_error_pct: number; + rate_limit_pct: number; concurrency_active: number; concurrency_max: number; - chromium_status: string; - chromium_restarts: number; - libreoffice_status: string; - libreoffice_restarts: number; queue_size: number; uptime_seconds: number; } @@ -42,6 +47,10 @@ export interface EnginePayload { restarts: number; mode: string; mini_series: number[]; + conversions_total: number; + error_rate: number; + bytes_mb: number; + idle_secs: number; } export interface ConcurrencyPayload { @@ -49,6 +58,8 @@ export interface ConcurrencyPayload { max: number; warn_threshold: number; crit_threshold: number; + queue_wait_p95_ms: number; + queue_processing: number; } export interface ResourcesPayload { @@ -58,10 +69,14 @@ export interface ResourcesPayload { } export interface ThroughputPayload { + ts_series: number[]; rps_series: number[]; rps_baseline: number; p95_series: number[]; p95_target_s: number; + chromium_conv_series: number[]; + libreoffice_conv_series: number[]; + queue_wait_p95_series: number[]; } export interface BatchPayload { @@ -69,6 +84,10 @@ export interface BatchPayload { status: string; progress_pct: number; elapsed: string; + total_items: number; + completed_items: number; + failed_items: number; + output_mode: string; } export interface RequestLogEntry { diff --git a/ui/src/routes/+page.svelte b/ui/src/routes/+page.svelte index 88e2270..3fa0f2a 100644 --- a/ui/src/routes/+page.svelte +++ b/ui/src/routes/+page.svelte @@ -9,9 +9,9 @@ import Engines from '$lib/components/side-rail/Engines.svelte'; import Concurrency from '$lib/components/side-rail/Concurrency.svelte'; import Batches from '$lib/components/side-rail/Batches.svelte'; - import Resources from '$lib/components/side-rail/Resources.svelte'; import ThroughputStrip from '$lib/components/ThroughputStrip.svelte'; - import ActivityStrip from '$lib/components/ActivityStrip.svelte'; + import CpuChart from '$lib/components/CpuChart.svelte'; + import MemChart from '$lib/components/MemChart.svelte'; onMount(() => metricsStore.start()); onDestroy(() => metricsStore.stop()); @@ -30,9 +30,9 @@ let D = $derived(themeStore.D); -
+
{#if metricsStore.loading} -
+
Connecting to pdfbro…
{:else if metricsStore.data} @@ -44,25 +44,38 @@
- -
- -
- - - - -
-
+ +
- -
- -
+ +
- -
- + + + + +
+ + +
+ + + +
+ + +
+ + + +
{/if}