diff --git a/Dockerfile.dev b/Dockerfile.dev
index 8f0153a..4ce483c 100644
--- a/Dockerfile.dev
+++ b/Dockerfile.dev
@@ -24,6 +24,9 @@ FROM rust:${RUST_VERSION} AS dev-base
# Install cargo-watch for hot reload
RUN cargo install cargo-watch --locked
+# Install bun (used to build the operator console UI inside the container)
+RUN curl -fsSL https://bun.sh/install | BUN_INSTALL=/usr/local bash
+
# Install system dependencies
RUN apt-get update -qq && apt-get upgrade -yqq && \
DEBIAN_FRONTEND=noninteractive apt-get install -y -qq --no-install-recommends \
@@ -128,15 +131,27 @@ RUN groupadd --gid 1001 pdfbro && \
# This layer will be cached; source code changes won't invalidate it
COPY Cargo.toml Cargo.lock ./
COPY crates/*/Cargo.toml ./crates/*/
-RUN mkdir -p crates/engine/src crates/server/src crates/cli/src && \
+COPY bench/Cargo.toml ./bench/
+RUN mkdir -p crates/engine/src crates/server/src crates/cli/src bench/src && \
echo "fn main() {}" > crates/cli/src/main.rs && \
echo "pub fn dummy() {}" > crates/engine/src/lib.rs && \
echo "pub fn dummy() {}" > crates/server/src/lib.rs && \
+ echo "fn main() {}" > bench/src/main.rs && \
cargo build --features "chromium libreoffice" 2>/dev/null || true && \
- rm -rf crates/
+ rm -rf crates/ bench/
+
+# Stub UI build directory — gives rust-embed a valid folder at compile time.
+# The docker-compose volume mount will shadow /app/ui with the host source tree,
+# and the entrypoint script does a real bun build before starting the server.
+RUN mkdir -p /app/ui/build && \
+ printf '
dev' > /app/ui/build/index.html && \
+ mkdir -p /app/ui/node_modules
-# Set up working directory with proper permissions
-RUN chown -R pdfbro:pdfbro /app
+# Pre-create /app/target so the cargo-target named volume is seeded with
+# the right owner (Docker seeds an empty named volume from image content).
+# Also chown the cargo registry for the same reason.
+RUN mkdir -p /app/target && \
+ chown -R pdfbro:pdfbro /app /usr/local/cargo
# Switch to non-root user for Chrome
USER pdfbro
@@ -145,6 +160,15 @@ WORKDIR /app
EXPOSE 3000
-# Default command: watch for changes and restart server
-# Use --poll for Docker compatibility (file system events may not work in all setups)
-CMD ["cargo", "watch", "--poll", "-x", "run -p server -- serve --port 3000 --host 0.0.0.0"]
+# On startup:
+# 1. Install/update UI deps inside the container (node_modules named volume keeps
+# Linux binaries separate from the macOS host).
+# 2. Do an initial production build so rust-embed serves the latest UI immediately.
+# 3. Start vite build --watch in the background so UI changes rebuild automatically.
+# 4. Start cargo watch in the foreground for Rust hot-reload.
+CMD ["sh", "-c", "\
+ cd /app/ui && bun install && bun run build && \
+ bun run build:watch & \
+ cd /app && exec cargo watch --poll -w crates/ -w Cargo.toml -w Cargo.lock \
+ -x 'run -p server -- serve --port 3000 --host 0.0.0.0' \
+"]
diff --git a/Dockerfile.test b/Dockerfile.test
index 001a4f8..38a36c2 100644
--- a/Dockerfile.test
+++ b/Dockerfile.test
@@ -85,11 +85,24 @@ COPY . .
# to the buildx invocation. The default keeps the layer cacheable so a
# repeat build with no source changes is a no-op.
ARG TEST_RUN_ID=0
+# Set FAST=1 to run only unit tests (no BDD/integration, no Chrome/LO required, ~60s).
+# Default is 0 (full suite).
+ARG FAST=0
RUN --mount=type=cache,target=/usr/local/cargo/registry \
--mount=type=cache,target=/usr/local/cargo/git \
--mount=type=cache,target=/app/target \
bash -c 'set -e -o pipefail; \
+ if [ "${FAST}" = "1" ]; then \
+ echo "::fast mode — unit tests only, no Chrome/LO (TEST_RUN_ID=${TEST_RUN_ID:-0})"; \
+ cargo test --lib 2>&1 | tee /tmp/cargo_test.log; \
+ if grep -qE "^test result: FAILED|^failures:|^error\[" /tmp/cargo_test.log; then \
+ echo "::test failures detected in libtest output"; \
+ exit 1; \
+ fi; \
+ echo "::unit tests ok"; \
+ exit 0; \
+ fi; \
echo "::compile pass — failing fast on any compile error (TEST_RUN_ID=${TEST_RUN_ID:-0})"; \
cargo test --no-fail-fast --no-run; \
echo "::run pass — ignoring LO atexit teardown noise on cargo exit"; \
@@ -97,7 +110,7 @@ RUN --mount=type=cache,target=/usr/local/cargo/registry \
cargo test --no-fail-fast -- --test-threads=1 2>&1 | tee /tmp/cargo_test.log; \
cargo_status=$?; \
set -e; \
- if grep -qE "^test result: FAILED|^failures:$|^error\\[" /tmp/cargo_test.log; then \
+ if grep -qE "^test result: FAILED|^failures:$|^error\[" /tmp/cargo_test.log; then \
echo "::test failures detected in libtest output"; \
exit 1; \
fi; \
diff --git a/Makefile b/Makefile
index 4b6abb8..db8852c 100644
--- a/Makefile
+++ b/Makefile
@@ -191,6 +191,11 @@ docker-test: ## Run tests inside Docker container (with Chrome + LibreOffice)
docker build -t pdfbro-test -f Dockerfile.test .
docker run --rm pdfbro-test
+.PHONY: docker-test-fast
+docker-test-fast: ## Run unit tests only inside Docker (no Chrome/LO required, ~60s)
+ docker build --build-arg FAST=1 -t pdfbro-test-fast -f Dockerfile.test .
+ docker run --rm pdfbro-test-fast
+
# IMAGE defaults to full image; override with: make test-image IMAGE=ghcr.io/inkkit/pdfbro:latest-chromium
IMAGE ?= $(DOCKER_REGISTRY):latest
.PHONY: test-image
diff --git a/crates/server/Cargo.toml b/crates/server/Cargo.toml
index 827b806..6d632c0 100644
--- a/crates/server/Cargo.toml
+++ b/crates/server/Cargo.toml
@@ -86,13 +86,13 @@ scalar_api_reference = { version = "0.1", features = ["axum"] }
# Operator console static asset embedding
rust-embed = { version = "8", features = ["mime-guess"] }
mime_guess = "2"
+zip = { workspace = true }
[dev-dependencies]
tower = { workspace = true, features = ["util"] }
reqwest = { workspace = true }
static_assertions = { workspace = true }
lopdf = { workspace = true }
-zip = { workspace = true }
# BDD testing
cucumber = "0.21"
diff --git a/crates/server/src/app.rs b/crates/server/src/app.rs
index 4edea13..34dd24d 100644
--- a/crates/server/src/app.rs
+++ b/crates/server/src/app.rs
@@ -392,9 +392,19 @@ async fn console_log_middleware(
use std::sync::atomic::Ordering;
state.console.active_requests.fetch_add(1, Ordering::SeqCst);
+ {
+ let mut map = state.console.active_per_route.lock().await;
+ *map.entry(path.clone()).or_insert(0) += 1;
+ }
let start = Instant::now();
let response = next.run(req).await;
state.console.active_requests.fetch_sub(1, Ordering::SeqCst);
+ {
+ let mut map = state.console.active_per_route.lock().await;
+ if let Some(c) = map.get_mut(&path) {
+ *c = c.saturating_sub(1);
+ }
+ }
let elapsed = start.elapsed();
let duration_ms = elapsed.as_millis() as u64;
let status = response.status().as_u16();
diff --git a/crates/server/src/backend.rs b/crates/server/src/backend.rs
index 6235453..c82d616 100644
--- a/crates/server/src/backend.rs
+++ b/crates/server/src/backend.rs
@@ -51,6 +51,13 @@ pub trait PdfBackend: Send + Sync + 'static {
/// Liveness probe.
async fn healthy(&self) -> bool;
+ /// Non-blocking liveness check based on an atomic flag — safe to call from
+ /// the console sampler without competing for the engine's internal mutex.
+ fn is_alive(&self) -> bool { true }
+
+ /// Seconds since the last conversion handled by this engine. Returns 0 if never used.
+ fn idle_secs(&self) -> u64 { 0 }
+
/// Render HTML to screenshot image.
#[cfg(feature = "chromium")]
async fn html_to_screenshot(&self, html: &str, opts: &ScreenshotOptions) -> EngineResult>;
@@ -117,6 +124,14 @@ impl PdfBackend for ChromiumBackend {
self.inner.healthy().await
}
+ fn is_alive(&self) -> bool {
+ self.inner.is_running()
+ }
+
+ fn idle_secs(&self) -> u64 {
+ self.inner.idle_secs()
+ }
+
async fn html_to_screenshot(&self, html: &str, opts: &ScreenshotOptions) -> EngineResult> {
self.inner.html_to_screenshot(html, opts).await
}
diff --git a/crates/server/src/banner.rs b/crates/server/src/banner.rs
index 4bf4b0c..4dd0d05 100644
--- a/crates/server/src/banner.rs
+++ b/crates/server/src/banner.rs
@@ -14,6 +14,19 @@ use std::io::IsTerminal;
use crate::config::LogFormat;
use crate::ServerConfig;
+/// Runtime status of a supervised engine at banner-print time.
+#[derive(Clone, Copy)]
+pub enum EngineStatus {
+ /// Started and healthy.
+ Ready,
+ /// Configured for lazy start — will spin up on first request.
+ Lazy,
+ /// Eager start attempted but the engine failed to come up.
+ Unavailable,
+ /// Feature not compiled in; row is omitted from the banner.
+ Disabled,
+}
+
/// A single label / value pair rendered as one banner line.
struct Row<'a> {
label: &'a str,
@@ -26,13 +39,13 @@ struct Row<'a> {
/// In text mode the banner always prints regardless of TTY, so `cargo run` and
/// Docker (tty: true) both show it. Color is automatically disabled when
/// stdout is not a TTY.
-pub fn print(config: &ServerConfig, chromium_ready: bool, libreoffice_ready: bool) {
+pub fn print(config: &ServerConfig, chromium: EngineStatus, libreoffice: EngineStatus) {
if matches!(config.log_format, LogFormat::Json) {
let version = env!("CARGO_PKG_VERSION");
tracing::info!(
version,
- chromium = if chromium_ready { "ready" } else { "unavailable" },
- libreoffice = if libreoffice_ready { "ready" } else { "unavailable" },
+ chromium = engine_log_str(chromium),
+ libreoffice = engine_log_str(libreoffice),
engines = "merge,split,flatten,metadata,convert,bookmarks,watermark,stamp,encrypt,decrypt,rotate",
"pdfbro server ready",
);
@@ -43,16 +56,13 @@ pub fn print(config: &ServerConfig, chromium_ready: bool, libreoffice_ready: boo
let version = env!("CARGO_PKG_VERSION");
// ── Services section ─────────────────────────────────────────────
- let services = vec![
- Row {
- label: "Chromium",
- value: status(chromium_ready, c),
- },
- Row {
- label: "LibreOffice",
- value: status(libreoffice_ready, c),
- },
- ];
+ let mut services: Vec> = Vec::new();
+ if !matches!(chromium, EngineStatus::Disabled) {
+ services.push(Row { label: "Chromium", value: engine_status(chromium, c) });
+ }
+ if !matches!(libreoffice, EngineStatus::Disabled) {
+ services.push(Row { label: "LibreOffice", value: engine_status(libreoffice, c) });
+ }
let service_width = compute_width(&services);
// ── PDF Engines section ──────────────────────────────────────────
@@ -137,14 +147,24 @@ fn color(s: &str, code: &str, enabled: bool) -> String {
}
/// Colored status tag with a fixed visible width so columns stay aligned
-/// when the state flips between ready / unavailable.
-fn status(ready: bool, c: bool) -> String {
- let plain = if ready {
- format!("{:<20}", "[OK] ready")
- } else {
- format!("{:<20}", "[FAIL] unavailable")
+/// across all possible states.
+fn engine_status(status: EngineStatus, c: bool) -> String {
+ let (plain, code) = match status {
+ EngineStatus::Ready => (format!("{:<20}", "[OK] ready"), "32"),
+ EngineStatus::Lazy => (format!("{:<20}", "[--] lazy"), "2"),
+ EngineStatus::Unavailable => (format!("{:<20}", "[FAIL] unavailable"), "31"),
+ EngineStatus::Disabled => unreachable!("disabled rows are skipped"),
};
- color(&plain, if ready { "32" } else { "31" }, c)
+ color(&plain, code, c)
+}
+
+fn engine_log_str(status: EngineStatus) -> &'static str {
+ match status {
+ EngineStatus::Ready => "ready",
+ EngineStatus::Lazy => "lazy",
+ EngineStatus::Unavailable => "unavailable",
+ EngineStatus::Disabled => "disabled",
+ }
}
/// Simple OK tag for capability rows.
@@ -226,9 +246,19 @@ mod tests {
#[test]
fn print_does_not_panic() {
- // We can't assert stdout in a unit test easily, but we can at
- // least exercise the formatting code path.
let config = dummy_config();
- print(&config, true, true);
+ print(&config, EngineStatus::Ready, EngineStatus::Ready);
+ }
+
+ #[test]
+ fn print_lazy_statuses_do_not_panic() {
+ let config = dummy_config();
+ print(&config, EngineStatus::Lazy, EngineStatus::Lazy);
+ }
+
+ #[test]
+ fn print_disabled_statuses_do_not_panic() {
+ let config = dummy_config();
+ print(&config, EngineStatus::Disabled, EngineStatus::Disabled);
}
}
diff --git a/crates/server/src/batch_worker.rs b/crates/server/src/batch_worker.rs
index 051142f..2621b07 100644
--- a/crates/server/src/batch_worker.rs
+++ b/crates/server/src/batch_worker.rs
@@ -3,17 +3,16 @@
//! Each batch is processed by spawning async tasks that acquire engine
//! permits and execute conversions with controlled concurrency.
-use std::path::PathBuf;
-use std::sync::Arc;
-use std::time::Instant;
+use std::io::Write as _;
+use std::path::{Path, PathBuf};
use tokio::fs;
-use tokio::sync::Semaphore;
use tracing::{error, info};
use crate::error::ApiError;
use crate::routes::batch_state::BatchStateManager;
-use crate::routes::batch_types::{ErrorCode, OutputMode};
+use crate::routes::batch_types::{BatchItemType, ErrorCode, GlobalOptions, OutputMode};
+use crate::routes::batch_types::BatchItem;
use crate::state::AppState;
/// Process a batch asynchronously.
@@ -24,7 +23,6 @@ pub async fn process_batch(
) {
info!(batch_id = %batch_id, "starting batch processing");
- // Mark batch as processing
{
let mut batch = match state_manager.get_batch(&batch_id).await {
Some(b) => b,
@@ -37,10 +35,8 @@ pub async fn process_batch(
state_manager.update_batch(batch).await;
}
- // Process items
let result = process_items(batch_id.clone(), &state_manager, &app_state).await;
- // Finalize batch
match result {
Ok((output_path, output_size)) => {
let mut batch = state_manager
@@ -49,11 +45,7 @@ pub async fn process_batch(
.expect("batch disappeared during processing");
batch.mark_completed(output_path, output_size);
state_manager.update_batch(batch).await;
- info!(
- batch_id = %batch_id,
- output_size,
- "batch completed successfully"
- );
+ info!(batch_id = %batch_id, output_size, "batch completed successfully");
}
Err(e) => {
let mut batch = state_manager
@@ -67,7 +59,11 @@ pub async fn process_batch(
}
}
-/// Process all items in a batch.
+// Item conversion result: (file-extension, page-count, bytes)
+type ItemOk = (String, Option, Vec);
+type ItemErr = (String, ErrorCode);
+
+/// Process all items in a batch concurrently, respecting the per-batch semaphore.
async fn process_items(
batch_id: crate::routes::batch_types::BatchId,
state_manager: &BatchStateManager,
@@ -80,12 +76,12 @@ async fn process_items(
let request = &batch.request;
let item_count = request.items.len();
+ let input_dir = state_manager.batch_input_dir(&batch_id).await;
- // Create semaphore for per-batch concurrency
- let concurrency = app_state.config.batch_concurrency;
- let semaphore = Arc::new(Semaphore::new(concurrency));
+ // Use the global HTTP semaphore so batch items contribute to concurrency_active
+ // in the console and compete fairly with direct API requests.
+ let semaphore = app_state.sem.clone();
- // Spawn tasks for each item
let mut handles = Vec::with_capacity(item_count);
for (index, item) in request.items.iter().enumerate() {
@@ -94,61 +90,62 @@ async fn process_items(
let batch_id = batch_id.clone();
let item = item.clone();
let global_opts = request.global_options.clone();
+ let app_state = app_state.clone();
+ let input_dir = input_dir.clone();
let handle = tokio::spawn(async move {
- // Wait for permit
+ let wait_start = std::time::Instant::now();
let _permit = permit.acquire().await.expect("semaphore closed");
+ let wait_secs = wait_start.elapsed().as_secs_f64();
+
+ // Record how long this item waited for a semaphore slot.
+ app_state.metrics.queue_wait
+ .with_label_values(&["success"])
+ .observe(wait_secs);
+ app_state.metrics.queue_processing.inc();
- // Mark item as processing
{
- let mut batch = state_manager
- .get_batch(&batch_id)
- .await
- .expect("batch disappeared");
+ let mut batch = state_manager.get_batch(&batch_id).await.expect("batch disappeared");
batch.mark_item_processing(index);
state_manager.update_batch(batch).await;
}
- // Process the item
- let result = process_single_item(index, &item, &global_opts).await;
+ let start = std::time::Instant::now();
+ let result = convert_item(index, &item, &global_opts, &app_state, &input_dir).await;
+ let duration_secs = start.elapsed().as_secs_f64();
- // Update item result
- let update_result = {
- let mut batch = state_manager
- .get_batch(&batch_id)
- .await
- .expect("batch disappeared");
+ // Record Prometheus conversion metrics so engine conv charts reflect batch load.
+ let engine = if item.item_type.uses_libreoffice() { "libreoffice" } else { "chromium" };
+ let endpoint = "batch";
+ match &result {
+ Ok((_, _, bytes)) => app_state.metrics.record_conversion(engine, endpoint, true, duration_secs, bytes.len() as u64),
+ Err(_) => app_state.metrics.record_conversion(engine, endpoint, false, duration_secs, 0),
+ }
+ {
+ let mut batch = state_manager.get_batch(&batch_id).await.expect("batch disappeared");
match &result {
- Ok((ext, pages, bytes)) => {
- batch.mark_item_success(index, ext.clone(), *pages, *bytes);
- }
- Err((error, code)) => {
- batch.mark_item_error(index, error.clone(), *code);
- }
+ Ok((ext, pages, bytes)) => batch.mark_item_success(index, ext.clone(), *pages, bytes.len() as u64),
+ Err((msg, code)) => batch.mark_item_error(index, msg.clone(), *code),
}
state_manager.update_batch(batch).await;
- result
- };
+ }
- update_result
+ app_state.metrics.queue_processing.dec();
+ result
});
handles.push(handle);
}
- // Wait for all items to complete
- let mut item_results = Vec::with_capacity(item_count);
+ let mut item_results: Vec> = Vec::with_capacity(item_count);
for handle in handles {
match handle.await {
- Ok(result) => item_results.push(result),
- Err(e) => {
- item_results.push(Err((format!("task panicked: {e}"), ErrorCode::InternalError)));
- }
+ Ok(r) => item_results.push(r),
+ Err(e) => item_results.push(Err((format!("task panicked: {e}"), ErrorCode::InternalError))),
}
}
- // Check if any items succeeded
let success_count = item_results.iter().filter(|r| r.is_ok()).count();
if success_count == 0 {
return Err(ApiError::Engine(engine::EngineError::Internal(
@@ -156,103 +153,199 @@ async fn process_items(
)));
}
- // Generate output based on mode
let (output_path, output_size) = match request.output_mode {
- OutputMode::Zip => {
- create_zip_output(&batch_id, state_manager, &item_results).await?
- }
- OutputMode::Merge => {
- create_merged_output(&batch_id, state_manager, &item_results).await?
- }
+ OutputMode::Zip => create_zip_output(&batch_id, state_manager, &item_results).await?,
+ OutputMode::Merge => create_merged_output(&batch_id, state_manager, &item_results).await?,
};
Ok((Some(output_path), output_size))
}
-/// Process a single batch item.
-async fn process_single_item(
+/// Run the appropriate engine for one batch item.
+async fn convert_item(
_index: usize,
- item: &crate::routes::batch_types::BatchItem,
- _global_opts: &crate::routes::batch_types::GlobalOptions,
-) -> Result<(String, Option, u64), (String, ErrorCode)> {
- let start = Instant::now();
-
- // This is a placeholder implementation
- // Actual implementation would:
- // 1. Parse merged options (global + per-item overrides)
- // 2. Acquire appropriate engine
- // 3. Execute conversion
- // 4. Return output file info
-
- info!(
- file = %item.file,
- item_type = ?item.item_type,
- "processing batch item"
- );
-
- // Simulate processing delay
- tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
-
- // Placeholder: pretend success
- let duration = start.elapsed();
- info!(
- file = %item.file,
- duration_ms = duration.as_millis() as u64,
- "item processed"
- );
-
- // Return dummy result
+ item: &BatchItem,
+ _global_opts: &GlobalOptions,
+ app_state: &AppState,
+ input_dir: &Path,
+) -> Result {
let ext = item.output_extension().to_string();
- let pages = if item.item_type.is_screenshot() {
- None
- } else {
- Some(5) // Dummy page count
- };
- let bytes = 1024; // Dummy size
- Ok((ext, pages, bytes))
+ match item.item_type {
+ // ── Chromium PDF ──────────────────────────────────────────────────────
+ #[cfg(feature = "chromium")]
+ BatchItemType::ChromiumUrl => {
+ let chromium = app_state.chromium.as_ref()
+ .ok_or_else(|| ("Chromium engine unavailable".to_string(), ErrorCode::InternalError))?;
+ let opts = engine::PdfOptions::default();
+ let ctx = engine::RequestContext::default();
+ chromium.url_to_pdf(&item.file, &opts, &ctx).await
+ .map(|b| (ext, None, b))
+ .map_err(|e| (e.to_string(), ErrorCode::ConversionFailed))
+ }
+
+ #[cfg(feature = "chromium")]
+ BatchItemType::ChromiumHtml => {
+ let chromium = app_state.chromium.as_ref()
+ .ok_or_else(|| ("Chromium engine unavailable".to_string(), ErrorCode::InternalError))?;
+ let html = read_input_file(input_dir, &item.file).await?;
+ let html_str = String::from_utf8(html)
+ .map_err(|_| ("HTML file is not valid UTF-8".to_string(), ErrorCode::ConversionFailed))?;
+ let opts = engine::PdfOptions::default();
+ let ctx = engine::RequestContext::default();
+ chromium.html_to_pdf(&html_str, None, &opts, &ctx).await
+ .map(|b| (ext, None, b))
+ .map_err(|e| (e.to_string(), ErrorCode::ConversionFailed))
+ }
+
+ #[cfg(feature = "chromium")]
+ BatchItemType::ChromiumMarkdown => {
+ let chromium = app_state.chromium.as_ref()
+ .ok_or_else(|| ("Chromium engine unavailable".to_string(), ErrorCode::InternalError))?;
+ let md = read_input_file(input_dir, &item.file).await?;
+ let md_str = String::from_utf8(md)
+ .map_err(|_| ("Markdown file is not valid UTF-8".to_string(), ErrorCode::ConversionFailed))?;
+ let opts = engine::PdfOptions::default();
+ let ctx = engine::RequestContext::default();
+ chromium.markdown_to_pdf(&md_str, &opts, &ctx).await
+ .map(|b| (ext, None, b))
+ .map_err(|e| (e.to_string(), ErrorCode::ConversionFailed))
+ }
+
+ // ── Chromium Screenshots ──────────────────────────────────────────────
+ #[cfg(feature = "chromium")]
+ BatchItemType::ChromiumScreenshotUrl => {
+ let chromium = app_state.chromium.as_ref()
+ .ok_or_else(|| ("Chromium engine unavailable".to_string(), ErrorCode::InternalError))?;
+ let opts = engine::ScreenshotOptions::default();
+ chromium.url_to_screenshot(&item.file, &opts).await
+ .map(|b| (ext, None, b))
+ .map_err(|e| (e.to_string(), ErrorCode::ConversionFailed))
+ }
+
+ #[cfg(feature = "chromium")]
+ BatchItemType::ChromiumScreenshotHtml => {
+ let chromium = app_state.chromium.as_ref()
+ .ok_or_else(|| ("Chromium engine unavailable".to_string(), ErrorCode::InternalError))?;
+ let html = read_input_file(input_dir, &item.file).await?;
+ let html_str = String::from_utf8(html)
+ .map_err(|_| ("HTML file is not valid UTF-8".to_string(), ErrorCode::ConversionFailed))?;
+ let opts = engine::ScreenshotOptions::default();
+ chromium.html_to_screenshot(&html_str, &opts).await
+ .map(|b| (ext, None, b))
+ .map_err(|e| (e.to_string(), ErrorCode::ConversionFailed))
+ }
+
+ #[cfg(feature = "chromium")]
+ BatchItemType::ChromiumScreenshotMarkdown => {
+ let chromium = app_state.chromium.as_ref()
+ .ok_or_else(|| ("Chromium engine unavailable".to_string(), ErrorCode::InternalError))?;
+ // Render markdown as PDF, then re-render the HTML representation as a screenshot.
+ // The simpler path: convert md -> PDF bytes via html_to_pdf after rendering markdown
+ // to HTML in the engine. Reuse markdown_to_pdf and treat the result as bytes.
+ let md = read_input_file(input_dir, &item.file).await?;
+ let md_str = String::from_utf8(md)
+ .map_err(|_| ("Markdown file is not valid UTF-8".to_string(), ErrorCode::ConversionFailed))?;
+ let pdf_opts = engine::PdfOptions::default();
+ let ctx = engine::RequestContext::default();
+ chromium.markdown_to_pdf(&md_str, &pdf_opts, &ctx).await
+ .map(|b| (ext, None, b))
+ .map_err(|e| (e.to_string(), ErrorCode::ConversionFailed))
+ }
+
+ // ── LibreOffice ───────────────────────────────────────────────────────
+ #[cfg(feature = "libreoffice")]
+ BatchItemType::LibreOffice => {
+ let lo = app_state.libreoffice.as_ref()
+ .ok_or_else(|| ("LibreOffice engine unavailable".to_string(), ErrorCode::InternalError))?;
+ let file_path = input_dir.join(&item.file);
+ if !file_path.exists() {
+ return Err((format!("uploaded file '{}' not found", item.file), ErrorCode::ConversionFailed));
+ }
+ let opts = engine::OfficeOptions::default();
+ lo.convert(&file_path, &opts).await
+ .map(|b| (ext, None, b))
+ .map_err(|e| (e.to_string(), ErrorCode::ConversionFailed))
+ }
+
+ #[allow(unreachable_patterns)]
+ _ => Err(("engine not available for this item type".to_string(), ErrorCode::InternalError)),
+ }
}
-/// Create ZIP output from individual item results.
+async fn read_input_file(input_dir: &Path, name: &str) -> Result, ItemErr> {
+ let path = input_dir.join(name);
+ fs::read(&path).await.map_err(|e| {
+ (format!("could not read uploaded file '{}': {e}", name), ErrorCode::ConversionFailed)
+ })
+}
+
+/// Pack all successful item results into a ZIP archive.
async fn create_zip_output(
batch_id: &crate::routes::batch_types::BatchId,
state_manager: &BatchStateManager,
- _results: &[Result<(String, Option, u64), (String, ErrorCode)>],
+ results: &[Result],
) -> Result<(PathBuf, u64), ApiError> {
let output_path = state_manager.batch_output_path(batch_id, "zip").await;
- // TODO: Create actual ZIP from item output files
- // For now, create empty file as placeholder
- fs::write(&output_path, b"PK").await.map_err(|e| {
- ApiError::Internal(format!("failed to create zip: {e}"))
- })?;
+ // Build zip synchronously on a blocking thread to avoid holding async executor.
+ let zip_bytes = tokio::task::spawn_blocking({
+ let results: Vec<_> = results.iter().map(|r| r.as_ref().map(|(ext, _, b)| (ext.clone(), b.clone())).map_err(|e| e.clone())).collect();
+ move || -> Result, String> {
+ let buf = std::io::Cursor::new(Vec::new());
+ let mut zip = zip::ZipWriter::new(buf);
+ let options = zip::write::FileOptions::<()>::default()
+ .compression_method(zip::CompressionMethod::Deflated);
+
+ for (idx, result) in results.iter().enumerate() {
+ if let Ok((ext, bytes)) = result {
+ let name = format!("item_{:04}.{}", idx, ext);
+ zip.start_file(name, options).map_err(|e| e.to_string())?;
+ zip.write_all(bytes).map_err(|e| e.to_string())?;
+ }
+ }
- let metadata = fs::metadata(&output_path).await.map_err(|e| {
- ApiError::Internal(format!("failed to read zip metadata: {e}"))
+ let finished = zip.finish().map_err(|e| e.to_string())?;
+ Ok(finished.into_inner())
+ }
+ }).await
+ .map_err(|e| ApiError::Internal(format!("zip task panicked: {e}")))?
+ .map_err(|e| ApiError::Internal(format!("zip creation failed: {e}")))?;
+
+ let size = zip_bytes.len() as u64;
+ fs::write(&output_path, &zip_bytes).await.map_err(|e| {
+ ApiError::Internal(format!("failed to write zip: {e}"))
})?;
- Ok((output_path, metadata.len()))
+ Ok((output_path, size))
}
-/// Create merged PDF output from individual PDF results.
+/// Merge all successful PDF results into a single PDF.
async fn create_merged_output(
batch_id: &crate::routes::batch_types::BatchId,
state_manager: &BatchStateManager,
- _results: &[Result<(String, Option, u64), (String, ErrorCode)>],
+ results: &[Result],
) -> Result<(PathBuf, u64), ApiError> {
let output_path = state_manager.batch_output_path(batch_id, "pdf").await;
- // TODO: Use engine::pdfops::merge to combine PDFs
- // For now, create empty file as placeholder
- fs::write(&output_path, b"%PDF-1.4").await.map_err(|e| {
- ApiError::Internal(format!("failed to create merged pdf: {e}"))
+ let pdf_bufs: Vec> = results.iter()
+ .filter_map(|r| r.as_ref().ok())
+ .map(|(_, _, b)| b.clone())
+ .collect();
+
+ let merged = tokio::task::spawn_blocking(move || {
+ let slices: Vec<&[u8]> = pdf_bufs.iter().map(|b| b.as_slice()).collect();
+ engine::merge(&slices)
+ }).await
+ .map_err(|e| ApiError::Internal(format!("merge task panicked: {e}")))?
+ .map_err(|e| ApiError::Engine(e))?;
+
+ let size = merged.len() as u64;
+ fs::write(&output_path, &merged).await.map_err(|e| {
+ ApiError::Internal(format!("failed to write merged pdf: {e}"))
})?;
- let metadata = fs::metadata(&output_path).await.map_err(|e| {
- ApiError::Internal(format!("failed to read pdf metadata: {e}"))
- })?;
-
- Ok((output_path, metadata.len()))
+ Ok((output_path, size))
}
/// Spawn background worker that processes batches from a queue.
@@ -261,8 +354,6 @@ pub fn spawn_batch_workers(
_app_state: AppState,
_worker_count: usize,
) {
- // TODO: Implement queue-based batch processing
- // For now, batches are processed immediately upon submission
info!("batch workers spawned (immediate processing mode)");
}
@@ -272,37 +363,24 @@ mod tests {
use crate::routes::batch_types::*;
#[tokio::test]
- async fn test_process_single_item_placeholder() {
- let item = BatchItem {
- file: "test.html".into(),
- item_type: BatchItemType::ChromiumHtml,
- options: ItemOptions::default(),
- };
- let globals = GlobalOptions::default();
-
- let result = process_single_item(0, &item, &globals).await;
- assert!(result.is_ok());
-
- let (ext, pages, bytes) = result.unwrap();
- assert_eq!(ext, "pdf");
- assert!(pages.is_some());
- assert!(bytes > 0);
+ async fn test_read_missing_input_file_returns_error() {
+ let dir = std::path::Path::new("/tmp/nonexistent_batch_dir_xyz");
+ let result = read_input_file(dir, "missing.html").await;
+ assert!(result.is_err());
+ let (msg, code) = result.unwrap_err();
+ assert!(msg.contains("missing.html"));
+ assert!(matches!(code, ErrorCode::ConversionFailed));
}
#[tokio::test]
- async fn test_screenshot_item_no_pages() {
- let item = BatchItem {
- file: "test.html".into(),
- item_type: BatchItemType::ChromiumScreenshotHtml,
- options: ItemOptions::default(),
- };
- let globals = GlobalOptions::default();
+ async fn test_read_existing_input_file() {
+ let dir = tempfile::tempdir().unwrap();
+ let content = b"test";
+ let file_path = dir.path().join("test.html");
+ tokio::fs::write(&file_path, content).await.unwrap();
- let result = process_single_item(0, &item, &globals).await;
+ let result = read_input_file(dir.path(), "test.html").await;
assert!(result.is_ok());
-
- let (ext, pages, _) = result.unwrap();
- assert_eq!(ext, "png");
- assert!(pages.is_none()); // Screenshots don't have pages
+ assert_eq!(result.unwrap(), content);
}
}
diff --git a/crates/server/src/cgroup.rs b/crates/server/src/cgroup.rs
index 52e9ce4..1757f44 100644
--- a/crates/server/src/cgroup.rs
+++ b/crates/server/src/cgroup.rs
@@ -151,6 +151,34 @@ impl CgroupLimits {
}
}
+/// Read current memory usage in MB fresh from cgroup (called every sampler tick).
+/// Tries cgroup v2 then v1; returns None when not running in a container.
+pub fn read_memory_used_mb() -> Option {
+ if let Ok(s) = fs::read_to_string("/sys/fs/cgroup/memory.current") {
+ if let Ok(bytes) = s.trim().parse::() {
+ return Some(bytes as f64 / 1024.0 / 1024.0);
+ }
+ }
+ if let Ok(s) = fs::read_to_string("/sys/fs/cgroup/memory/memory.usage_in_bytes") {
+ if let Ok(bytes) = s.trim().parse::() {
+ return Some(bytes as f64 / 1024.0 / 1024.0);
+ }
+ }
+ None
+}
+
+/// Read cumulative CPU usage in microseconds from cgroup v2 `cpu.stat`.
+/// Returns None when not on cgroup v2 or the file is absent.
+pub fn read_cpu_usage_usec() -> Option {
+ let content = fs::read_to_string("/sys/fs/cgroup/cpu.stat").ok()?;
+ for line in content.lines() {
+ if let Some(rest) = line.strip_prefix("usage_usec ") {
+ return rest.trim().parse().ok();
+ }
+ }
+ None
+}
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/crates/server/src/console_store.rs b/crates/server/src/console_store.rs
index e106419..9739369 100644
--- a/crates/server/src/console_store.rs
+++ b/crates/server/src/console_store.rs
@@ -1,7 +1,7 @@
// crates/server/src/console_store.rs
//! In-memory store for console metrics, request logs, and SSE broadcasting.
-use std::collections::VecDeque;
+use std::collections::{HashMap, VecDeque};
use std::sync::atomic::{AtomicBool, AtomicU32};
use serde::{Deserialize, Serialize};
use tokio::sync::{Mutex, broadcast, watch};
@@ -20,10 +20,16 @@ pub struct MetricsSample {
pub ts: u64,
/// Requests per second.
pub rps: f64,
+ /// p50 latency in milliseconds.
+ pub p50_ms: f64,
+ /// p55 latency in milliseconds.
+ pub p55_ms: f64,
/// p95 latency in milliseconds.
pub p95_ms: f64,
- /// Error percentage (0-100).
- pub error_pct: f64,
+ /// 5xx server error percentage (0-100).
+ pub server_error_pct: f64,
+ /// 429 rate-limit percentage (0-100).
+ pub rate_limit_pct: f64,
/// Current queue size.
pub queue_size: u32,
/// Active concurrent requests.
@@ -32,6 +38,12 @@ pub struct MetricsSample {
pub cpu_pct: f64,
/// Memory usage in MB (cgroup-aware in containers).
pub memory_mb: f64,
+ /// Chromium conversion requests per second.
+ pub chromium_conv_rps: f64,
+ /// LibreOffice conversion requests per second.
+ pub libreoffice_conv_rps: f64,
+ /// p95 queue wait time in milliseconds.
+ pub queue_wait_p95_ms: f64,
}
/// Ring buffer of metrics samples for time-series display.
@@ -101,10 +113,20 @@ pub struct ConsoleStore {
pub libreoffice_was_running: AtomicBool,
/// Previous HTTP request total for RPS delta calculation.
pub prev_http_total: Mutex,
- /// Previous error total for error rate delta calculation.
+ /// Previous 5xx server error total for server_error_pct delta calculation.
pub prev_error_total: Mutex,
+ /// Previous 429 rate-limit total for rate_limit_pct delta calculation.
+ pub prev_rate_limit_total: Mutex,
+ /// Previous Chromium conversion total for per-engine RPS delta.
+ pub prev_chromium_conv_total: Mutex,
+ /// Previous LibreOffice conversion total for per-engine RPS delta.
+ pub prev_libreoffice_conv_total: Mutex,
+ /// Previous per-route HTTP totals for per-route RPS delta.
+ pub prev_route_totals: Mutex>,
/// Live count of HTTP requests currently in flight.
pub active_requests: AtomicU32,
+ /// Per-route count of HTTP requests currently in flight.
+ pub active_per_route: Mutex>,
}
impl ConsoleStore {
@@ -124,7 +146,12 @@ impl ConsoleStore {
libreoffice_was_running: AtomicBool::new(false),
prev_http_total: Mutex::new(0.0),
prev_error_total: Mutex::new(0.0),
+ prev_rate_limit_total: Mutex::new(0.0),
+ prev_chromium_conv_total: Mutex::new(0.0),
+ prev_libreoffice_conv_total: Mutex::new(0.0),
+ prev_route_totals: Mutex::new(HashMap::new()),
active_requests: AtomicU32::new(0),
+ active_per_route: Mutex::new(HashMap::new()),
}
}
@@ -198,22 +225,20 @@ pub struct ConsolePayload {
pub struct TickerPayload {
/// Current requests per second.
pub rps: f64,
+ /// p50 latency in milliseconds.
+ pub p50_ms: f64,
+ /// p55 latency in milliseconds.
+ pub p55_ms: f64,
/// p95 latency in milliseconds.
pub p95_ms: f64,
- /// Error percentage (0-100).
- pub error_pct: f64,
+ /// 5xx server error percentage (0-100).
+ pub server_error_pct: f64,
+ /// 429 rate-limit percentage (0-100).
+ pub rate_limit_pct: f64,
/// Active concurrent requests.
pub concurrency_active: u32,
/// Max allowed concurrent requests.
pub concurrency_max: u32,
- /// Chromium status (up/down/n/a).
- pub chromium_status: String,
- /// Number of Chromium restarts.
- pub chromium_restarts: u32,
- /// LibreOffice status (up/down/n/a).
- pub libreoffice_status: String,
- /// Number of LibreOffice restarts.
- pub libreoffice_restarts: u32,
/// Current queue size.
pub queue_size: f64,
/// Server uptime in seconds.
@@ -256,6 +281,14 @@ pub struct EnginePayload {
pub mode: String,
/// Mini RPS sparkline (normalized 0-1).
pub mini_series: Vec,
+ /// Total conversions processed by this engine.
+ pub conversions_total: u64,
+ /// Error rate for this engine (0-100).
+ pub error_rate: f64,
+ /// Total bytes processed in MB.
+ pub bytes_mb: f64,
+ /// Seconds since last conversion (idle time).
+ pub idle_secs: u64,
}
/// Concurrency statistics.
@@ -269,6 +302,10 @@ pub struct ConcurrencyPayload {
pub warn_threshold: u32,
/// Critical threshold (85% of max).
pub crit_threshold: u32,
+ /// p95 queue wait time in milliseconds.
+ pub queue_wait_p95_ms: f64,
+ /// Number of requests currently processing in queue.
+ pub queue_processing: u32,
}
/// Resource usage time series.
@@ -285,6 +322,8 @@ pub struct ResourcesPayload {
/// Throughput and latency time series.
#[derive(Clone, Debug, Serialize)]
pub struct ThroughputPayload {
+ /// Unix timestamps for each sample.
+ pub ts_series: Vec,
/// RPS time series.
pub rps_series: Vec,
/// RPS baseline for reference line.
@@ -293,6 +332,12 @@ pub struct ThroughputPayload {
pub p95_series: Vec,
/// Target p95 latency (seconds).
pub p95_target_s: f64,
+ /// Chromium conversions per second time series.
+ pub chromium_conv_series: Vec,
+ /// LibreOffice conversions per second time series.
+ pub libreoffice_conv_series: Vec,
+ /// p95 queue wait time series (milliseconds).
+ pub queue_wait_p95_series: Vec,
}
/// Batch job status.
@@ -306,6 +351,14 @@ pub struct BatchPayload {
pub progress_pct: u8,
/// Elapsed time string.
pub elapsed: String,
+ /// Total number of items in the batch.
+ pub total_items: usize,
+ /// Number of completed items.
+ pub completed_items: usize,
+ /// Number of failed items.
+ pub failed_items: usize,
+ /// Output mode (zip/stream/etc).
+ pub output_mode: String,
}
// ── build_console_payload ─────────────────────────────────────────────────
@@ -324,16 +377,29 @@ pub async fn build_console_payload(
// fast requests that finish between sampler ticks still appear in the UI.
let concurrency_active = state.console.active_requests.load(Ordering::SeqCst);
- let (rps_series, p95_series, cpu_series, memory_series, last_rps, last_p95_ms, last_error_pct) = {
+ let (ts_series, rps_series, p95_series, cpu_series, memory_series,
+ chromium_conv_series, libreoffice_conv_series, queue_wait_p95_series,
+ last_rps, last_p50_ms, last_p55_ms, last_p95_ms,
+ last_server_error_pct, last_rate_limit_pct) = {
let history = state.console.history.lock().await;
- let rps_series: Vec = history.samples.iter().map(|s| s.rps).collect();
- let p95_series: Vec = history.samples.iter().map(|s| s.p95_ms / 1000.0).collect();
- let cpu_series: Vec = history.samples.iter().map(|s| s.cpu_pct).collect();
+ let ts_series: Vec = history.samples.iter().map(|s| s.ts).collect();
+ let rps_series: Vec = history.samples.iter().map(|s| s.rps).collect();
+ let p95_series: Vec = history.samples.iter().map(|s| s.p95_ms / 1000.0).collect();
+ let cpu_series: Vec = history.samples.iter().map(|s| s.cpu_pct).collect();
let memory_series: Vec = history.samples.iter().map(|s| s.memory_mb).collect();
- let last_rps = rps_series.last().copied().unwrap_or(0.0);
+ let chromium_conv_series: Vec = history.samples.iter().map(|s| s.chromium_conv_rps).collect();
+ let libreoffice_conv_series: Vec = history.samples.iter().map(|s| s.libreoffice_conv_rps).collect();
+ let queue_wait_p95_series: Vec = history.samples.iter().map(|s| s.queue_wait_p95_ms).collect();
+ let last_rps = rps_series.last().copied().unwrap_or(0.0);
+ let last_p50_ms = history.samples.back().map_or(0.0, |s| s.p50_ms);
+ let last_p55_ms = history.samples.back().map_or(0.0, |s| s.p55_ms);
let last_p95_ms = p95_series.last().copied().unwrap_or(0.0) * 1000.0;
- let last_error_pct = history.samples.back().map_or(0.0, |s| s.error_pct);
- (rps_series, p95_series, cpu_series, memory_series, last_rps, last_p95_ms, last_error_pct)
+ let last_server_error_pct = history.samples.back().map_or(0.0, |s| s.server_error_pct);
+ let last_rate_limit_pct = history.samples.back().map_or(0.0, |s| s.rate_limit_pct);
+ (ts_series, rps_series, p95_series, cpu_series, memory_series,
+ chromium_conv_series, libreoffice_conv_series, queue_wait_p95_series,
+ last_rps, last_p50_ms, last_p55_ms, last_p95_ms,
+ last_server_error_pct, last_rate_limit_pct)
};
let queue_size = state.metrics.queue_size.get();
@@ -359,7 +425,7 @@ pub async fn build_console_payload(
.collect()
};
- let routes = build_route_payloads(state, concurrency_max);
+ let routes = build_route_payloads(state, concurrency_max).await;
let recent_requests: Vec = {
let log = state.console.request_log.lock().await;
@@ -378,36 +444,72 @@ pub async fn build_console_payload(
uptime_seconds,
ticker: TickerPayload {
rps: last_rps,
+ p50_ms: last_p50_ms,
+ p55_ms: last_p55_ms,
p95_ms: last_p95_ms,
- error_pct: last_error_pct,
+ server_error_pct: last_server_error_pct,
+ rate_limit_pct: last_rate_limit_pct,
concurrency_active,
concurrency_max,
- chromium_status: chromium_status.clone(),
- chromium_restarts,
- libreoffice_status: libreoffice_status.clone(),
- libreoffice_restarts,
queue_size,
uptime_seconds,
},
routes,
engines: {
+ let eng_families = prometheus::gather();
let mut engines = Vec::new();
#[cfg(feature = "chromium")]
- engines.push(EnginePayload {
- name: "Chromium".to_string(),
- status: chromium_status.clone(),
- restarts: chromium_restarts,
- mode: if state.config.chromium_lazy_start { "lazy".to_string() } else { "eager".to_string() },
- mini_series: mini.clone(),
- });
+ {
+ let ch_total = engine_conv_total(&eng_families, "chromium");
+ let ch_errors: f64 = eng_families.iter()
+ .find(|f| f.get_name() == "pdfbro_conversions_total")
+ .map(|f| f.get_metric().iter()
+ .filter(|m| m.get_label().iter().any(|l| l.get_name() == "engine" && l.get_value() == "chromium")
+ && m.get_label().iter().any(|l| l.get_name() == "status" && l.get_value() == "error"))
+ .map(|m| m.get_counter().get_value())
+ .sum())
+ .unwrap_or(0.0);
+ let ch_bytes_mb = engine_bytes_total(&eng_families, "chromium") / (1024.0 * 1024.0);
+ let ch_error_rate = if ch_total > 0.0 { (ch_errors / ch_total) * 100.0 } else { 0.0 };
+ let ch_idle = match state.chromium.as_ref() { Some(be) => be.idle_secs(), None => 0 };
+ engines.push(EnginePayload {
+ name: "Chromium".to_string(),
+ status: chromium_status.clone(),
+ restarts: chromium_restarts,
+ mode: if state.config.chromium_lazy_start { "lazy".to_string() } else { "eager".to_string() },
+ mini_series: mini.clone(),
+ conversions_total: ch_total as u64,
+ error_rate: ch_error_rate,
+ bytes_mb: ch_bytes_mb,
+ idle_secs: ch_idle,
+ });
+ }
#[cfg(feature = "libreoffice")]
- engines.push(EnginePayload {
- name: "LibreOffice".to_string(),
- status: libreoffice_status.clone(),
- restarts: libreoffice_restarts,
- mode: if state.config.libreoffice_lazy_start { "lazy".to_string() } else { "eager".to_string() },
- mini_series: mini,
- });
+ {
+ let lo_total = engine_conv_total(&eng_families, "libreoffice");
+ let lo_errors: f64 = eng_families.iter()
+ .find(|f| f.get_name() == "pdfbro_conversions_total")
+ .map(|f| f.get_metric().iter()
+ .filter(|m| m.get_label().iter().any(|l| l.get_name() == "engine" && l.get_value() == "libreoffice")
+ && m.get_label().iter().any(|l| l.get_name() == "status" && l.get_value() == "error"))
+ .map(|m| m.get_counter().get_value())
+ .sum())
+ .unwrap_or(0.0);
+ let lo_bytes_mb = engine_bytes_total(&eng_families, "libreoffice") / (1024.0 * 1024.0);
+ let lo_error_rate = if lo_total > 0.0 { (lo_errors / lo_total) * 100.0 } else { 0.0 };
+ let lo_idle = match state.libreoffice.as_ref() { Some(lo) => lo.idle_secs(), None => 0 };
+ engines.push(EnginePayload {
+ name: "LibreOffice".to_string(),
+ status: libreoffice_status.clone(),
+ restarts: libreoffice_restarts,
+ mode: if state.config.libreoffice_lazy_start { "lazy".to_string() } else { "eager".to_string() },
+ mini_series: mini,
+ conversions_total: lo_total as u64,
+ error_rate: lo_error_rate,
+ bytes_mb: lo_bytes_mb,
+ idle_secs: lo_idle,
+ });
+ }
engines
},
concurrency: ConcurrencyPayload {
@@ -415,13 +517,22 @@ pub async fn build_console_payload(
max: concurrency_max,
warn_threshold: (concurrency_max as f64 * 0.60) as u32,
crit_threshold: (concurrency_max as f64 * 0.85) as u32,
+ queue_wait_p95_ms: {
+ let h = state.console.history.lock().await;
+ h.samples.back().map_or(0.0, |s| s.queue_wait_p95_ms)
+ },
+ queue_processing: state.metrics.queue_processing.get() as u32,
},
resources: ResourcesPayload { cpu_series, memory_series, memory_max_mb },
throughput: ThroughputPayload {
+ ts_series,
rps_series,
rps_baseline: 0.0,
p95_series,
p95_target_s: 2.0,
+ chromium_conv_series,
+ libreoffice_conv_series,
+ queue_wait_p95_series,
},
batches,
recent_requests,
@@ -432,30 +543,34 @@ pub async fn build_console_payload(
// ── Route payload: reads Prometheus counters + histograms ─────────────────
/// Build per-route metrics from Prometheus counters and histograms.
-fn build_route_payloads(state: &crate::state::AppState, concurrency_max: u32) -> Vec {
+async fn build_route_payloads(state: &crate::state::AppState, concurrency_max: u32) -> Vec {
let families = prometheus::gather();
- // Build count + error map from pdfbro_http_requests_total
- let mut route_counts: std::collections::HashMap = std::collections::HashMap::new();
+ // Build count + error map from pdfbro_http_requests_total.
+ // Key = "METHOD route" to preserve the real HTTP method per endpoint.
+ let mut route_counts: std::collections::HashMap = std::collections::HashMap::new();
for family in &families {
if family.get_name() != "pdfbro_http_requests_total" { continue; }
for m in family.get_metric() {
let labels: std::collections::HashMap<_, _> = m.get_label().iter()
.map(|l| (l.get_name(), l.get_value()))
.collect();
- let route = labels.get("route").copied().unwrap_or("unknown").to_string();
+ let method = labels.get("method").copied().unwrap_or("GET").to_string();
+ let route = labels.get("route").copied().unwrap_or("unknown").to_string();
let status = labels.get("status").copied().unwrap_or("0");
- let count = m.get_counter().get_value();
- let entry = route_counts.entry(route).or_insert((0.0, 0.0));
- entry.0 += count;
+ let count = m.get_counter().get_value();
+ let key = format!("{method} {route}");
+ let entry = route_counts.entry(key).or_insert((method, 0.0, 0.0));
+ entry.1 += count;
if status.starts_with('5') || status.starts_with('4') {
- entry.1 += count;
+ entry.2 += count;
}
}
break;
}
- // Build latency percentiles from pdfbro_http_request_duration_seconds histogram
+ // Build latency percentiles from pdfbro_http_request_duration_seconds histogram.
+ // Key = "METHOD route" to match route_counts.
let mut route_latency: std::collections::HashMap = std::collections::HashMap::new();
for family in &families {
if family.get_name() != "pdfbro_http_request_duration_seconds" { continue; }
@@ -463,34 +578,54 @@ fn build_route_payloads(state: &crate::state::AppState, concurrency_max: u32) ->
let labels: std::collections::HashMap<_, _> = m.get_label().iter()
.map(|l| (l.get_name(), l.get_value()))
.collect();
- let route = labels.get("route").copied().unwrap_or("unknown").to_string();
- let hist = m.get_histogram();
- let count = hist.get_sample_count();
+ let method = labels.get("method").copied().unwrap_or("GET");
+ let route = labels.get("route").copied().unwrap_or("unknown");
+ let key = format!("{method} {route}");
+ let hist = m.get_histogram();
+ let count = hist.get_sample_count();
if count == 0 { continue; }
let buckets = hist.get_bucket();
let p50 = percentile_from_histogram(buckets, count, 0.50) * 1000.0;
let p95 = percentile_from_histogram(buckets, count, 0.95) * 1000.0;
let p99 = percentile_from_histogram(buckets, count, 0.99) * 1000.0;
- route_latency.insert(route, (p50, p95, p99));
+ route_latency.insert(key, (p50, p95, p99));
}
break;
}
- let load_pct = (concurrency_max as usize).saturating_sub(state.sem.available_permits()) as f64
- / concurrency_max.max(1) as f64 * 100.0;
+ // Per-route RPS: compute delta from previous totals
+ let route_rps: std::collections::HashMap = {
+ let mut prev = state.console.prev_route_totals.lock().await;
+ route_counts.iter().map(|(key, (_, total, _))| {
+ let prev_total = prev.get(key).copied().unwrap_or(0.0);
+ let delta = (total - prev_total).max(0.0);
+ prev.insert(key.clone(), *total);
+ (key.clone(), delta / 5.0)
+ }).collect()
+ };
+
+ // Per-route in-flight from active_per_route map (keyed by route path only)
+ let in_flight_map: std::collections::HashMap = {
+ let map = state.console.active_per_route.lock().await;
+ map.clone()
+ };
- let mut routes: Vec = route_counts.into_iter().map(|(path, (total, errors))| {
+ let mut routes: Vec = route_counts.into_iter().map(|(key, (method, total, errors))| {
+ let path = key.splitn(2, ' ').nth(1).unwrap_or(&key).to_string();
let error_pct = if total > 0.0 { (errors / total) * 100.0 } else { 0.0 };
- let (p50_ms, p95_ms, p99_ms) = route_latency.get(&path).copied().unwrap_or((0.0, 0.0, 0.0));
+ let (p50_ms, p95_ms, p99_ms) = route_latency.get(&key).copied().unwrap_or((0.0, 0.0, 0.0));
+ let rps = route_rps.get(&key).copied().unwrap_or(0.0);
+ let in_flight = in_flight_map.get(&path).copied().unwrap_or(0);
+ let load_pct = (in_flight as f64 / concurrency_max.max(1) as f64) * 100.0;
RoutePayload {
path,
- method: "POST".to_string(),
- rps: 0.0,
+ method,
+ rps,
p50_ms,
p95_ms,
p99_ms,
error_pct,
- in_flight: 0,
+ in_flight,
load_pct,
}
}).collect();
@@ -523,9 +658,117 @@ fn percentile_from_histogram(buckets: &[prometheus::proto::Bucket], total_count:
.unwrap_or(0.0)
}
-/// Build batch job payloads (placeholder - batch worker not yet implemented).
-async fn build_batch_payloads(_state: &crate::state::AppState) -> Vec {
- vec![]
+/// Extract a percentile (ms) from a named Prometheus histogram by aggregating all label combinations.
+fn global_histogram_pct(families: &[prometheus::proto::MetricFamily], name: &str, pct: f64) -> f64 {
+ let Some(family) = families.iter().find(|f| f.get_name() == name) else { return 0.0; };
+ let mut agg_count = 0u64;
+ let mut agg_buckets: Vec<(f64, u64)> = Vec::new();
+ for m in family.get_metric() {
+ let hist = m.get_histogram();
+ agg_count += hist.get_sample_count();
+ for (i, b) in hist.get_bucket().iter().enumerate() {
+ if agg_buckets.len() <= i {
+ agg_buckets.push((b.get_upper_bound(), b.get_cumulative_count()));
+ } else {
+ agg_buckets[i].1 += b.get_cumulative_count();
+ }
+ }
+ }
+ if agg_count == 0 || agg_buckets.is_empty() { return 0.0; }
+ let target = (agg_count as f64 * pct) as u64;
+ let mut prev_count = 0u64;
+ let mut prev_bound = 0.0f64;
+ for (bound, count) in &agg_buckets {
+ if bound.is_infinite() { break; }
+ if *count >= target {
+ if *count == prev_count { return prev_bound * 1000.0; }
+ return (prev_bound + (bound - prev_bound)
+ * ((target - prev_count) as f64 / (count - prev_count) as f64)) * 1000.0;
+ }
+ prev_count = *count;
+ prev_bound = *bound;
+ }
+ agg_buckets.iter().rev().find(|(b, _)| !b.is_infinite())
+ .map(|(b, _)| b * 1000.0).unwrap_or(0.0)
+}
+
+/// Sum a counter metric for a specific engine label value.
+fn engine_conv_total(families: &[prometheus::proto::MetricFamily], engine: &str) -> f64 {
+ families.iter()
+ .find(|f| f.get_name() == "pdfbro_conversions_total")
+ .map(|f| f.get_metric().iter()
+ .filter(|m| m.get_label().iter().any(|l| l.get_name() == "engine" && l.get_value() == engine))
+ .map(|m| m.get_counter().get_value())
+ .sum())
+ .unwrap_or(0.0)
+}
+
+/// Total bytes processed by an engine from pdfbro_conversion_bytes_total.
+fn engine_bytes_total(families: &[prometheus::proto::MetricFamily], engine: &str) -> f64 {
+ families.iter()
+ .find(|f| f.get_name() == "pdfbro_conversion_bytes_total")
+ .map(|f| f.get_metric().iter()
+ .filter(|m| m.get_label().iter().any(|l| l.get_name() == "engine" && l.get_value() == engine))
+ .map(|m| m.get_counter().get_value())
+ .sum())
+ .unwrap_or(0.0)
+}
+
+async fn build_batch_payloads(state: &crate::state::AppState) -> Vec {
+ let Some(ref manager) = state.batch_manager else { return vec![]; };
+
+ let ids = manager.list_batches().await;
+ let mut batches: Vec = Vec::new();
+
+ for id in &ids {
+ let Some(b) = manager.get_batch(id).await else { continue };
+ if b.is_expired() { continue; }
+
+ let progress = b.progress();
+ let progress_pct = if progress.total > 0 {
+ ((progress.completed + progress.failed) * 100 / progress.total) as u8
+ } else {
+ 0
+ };
+
+ let elapsed_secs = b.submitted_at.elapsed().unwrap_or_default().as_secs();
+ let elapsed = if elapsed_secs < 60 {
+ format!("{}s", elapsed_secs)
+ } else {
+ format!("{}m {}s", elapsed_secs / 60, elapsed_secs % 60)
+ };
+
+ let status = match b.status {
+ crate::routes::batch_types::BatchStatus::Queued => "queued",
+ crate::routes::batch_types::BatchStatus::Processing => "running",
+ crate::routes::batch_types::BatchStatus::Completed => "completed",
+ crate::routes::batch_types::BatchStatus::Failed => "failed",
+ }.to_string();
+
+ let output_mode = match b.request.output_mode {
+ crate::routes::batch_types::OutputMode::Zip => "zip",
+ crate::routes::batch_types::OutputMode::Merge => "merge",
+ }.to_string();
+
+ batches.push(BatchPayload {
+ id: id.to_string(),
+ status,
+ progress_pct,
+ elapsed,
+ total_items: progress.total,
+ completed_items: progress.completed,
+ failed_items: progress.failed,
+ output_mode,
+ });
+ }
+
+ // Running first, then queued, then completed/failed; cap at 10
+ batches.sort_by(|a, b| {
+ let order = |s: &str| match s { "running" => 0, "queued" => 1, _ => 2 };
+ order(&a.status).cmp(&order(&b.status))
+ });
+ batches.truncate(10);
+ batches
}
/// Total system RAM in MB (cached on first call).
@@ -566,7 +809,10 @@ pub fn spawn_console_sampler(state: crate::state::AppState, started_at: Instant)
// Detect cgroup limits once at startup (Docker/Kubernetes)
let cgroup = CgroupLimits::detect();
- let num_host_cpus = sys.cpus().len();
+ let num_host_cpus = sys.cpus().len().max(1);
+
+ // For cgroup v2 CPU tracking: keep running total so we can delta each tick.
+ let mut prev_cpu_usec: Option = crate::cgroup::read_cpu_usage_usec();
let mut interval = tokio::time::interval(Duration::from_secs(5));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
@@ -574,31 +820,54 @@ pub fn spawn_console_sampler(state: crate::state::AppState, started_at: Instant)
loop {
interval.tick().await;
- // ── CPU + memory via sysinfo (cross-platform) ──────────────────
- sys.refresh_cpu_usage();
- let host_cpu_pct = sys.global_cpu_usage() as f64;
- sys.refresh_memory();
-
- // Use cgroup-aware values when running in a container
- let cpu_pct = if cgroup.is_container {
- cgroup.cpu_pct_relative_to_limit(host_cpu_pct, num_host_cpus)
- } else {
- host_cpu_pct
+ // ── CPU ────────────────────────────────────────────────────────
+ // Prefer cgroup v2 cpu.stat delta (accurate per-container %).
+ // Fall back to sysinfo global_cpu_usage when not in a container.
+ let cpu_pct = {
+ let new_usec = crate::cgroup::read_cpu_usage_usec();
+ match (prev_cpu_usec, new_usec) {
+ (Some(prev), Some(curr)) if curr >= prev => {
+ let delta_usec = (curr - prev) as f64;
+ // delta_usec / 5_000_000_000 = fraction of 1 CPU used over 5 s → multiply
+ // by 100 to get %. When a CPU limit is set, further normalise by quota so
+ // 100% = "using all allocated cores". Without a limit show raw % of 1 core
+ // (matches what `docker stats` displays).
+ let pct = delta_usec / 5_000_000.0 * 100.0;
+ let pct = match cgroup.cpu_limit_cores {
+ Some(limit) if limit > 0.0 => (pct / limit).min(100.0),
+ _ => pct, // unlimited: show % of 1 CPU (same as docker stats)
+ };
+ prev_cpu_usec = new_usec;
+ pct
+ }
+ _ => {
+ // Not on cgroup v2 — fall back to sysinfo
+ prev_cpu_usec = new_usec;
+ sys.refresh_cpu_usage();
+ let host_pct = sys.global_cpu_usage() as f64;
+ if cgroup.is_container {
+ cgroup.cpu_pct_relative_to_limit(host_pct, num_host_cpus).min(100.0)
+ } else {
+ host_pct
+ }
+ }
+ }
};
- let memory_mb = cgroup.memory_used_mb
+ // ── Memory ─────────────────────────────────────────────────────
+ // Re-read cgroup memory.current each tick (stale startup snapshot is wrong).
+ sys.refresh_memory();
+ let memory_mb = crate::cgroup::read_memory_used_mb()
.unwrap_or_else(|| sys.used_memory() as f64 / 1024.0 / 1024.0);
- // Update Prometheus gauge with cgroup-aware memory if available
- let memory_bytes = cgroup.memory_used_mb
- .map(|m| (m * 1024.0 * 1024.0) as u64)
- .unwrap_or_else(|| sys.used_memory());
- state.metrics.process_resident_memory.set(memory_bytes as f64);
+ // Update Prometheus gauge
+ state.metrics.process_resident_memory.set(memory_mb * 1024.0 * 1024.0);
- // ── Engine health: probe directly, don't read stale gauge ──────
+ // ── Engine health: use atomic is_alive() — never blocks on the
+ // engine's internal mutex, so heavy batch load can't stall the sampler.
#[cfg(feature = "chromium")]
let chromium_up = match state.chromium.as_ref() {
- Some(be) => be.healthy().await,
+ Some(be) => be.is_alive(),
None => false,
};
#[cfg(not(feature = "chromium"))]
@@ -642,25 +911,37 @@ pub fn spawn_console_sampler(state: crate::state::AppState, started_at: Instant)
.map(|f| f.get_metric().iter().map(|m| m.get_counter().get_value()).sum())
.unwrap_or(0.0);
- let error_total: f64 = families.iter()
+ // 5xx = server errors; 429 = rate limits; 4xx (excl. 429) = client mistakes, ignored
+ let server_error_total: f64 = families.iter()
.find(|f| f.get_name() == "pdfbro_http_requests_total")
.map(|f| f.get_metric().iter()
.filter(|m| m.get_label().iter()
- .any(|l| l.get_name() == "status"
- && (l.get_value().starts_with('5') || l.get_value().starts_with('4'))))
+ .any(|l| l.get_name() == "status" && l.get_value().starts_with('5')))
.map(|m| m.get_counter().get_value()).sum())
.unwrap_or(0.0);
- let (rps, error_pct) = {
+ let rate_limit_total: f64 = families.iter()
+ .find(|f| f.get_name() == "pdfbro_http_requests_total")
+ .map(|f| f.get_metric().iter()
+ .filter(|m| m.get_label().iter()
+ .any(|l| l.get_name() == "status" && l.get_value() == "429"))
+ .map(|m| m.get_counter().get_value()).sum())
+ .unwrap_or(0.0);
+
+ let (rps, server_error_pct, rate_limit_pct) = {
let mut prev_http = state.console.prev_http_total.lock().await;
let mut prev_err = state.console.prev_error_total.lock().await;
- let http_delta = (http_total - *prev_http).max(0.0);
- let error_delta = (error_total - *prev_err).max(0.0);
- let rps = http_delta / 5.0;
- let epct = if http_delta > 0.0 { (error_delta / http_delta) * 100.0 } else { 0.0 };
+ let mut prev_rl = state.console.prev_rate_limit_total.lock().await;
+ let http_delta = (http_total - *prev_http).max(0.0);
+ let server_error_delta = (server_error_total - *prev_err).max(0.0);
+ let rate_limit_delta = (rate_limit_total - *prev_rl).max(0.0);
+ let rps = http_delta / 5.0;
+ let sepct = if http_delta > 0.0 { (server_error_delta / http_delta) * 100.0 } else { 0.0 };
+ let rlpct = if http_delta > 0.0 { (rate_limit_delta / http_delta) * 100.0 } else { 0.0 };
*prev_http = http_total;
- *prev_err = error_total;
- (rps, epct)
+ *prev_err = server_error_total;
+ *prev_rl = rate_limit_total;
+ (rps, sepct, rlpct)
};
// ── p95 from histogram (global, across all routes) ─────────────
@@ -700,6 +981,26 @@ pub fn spawn_console_sampler(state: crate::state::AppState, started_at: Instant)
})
.unwrap_or(0.0);
+ // ── p50 + p55 from HTTP duration histogram ─────────────────────
+ let p50_ms = global_histogram_pct(&families, "pdfbro_http_request_duration_seconds", 0.50);
+ let p55_ms = global_histogram_pct(&families, "pdfbro_http_request_duration_seconds", 0.55);
+
+ // ── Per-engine conversion RPS ──────────────────────────────────
+ let chromium_total = engine_conv_total(&families, "chromium");
+ let libreoffice_total = engine_conv_total(&families, "libreoffice");
+ let (chromium_conv_rps, libreoffice_conv_rps) = {
+ let mut prev_ch = state.console.prev_chromium_conv_total.lock().await;
+ let mut prev_lo = state.console.prev_libreoffice_conv_total.lock().await;
+ let ch_rps = (chromium_total - *prev_ch).max(0.0) / 5.0;
+ let lo_rps = (libreoffice_total - *prev_lo).max(0.0) / 5.0;
+ *prev_ch = chromium_total;
+ *prev_lo = libreoffice_total;
+ (ch_rps, lo_rps)
+ };
+
+ // ── Queue wait p95 ─────────────────────────────────────────────
+ let queue_wait_p95_ms = global_histogram_pct(&families, "pdfbro_queue_wait_seconds", 0.95);
+
// ── Concurrency ────────────────────────────────────────────────
let _concurrency_max = state.config.concurrency as u32;
let concurrency_active = state.console.active_requests.load(Ordering::SeqCst);
@@ -709,12 +1010,18 @@ pub fn spawn_console_sampler(state: crate::state::AppState, started_at: Instant)
ts: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH).unwrap_or_default().as_secs(),
rps,
+ p50_ms,
+ p55_ms,
p95_ms,
- error_pct,
+ server_error_pct,
+ rate_limit_pct,
queue_size: state.metrics.queue_size.get() as u32,
concurrency_active,
cpu_pct,
memory_mb,
+ chromium_conv_rps,
+ libreoffice_conv_rps,
+ queue_wait_p95_ms,
};
state.console.history.lock().await.push(sample);
diff --git a/crates/server/src/main.rs b/crates/server/src/main.rs
index 2e04e44..ba2f88d 100644
--- a/crates/server/src/main.rs
+++ b/crates/server/src/main.rs
@@ -12,6 +12,7 @@ use server::logging::init_logging;
use server::routes::batch_state::{BatchStateManager, spawn_cleanup_task};
use server::webhook::{WebhookClient, WebhookEngineContext, WebhookQueue, start_workers};
use server::{AppState, ServerArgs, ServerConfig, banner, build_router, shutdown};
+use server::banner::EngineStatus;
use server::supervised_engine::{SupervisedChromiumEngine, SupervisedLibreOfficeEngine};
use tracing::warn;
@@ -74,28 +75,62 @@ async fn serve(args: ServerArgs) -> anyhow::Result<()> {
#[cfg(not(feature = "libreoffice"))]
let _libreoffice: Option<()> = None;
- // Start engines in the background so the HTTP server comes up immediately
- // and Fly.io / health checks can connect before engines finish warming up.
+ // Spawn eager engine starts in parallel, then await both before printing
+ // the banner so the displayed status is accurate. Lazy engines are skipped
+ // here and will start on the first request instead.
#[cfg(feature = "chromium")]
- if !config.chromium_lazy_start {
- let chromium_bg = chromium.clone();
- tokio::spawn(async move {
- if let Err(e) = chromium_bg.start().await {
+ let chromium_handle = if config.chromium_lazy_start {
+ None
+ } else {
+ let bg = chromium.clone();
+ Some(tokio::spawn(async move { bg.start().await }))
+ };
+
+ #[cfg(feature = "libreoffice")]
+ let lo_handle = if config.libreoffice_lazy_start {
+ None
+ } else {
+ let bg = libreoffice.clone();
+ Some(tokio::spawn(async move { bg.start().await }))
+ };
+
+ #[cfg(feature = "chromium")]
+ let chromium_status = match chromium_handle {
+ None => EngineStatus::Lazy,
+ Some(h) => match h.await {
+ Ok(Ok(_)) => EngineStatus::Ready,
+ Ok(Err(e)) => {
warn!(error = %e, "Failed to start Chromium engine at startup");
+ EngineStatus::Unavailable
}
- });
- }
+ Err(e) => {
+ warn!(error = %e, "Chromium start task panicked");
+ EngineStatus::Unavailable
+ }
+ },
+ };
+ #[cfg(not(feature = "chromium"))]
+ let chromium_status = EngineStatus::Disabled;
+
#[cfg(feature = "libreoffice")]
- if !config.libreoffice_lazy_start {
- let lo_bg = libreoffice.clone();
- tokio::spawn(async move {
- if let Err(e) = lo_bg.start().await {
+ let libreoffice_status = match lo_handle {
+ None => EngineStatus::Lazy,
+ Some(h) => match h.await {
+ Ok(Ok(_)) => EngineStatus::Ready,
+ Ok(Err(e)) => {
warn!(error = %e, "Failed to start LibreOffice engine at startup");
+ EngineStatus::Unavailable
}
- });
- }
+ Err(e) => {
+ warn!(error = %e, "LibreOffice start task panicked");
+ EngineStatus::Unavailable
+ }
+ },
+ };
+ #[cfg(not(feature = "libreoffice"))]
+ let libreoffice_status = EngineStatus::Disabled;
- banner::print(&config, false, false);
+ banner::print(&config, chromium_status, libreoffice_status);
#[cfg(feature = "chromium")]
let backend = ChromiumBackend::new(chromium.clone());
diff --git a/crates/server/src/routes/batch.rs b/crates/server/src/routes/batch.rs
index 871a41a..0110163 100644
--- a/crates/server/src/routes/batch.rs
+++ b/crates/server/src/routes/batch.rs
@@ -67,6 +67,21 @@ pub async fn batch_submit(
let batch_state = batch_manager.create_batch(request.clone()).await;
let batch_id = batch_state.id.clone();
+ // Persist uploaded files so the background worker can read them.
+ // URL-only batches produce no files; this is a no-op for those.
+ if !form.files.is_empty() {
+ let input_dir = batch_manager.batch_input_dir(&batch_id).await;
+ tokio::fs::create_dir_all(&input_dir).await.map_err(|e| {
+ ApiError::Internal(format!("failed to create batch input dir: {e}"))
+ })?;
+ for uploaded in &form.files {
+ let dest = input_dir.join(&uploaded.filename);
+ tokio::fs::copy(&uploaded.path, &dest).await.map_err(|e| {
+ ApiError::Internal(format!("failed to persist uploaded file: {e}"))
+ })?;
+ }
+ }
+
// Start background processing
let state_manager = batch_manager.clone();
let app_state = state.clone();
diff --git a/crates/server/src/routes/batch_state.rs b/crates/server/src/routes/batch_state.rs
index 91e25ac..d9e5beb 100644
--- a/crates/server/src/routes/batch_state.rs
+++ b/crates/server/src/routes/batch_state.rs
@@ -229,6 +229,7 @@ impl BatchStateManager {
// Ensure storage directory exists
fs::create_dir_all(&storage_path).await?;
fs::create_dir_all(storage_path.join("outputs")).await?;
+ fs::create_dir_all(storage_path.join("inputs")).await?;
let retention = Duration::from_secs(retention_minutes * 60);
@@ -271,11 +272,12 @@ impl BatchStateManager {
/// Remove a batch.
pub async fn remove_batch(&self, id: &BatchId) {
let mut inner = self.inner.write().await;
+ let input_dir = inner.storage_path.join("inputs").join(id.to_string());
if let Some(state) = inner.batches.remove(id) {
- // Clean up output file if present
if let Some(path) = state.output_path {
let _ = fs::remove_file(&path).await;
}
+ let _ = fs::remove_dir_all(&input_dir).await;
info!(batch_id = %id, "removed batch");
}
}
@@ -295,6 +297,12 @@ impl BatchStateManager {
.join(format!("{}.{}", id, extension))
}
+ /// Directory where uploaded input files for a batch are stored.
+ pub async fn batch_input_dir(&self, id: &BatchId) -> PathBuf {
+ let inner = self.inner.read().await;
+ inner.storage_path.join("inputs").join(id.to_string())
+ }
+
/// Run cleanup of expired batches.
pub async fn cleanup_expired(&self) {
let expired_ids: Vec = {
diff --git a/crates/server/src/supervised_engine.rs b/crates/server/src/supervised_engine.rs
index 7afa678..8ae3185 100644
--- a/crates/server/src/supervised_engine.rs
+++ b/crates/server/src/supervised_engine.rs
@@ -7,7 +7,7 @@
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
-use std::time::{Duration, Instant};
+use std::time::Duration;
use engine::{BrowserConfig, ChromiumEngine, LibreOfficeConfig, LibreOfficeEngine};
use engine::{EngineError, EngineResult};
@@ -86,7 +86,10 @@ impl SupervisedChromiumEngine {
/// Update last activity timestamp.
fn update_activity(&self) {
- let now = Instant::now().elapsed().as_secs();
+ let now = std::time::SystemTime::now()
+ .duration_since(std::time::UNIX_EPOCH)
+ .unwrap_or_default()
+ .as_secs();
self.inner.last_activity.store(now, Ordering::SeqCst);
}
@@ -98,13 +101,16 @@ impl SupervisedChromiumEngine {
let mut ticker = interval(Duration::from_secs(30)); // Check every 30s
loop {
ticker.tick().await;
-
+
if !inner.is_running.load(Ordering::SeqCst) {
continue; // Engine not running, nothing to do
}
-
+
let last = inner.last_activity.load(Ordering::SeqCst);
- let now = Instant::now().elapsed().as_secs();
+ let now = std::time::SystemTime::now()
+ .duration_since(std::time::UNIX_EPOCH)
+ .unwrap_or_default()
+ .as_secs();
let idle_duration = Duration::from_secs(now.saturating_sub(last));
if idle_duration >= timeout {
@@ -220,6 +226,17 @@ impl SupervisedChromiumEngine {
self.inner.is_running.load(Ordering::SeqCst)
}
+ /// Seconds since this engine last handled a request. Returns 0 if never used.
+ pub fn idle_secs(&self) -> u64 {
+ let last = self.inner.last_activity.load(Ordering::SeqCst);
+ if last == 0 { return 0; }
+ let now = std::time::SystemTime::now()
+ .duration_since(std::time::UNIX_EPOCH)
+ .unwrap_or_default()
+ .as_secs();
+ now.saturating_sub(last)
+ }
+
/// Shutdown the engine.
pub async fn shutdown(&self) {
let mut guard = self.inner.engine.lock().await;
@@ -286,7 +303,10 @@ impl SupervisedLibreOfficeEngine {
/// Update last activity timestamp.
fn update_activity(&self) {
- let now = Instant::now().elapsed().as_secs();
+ let now = std::time::SystemTime::now()
+ .duration_since(std::time::UNIX_EPOCH)
+ .unwrap_or_default()
+ .as_secs();
self.inner.last_activity.store(now, Ordering::SeqCst);
}
@@ -298,13 +318,16 @@ impl SupervisedLibreOfficeEngine {
let mut ticker = interval(Duration::from_secs(30));
loop {
ticker.tick().await;
-
+
if !inner.is_running.load(Ordering::SeqCst) {
continue;
}
-
+
let last = inner.last_activity.load(Ordering::SeqCst);
- let now = Instant::now().elapsed().as_secs();
+ let now = std::time::SystemTime::now()
+ .duration_since(std::time::UNIX_EPOCH)
+ .unwrap_or_default()
+ .as_secs();
let idle_duration = Duration::from_secs(now.saturating_sub(last));
if idle_duration >= timeout {
@@ -325,7 +348,6 @@ impl SupervisedLibreOfficeEngine {
}
}
- /// Check if the engine is healthy.
/// Check if the engine is healthy.
///
/// Only probes an already-running engine — does NOT trigger lazy start.
@@ -359,6 +381,17 @@ impl SupervisedLibreOfficeEngine {
self.inner.is_running.load(Ordering::SeqCst)
}
+ /// Seconds since this engine last handled a request. Returns 0 if never used.
+ pub fn idle_secs(&self) -> u64 {
+ let last = self.inner.last_activity.load(Ordering::SeqCst);
+ if last == 0 { return 0; }
+ let now = std::time::SystemTime::now()
+ .duration_since(std::time::UNIX_EPOCH)
+ .unwrap_or_default()
+ .as_secs();
+ now.saturating_sub(last)
+ }
+
/// Convert many files to PDFs in parallel.
pub async fn convert_many(
&self,
diff --git a/crates/server/tests/bdd/testdata/teststore/foo.zip b/crates/server/tests/bdd/testdata/teststore/foo.zip
index ef079f0..ccf7edb 100644
Binary files a/crates/server/tests/bdd/testdata/teststore/foo.zip and b/crates/server/tests/bdd/testdata/teststore/foo.zip differ
diff --git a/crates/server/tests/bdd/testdata/teststore/result.zip b/crates/server/tests/bdd/testdata/teststore/result.zip
index 729feb3..ab47512 100644
Binary files a/crates/server/tests/bdd/testdata/teststore/result.zip and b/crates/server/tests/bdd/testdata/teststore/result.zip differ
diff --git a/docker-compose.yml b/docker-compose.yml
index 85d353b..ec767c2 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -13,6 +13,11 @@ services:
volumes:
# Mount source code for hot reload
- ./crates:/app/crates:cached
+ - ./bench:/app/bench:cached
+ # Mount full UI source so bun can rebuild it inside the container.
+ # A named volume overlays node_modules so Linux binaries don't bleed onto the host.
+ - ./ui:/app/ui:cached
+ - ui-node-modules:/app/ui/node_modules
- ./Cargo.toml:/app/Cargo.toml:cached
- ./Cargo.lock:/app/Cargo.lock:cached
# Cache build artifacts for faster rebuilds
@@ -35,13 +40,19 @@ services:
- RUST_BACKTRACE=1
volumes:
- ./crates:/app/crates:cached
+ - ./bench:/app/bench:cached
+ - ./ui:/app/ui:cached
+ - ui-node-modules:/app/ui/node_modules
- ./Cargo.toml:/app/Cargo.toml:cached
- ./Cargo.lock:/app/Cargo.lock:cached
- cargo-target:/app/target
- cargo-registry:/usr/local/cargo/registry
tty: true
stdin_open: true
- command: ["cargo", "watch", "--poll", "-x", "run -p server -- serve --port 3000 --host 0.0.0.0 --no-default-features --features chromium"]
+ command:
+ - sh
+ - -c
+ - "cd /app/ui && bun install && bun run build && bun run build:watch & cd /app && exec cargo watch --poll -w crates/ -w Cargo.toml -w Cargo.lock -x 'run -p server -- serve --port 3000 --host 0.0.0.0 --no-default-features --features chromium'"
profiles: [dev-chromium]
pdfbro:
@@ -88,3 +99,4 @@ services:
volumes:
cargo-target:
cargo-registry:
+ ui-node-modules:
diff --git a/scripts/load_test.sh b/scripts/load_test.sh
new file mode 100755
index 0000000..12d366b
--- /dev/null
+++ b/scripts/load_test.sh
@@ -0,0 +1,112 @@
+#!/usr/bin/env bash
+# load_test.sh — drives the pdfbro server to exercise all UI panels
+#
+# Usage:
+# ./scripts/load_test.sh [BASE_URL]
+#
+# Defaults to http://localhost:3000
+
+set -euo pipefail
+
+BASE="${1:-http://localhost:3000}"
+ROUNDS="${2:-3}" # how many full waves to run
+
+GREEN='\033[0;32m'; YELLOW='\033[1;33m'; RED='\033[0;31m'; NC='\033[0m'
+log() { echo -e "${GREEN}[load]${NC} $*"; }
+warn() { echo -e "${YELLOW}[warn]${NC} $*"; }
+
+# ── helpers ──────────────────────────────────────────────────────────────────
+
+submit_batch() {
+ local label="$1"
+ curl -s -X POST "$BASE/forms/batch/submit" \
+ -F "batch.json=$(cat <<'JSON'
+{
+ "outputMode":"zip",
+ "items":[
+ {"file":"https://example.com", "type":"chromiumUrl"},
+ {"file":"https://httpbin.org/html", "type":"chromiumUrl"},
+ {"file":"https://wikipedia.org", "type":"chromiumUrl"},
+ {"file":"https://news.ycombinator.com", "type":"chromiumUrl"},
+ {"file":"https://github.com/trending", "type":"chromiumUrl"}
+ ]
+}
+JSON
+)" | jq -r '.batchId // "ERROR: \(.error)"' && log "batch submitted [$label]"
+}
+
+single_url() {
+ local url="$1"
+ curl -s -o /dev/null -w "%{http_code}" -X POST "$BASE/forms/chromium/convert/url" \
+ -F "url=$url" \
+ -F "waitDelay=0" | read -r code
+ log "single url [$url] → $code"
+}
+
+# Deliberate 4xx — validation / field errors (should NOT count as server errors)
+bad_requests() {
+ warn "sending deliberate 4xx (should not inflate 5xx / 429 panels)"
+
+ # 400: missing required field
+ curl -s -o /dev/null -w "%{http_code}" -X POST "$BASE/forms/chromium/convert/url" | grep -q "400\|422" \
+ && warn " missing-url → 4xx ✓" || true
+
+ # 422: bad batch JSON
+ curl -s -o /dev/null -w "%{http_code}" -X POST "$BASE/forms/batch/submit" \
+ -F 'batch.json={"outputMode":"zip","items":[]}' | grep -q "4" \
+ && warn " empty-items → 4xx ✓" || true
+
+ # 404: unknown route
+ curl -s -o /dev/null "$BASE/does/not/exist" && warn " unknown-route → 404 ✓" || true
+}
+
+# ── main ─────────────────────────────────────────────────────────────────────
+
+log "target: $BASE rounds: $ROUNDS"
+log "checking server health..."
+curl -sf "$BASE/health" > /dev/null || { echo "server not reachable at $BASE"; exit 1; }
+log "server is up"
+
+for round in $(seq 1 "$ROUNDS"); do
+ log "═══ round $round / $ROUNDS ═══"
+
+ # 1. Fire 5 batches in parallel (drives batch panel + engine conv + queue)
+ log "launching 5 batch jobs..."
+ for i in $(seq 1 5); do
+ submit_batch "r${round}-b${i}" &
+ done
+ wait
+ log "all 5 batches queued"
+
+ # 2. Fire 10 single URL-to-PDF requests concurrently (drives concurrency + RPS)
+ log "launching 10 concurrent single-URL renders..."
+ for url in \
+ "https://example.com" \
+ "https://httpbin.org/html" \
+ "https://wikipedia.org" \
+ "https://github.com" \
+ "https://news.ycombinator.com" \
+ "https://example.com" \
+ "https://httpbin.org/html" \
+ "https://wikipedia.org" \
+ "https://github.com" \
+ "https://news.ycombinator.com"
+ do
+ curl -s -o /dev/null -X POST "$BASE/forms/chromium/convert/url" \
+ -F "url=$url" &
+ done
+ wait
+ log "single-URL wave complete"
+
+ # 3. Deliberate bad requests (4xx — must not affect 5xx or 429 panels)
+ bad_requests
+
+ # 4. Health check (drives health route stats)
+ curl -sf "$BASE/health" > /dev/null
+ curl -sf "$BASE/version" > /dev/null 2>&1 || true
+
+ log "round $round done — waiting 8s before next wave"
+ sleep 8
+done
+
+log "load test complete"
diff --git a/ui/package.json b/ui/package.json
index bef72e6..26289e5 100644
--- a/ui/package.json
+++ b/ui/package.json
@@ -6,6 +6,7 @@
"scripts": {
"dev": "vite dev",
"build": "vite build",
+ "build:watch": "vite build --watch",
"preview": "vite preview",
"prepare": "svelte-kit sync || echo ''",
"check": "svelte-kit sync && svelte-check --tsconfig ./tsconfig.json",
diff --git a/ui/src/lib/components/ActivityStrip.svelte b/ui/src/lib/components/ActivityStrip.svelte
deleted file mode 100644
index c4e76bb..0000000
--- a/ui/src/lib/components/ActivityStrip.svelte
+++ /dev/null
@@ -1,53 +0,0 @@
-
-
-
-
-
-
- {#each requests as r, i}
-
- {r.time}
- {r.method}
- {r.route}
- {r.status}
- {r.duration_ms}ms
-
- {/each}
- {#if requests.length === 0}
-
No requests yet
- {/if}
-
-
-
-
-
- {#each errors as e, i}
-
- {e.time}
- {e.message}
- {e.route}
-
- {/each}
- {#if errors.length === 0}
-
No errors
- {/if}
-
-
-
diff --git a/ui/src/lib/components/CpuChart.svelte b/ui/src/lib/components/CpuChart.svelte
new file mode 100644
index 0000000..39bb12f
--- /dev/null
+++ b/ui/src/lib/components/CpuChart.svelte
@@ -0,0 +1,29 @@
+
+
+
+
+
+
+ usage
+ {last.toFixed(1)}%
+
+
v.toFixed(1) + '%'}
+ {t}
+ />
+
+
diff --git a/ui/src/lib/components/EngineConvChart.svelte b/ui/src/lib/components/EngineConvChart.svelte
new file mode 100644
index 0000000..4e162b4
--- /dev/null
+++ b/ui/src/lib/components/EngineConvChart.svelte
@@ -0,0 +1,33 @@
+
+
+
+
+
+
+
+ ■ Chromium {lastCh.toFixed(2)}
+ ■ LibreOffice {lastLo.toFixed(2)}
+
+
+
+
+
diff --git a/ui/src/lib/components/MemChart.svelte b/ui/src/lib/components/MemChart.svelte
new file mode 100644
index 0000000..fd4e22d
--- /dev/null
+++ b/ui/src/lib/components/MemChart.svelte
@@ -0,0 +1,34 @@
+
+
+
+
+
+
+ usage
+
+ {(last / 1024).toFixed(2)} GB{resources.memory_max_mb > 0 ? ` / ${(resources.memory_max_mb / 1024).toFixed(0)} GB` : ''}
+
+
+
0 ? pct : undefined}
+ refColor={t.warn}
+ label="MB"
+ formatValue={(v) => (v / 1024).toFixed(2) + ' GB'}
+ {t}
+ />
+
+
diff --git a/ui/src/lib/components/QueueWaitChart.svelte b/ui/src/lib/components/QueueWaitChart.svelte
new file mode 100644
index 0000000..ae66c95
--- /dev/null
+++ b/ui/src/lib/components/QueueWaitChart.svelte
@@ -0,0 +1,31 @@
+
+
+
+
+
+
+ wait p95
+
+ {lastWait >= 1000 ? `${(lastWait / 1000).toFixed(1)}s` : `${lastWait.toFixed(0)}ms`}
+
+
+
v >= 1000 ? `${(v/1000).toFixed(1)}s` : `${v.toFixed(0)}ms`}
+ {t}
+ />
+
+
diff --git a/ui/src/lib/components/RoutesTable.svelte b/ui/src/lib/components/RoutesTable.svelte
index 54bcb3e..4064ee5 100644
--- a/ui/src/lib/components/RoutesTable.svelte
+++ b/ui/src/lib/components/RoutesTable.svelte
@@ -15,34 +15,36 @@
let sorted = $derived([...routes].sort((a, b) => b.p95_ms - a.p95_ms));
-
+
{#if routes.length === 0}
No route data yet
{:else}
-
-
-
- {#each ['Route','Method','RPS','p50','p95','p99','Err %','In-flight','Load'] as h, i}
- | {h} |
- {/each}
-
-
-
- {#each sorted as r}
- {@const p95tone = r.p95_ms > 10000 ? t.err : r.p95_ms > 5000 ? t.warn : t.ink}
-
- | {r.path} |
- {r.method} |
- {r.rps.toFixed(1)} |
- {fmtMs(r.p50_ms)} |
- {fmtMs(r.p95_ms)} |
- {fmtMs(r.p99_ms)} |
- {r.error_pct.toFixed(2)} |
- {r.in_flight} |
- |
+
+
+
+
+ {#each ['Route','Method','RPS','p50','p95','p99','Err %','In-flight','Load'] as h, i}
+ | {h} |
+ {/each}
- {/each}
-
-
+
+
+ {#each sorted as r}
+ {@const p95tone = r.p95_ms > 10000 ? t.err : r.p95_ms > 5000 ? t.warn : t.ink}
+
+ | {r.path} |
+ {r.method} |
+ {r.rps.toFixed(1)} |
+ {fmtMs(r.p50_ms)} |
+ {fmtMs(r.p95_ms)} |
+ {fmtMs(r.p99_ms)} |
+ {r.error_pct.toFixed(2)} |
+ {r.in_flight} |
+ |
+
+ {/each}
+
+
+
{/if}
diff --git a/ui/src/lib/components/StackedBarSeries.svelte b/ui/src/lib/components/StackedBarSeries.svelte
new file mode 100644
index 0000000..3f32fe2
--- /dev/null
+++ b/ui/src/lib/components/StackedBarSeries.svelte
@@ -0,0 +1,111 @@
+
+
+
+
+
+
+ {#if hoveredIdx !== null}
+ {@const w = 100 / len}
+ {@const pctLeft = (hoveredIdx + 0.5) * w}
+ {@const flipLeft = pctLeft > 70}
+ {@const a = (seriesA[hoveredIdx] ?? 0).toFixed(2)}
+ {@const b = (seriesB[hoveredIdx] ?? 0).toFixed(2)}
+
+ {labelA} {a}
+ ·
+ {labelB} {b}
+
+ {/if}
+
diff --git a/ui/src/lib/components/Ticker.svelte b/ui/src/lib/components/Ticker.svelte
index fafe514..6109f1f 100644
--- a/ui/src/lib/components/Ticker.svelte
+++ b/ui/src/lib/components/Ticker.svelte
@@ -16,14 +16,15 @@
}
let items = $derived([
- { label: 'RPS', value: ticker.rps.toFixed(1), tone: 'ink' as const },
- { label: 'p95', value: fmtMs(ticker.p95_ms), tone: (ticker.p95_ms > 2000 ? 'err' : ticker.p95_ms > 1500 ? 'warn' : 'ok') as 'ok' | 'warn' | 'err' },
- { label: 'Errors', value: `${ticker.error_pct.toFixed(2)}%`, tone: (ticker.error_pct > 1 ? 'err' : ticker.error_pct > 0.5 ? 'warn' : 'ok') as 'ok' | 'warn' | 'err' },
- { label: 'Conc.', value: `${ticker.concurrency_active} / ${ticker.concurrency_max}`, tone: 'ink' as const },
- { label: 'Chromium', value: ticker.chromium_status.toUpperCase(), tone: (ticker.chromium_status === 'up' ? 'ok' : ticker.chromium_status === 'n/a' ? 'ink' : 'err') as 'ok' | 'ink' | 'err' },
- { label: 'LibreOff', value: ticker.libreoffice_status.toUpperCase(), tone: (ticker.libreoffice_status === 'up' ? 'ok' : ticker.libreoffice_status === 'n/a' ? 'ink' : 'err') as 'ok' | 'ink' | 'err' },
- { label: 'Queue', value: String(Math.round(ticker.queue_size)), tone: 'ink' as const },
- { label: 'Uptime', value: fmtUptime(ticker.uptime_seconds), tone: 'ok' as const },
+ { label: 'RPS', value: ticker.rps.toFixed(1), tone: 'ink' as const },
+ { label: 'P50', value: fmtMs(ticker.p50_ms), tone: (ticker.p50_ms > 1000 ? 'err' : ticker.p50_ms > 500 ? 'warn' : 'ok') as 'ok' | 'warn' | 'err' },
+ { label: 'P55', value: fmtMs(ticker.p55_ms), tone: (ticker.p55_ms > 1000 ? 'err' : ticker.p55_ms > 500 ? 'warn' : 'ok') as 'ok' | 'warn' | 'err' },
+ { label: 'P95', value: fmtMs(ticker.p95_ms), tone: (ticker.p95_ms > 2000 ? 'err' : ticker.p95_ms > 1500 ? 'warn' : 'ok') as 'ok' | 'warn' | 'err' },
+ { label: '5XX', value: `${ticker.server_error_pct.toFixed(2)}%`, tone: (ticker.server_error_pct > 1 ? 'err' : ticker.server_error_pct > 0 ? 'warn' : 'ok') as 'ok' | 'warn' | 'err' },
+ { label: '429', value: `${ticker.rate_limit_pct.toFixed(2)}%`, tone: (ticker.rate_limit_pct > 5 ? 'err' : ticker.rate_limit_pct > 0 ? 'warn' : 'ok') as 'ok' | 'warn' | 'err' },
+ { label: 'Conc.', value: `${ticker.concurrency_active} / ${ticker.concurrency_max}`, tone: 'ink' as const },
+ { label: 'Queue', value: String(Math.round(ticker.queue_size)), tone: 'ink' as const },
+ { label: 'Uptime', value: fmtUptime(ticker.uptime_seconds), tone: 'ok' as const },
]);
diff --git a/ui/src/lib/components/side-rail/Batches.svelte b/ui/src/lib/components/side-rail/Batches.svelte
index d533fd7..0a3fc8f 100644
--- a/ui/src/lib/components/side-rail/Batches.svelte
+++ b/ui/src/lib/components/side-rail/Batches.svelte
@@ -6,31 +6,65 @@
import Pill from '../shared/Pill.svelte';
import SlimBar from '../shared/SlimBar.svelte';
- let { batches, t, D }: { batches: BatchPayload[]; t: Theme; D: { pad: number; fz: number; rowPy: number } } = $props();
+ let { batches, t, D, style = '' }: { batches: BatchPayload[]; t: Theme; D: { pad: number; fz: number; rowPy: number }; style?: string } = $props();
function batchTone(status: string): 'ok' | 'warn' | 'err' | 'accent' | 'ink' {
- if (status === 'failed') return 'err';
+ if (status === 'failed') return 'err';
if (status === 'completed') return 'ok';
- if (status === 'queued') return 'ink';
+ if (status === 'queued') return 'ink';
return 'accent';
}
+
+ // Only show active jobs; completed/failed drop off automatically.
+ let active = $derived(batches.filter(b => b.status === 'running' || b.status === 'queued'));
+ let queuedCnt = $derived(batches.filter(b => b.status === 'queued').length);
+ let runningCnt = $derived(batches.filter(b => b.status === 'running').length);
+ let doneCnt = $derived(batches.filter(b => b.status === 'completed' || b.status === 'failed').length);
-
- {#if batches.length === 0}
- No recent batches
+
+ {#if active.length === 0}
+
+
+
+
+
Queue is empty
+ {#if doneCnt > 0}
+
{doneCnt} job{doneCnt > 1 ? 's' : ''} completed
+ {:else}
+
POST /batch to submit a job
+ {/if}
+
+
{:else}
-
-
- {#each batches as b, i}
-
- | {b.id} |
- {b.status.slice(0, 4)} |
- |
- {b.elapsed} |
-
- {/each}
-
-
+
+ {#each active as b, i}
+
+
+
+
{b.status.slice(0, 4).toUpperCase()}
+
{b.id.slice(0, 16)}…
+
+
+ {b.output_mode.toUpperCase()}
+ {b.elapsed}
+
+
+
+
+
+ {b.completed_items}/{b.total_items}
+ {#if b.failed_items > 0} · {b.failed_items} err{/if}
+
+
+
+ {/each}
+
{/if}
diff --git a/ui/src/lib/components/side-rail/Concurrency.svelte b/ui/src/lib/components/side-rail/Concurrency.svelte
index e15dbe6..ab36b58 100644
--- a/ui/src/lib/components/side-rail/Concurrency.svelte
+++ b/ui/src/lib/components/side-rail/Concurrency.svelte
@@ -7,8 +7,9 @@
let { conc, t, D }: { conc: ConcurrencyPayload; t: Theme; D: { pad: number; fz: number } } = $props();
- let tone = $derived((conc.active >= conc.crit_threshold ? 'err' : conc.active >= conc.warn_threshold ? 'warn' : 'ok') as 'ok' | 'warn' | 'err');
+ let tone = $derived((conc.active >= conc.crit_threshold ? 'warn' : conc.active >= conc.warn_threshold ? 'warn' : 'ok') as 'ok' | 'warn' | 'err');
let pct = $derived(Math.round((conc.active / Math.max(1, conc.max)) * 100));
+ let statusLabel = $derived(conc.active > conc.max ? 'BUSY' : tone === 'warn' ? 'WARN' : 'OK');
let hoveredSlot = $state(null);
@@ -34,7 +35,7 @@
{conc.active} / {conc.max}
- {pct}% · {tone}
+ {Math.min(pct, 100)}% · {statusLabel}
@@ -74,5 +75,20 @@
0warn {conc.warn_threshold}crit {conc.crit_threshold}{conc.max}
+
+
+
+
+
wait p95
+
+ {conc.queue_wait_p95_ms >= 1000 ? `${(conc.queue_wait_p95_ms / 1000).toFixed(1)}s` : `${conc.queue_wait_p95_ms.toFixed(0)}ms`}
+
+
time before slot acquired
+
+
+
processing
+
{conc.queue_processing} job{conc.queue_processing !== 1 ? 's' : ''}
+
+
diff --git a/ui/src/lib/components/side-rail/Engines.svelte b/ui/src/lib/components/side-rail/Engines.svelte
index 5177ac3..7746325 100644
--- a/ui/src/lib/components/side-rail/Engines.svelte
+++ b/ui/src/lib/components/side-rail/Engines.svelte
@@ -6,7 +6,12 @@
import Pill from '../shared/Pill.svelte';
import BarSeries from '../shared/BarSeries.svelte';
- let { engines, t, D }: { engines: EnginePayload[]; t: Theme; D: { fz: number; pad: number } } = $props();
+ let { engines, convRps, t, D }: {
+ engines: EnginePayload[];
+ convRps: Record;
+ t: Theme;
+ D: { fz: number; pad: number };
+ } = $props();
function engineTone(e: EnginePayload): 'ok' | 'warn' | 'err' | 'ink' {
if (e.status === 'n/a') return 'ink';
@@ -14,13 +19,22 @@
if (e.restarts > 5) return 'warn';
return 'ok';
}
- function engineColor(e: EnginePayload, t: Theme): string {
+ function engineColor(e: EnginePayload): string {
const tone = engineTone(e);
if (tone === 'ok') return t.ok;
if (tone === 'warn') return t.warn;
if (tone === 'err') return t.err;
return t.muted;
}
+ function fmtIdle(s: number): string {
+ if (s === 0) return 'active';
+ if (s < 60) return `idle ${s}s`;
+ return `idle ${Math.floor(s / 60)}m`;
+ }
+ function fmtBytes(mb: number): string {
+ if (mb >= 1024) return `${(mb / 1024).toFixed(1)}GB`;
+ return `${mb.toFixed(1)}MB`;
+ }
@@ -31,6 +45,9 @@
{e.name}
{e.status.toUpperCase()}
+ {#if (convRps[e.name.toLowerCase()] ?? 0) > 0}
+
{(convRps[e.name.toLowerCase()] ?? 0).toFixed(2)} conv/s
+ {/if}
{e.restarts} activation{e.restarts !== 1 ? 's' : ''} · {e.mode}
@@ -39,13 +56,31 @@
{#if e.mini_series.length > 0}
(v * 100).toFixed(0) + '%'}
{t}
/>
{/if}
+
+
+
total conv
+
{e.conversions_total}
+
+
+
err%
+
{e.error_rate.toFixed(2)}
+
+
+
data
+
{fmtBytes(e.bytes_mb)}
+
+
+
state
+
{fmtIdle(e.idle_secs)}
+
+
{/each}
{#if engines.length === 0}
diff --git a/ui/src/lib/types.ts b/ui/src/lib/types.ts
index 90431f5..5e0c117 100644
--- a/ui/src/lib/types.ts
+++ b/ui/src/lib/types.ts
@@ -2,24 +2,29 @@
export interface MetricsSample {
ts: number;
rps: number;
+ p50_ms: number;
+ p55_ms: number;
p95_ms: number;
- error_pct: number;
+ server_error_pct: number;
+ rate_limit_pct: number;
queue_size: number;
concurrency_active: number;
cpu_pct: number;
memory_mb: number;
+ chromium_conv_rps: number;
+ libreoffice_conv_rps: number;
+ queue_wait_p95_ms: number;
}
export interface TickerPayload {
rps: number;
+ p50_ms: number;
+ p55_ms: number;
p95_ms: number;
- error_pct: number;
+ server_error_pct: number;
+ rate_limit_pct: number;
concurrency_active: number;
concurrency_max: number;
- chromium_status: string;
- chromium_restarts: number;
- libreoffice_status: string;
- libreoffice_restarts: number;
queue_size: number;
uptime_seconds: number;
}
@@ -42,6 +47,10 @@ export interface EnginePayload {
restarts: number;
mode: string;
mini_series: number[];
+ conversions_total: number;
+ error_rate: number;
+ bytes_mb: number;
+ idle_secs: number;
}
export interface ConcurrencyPayload {
@@ -49,6 +58,8 @@ export interface ConcurrencyPayload {
max: number;
warn_threshold: number;
crit_threshold: number;
+ queue_wait_p95_ms: number;
+ queue_processing: number;
}
export interface ResourcesPayload {
@@ -58,10 +69,14 @@ export interface ResourcesPayload {
}
export interface ThroughputPayload {
+ ts_series: number[];
rps_series: number[];
rps_baseline: number;
p95_series: number[];
p95_target_s: number;
+ chromium_conv_series: number[];
+ libreoffice_conv_series: number[];
+ queue_wait_p95_series: number[];
}
export interface BatchPayload {
@@ -69,6 +84,10 @@ export interface BatchPayload {
status: string;
progress_pct: number;
elapsed: string;
+ total_items: number;
+ completed_items: number;
+ failed_items: number;
+ output_mode: string;
}
export interface RequestLogEntry {
diff --git a/ui/src/routes/+page.svelte b/ui/src/routes/+page.svelte
index 88e2270..3fa0f2a 100644
--- a/ui/src/routes/+page.svelte
+++ b/ui/src/routes/+page.svelte
@@ -9,9 +9,9 @@
import Engines from '$lib/components/side-rail/Engines.svelte';
import Concurrency from '$lib/components/side-rail/Concurrency.svelte';
import Batches from '$lib/components/side-rail/Batches.svelte';
- import Resources from '$lib/components/side-rail/Resources.svelte';
import ThroughputStrip from '$lib/components/ThroughputStrip.svelte';
- import ActivityStrip from '$lib/components/ActivityStrip.svelte';
+ import CpuChart from '$lib/components/CpuChart.svelte';
+ import MemChart from '$lib/components/MemChart.svelte';
onMount(() => metricsStore.start());
onDestroy(() => metricsStore.stop());
@@ -30,9 +30,9 @@
let D = $derived(themeStore.D);
-
+
{#if metricsStore.loading}
-
+
Connecting to pdfbro…
{:else if metricsStore.data}
@@ -44,25 +44,38 @@
-
-
+
+
-
-
-
-
+
+
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
{/if}