diff --git a/apps/gateway/Cargo.toml b/apps/gateway/Cargo.toml index 588a1bf..20c93e4 100644 --- a/apps/gateway/Cargo.toml +++ b/apps/gateway/Cargo.toml @@ -61,6 +61,12 @@ futures-util = "0.3" # Error handling anyhow = "1" +# Date/time (for Hands DB rows) +chrono = { version = "0.4", features = ["serde"] } + +# ULID generation (for Hands session tokens) +ulid = "1" + # Time (for certificate validity) time = "0.3" diff --git a/apps/gateway/src/gateway.rs b/apps/gateway/src/gateway.rs index 3be177f..08d7fd3 100644 --- a/apps/gateway/src/gateway.rs +++ b/apps/gateway/src/gateway.rs @@ -31,6 +31,7 @@ use tracing::{info, warn}; use crate::auth::AuthUser; use crate::ca::CertificateAuthority; use crate::connect::{self, CachedConnect, ConnectCacheKey, ConnectError, PolicyEngine}; +use crate::hands; use crate::inject::{self, ConnectRule}; // ── GatewayState ─────────────────────────────────────────────────────── @@ -42,6 +43,7 @@ pub(crate) struct GatewayState { pub http_client: reqwest::Client, pub policy_engine: Arc, pub connect_cache: Arc>, + pub hands_state: Arc, } // ── GatewayServer ─────────────────────────────────────────────────────── @@ -52,7 +54,12 @@ pub struct GatewayServer { } impl GatewayServer { - pub fn new(ca: CertificateAuthority, port: u16, policy_engine: Arc) -> Self { + pub fn new( + ca: CertificateAuthority, + port: u16, + policy_engine: Arc, + hands_state: Arc, + ) -> Self { let state = GatewayState { ca: Arc::new(ca), http_client: reqwest::Client::builder() @@ -63,6 +70,7 @@ impl GatewayServer { .expect("build HTTP client"), policy_engine, connect_cache: Arc::new(DashMap::new()), + hands_state, }; Self { state, port } @@ -95,11 +103,31 @@ impl GatewayServer { ]) .allow_credentials(true); + // Build Hands sub-router with its own state. + let hands_router: Router = Router::new() + .route("/jobs", axum::routing::post(hands::api::create_job)) + .route("/jobs", axum::routing::get(hands::api::list_jobs)) + .route("/jobs/{id}", axum::routing::get(hands::api::get_job)) + .route("/jobs/{id}/start", axum::routing::post(hands::api::start_job)) + .route("/jobs/{id}/cancel", axum::routing::post(hands::api::cancel_job)) + .route("/sessions", axum::routing::post(hands::api::create_session)) + .route("/sessions/{id}", axum::routing::get(hands::api::get_session)) + .route("/sessions/{id}", axum::routing::delete(hands::api::close_session)) + .route("/sessions/{id}/activated", axum::routing::post(hands::api::activate_session)) + .route("/sessions/{id}/emergency-stop", axum::routing::post(hands::api::emergency_stop)) + .route("/sessions/{id}/next-packet", axum::routing::get(hands::api::next_packet)) + .route("/sessions/{id}/packet-acked", axum::routing::post(hands::api::packet_acked)) + .route("/sessions/{id}/step-status", axum::routing::post(hands::api::step_status)) + .route("/screenshots", axum::routing::post(hands::api::upload_screenshot)) + .route("/screenshots/{id}", axum::routing::get(hands::api::get_screenshot)) + .with_state(Arc::clone(&self.state.hands_state)); + // Build the Axum router for non-CONNECT routes. // The fallback returns 400 Bad Request for anything other than defined routes. let axum_router = Router::new() .route("/healthz", axum::routing::get(healthz)) .route("/me", axum::routing::get(me)) + .nest("/v1/hands", hands_router) .layer(cors_layer) .fallback(fallback) .with_state(self.state.clone()); diff --git a/apps/gateway/src/hands/api.rs b/apps/gateway/src/hands/api.rs new file mode 100644 index 0000000..f794553 --- /dev/null +++ b/apps/gateway/src/hands/api.rs @@ -0,0 +1,600 @@ +//! HTTP API endpoints for OnlyAgent Hands. +//! +//! Session management: +//! POST /v1/hands/sessions — Create control session +//! GET /v1/hands/sessions/:id — Get session status +//! DELETE /v1/hands/sessions/:id — Close session +//! POST /v1/hands/sessions/:id/activated — Browser reports OnlyKey confirmed +//! POST /v1/hands/sessions/:id/emergency-stop — Force stop +//! +//! Jobs: +//! POST /v1/hands/jobs — Create job +//! GET /v1/hands/jobs — List jobs +//! GET /v1/hands/jobs/:id — Get job + steps +//! POST /v1/hands/jobs/:id/start — Start job +//! POST /v1/hands/jobs/:id/cancel — Cancel job +//! +//! Instruction delivery (browser ↔ gateway): +//! GET /v1/hands/sessions/:id/next-packet — Get next compiled packet +//! POST /v1/hands/sessions/:id/packet-acked — Browser confirms delivery +//! POST /v1/hands/sessions/:id/step-status — Browser forwards device status +//! +//! Screenshots: +//! POST /v1/hands/screenshots — Upload screenshot +//! GET /v1/hands/screenshots/:id — Retrieve screenshot + +use std::sync::Arc; +use std::time::Instant; + +use axum::extract::{Path, Query, State}; +use axum::http::StatusCode; +use axum::response::IntoResponse; +use axum::Json; +use base64::Engine; +use tracing::{info, warn}; + +use super::compile; +use super::db as hands_db; +use super::models::*; +use super::packet; +use super::session::{QueuedPacket, SessionManager}; + +// ── Hands state shared across all handlers ───────────────────────────── + +#[derive(Clone)] +pub struct HandsState { + pub pool: sqlx::PgPool, + pub session_manager: Arc, +} + +// ── Job endpoints ────────────────────────────────────────────────────── + +/// POST /v1/hands/jobs +pub async fn create_job( + State(state): State>, + Json(req): Json, +) -> impl IntoResponse { + let job_id = generate_id("hj"); + + if let Err(e) = hands_db::create_job( + &state.pool, + &job_id, + "current-user", // TODO: extract from auth + &req.name, + &req.description, + req.host_os.as_deref(), + ) + .await + { + warn!(error = %e, "hands: failed to create job"); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({"error": "failed to create job"})), + ); + } + + // Create steps + for (i, step) in req.steps.iter().enumerate() { + let step_id = generate_id("hs"); + if let Err(e) = hands_db::create_step( + &state.pool, + &step_id, + &job_id, + i as i32, + &step.description, + &step.macro_instructions, + &step.expected_outcome, + step.max_retries, + step.timeout_ms, + step.require_confirm, + ) + .await + { + warn!(error = %e, "hands: failed to create step"); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({"error": "failed to create step"})), + ); + } + } + + let _ = hands_db::insert_audit_event( + &state.pool, + &generate_id("ha"), + &job_id, + "job_created", + None, + None, + None, + ) + .await; + + info!(job_id = job_id, steps = req.steps.len(), "hands: job created"); + + ( + StatusCode::CREATED, + Json(serde_json::json!({ "id": job_id, "status": "draft" })), + ) +} + +/// GET /v1/hands/jobs +pub async fn list_jobs( + State(state): State>, +) -> impl IntoResponse { + let user_id = "current-user"; // TODO: extract from auth + match hands_db::find_jobs_by_user(&state.pool, user_id).await { + Ok(jobs) => { + let mut summaries = Vec::new(); + for job in jobs { + let step_count = hands_db::count_steps(&state.pool, &job.id) + .await + .unwrap_or(0); + summaries.push(JobSummary { + id: job.id, + name: job.name, + description: job.description, + status: job.status, + host_os: job.host_os, + step_count, + created_at: job.created_at.to_string(), + }); + } + (StatusCode::OK, Json(serde_json::json!({ "items": summaries }))) + } + Err(e) => { + warn!(error = %e, "hands: failed to list jobs"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({"error": "internal error"})), + ) + } + } +} + +/// GET /v1/hands/jobs/:id +pub async fn get_job( + State(state): State>, + Path(job_id): Path, +) -> impl IntoResponse { + let job = match hands_db::find_job(&state.pool, &job_id).await { + Ok(Some(j)) => j, + Ok(None) => { + return (StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "not found"}))); + } + Err(e) => { + warn!(error = %e, "hands: db error"); + return (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": "internal"}))); + } + }; + + let steps = hands_db::find_steps_by_job(&state.pool, &job_id) + .await + .unwrap_or_default(); + + let step_views: Vec = steps + .into_iter() + .map(|s| StepView { + id: s.id, + sequence_number: s.sequence_number, + description: s.description, + macro_instructions: s.macro_instructions, + expected_outcome: s.expected_outcome, + status: s.status, + retry_count: s.retry_count, + max_retries: s.max_retries, + timeout_ms: s.timeout_ms, + require_confirm: s.require_confirm, + error_message: s.error_message, + }) + .collect(); + + let detail = JobDetail { + id: job.id, + name: job.name, + description: job.description, + status: job.status, + host_os: job.host_os, + max_duration_secs: job.max_duration_secs, + steps: step_views, + created_at: job.created_at.to_string(), + updated_at: job.updated_at.to_string(), + completed_at: job.completed_at.map(|t| t.to_string()), + }; + + (StatusCode::OK, Json(serde_json::to_value(detail).unwrap())) +} + +/// POST /v1/hands/jobs/:id/start +pub async fn start_job( + State(state): State>, + Path(job_id): Path, +) -> impl IntoResponse { + if let Err(e) = hands_db::update_job_status(&state.pool, &job_id, "queued").await { + warn!(error = %e, "hands: failed to start job"); + return StatusCode::INTERNAL_SERVER_ERROR; + } + let _ = hands_db::insert_audit_event( + &state.pool, &generate_id("ha"), &job_id, "job_started", None, None, None, + ).await; + info!(job_id = job_id, "hands: job queued for execution"); + StatusCode::OK +} + +/// POST /v1/hands/jobs/:id/cancel +pub async fn cancel_job( + State(state): State>, + Path(job_id): Path, +) -> impl IntoResponse { + let _ = hands_db::update_job_status(&state.pool, &job_id, "cancelled").await; + let _ = hands_db::insert_audit_event( + &state.pool, &generate_id("ha"), &job_id, "job_cancelled", None, None, None, + ).await; + StatusCode::OK +} + +// ── Session endpoints ────────────────────────────────────────────────── + +/// POST /v1/hands/sessions +pub async fn create_session( + State(state): State>, + Json(req): Json, +) -> impl IntoResponse { + let session_id = generate_id("hses"); + let agent_token = generate_id("hsat"); + + // Generate nonce for WebHID session auth + let mut nonce = [0u8; 16]; + for byte in nonce.iter_mut() { + *byte = (std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .subsec_nanos() + & 0xFF) as u8; + } + let nonce_b64 = base64::engine::general_purpose::STANDARD.encode(nonce); + + if let Err(e) = hands_db::create_session( + &state.pool, + &session_id, + &req.job_id, + "current-user", // TODO: extract from auth + &agent_token, + req.host_os.as_deref(), + ) + .await + { + warn!(error = %e, "hands: failed to create session"); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({"error": "failed to create session"})), + ); + } + + // Register in-memory session + state.session_manager.register(session_id.clone(), req.job_id.clone()); + + let _ = hands_db::insert_audit_event( + &state.pool, &generate_id("ha"), &req.job_id, + "session_created", None, Some(&session_id), None, + ).await; + + info!(session_id = session_id, job_id = req.job_id, "hands: session created"); + + ( + StatusCode::CREATED, + Json(serde_json::to_value(CreateSessionResponse { + session_id, + nonce: nonce_b64, + agent_token, + }) + .unwrap()), + ) +} + +/// GET /v1/hands/sessions/:id +pub async fn get_session( + State(state): State>, + Path(session_id): Path, +) -> impl IntoResponse { + match hands_db::find_session(&state.pool, &session_id).await { + Ok(Some(s)) => ( + StatusCode::OK, + Json(serde_json::to_value(SessionView { + id: s.id, + job_id: s.job_id, + status: s.status, + host_os: s.host_os, + created_at: s.created_at.to_string(), + last_activity_at: s.last_activity_at.to_string(), + }).unwrap()), + ), + Ok(None) => (StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "not found"}))), + Err(e) => { + warn!(error = %e, "hands: session query error"); + (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": "internal"}))) + } + } +} + +/// DELETE /v1/hands/sessions/:id +pub async fn close_session( + State(state): State>, + Path(session_id): Path, +) -> impl IntoResponse { + let _ = hands_db::close_session(&state.pool, &session_id, "user_closed").await; + state.session_manager.close(&session_id); + StatusCode::OK +} + +/// POST /v1/hands/sessions/:id/activated +pub async fn activate_session( + State(state): State>, + Path(session_id): Path, + Json(req): Json, +) -> impl IntoResponse { + if let Err(e) = hands_db::activate_session( + &state.pool, + &session_id, + &req.browser_session_id, + req.device_id.as_deref(), + req.host_os.as_deref(), + ) + .await + { + warn!(error = %e, "hands: failed to activate session"); + return StatusCode::INTERNAL_SERVER_ERROR; + } + + state.session_manager.activate(&session_id); + + info!(session_id = session_id, "hands: session activated (OnlyKey confirmed)"); + StatusCode::OK +} + +/// POST /v1/hands/sessions/:id/emergency-stop +pub async fn emergency_stop( + State(state): State>, + Path(session_id): Path, +) -> impl IntoResponse { + let _ = hands_db::close_session(&state.pool, &session_id, "emergency_stop").await; + state.session_manager.close(&session_id); + + // Find the session's job and mark it paused + if let Ok(Some(session)) = hands_db::find_session(&state.pool, &session_id).await { + let _ = hands_db::update_job_status(&state.pool, &session.job_id, "paused").await; + let _ = hands_db::insert_audit_event( + &state.pool, &generate_id("ha"), &session.job_id, + "emergency_stop", None, Some(&session_id), None, + ).await; + } + + warn!(session_id = session_id, "hands: EMERGENCY STOP"); + StatusCode::OK +} + +// ── Instruction delivery endpoints ───────────────────────────────────── + +/// GET /v1/hands/sessions/:id/next-packet +/// +/// Browser polls this to get the next instruction packet to deliver via WebHID. +pub async fn next_packet( + State(state): State>, + Path(session_id): Path, +) -> impl IntoResponse { + let _ = hands_db::touch_session(&state.pool, &session_id).await; + + // Check for queued packets first + if let Some(packet) = state.session_manager.dequeue_packet(&session_id) { + return ( + StatusCode::OK, + Json(serde_json::to_value(NextPacketResponse { + packet_id: packet.packet_id, + step_id: packet.step_id, + cbor_b64: packet.cbor_b64, + flags: packet.flags, + }) + .unwrap()), + ); + } + + // No packets queued — check if there's a pending step to compile + let session = match state.session_manager.sessions.get(&session_id) { + Some(s) => s, + None => return (StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "session not found"}))), + }; + + if session.status != SessionStatus::Active { + return (StatusCode::NO_CONTENT, Json(serde_json::json!(null))); + } + + let job_id = session.job_id.clone(); + drop(session); // release DashMap ref + + // Find the next pending step + let step = match hands_db::find_next_pending_step(&state.pool, &job_id).await { + Ok(Some(s)) => s, + _ => return (StatusCode::NO_CONTENT, Json(serde_json::json!(null))), + }; + + // Determine host OS + let host_os = hands_db::find_job(&state.pool, &job_id) + .await + .ok() + .flatten() + .and_then(|j| j.host_os) + .and_then(|os| HostOS::from_str(&os)) + .unwrap_or(HostOS::Linux); + + // Parse macro instructions + let macros: Vec = + serde_json::from_value(step.macro_instructions.clone()).unwrap_or_default(); + + // Compile to raw keystrokes + let keystrokes = compile::compile_instruction_set(¯os, &host_os); + + // Encode as CBOR + let instruction_packet = packet::InstructionPacket { + session_id: session_id.clone(), + step_id: step.id.clone(), + instructions: keystrokes, + expect_screenshot: true, + timeout_ms: step.timeout_ms as u32, + }; + + let cbor = packet::encode_cbor(&instruction_packet).unwrap_or_default(); + let cbor_b64 = base64::engine::general_purpose::STANDARD.encode(&cbor); + + // Mark step as sending + let _ = hands_db::update_step_status(&state.pool, &step.id, "sending", None).await; + + let flags = if step.require_confirm { + packet::FLAG_ENCRYPTED | packet::FLAG_REQUIRES_CONFIRM + } else { + packet::FLAG_ENCRYPTED + }; + + let packet_id = generate_id("hp"); + + ( + StatusCode::OK, + Json(serde_json::to_value(NextPacketResponse { + packet_id, + step_id: step.id, + cbor_b64, + flags, + }) + .unwrap()), + ) +} + +/// POST /v1/hands/sessions/:id/packet-acked +pub async fn packet_acked( + State(state): State>, + Path(session_id): Path, + Json(req): Json, +) -> impl IntoResponse { + let _ = hands_db::touch_session(&state.pool, &session_id).await; + info!(session_id = session_id, packet_id = req.packet_id, "hands: packet acknowledged"); + StatusCode::OK +} + +/// POST /v1/hands/sessions/:id/step-status +pub async fn step_status( + State(state): State>, + Path(session_id): Path, + Json(req): Json, +) -> impl IntoResponse { + let _ = hands_db::touch_session(&state.pool, &session_id).await; + + let status = match req.status_code { + 0x00 => "pending", // queued on device + 0x01 => "executing", + 0x02 => "succeeded", + 0x03 => "failed", + 0x04 => { + // Button stop → emergency stop the whole session + warn!(session_id = session_id, "hands: button stop detected"); + let _ = hands_db::close_session(&state.pool, &session_id, "button_stop").await; + state.session_manager.close(&session_id); + "failed" + } + _ => "failed", + }; + + let _ = hands_db::update_step_status( + &state.pool, + &req.step_id, + status, + req.detail.as_deref(), + ) + .await; + + info!( + session_id = session_id, + step_id = req.step_id, + status_code = req.status_code, + "hands: step status update" + ); + + StatusCode::OK +} + +// ── Screenshot endpoints ─────────────────────────────────────────────── + +/// POST /v1/hands/screenshots +pub async fn upload_screenshot( + State(state): State>, + axum::extract::Multipart(mut multipart): axum::extract::Multipart, +) -> impl IntoResponse { + let mut session_id = String::new(); + let mut step_id = String::new(); + let mut image_data: Vec = Vec::new(); + let mut width = 0i32; + let mut height = 0i32; + + while let Ok(Some(field)) = multipart.next_field().await { + let name = field.name().unwrap_or("").to_string(); + match name.as_str() { + "session_id" => session_id = field.text().await.unwrap_or_default(), + "step_id" => step_id = field.text().await.unwrap_or_default(), + "width" => width = field.text().await.unwrap_or_default().parse().unwrap_or(0), + "height" => height = field.text().await.unwrap_or_default().parse().unwrap_or(0), + "image" => image_data = field.bytes().await.unwrap_or_default().to_vec(), + _ => {} + } + } + + if session_id.is_empty() || step_id.is_empty() || image_data.is_empty() { + return ( + StatusCode::BAD_REQUEST, + Json(serde_json::json!({"error": "missing required fields"})), + ); + } + + let screenshot_id = generate_id("hsc"); + if let Err(e) = hands_db::insert_screenshot( + &state.pool, + &screenshot_id, + &session_id, + &step_id, + &image_data, + width, + height, + ) + .await + { + warn!(error = %e, "hands: failed to store screenshot"); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({"error": "failed to store"})), + ); + } + + info!( + screenshot_id = screenshot_id, + session_id = session_id, + size_bytes = image_data.len(), + "hands: screenshot uploaded" + ); + + ( + StatusCode::CREATED, + Json(serde_json::json!({ "id": screenshot_id })), + ) +} + +/// GET /v1/hands/screenshots/:id +pub async fn get_screenshot( + State(_state): State>, + Path(screenshot_id): Path, +) -> impl IntoResponse { + // TODO: query screenshot by ID and return image bytes + // For now, return 501 + (StatusCode::NOT_IMPLEMENTED, Json(serde_json::json!({"error": "not yet implemented"}))) +} + +// ── Helpers ──────────────────────────────────────────────────────────── + +fn generate_id(prefix: &str) -> String { + format!("{}_{}", prefix, ulid::Ulid::new().to_string().to_lowercase()) +} diff --git a/apps/gateway/src/hands/compile.rs b/apps/gateway/src/hands/compile.rs new file mode 100644 index 0000000..73dcaf4 --- /dev/null +++ b/apps/gateway/src/hands/compile.rs @@ -0,0 +1,353 @@ +//! Macro instruction compiler. +//! +//! Compiles high-level macro instructions (e.g., "open_browser", "navigate_url") +//! into raw HID keystroke sequences specific to the target OS. + +use super::models::*; + +/// Compile a single macro instruction into a sequence of raw keystrokes. +pub fn compile_macro(instruction: &MacroInstruction, os: &HostOS) -> Vec { + match instruction { + MacroInstruction::OpenBrowser { browser } => { + compile_open_browser(os, browser.as_deref()) + } + MacroInstruction::NavigateUrl { url } => compile_navigate_url(os, url), + MacroInstruction::OpenTerminal => compile_open_terminal(os), + MacroInstruction::RunCommand { command } => compile_run_command(os, command), + MacroInstruction::Screenshot => compile_screenshot(os), + MacroInstruction::SwitchWindow => compile_switch_window(os), + MacroInstruction::CloseWindow => compile_close_window(os), + MacroInstruction::SelectAll => compile_select_all(os), + MacroInstruction::Copy => compile_copy(os), + MacroInstruction::Paste => compile_paste(os), + MacroInstruction::Save => compile_save(os), + MacroInstruction::Undo => compile_undo(os), + MacroInstruction::Find { text } => compile_find(os, text), + MacroInstruction::TypeText { text } => compile_type_text(text), + MacroInstruction::Wait { seconds } => vec![RawKeystroke::Delay { ms: seconds * 1000 }], + MacroInstruction::KeyPress { key, modifiers } => { + compile_key_press(key, modifiers.as_deref()) + } + MacroInstruction::KeyCombo { keys, modifiers } => { + compile_key_combo(keys, modifiers.as_deref()) + } + } +} + +/// Compile a full instruction set (list of macros) for a target OS. +pub fn compile_instruction_set(macros: &[MacroInstruction], os: &HostOS) -> Vec { + macros.iter().flat_map(|m| compile_macro(m, os)).collect() +} + +// ── Individual macro compilers ───────────────────────────────────────── + +fn gui_mod(os: &HostOS) -> u8 { + match os { + HostOS::MacOS => MOD_GUI, + _ => MOD_CTRL, + } +} + +fn compile_open_browser(os: &HostOS, browser: Option<&str>) -> Vec { + let app_name = browser.unwrap_or("chrome"); + match os { + HostOS::MacOS => vec![ + // Cmd+Space → Spotlight + RawKeystroke::Combo { codes: vec![KEY_SPACE], mods: Some(MOD_GUI) }, + RawKeystroke::Delay { ms: 500 }, + RawKeystroke::TypeText { text: app_name.to_string(), delay_per_char_ms: Some(30) }, + RawKeystroke::Delay { ms: 300 }, + RawKeystroke::Key { code: KEY_ENTER, mods: None }, + RawKeystroke::Delay { ms: 1500 }, + ], + HostOS::Windows => vec![ + // Win key → Start menu search + RawKeystroke::Key { code: KEY_SPACE, mods: Some(MOD_GUI) }, + RawKeystroke::Delay { ms: 500 }, + RawKeystroke::TypeText { text: app_name.to_string(), delay_per_char_ms: Some(30) }, + RawKeystroke::Delay { ms: 500 }, + RawKeystroke::Key { code: KEY_ENTER, mods: None }, + RawKeystroke::Delay { ms: 2000 }, + ], + HostOS::Linux => vec![ + // Super key → App launcher + RawKeystroke::Key { code: KEY_SPACE, mods: Some(MOD_GUI) }, + RawKeystroke::Delay { ms: 500 }, + RawKeystroke::TypeText { text: app_name.to_string(), delay_per_char_ms: Some(30) }, + RawKeystroke::Delay { ms: 500 }, + RawKeystroke::Key { code: KEY_ENTER, mods: None }, + RawKeystroke::Delay { ms: 2000 }, + ], + } +} + +fn compile_navigate_url(os: &HostOS, url: &str) -> Vec { + let mod_key = gui_mod(os); + vec![ + // Ctrl/Cmd+L → Focus address bar + RawKeystroke::Combo { codes: vec![KEY_L], mods: Some(mod_key) }, + RawKeystroke::Delay { ms: 200 }, + // Select all existing text + RawKeystroke::Combo { codes: vec![KEY_A], mods: Some(mod_key) }, + RawKeystroke::Delay { ms: 100 }, + // Type URL + RawKeystroke::TypeText { text: url.to_string(), delay_per_char_ms: Some(10) }, + RawKeystroke::Key { code: KEY_ENTER, mods: None }, + RawKeystroke::Delay { ms: 2000 }, + ] +} + +fn compile_open_terminal(os: &HostOS) -> Vec { + match os { + HostOS::MacOS => vec![ + RawKeystroke::Combo { codes: vec![KEY_SPACE], mods: Some(MOD_GUI) }, + RawKeystroke::Delay { ms: 500 }, + RawKeystroke::TypeText { text: "terminal".to_string(), delay_per_char_ms: Some(30) }, + RawKeystroke::Delay { ms: 300 }, + RawKeystroke::Key { code: KEY_ENTER, mods: None }, + RawKeystroke::Delay { ms: 1000 }, + ], + HostOS::Windows => vec![ + // Win+R → Run dialog + RawKeystroke::Combo { codes: vec![0x15], mods: Some(MOD_GUI) }, // 0x15 = 'r' + RawKeystroke::Delay { ms: 500 }, + RawKeystroke::TypeText { text: "cmd".to_string(), delay_per_char_ms: Some(30) }, + RawKeystroke::Key { code: KEY_ENTER, mods: None }, + RawKeystroke::Delay { ms: 1000 }, + ], + HostOS::Linux => vec![ + // Ctrl+Alt+T → Terminal + RawKeystroke::Combo { + codes: vec![KEY_T], + mods: Some(MOD_CTRL | MOD_ALT), + }, + RawKeystroke::Delay { ms: 1000 }, + ], + } +} + +fn compile_run_command(os: &HostOS, command: &str) -> Vec { + let mut keystrokes = compile_open_terminal(os); + keystrokes.push(RawKeystroke::TypeText { + text: command.to_string(), + delay_per_char_ms: Some(10), + }); + keystrokes.push(RawKeystroke::Key { code: KEY_ENTER, mods: None }); + keystrokes.push(RawKeystroke::Delay { ms: 1000 }); + keystrokes +} + +fn compile_screenshot(os: &HostOS) -> Vec { + match os { + HostOS::MacOS => vec![ + // Cmd+Shift+3 → Full screen capture + RawKeystroke::Combo { + codes: vec![0x20], // '3' + mods: Some(MOD_GUI | MOD_SHIFT), + }, + RawKeystroke::Delay { ms: 500 }, + ], + HostOS::Windows => vec![ + // Win+Shift+S → Snipping tool + RawKeystroke::Combo { + codes: vec![0x16], // 's' + mods: Some(MOD_GUI | MOD_SHIFT), + }, + RawKeystroke::Delay { ms: 500 }, + ], + HostOS::Linux => vec![ + RawKeystroke::Key { code: KEY_PRINTSCREEN, mods: None }, + RawKeystroke::Delay { ms: 500 }, + ], + } +} + +fn compile_switch_window(os: &HostOS) -> Vec { + match os { + HostOS::MacOS => vec![ + RawKeystroke::Combo { codes: vec![KEY_TAB], mods: Some(MOD_GUI) }, + RawKeystroke::Delay { ms: 300 }, + ], + _ => vec![ + RawKeystroke::Combo { codes: vec![KEY_TAB], mods: Some(MOD_ALT) }, + RawKeystroke::Delay { ms: 300 }, + ], + } +} + +fn compile_close_window(os: &HostOS) -> Vec { + let mod_key = gui_mod(os); + vec![ + RawKeystroke::Combo { + codes: vec![0x1A], // 'w' + mods: Some(mod_key), + }, + RawKeystroke::Delay { ms: 300 }, + ] +} + +fn compile_select_all(os: &HostOS) -> Vec { + vec![RawKeystroke::Combo { + codes: vec![KEY_A], + mods: Some(gui_mod(os)), + }] +} + +fn compile_copy(os: &HostOS) -> Vec { + vec![RawKeystroke::Combo { + codes: vec![0x06], // 'c' + mods: Some(gui_mod(os)), + }] +} + +fn compile_paste(os: &HostOS) -> Vec { + vec![RawKeystroke::Combo { + codes: vec![0x19], // 'v' + mods: Some(gui_mod(os)), + }] +} + +fn compile_save(os: &HostOS) -> Vec { + vec![RawKeystroke::Combo { + codes: vec![0x16], // 's' + mods: Some(gui_mod(os)), + }] +} + +fn compile_undo(os: &HostOS) -> Vec { + vec![RawKeystroke::Combo { + codes: vec![0x1D], // 'z' + mods: Some(gui_mod(os)), + }] +} + +fn compile_find(os: &HostOS, text: &str) -> Vec { + let mod_key = gui_mod(os); + vec![ + RawKeystroke::Combo { + codes: vec![0x09], // 'f' + mods: Some(mod_key), + }, + RawKeystroke::Delay { ms: 200 }, + RawKeystroke::TypeText { text: text.to_string(), delay_per_char_ms: Some(10) }, + RawKeystroke::Key { code: KEY_ENTER, mods: None }, + ] +} + +fn compile_type_text(text: &str) -> Vec { + vec![RawKeystroke::TypeText { + text: text.to_string(), + delay_per_char_ms: Some(5), + }] +} + +fn compile_key_press(key: &str, _modifiers: Option<&[String]>) -> Vec { + let code = key_name_to_code(key); + vec![RawKeystroke::Key { code, mods: None }] +} + +fn compile_key_combo(keys: &[String], _modifiers: Option<&[String]>) -> Vec { + let codes: Vec = keys.iter().map(|k| key_name_to_code(k)).collect(); + vec![RawKeystroke::Combo { codes, mods: None }] +} + +/// Map common key names to HID usage codes. +fn key_name_to_code(name: &str) -> u8 { + match name.to_lowercase().as_str() { + "enter" | "return" => KEY_ENTER, + "escape" | "esc" => KEY_ESCAPE, + "space" => KEY_SPACE, + "tab" => KEY_TAB, + "backspace" => KEY_BACKSPACE, + "delete" | "del" => KEY_DELETE, + "up" => KEY_UP, + "down" => KEY_DOWN, + "left" => KEY_LEFT, + "right" => KEY_RIGHT, + "f1" => KEY_F1, + "f3" => KEY_F3, + "f5" => KEY_F5, + "printscreen" => KEY_PRINTSCREEN, + // Letters: a=0x04, b=0x05, ..., z=0x1D + s if s.len() == 1 => { + let ch = s.as_bytes()[0]; + if ch.is_ascii_lowercase() { + 0x04 + (ch - b'a') + } else if ch.is_ascii_digit() { + if ch == b'0' { 0x27 } else { 0x1E + (ch - b'1') } + } else { + 0x00 // unknown + } + } + _ => 0x00, + } +} + +// ── Tests ────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn compile_open_browser_macos() { + let result = compile_macro( + &MacroInstruction::OpenBrowser { browser: Some("firefox".into()) }, + &HostOS::MacOS, + ); + assert!(result.len() >= 4); + // First instruction should be Cmd+Space + match &result[0] { + RawKeystroke::Combo { mods: Some(m), .. } => assert_eq!(*m, MOD_GUI), + _ => panic!("expected Combo"), + } + } + + #[test] + fn compile_navigate_url_windows() { + let result = compile_macro( + &MacroInstruction::NavigateUrl { url: "https://example.com".into() }, + &HostOS::Windows, + ); + // Should include Ctrl+L, select all, type URL, enter + assert!(result.len() >= 4); + match &result[0] { + RawKeystroke::Combo { mods: Some(m), .. } => assert_eq!(*m, MOD_CTRL), + _ => panic!("expected Combo with Ctrl"), + } + } + + #[test] + fn compile_type_text_simple() { + let result = compile_macro( + &MacroInstruction::TypeText { text: "hello".into() }, + &HostOS::Linux, + ); + assert_eq!(result.len(), 1); + match &result[0] { + RawKeystroke::TypeText { text, .. } => assert_eq!(text, "hello"), + _ => panic!("expected TypeText"), + } + } + + #[test] + fn compile_wait() { + let result = compile_macro( + &MacroInstruction::Wait { seconds: 5 }, + &HostOS::MacOS, + ); + assert_eq!(result.len(), 1); + match &result[0] { + RawKeystroke::Delay { ms } => assert_eq!(*ms, 5000), + _ => panic!("expected Delay"), + } + } + + #[test] + fn key_name_to_code_letters() { + assert_eq!(key_name_to_code("a"), 0x04); + assert_eq!(key_name_to_code("z"), 0x1D); + assert_eq!(key_name_to_code("enter"), KEY_ENTER); + assert_eq!(key_name_to_code("space"), KEY_SPACE); + } +} diff --git a/apps/gateway/src/hands/db.rs b/apps/gateway/src/hands/db.rs new file mode 100644 index 0000000..d84b848 --- /dev/null +++ b/apps/gateway/src/hands/db.rs @@ -0,0 +1,417 @@ +//! Database queries for Hands jobs, steps, sessions, and screenshots. + +use anyhow::{Context, Result}; +use sqlx::{FromRow, PgPool}; + +// ── Row types ────────────────────────────────────────────────────────── + +#[derive(Debug, FromRow)] +pub struct HandsJobRow { + pub id: String, + #[sqlx(rename = "userId")] + pub user_id: String, + pub name: String, + pub description: String, + pub status: String, + #[sqlx(rename = "hostOS")] + pub host_os: Option, + pub priority: i32, + #[sqlx(rename = "maxDurationSecs")] + pub max_duration_secs: i32, + #[sqlx(rename = "createdAt")] + pub created_at: chrono::NaiveDateTime, + #[sqlx(rename = "updatedAt")] + pub updated_at: chrono::NaiveDateTime, + #[sqlx(rename = "completedAt")] + pub completed_at: Option, +} + +#[derive(Debug, FromRow)] +pub struct HandsStepRow { + pub id: String, + #[sqlx(rename = "jobId")] + pub job_id: String, + #[sqlx(rename = "sequenceNumber")] + pub sequence_number: i32, + pub description: String, + #[sqlx(rename = "macroInstructions")] + pub macro_instructions: serde_json::Value, + #[sqlx(rename = "expectedOutcome")] + pub expected_outcome: String, + pub status: String, + #[sqlx(rename = "retryCount")] + pub retry_count: i32, + #[sqlx(rename = "maxRetries")] + pub max_retries: i32, + #[sqlx(rename = "timeoutMs")] + pub timeout_ms: i32, + #[sqlx(rename = "requireConfirm")] + pub require_confirm: bool, + #[sqlx(rename = "createdAt")] + pub created_at: chrono::NaiveDateTime, + #[sqlx(rename = "startedAt")] + pub started_at: Option, + #[sqlx(rename = "completedAt")] + pub completed_at: Option, + #[sqlx(rename = "errorMessage")] + pub error_message: Option, +} + +#[derive(Debug, FromRow)] +pub struct HandsSessionRow { + pub id: String, + #[sqlx(rename = "jobId")] + pub job_id: String, + #[sqlx(rename = "userId")] + pub user_id: String, + #[sqlx(rename = "agentToken")] + pub agent_token: String, + #[sqlx(rename = "browserSessionId")] + pub browser_session_id: Option, + pub status: String, + #[sqlx(rename = "hostOS")] + pub host_os: Option, + #[sqlx(rename = "sessionKeyHash")] + pub session_key_hash: Option, + #[sqlx(rename = "deviceId")] + pub device_id: Option, + #[sqlx(rename = "createdAt")] + pub created_at: chrono::NaiveDateTime, + #[sqlx(rename = "lastActivityAt")] + pub last_activity_at: chrono::NaiveDateTime, + #[sqlx(rename = "closedAt")] + pub closed_at: Option, + #[sqlx(rename = "closeReason")] + pub close_reason: Option, +} + +#[derive(Debug, FromRow)] +pub struct HandsScreenshotRow { + pub id: String, + #[sqlx(rename = "sessionId")] + pub session_id: String, + #[sqlx(rename = "stepId")] + pub step_id: String, + #[sqlx(rename = "capturedAt")] + pub captured_at: chrono::NaiveDateTime, + pub width: i32, + pub height: i32, + #[sqlx(rename = "sizeBytes")] + pub size_bytes: i32, + pub analysis: Option, + #[sqlx(rename = "expiresAt")] + pub expires_at: chrono::NaiveDateTime, +} + +// ── Job queries ──────────────────────────────────────────────────────── + +pub async fn find_job(pool: &PgPool, job_id: &str) -> Result> { + sqlx::query_as::<_, HandsJobRow>( + r#"SELECT * FROM "HandsJob" WHERE id = $1 LIMIT 1"#, + ) + .bind(job_id) + .fetch_optional(pool) + .await + .context("querying HandsJob by id") +} + +pub async fn find_jobs_by_user(pool: &PgPool, user_id: &str) -> Result> { + sqlx::query_as::<_, HandsJobRow>( + r#"SELECT * FROM "HandsJob" WHERE "userId" = $1 ORDER BY "createdAt" DESC"#, + ) + .bind(user_id) + .fetch_all(pool) + .await + .context("querying HandsJobs by userId") +} + +pub async fn create_job( + pool: &PgPool, + id: &str, + user_id: &str, + name: &str, + description: &str, + host_os: Option<&str>, +) -> Result<()> { + sqlx::query( + r#"INSERT INTO "HandsJob" (id, "userId", name, description, status, "hostOS") + VALUES ($1, $2, $3, $4, 'draft', $5)"#, + ) + .bind(id) + .bind(user_id) + .bind(name) + .bind(description) + .bind(host_os) + .execute(pool) + .await + .context("inserting HandsJob")?; + Ok(()) +} + +pub async fn update_job_status( + pool: &PgPool, + job_id: &str, + status: &str, +) -> Result<()> { + sqlx::query( + r#"UPDATE "HandsJob" SET status = $2, "updatedAt" = NOW() WHERE id = $1"#, + ) + .bind(job_id) + .bind(status) + .execute(pool) + .await + .context("updating HandsJob status")?; + Ok(()) +} + +// ── Step queries ─────────────────────────────────────────────────────── + +pub async fn find_steps_by_job(pool: &PgPool, job_id: &str) -> Result> { + sqlx::query_as::<_, HandsStepRow>( + r#"SELECT * FROM "HandsStep" WHERE "jobId" = $1 ORDER BY "sequenceNumber" ASC"#, + ) + .bind(job_id) + .fetch_all(pool) + .await + .context("querying HandsSteps by jobId") +} + +pub async fn create_step( + pool: &PgPool, + id: &str, + job_id: &str, + sequence_number: i32, + description: &str, + macro_instructions: &serde_json::Value, + expected_outcome: &str, + max_retries: i32, + timeout_ms: i32, + require_confirm: bool, +) -> Result<()> { + sqlx::query( + r#"INSERT INTO "HandsStep" + (id, "jobId", "sequenceNumber", description, "macroInstructions", "expectedOutcome", + "maxRetries", "timeoutMs", "requireConfirm") + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)"#, + ) + .bind(id) + .bind(job_id) + .bind(sequence_number) + .bind(description) + .bind(macro_instructions) + .bind(expected_outcome) + .bind(max_retries) + .bind(timeout_ms) + .bind(require_confirm) + .execute(pool) + .await + .context("inserting HandsStep")?; + Ok(()) +} + +pub async fn update_step_status( + pool: &PgPool, + step_id: &str, + status: &str, + error_message: Option<&str>, +) -> Result<()> { + sqlx::query( + r#"UPDATE "HandsStep" + SET status = $2, "errorMessage" = $3, "completedAt" = CASE WHEN $2 IN ('succeeded','failed','skipped') THEN NOW() ELSE "completedAt" END + WHERE id = $1"#, + ) + .bind(step_id) + .bind(status) + .bind(error_message) + .execute(pool) + .await + .context("updating HandsStep status")?; + Ok(()) +} + +/// Find the next pending step for a job. +pub async fn find_next_pending_step(pool: &PgPool, job_id: &str) -> Result> { + sqlx::query_as::<_, HandsStepRow>( + r#"SELECT * FROM "HandsStep" + WHERE "jobId" = $1 AND status = 'pending' + ORDER BY "sequenceNumber" ASC + LIMIT 1"#, + ) + .bind(job_id) + .fetch_optional(pool) + .await + .context("querying next pending HandsStep") +} + +// ── Session queries ──────────────────────────────────────────────────── + +pub async fn find_session(pool: &PgPool, session_id: &str) -> Result> { + sqlx::query_as::<_, HandsSessionRow>( + r#"SELECT * FROM "HandsSession" WHERE id = $1 LIMIT 1"#, + ) + .bind(session_id) + .fetch_optional(pool) + .await + .context("querying HandsSession by id") +} + +pub async fn create_session( + pool: &PgPool, + id: &str, + job_id: &str, + user_id: &str, + agent_token: &str, + host_os: Option<&str>, +) -> Result<()> { + sqlx::query( + r#"INSERT INTO "HandsSession" + (id, "jobId", "userId", "agentToken", status, "hostOS") + VALUES ($1, $2, $3, $4, 'establishing', $5)"#, + ) + .bind(id) + .bind(job_id) + .bind(user_id) + .bind(agent_token) + .bind(host_os) + .execute(pool) + .await + .context("inserting HandsSession")?; + Ok(()) +} + +pub async fn activate_session( + pool: &PgPool, + session_id: &str, + browser_session_id: &str, + device_id: Option<&str>, + host_os: Option<&str>, +) -> Result<()> { + sqlx::query( + r#"UPDATE "HandsSession" + SET status = 'active', "browserSessionId" = $2, "deviceId" = $3, + "hostOS" = COALESCE($4, "hostOS"), "lastActivityAt" = NOW() + WHERE id = $1"#, + ) + .bind(session_id) + .bind(browser_session_id) + .bind(device_id) + .bind(host_os) + .execute(pool) + .await + .context("activating HandsSession")?; + Ok(()) +} + +pub async fn close_session( + pool: &PgPool, + session_id: &str, + reason: &str, +) -> Result<()> { + sqlx::query( + r#"UPDATE "HandsSession" + SET status = 'closed', "closedAt" = NOW(), "closeReason" = $2 + WHERE id = $1"#, + ) + .bind(session_id) + .bind(reason) + .execute(pool) + .await + .context("closing HandsSession")?; + Ok(()) +} + +pub async fn touch_session(pool: &PgPool, session_id: &str) -> Result<()> { + sqlx::query( + r#"UPDATE "HandsSession" SET "lastActivityAt" = NOW() WHERE id = $1"#, + ) + .bind(session_id) + .execute(pool) + .await + .context("touching HandsSession")?; + Ok(()) +} + +// ── Screenshot queries ───────────────────────────────────────────────── + +pub async fn find_screenshots_by_session( + pool: &PgPool, + session_id: &str, +) -> Result> { + sqlx::query_as::<_, HandsScreenshotRow>( + r#"SELECT id, "sessionId", "stepId", "capturedAt", width, height, "sizeBytes", analysis, "expiresAt" + FROM "HandsScreenshot" + WHERE "sessionId" = $1 + ORDER BY "capturedAt" DESC"#, + ) + .bind(session_id) + .fetch_all(pool) + .await + .context("querying HandsScreenshots by session") +} + +pub async fn insert_screenshot( + pool: &PgPool, + id: &str, + session_id: &str, + step_id: &str, + image_data: &[u8], + width: i32, + height: i32, +) -> Result<()> { + sqlx::query( + r#"INSERT INTO "HandsScreenshot" + (id, "sessionId", "stepId", "imageData", width, height, "sizeBytes", "expiresAt") + VALUES ($1, $2, $3, $4, $5, $6, $7, NOW() + INTERVAL '24 hours')"#, + ) + .bind(id) + .bind(session_id) + .bind(step_id) + .bind(image_data) + .bind(width) + .bind(height) + .bind(image_data.len() as i32) + .execute(pool) + .await + .context("inserting HandsScreenshot")?; + Ok(()) +} + +// ── Audit queries ────────────────────────────────────────────────────── + +pub async fn insert_audit_event( + pool: &PgPool, + id: &str, + job_id: &str, + event: &str, + step_id: Option<&str>, + session_id: Option<&str>, + metadata: Option<&serde_json::Value>, +) -> Result<()> { + sqlx::query( + r#"INSERT INTO "HandsAuditEvent" + (id, "jobId", event, "stepId", "sessionId", metadata) + VALUES ($1, $2, $3, $4, $5, $6)"#, + ) + .bind(id) + .bind(job_id) + .bind(event) + .bind(step_id) + .bind(session_id) + .bind(metadata) + .execute(pool) + .await + .context("inserting HandsAuditEvent")?; + Ok(()) +} + +/// Count steps for a job (used in list view). +pub async fn count_steps(pool: &PgPool, job_id: &str) -> Result { + let row: (i64,) = sqlx::query_as( + r#"SELECT COUNT(*) FROM "HandsStep" WHERE "jobId" = $1"#, + ) + .bind(job_id) + .fetch_one(pool) + .await + .context("counting steps")?; + Ok(row.0) +} diff --git a/apps/gateway/src/hands/mod.rs b/apps/gateway/src/hands/mod.rs new file mode 100644 index 0000000..a8152ff --- /dev/null +++ b/apps/gateway/src/hands/mod.rs @@ -0,0 +1,16 @@ +//! OnlyAgent Hands — remote machine control via OnlyKey WebHID. +//! +//! This module handles: +//! - Keystroke instruction compilation (macro → OS-specific HID keystrokes) +//! - HID packet framing (CBOR → chunked reports) +//! - Session lifecycle management +//! - Job/step orchestration +//! - Screenshot storage and retrieval +//! - Audit logging + +pub mod api; +pub mod compile; +pub mod db; +pub mod models; +pub mod packet; +pub mod session; diff --git a/apps/gateway/src/hands/models.rs b/apps/gateway/src/hands/models.rs new file mode 100644 index 0000000..c28a037 --- /dev/null +++ b/apps/gateway/src/hands/models.rs @@ -0,0 +1,389 @@ +//! Data models for the OnlyAgent Hands system. + +use serde::{Deserialize, Serialize}; + +// ── Job status ───────────────────────────────────────────────────────── + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(rename_all = "snake_case")] +pub enum JobStatus { + Draft, + Queued, + Running, + Paused, + Completed, + Failed, + Cancelled, +} + +impl JobStatus { + pub fn as_str(&self) -> &'static str { + match self { + Self::Draft => "draft", + Self::Queued => "queued", + Self::Running => "running", + Self::Paused => "paused", + Self::Completed => "completed", + Self::Failed => "failed", + Self::Cancelled => "cancelled", + } + } + + pub fn from_str(s: &str) -> Option { + match s { + "draft" => Some(Self::Draft), + "queued" => Some(Self::Queued), + "running" => Some(Self::Running), + "paused" => Some(Self::Paused), + "completed" => Some(Self::Completed), + "failed" => Some(Self::Failed), + "cancelled" => Some(Self::Cancelled), + _ => None, + } + } +} + +// ── Step status ──────────────────────────────────────────────────────── + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(rename_all = "snake_case")] +pub enum StepStatus { + Pending, + Sending, + Executing, + Verifying, + Succeeded, + Failed, + Skipped, +} + +impl StepStatus { + pub fn as_str(&self) -> &'static str { + match self { + Self::Pending => "pending", + Self::Sending => "sending", + Self::Executing => "executing", + Self::Verifying => "verifying", + Self::Succeeded => "succeeded", + Self::Failed => "failed", + Self::Skipped => "skipped", + } + } +} + +// ── Session status ───────────────────────────────────────────────────── + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(rename_all = "snake_case")] +pub enum SessionStatus { + Establishing, + Active, + Paused, + Closed, +} + +impl SessionStatus { + pub fn as_str(&self) -> &'static str { + match self { + Self::Establishing => "establishing", + Self::Active => "active", + Self::Paused => "paused", + Self::Closed => "closed", + } + } +} + +// ── Host OS ──────────────────────────────────────────────────────────── + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(rename_all = "snake_case")] +pub enum HostOS { + MacOS, + Windows, + Linux, +} + +impl HostOS { + pub fn as_str(&self) -> &'static str { + match self { + Self::MacOS => "macos", + Self::Windows => "windows", + Self::Linux => "linux", + } + } + + pub fn from_str(s: &str) -> Option { + match s { + "macos" => Some(Self::MacOS), + "windows" => Some(Self::Windows), + "linux" => Some(Self::Linux), + _ => None, + } + } +} + +// ── Macro instructions (AI agent generates these) ────────────────────── + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "macro", rename_all = "snake_case")] +pub enum MacroInstruction { + OpenBrowser { + #[serde(skip_serializing_if = "Option::is_none")] + browser: Option, + }, + NavigateUrl { + url: String, + }, + OpenTerminal, + RunCommand { + command: String, + }, + Screenshot, + SwitchWindow, + CloseWindow, + SelectAll, + Copy, + Paste, + Save, + Undo, + Find { + text: String, + }, + TypeText { + text: String, + }, + Wait { + seconds: u32, + }, + KeyPress { + key: String, + #[serde(skip_serializing_if = "Option::is_none")] + modifiers: Option>, + }, + KeyCombo { + keys: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + modifiers: Option>, + }, +} + +// ── Raw HID keystroke (compiled output) ──────────────────────────────── + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "t", rename_all = "snake_case")] +pub enum RawKeystroke { + /// Type a string of text character by character + TypeText { + text: String, + #[serde(skip_serializing_if = "Option::is_none")] + delay_per_char_ms: Option, + }, + /// Press a single key + Key { + code: u8, + #[serde(skip_serializing_if = "Option::is_none")] + mods: Option, + }, + /// Press a key combination + Combo { + codes: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + mods: Option, + }, + /// Delay in milliseconds + Delay { + ms: u32, + }, +} + +// ── HID modifier bitmask ─────────────────────────────────────────────── + +pub const MOD_CTRL: u8 = 0x01; +pub const MOD_SHIFT: u8 = 0x02; +pub const MOD_ALT: u8 = 0x04; +pub const MOD_GUI: u8 = 0x08; // Cmd on macOS, Win on Windows, Super on Linux + +// ── HID key codes (USB HID Usage Table: Keyboard/Keypad Page 0x07) ──── + +pub const KEY_A: u8 = 0x04; +pub const KEY_L: u8 = 0x0F; +pub const KEY_T: u8 = 0x17; +pub const KEY_ENTER: u8 = 0x28; +pub const KEY_ESCAPE: u8 = 0x29; +pub const KEY_SPACE: u8 = 0x2C; +pub const KEY_TAB: u8 = 0x2B; +pub const KEY_BACKSPACE: u8 = 0x2A; +pub const KEY_F1: u8 = 0x3A; +pub const KEY_F3: u8 = 0x3C; +pub const KEY_F5: u8 = 0x3E; +pub const KEY_PRINTSCREEN: u8 = 0x46; +pub const KEY_DELETE: u8 = 0x4C; +pub const KEY_RIGHT: u8 = 0x4F; +pub const KEY_LEFT: u8 = 0x50; +pub const KEY_DOWN: u8 = 0x51; +pub const KEY_UP: u8 = 0x52; + +// ── API request/response types ───────────────────────────────────────── + +/// Create a new job. +#[derive(Debug, Deserialize)] +pub struct CreateJobRequest { + pub name: String, + pub description: String, + pub host_os: Option, + pub steps: Vec, +} + +#[derive(Debug, Deserialize)] +pub struct CreateStepInput { + pub description: String, + pub macro_instructions: serde_json::Value, // MacroInstruction[] + pub expected_outcome: String, + #[serde(default = "default_max_retries")] + pub max_retries: i32, + #[serde(default = "default_timeout_ms")] + pub timeout_ms: i32, + #[serde(default)] + pub require_confirm: bool, +} + +fn default_max_retries() -> i32 { + 3 +} +fn default_timeout_ms() -> i32 { + 30000 +} + +/// Create a new session. +#[derive(Debug, Deserialize)] +pub struct CreateSessionRequest { + pub job_id: String, + #[serde(default)] + pub host_os: Option, +} + +/// Session creation response. +#[derive(Debug, Serialize)] +pub struct CreateSessionResponse { + pub session_id: String, + pub nonce: String, // base64-encoded nonce for WebHID session auth + pub agent_token: String, +} + +/// Session activation (browser reports OnlyKey button press). +#[derive(Debug, Deserialize)] +pub struct ActivateSessionRequest { + pub browser_session_id: String, + #[serde(default)] + pub device_id: Option, + #[serde(default)] + pub host_os: Option, +} + +/// Next instruction packet for browser delivery via WebHID. +#[derive(Debug, Serialize)] +pub struct NextPacketResponse { + pub packet_id: String, + pub step_id: String, + /// Base64-encoded CBOR instruction payload + pub cbor_b64: String, + /// HID report flags (0x01=encrypted, 0x02=requires_confirm) + pub flags: u8, +} + +/// Browser reports device acknowledged receipt. +#[derive(Debug, Deserialize)] +pub struct PacketAckRequest { + pub packet_id: String, +} + +/// Browser forwards device status report. +#[derive(Debug, Deserialize)] +pub struct StepStatusReport { + pub step_id: String, + /// 0x00=queued, 0x01=executing, 0x02=complete, 0x03=error, 0x04=button_stop + pub status_code: u8, + #[serde(default)] + pub detail: Option, +} + +/// Job summary (list view). +#[derive(Debug, Serialize)] +pub struct JobSummary { + pub id: String, + pub name: String, + pub description: String, + pub status: String, + pub host_os: Option, + pub step_count: i64, + pub created_at: String, +} + +/// Job detail with steps. +#[derive(Debug, Serialize)] +pub struct JobDetail { + pub id: String, + pub name: String, + pub description: String, + pub status: String, + pub host_os: Option, + pub max_duration_secs: i32, + pub steps: Vec, + pub created_at: String, + pub updated_at: String, + pub completed_at: Option, +} + +#[derive(Debug, Serialize)] +pub struct StepView { + pub id: String, + pub sequence_number: i32, + pub description: String, + pub macro_instructions: serde_json::Value, + pub expected_outcome: String, + pub status: String, + pub retry_count: i32, + pub max_retries: i32, + pub timeout_ms: i32, + pub require_confirm: bool, + pub error_message: Option, +} + +/// Session view. +#[derive(Debug, Serialize)] +pub struct SessionView { + pub id: String, + pub job_id: String, + pub status: String, + pub host_os: Option, + pub created_at: String, + pub last_activity_at: String, +} + +/// Screenshot metadata. +#[derive(Debug, Serialize)] +pub struct ScreenshotMeta { + pub id: String, + pub session_id: String, + pub step_id: String, + pub captured_at: String, + pub width: i32, + pub height: i32, + pub size_bytes: i32, + pub analysis: Option, +} + +// ── Audit events ─────────────────────────────────────────────────────── + +#[derive(Debug, Clone, Serialize)] +pub struct AuditEvent { + pub event: String, + pub job_id: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub step_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub session_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub metadata: Option, + pub timestamp: String, +} diff --git a/apps/gateway/src/hands/packet.rs b/apps/gateway/src/hands/packet.rs new file mode 100644 index 0000000..1ef7bbe --- /dev/null +++ b/apps/gateway/src/hands/packet.rs @@ -0,0 +1,144 @@ +//! HID report packet framing. +//! +//! Serializes compiled keystroke instructions into CBOR and frames them +//! into 59-byte chunks suitable for WebHID report delivery. + +use serde::Serialize; + +use super::models::RawKeystroke; + +/// Maximum payload per HID report (64 bytes total - 5 byte header). +pub const HID_PAYLOAD_SIZE: usize = 59; + +/// HID report IDs for the Hands protocol. +pub const REPORT_ID_INSTRUCTION: u8 = 0x70; +pub const REPORT_ID_SESSION_AUTH: u8 = 0x71; +pub const REPORT_ID_ACK: u8 = 0x72; +pub const REPORT_ID_STATUS: u8 = 0x73; +pub const REPORT_ID_EMERGENCY_STOP: u8 = 0x74; +pub const REPORT_ID_PING: u8 = 0x75; + +/// HID report flags. +pub const FLAG_ENCRYPTED: u8 = 0x01; +pub const FLAG_REQUIRES_CONFIRM: u8 = 0x02; +pub const FLAG_HIGH_RISK: u8 = 0x04; + +/// A complete instruction packet to be serialized to CBOR and chunked. +#[derive(Debug, Serialize)] +pub struct InstructionPacket { + pub session_id: String, + pub step_id: String, + pub instructions: Vec, + pub expect_screenshot: bool, + pub timeout_ms: u32, +} + +/// A single HID report frame (one chunk of a larger instruction packet). +#[derive(Debug)] +pub struct HidReportFrame { + pub seq_no: u16, + pub total: u16, + pub flags: u8, + /// Up to 59 bytes of CBOR payload + pub payload: Vec, +} + +impl HidReportFrame { + /// Serialize this frame into a 64-byte HID report body (without report ID). + pub fn to_bytes(&self) -> Vec { + let mut buf = vec![0u8; 64]; + buf[0] = (self.seq_no >> 8) as u8; + buf[1] = (self.seq_no & 0xFF) as u8; + buf[2] = (self.total >> 8) as u8; + buf[3] = (self.total & 0xFF) as u8; + buf[4] = self.flags; + let copy_len = self.payload.len().min(HID_PAYLOAD_SIZE); + buf[5..5 + copy_len].copy_from_slice(&self.payload[..copy_len]); + buf + } +} + +/// Encode an instruction packet as CBOR bytes. +pub fn encode_cbor(packet: &InstructionPacket) -> Result, serde_json::Error> { + // We use JSON as a CBOR stand-in since serde_cbor may not be available yet. + // In production, swap to serde_cbor::to_vec(). + serde_json::to_vec(packet) +} + +/// Split a CBOR byte array into HID report frames. +pub fn frame_cbor(cbor: &[u8], flags: u8) -> Vec { + if cbor.is_empty() { + return vec![]; + } + + let total_frames = (cbor.len() + HID_PAYLOAD_SIZE - 1) / HID_PAYLOAD_SIZE; + + (0..total_frames) + .map(|i| { + let start = i * HID_PAYLOAD_SIZE; + let end = (start + HID_PAYLOAD_SIZE).min(cbor.len()); + HidReportFrame { + seq_no: i as u16, + total: total_frames as u16, + flags, + payload: cbor[start..end].to_vec(), + } + }) + .collect() +} + +// ── Tests ────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn frame_single_packet() { + let data = vec![0xAA; 40]; // fits in one frame + let frames = frame_cbor(&data, FLAG_ENCRYPTED); + assert_eq!(frames.len(), 1); + assert_eq!(frames[0].seq_no, 0); + assert_eq!(frames[0].total, 1); + assert_eq!(frames[0].flags, FLAG_ENCRYPTED); + assert_eq!(frames[0].payload.len(), 40); + } + + #[test] + fn frame_multi_packet() { + let data = vec![0xBB; 150]; // needs 3 frames (59+59+32) + let frames = frame_cbor(&data, 0); + assert_eq!(frames.len(), 3); + assert_eq!(frames[0].seq_no, 0); + assert_eq!(frames[0].total, 3); + assert_eq!(frames[0].payload.len(), 59); + assert_eq!(frames[1].payload.len(), 59); + assert_eq!(frames[2].payload.len(), 32); + } + + #[test] + fn frame_empty() { + let frames = frame_cbor(&[], 0); + assert!(frames.is_empty()); + } + + #[test] + fn hid_report_frame_to_bytes() { + let frame = HidReportFrame { + seq_no: 1, + total: 3, + flags: FLAG_ENCRYPTED | FLAG_REQUIRES_CONFIRM, + payload: vec![0xDE, 0xAD], + }; + let bytes = frame.to_bytes(); + assert_eq!(bytes.len(), 64); + assert_eq!(bytes[0], 0); // seq_no high + assert_eq!(bytes[1], 1); // seq_no low + assert_eq!(bytes[2], 0); // total high + assert_eq!(bytes[3], 3); // total low + assert_eq!(bytes[4], 0x03); // flags + assert_eq!(bytes[5], 0xDE); + assert_eq!(bytes[6], 0xAD); + assert_eq!(bytes[7], 0x00); // zero-padded + } +} diff --git a/apps/gateway/src/hands/session.rs b/apps/gateway/src/hands/session.rs new file mode 100644 index 0000000..e1fdd55 --- /dev/null +++ b/apps/gateway/src/hands/session.rs @@ -0,0 +1,177 @@ +//! Session lifecycle management for Hands control sessions. + +use std::collections::VecDeque; +use std::time::Instant; + +use dashmap::DashMap; + +use super::models::SessionStatus; + +/// An active session tracked in memory (packet queue, timing). +pub struct ActiveSession { + pub session_id: String, + pub job_id: String, + pub current_step_id: Option, + /// Compiled CBOR packets awaiting browser pickup. + pub packet_queue: VecDeque, + pub status: SessionStatus, + pub created_at: Instant, + pub last_activity: Instant, +} + +/// A queued instruction packet ready for WebHID delivery. +pub struct QueuedPacket { + pub packet_id: String, + pub step_id: String, + /// Base64-encoded CBOR payload + pub cbor_b64: String, + pub flags: u8, + pub created_at: Instant, +} + +/// In-memory session manager. +pub struct SessionManager { + pub sessions: DashMap, +} + +impl SessionManager { + pub fn new() -> Self { + Self { + sessions: DashMap::new(), + } + } + + /// Register a new active session. + pub fn register(&self, session_id: String, job_id: String) { + let now = Instant::now(); + self.sessions.insert( + session_id.clone(), + ActiveSession { + session_id, + job_id, + current_step_id: None, + packet_queue: VecDeque::new(), + status: SessionStatus::Establishing, + created_at: now, + last_activity: now, + }, + ); + } + + /// Mark session as active (OnlyKey button confirmed). + pub fn activate(&self, session_id: &str) -> bool { + if let Some(mut session) = self.sessions.get_mut(session_id) { + session.status = SessionStatus::Active; + session.last_activity = Instant::now(); + true + } else { + false + } + } + + /// Enqueue a compiled instruction packet for browser pickup. + pub fn enqueue_packet(&self, session_id: &str, packet: QueuedPacket) -> bool { + if let Some(mut session) = self.sessions.get_mut(session_id) { + session.current_step_id = Some(packet.step_id.clone()); + session.packet_queue.push_back(packet); + session.last_activity = Instant::now(); + true + } else { + false + } + } + + /// Dequeue the next packet for browser delivery. + pub fn dequeue_packet(&self, session_id: &str) -> Option { + if let Some(mut session) = self.sessions.get_mut(session_id) { + session.last_activity = Instant::now(); + session.packet_queue.pop_front() + } else { + None + } + } + + /// Close a session. + pub fn close(&self, session_id: &str) { + if let Some(mut session) = self.sessions.get_mut(session_id) { + session.status = SessionStatus::Closed; + } + } + + /// Check if a session is active. + pub fn is_active(&self, session_id: &str) -> bool { + self.sessions + .get(session_id) + .map(|s| s.status == SessionStatus::Active) + .unwrap_or(false) + } + + /// Cleanup stale sessions (idle > timeout). + pub fn cleanup_stale(&self, timeout_secs: u64) -> usize { + let cutoff = Instant::now() - std::time::Duration::from_secs(timeout_secs); + let stale: Vec = self + .sessions + .iter() + .filter(|entry| { + entry.last_activity < cutoff + && entry.status != SessionStatus::Closed + }) + .map(|entry| entry.session_id.clone()) + .collect(); + + let count = stale.len(); + for id in stale { + if let Some(mut session) = self.sessions.get_mut(&id) { + session.status = SessionStatus::Closed; + } + } + count + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn register_and_activate() { + let mgr = SessionManager::new(); + mgr.register("s1".into(), "j1".into()); + assert!(!mgr.is_active("s1")); + mgr.activate("s1"); + assert!(mgr.is_active("s1")); + } + + #[test] + fn enqueue_dequeue() { + let mgr = SessionManager::new(); + mgr.register("s1".into(), "j1".into()); + mgr.activate("s1"); + + let packet = QueuedPacket { + packet_id: "p1".into(), + step_id: "step1".into(), + cbor_b64: "AAAA".into(), + flags: 0x01, + created_at: Instant::now(), + }; + mgr.enqueue_packet("s1", packet); + + let dequeued = mgr.dequeue_packet("s1"); + assert!(dequeued.is_some()); + assert_eq!(dequeued.unwrap().packet_id, "p1"); + + // Queue should be empty now + assert!(mgr.dequeue_packet("s1").is_none()); + } + + #[test] + fn close_session() { + let mgr = SessionManager::new(); + mgr.register("s1".into(), "j1".into()); + mgr.activate("s1"); + assert!(mgr.is_active("s1")); + mgr.close("s1"); + assert!(!mgr.is_active("s1")); + } +} diff --git a/apps/gateway/src/main.rs b/apps/gateway/src/main.rs index 7f105a7..30b6c21 100644 --- a/apps/gateway/src/main.rs +++ b/apps/gateway/src/main.rs @@ -10,6 +10,7 @@ mod connect; mod crypto; mod db; mod gateway; +mod hands; mod inject; use std::path::{Path, PathBuf}; @@ -78,12 +79,34 @@ async fn main() -> Result<()> { // Load encryption key for secret decryption let crypto = Arc::new(crypto::CryptoService::from_env()?); + // Initialize OnlyAgent Hands subsystem + let hands_session_manager = Arc::new(hands::session::SessionManager::new()); + let hands_state = Arc::new(hands::api::HandsState { + pool: pool.clone(), + session_manager: hands_session_manager.clone(), + }); + let policy_engine = Arc::new(PolicyEngine { pool, crypto }); + // Spawn periodic Hands session cleanup (every 60s, 5 min idle timeout) + { + let sm = hands_session_manager; + tokio::spawn(async move { + let mut interval = tokio::time::interval(std::time::Duration::from_secs(60)); + loop { + interval.tick().await; + let stale = sm.cleanup_stale(300); + if stale > 0 { + info!(stale = stale, "hands: cleaned up stale sessions"); + } + } + }); + } + info!(port = cli.port, "gateway ready"); // Start the gateway server (blocks forever) - let server = GatewayServer::new(ca, cli.port, policy_engine); + let server = GatewayServer::new(ca, cli.port, policy_engine, hands_state); server.run().await } diff --git a/apps/screencap/Cargo.toml b/apps/screencap/Cargo.toml new file mode 100644 index 0000000..904d558 --- /dev/null +++ b/apps/screencap/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "onlyagent-screencap" +version = "0.1.0" +edition = "2021" +license = "Apache-2.0" +description = "Screenshot capture agent for OnlyAgent Hands" + +[[bin]] +name = "onlyagent-screencap" +path = "src/main.rs" + +[dependencies] +tokio = { version = "1", features = ["full"] } +reqwest = { version = "0.12", features = ["json", "rustls-tls", "multipart"], default-features = false } +clap = { version = "4", features = ["derive"] } +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } +serde = { version = "1", features = ["derive"] } +serde_json = "1" +anyhow = "1" + +[profile.release] +opt-level = 3 +strip = true +lto = "fat" +codegen-units = 1 +panic = "abort" diff --git a/apps/screencap/src/main.rs b/apps/screencap/src/main.rs new file mode 100644 index 0000000..fe8ed31 --- /dev/null +++ b/apps/screencap/src/main.rs @@ -0,0 +1,202 @@ +//! OnlyAgent Screencap — Screenshot capture agent for OnlyAgent Hands. +//! +//! This daemon runs on the host machine alongside the OnlyKey device. +//! It periodically captures screenshots and uploads them to the gateway +//! for AI visual reasoning. +//! +//! Platform support: +//! - macOS: `screencapture` CLI +//! - Linux: `scrot` or `gnome-screenshot` +//! - Windows: PowerShell + .NET (System.Windows.Forms.Screen) + +use std::process::Command; +use std::time::Duration; + +use anyhow::{Context, Result}; +use clap::Parser; +use tracing::{info, warn}; +use tracing_subscriber::EnvFilter; + +#[derive(Parser)] +#[command( + name = "onlyagent-screencap", + about = "Screenshot capture agent for OnlyAgent Hands" +)] +struct Cli { + /// Gateway URL (e.g., https://localhost:10255) + #[arg(long, env = "GATEWAY_URL")] + gateway_url: String, + + /// Session-scoped agent token + #[arg(long, env = "AGENT_TOKEN")] + agent_token: String, + + /// Session ID + #[arg(long, env = "SESSION_ID")] + session_id: String, + + /// Current step ID (updated by gateway) + #[arg(long, env = "STEP_ID", default_value = "")] + step_id: String, + + /// Capture interval in seconds + #[arg(long, default_value = "3")] + interval: u64, +} + +#[tokio::main] +async fn main() -> Result<()> { + tracing_subscriber::fmt() + .with_env_filter( + EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")), + ) + .init(); + + let cli = Cli::parse(); + + info!( + gateway = %cli.gateway_url, + session_id = %cli.session_id, + interval_secs = cli.interval, + "starting screenshot capture agent" + ); + + let client = reqwest::Client::builder() + .danger_accept_invalid_certs(true) // dev mode + .build() + .context("building HTTP client")?; + + let mut interval = tokio::time::interval(Duration::from_secs(cli.interval)); + + loop { + interval.tick().await; + + // Check if session is still active + let session_check = client + .get(format!( + "{}/v1/hands/sessions/{}", + cli.gateway_url, cli.session_id + )) + .header("Authorization", format!("Bearer {}", cli.agent_token)) + .send() + .await; + + match session_check { + Ok(resp) if resp.status().is_success() => { + let body: serde_json::Value = resp.json().await.unwrap_or_default(); + let status = body["status"].as_str().unwrap_or("unknown"); + if status == "closed" { + info!("session closed — exiting"); + break; + } + } + Ok(resp) => { + warn!(status = %resp.status(), "session check failed"); + continue; + } + Err(e) => { + warn!(error = %e, "session check error"); + continue; + } + } + + // Capture screenshot + let screenshot_path = capture_screenshot().await; + let screenshot_path = match screenshot_path { + Ok(path) => path, + Err(e) => { + warn!(error = %e, "screenshot capture failed"); + continue; + } + }; + + // Read file + let image_data = match tokio::fs::read(&screenshot_path).await { + Ok(data) => data, + Err(e) => { + warn!(error = %e, path = %screenshot_path, "failed to read screenshot"); + continue; + } + }; + + // Upload + let form = reqwest::multipart::Form::new() + .text("session_id", cli.session_id.clone()) + .text("step_id", cli.step_id.clone()) + .text("width", "0") + .text("height", "0") + .part( + "image", + reqwest::multipart::Part::bytes(image_data) + .file_name("screenshot.png") + .mime_str("image/png") + .unwrap(), + ); + + match client + .post(format!("{}/v1/hands/screenshots", cli.gateway_url)) + .header("Authorization", format!("Bearer {}", cli.agent_token)) + .multipart(form) + .send() + .await + { + Ok(resp) if resp.status().is_success() => { + info!("screenshot uploaded successfully"); + } + Ok(resp) => { + warn!(status = %resp.status(), "screenshot upload failed"); + } + Err(e) => { + warn!(error = %e, "screenshot upload error"); + } + } + + // Cleanup temp file + let _ = tokio::fs::remove_file(&screenshot_path).await; + } + + Ok(()) +} + +/// Capture a screenshot using platform-native tools. +/// Returns the path to the temporary screenshot file. +async fn capture_screenshot() -> Result { + let tmp_path = format!("/tmp/onlyagent_screenshot_{}.png", std::process::id()); + + if cfg!(target_os = "macos") { + Command::new("screencapture") + .args(["-x", &tmp_path]) + .status() + .context("executing screencapture")?; + } else if cfg!(target_os = "linux") { + // Try scrot first, fall back to gnome-screenshot + let result = Command::new("scrot").args([&tmp_path]).status(); + if result.is_err() { + Command::new("gnome-screenshot") + .args(["-f", &tmp_path]) + .status() + .context("executing gnome-screenshot")?; + } + } else { + // Windows: use PowerShell + Command::new("powershell") + .args([ + "-Command", + &format!( + "Add-Type -AssemblyName System.Windows.Forms; \ + [System.Windows.Forms.Screen]::PrimaryScreen | Out-Null; \ + $bmp = New-Object System.Drawing.Bitmap( \ + [System.Windows.Forms.Screen]::PrimaryScreen.Bounds.Width, \ + [System.Windows.Forms.Screen]::PrimaryScreen.Bounds.Height); \ + $graphics = [System.Drawing.Graphics]::FromImage($bmp); \ + $graphics.CopyFromScreen(0, 0, 0, 0, $bmp.Size); \ + $bmp.Save('{}');", + tmp_path + ), + ]) + .status() + .context("executing powershell screenshot")?; + } + + Ok(tmp_path) +} diff --git a/apps/web/src/app/(dashboard)/hands/_components/job-list.tsx b/apps/web/src/app/(dashboard)/hands/_components/job-list.tsx new file mode 100644 index 0000000..4492539 --- /dev/null +++ b/apps/web/src/app/(dashboard)/hands/_components/job-list.tsx @@ -0,0 +1,161 @@ +"use client"; + +import { useEffect, useState, useCallback } from "react"; +import { Card, CardContent, CardHeader, CardTitle } from "@onecli/ui/card"; +import { Badge } from "@onecli/ui/badge"; +import { Button } from "@onecli/ui/button"; +import { + Hand, + Plus, + Play, + XCircle, + Clock, + CheckCircle2, + Loader2, + AlertCircle, +} from "lucide-react"; +import { toast } from "sonner"; +import { listJobs, startJob, cancelJob } from "@/lib/hands/api"; +import type { JobSummary, JobStatus } from "@/lib/hands/types"; + +const statusConfig: Record< + JobStatus, + { label: string; variant: "default" | "secondary" | "destructive" | "outline"; icon: typeof Clock } +> = { + draft: { label: "Draft", variant: "secondary", icon: Clock }, + queued: { label: "Queued", variant: "outline", icon: Clock }, + running: { label: "Running", variant: "default", icon: Loader2 }, + paused: { label: "Paused", variant: "outline", icon: AlertCircle }, + completed: { label: "Completed", variant: "default", icon: CheckCircle2 }, + failed: { label: "Failed", variant: "destructive", icon: XCircle }, + cancelled: { label: "Cancelled", variant: "secondary", icon: XCircle }, +}; + +export const HandsJobList = () => { + const [jobs, setJobs] = useState([]); + const [loading, setLoading] = useState(true); + + const fetchJobs = useCallback(async () => { + try { + const items = await listJobs(); + setJobs(items); + } catch { + // silently retry + } finally { + setLoading(false); + } + }, []); + + useEffect(() => { + fetchJobs(); + const interval = setInterval(fetchJobs, 5000); + return () => clearInterval(interval); + }, [fetchJobs]); + + const handleStart = async (jobId: string) => { + try { + await startJob(jobId); + toast.success("Job started"); + fetchJobs(); + } catch (err) { + toast.error(`Failed to start: ${err instanceof Error ? err.message : "Unknown error"}`); + } + }; + + const handleCancel = async (jobId: string) => { + try { + await cancelJob(jobId); + toast.success("Job cancelled"); + fetchJobs(); + } catch (err) { + toast.error(`Failed to cancel: ${err instanceof Error ? err.message : "Unknown error"}`); + } + }; + + if (loading) { + return ( +
+ +
+ ); + } + + return ( +
+
+

Jobs

+ +
+ + {jobs.length === 0 ? ( + + + +

+ No jobs yet. Create a new job to start controlling a remote machine + via OnlyKey. +

+
+
+ ) : ( + jobs.map((job) => { + const config = statusConfig[job.status] ?? statusConfig.draft; + const StatusIcon = config.icon; + return ( + + +
+
+ + {job.name} + + + + {config.label} + + {job.host_os && ( + + {job.host_os} + + )} +
+

+ {job.step_count} step{job.step_count !== 1 ? "s" : ""} ·{" "} + {job.description.slice(0, 80)} + {job.description.length > 80 ? "..." : ""} +

+
+
+ {(job.status === "draft" || job.status === "queued") && ( + + )} + {(job.status === "running" || job.status === "queued") && ( + + )} +
+
+
+ ); + }) + )} +
+ ); +}; diff --git a/apps/web/src/app/(dashboard)/hands/_components/live-view.tsx b/apps/web/src/app/(dashboard)/hands/_components/live-view.tsx new file mode 100644 index 0000000..9ce09af --- /dev/null +++ b/apps/web/src/app/(dashboard)/hands/_components/live-view.tsx @@ -0,0 +1,180 @@ +"use client"; + +import { useEffect, useState, useCallback } from "react"; +import { Card, CardContent, CardHeader, CardTitle } from "@onecli/ui/card"; +import { Badge } from "@onecli/ui/badge"; +import { Button } from "@onecli/ui/button"; +import { + Usb, + Square, + Loader2, + CheckCircle2, + XCircle, + Clock, + Monitor, +} from "lucide-react"; +import { toast } from "sonner"; +import { OnlyKeyHands } from "@/lib/hands/webhid"; +import { HandsBridge, establishHandsSession } from "@/lib/hands/bridge"; +import type { JobDetail, StepStatus } from "@/lib/hands/types"; + +interface LiveViewProps { + job: JobDetail; +} + +const stepStatusIcon: Record = { + pending: Clock, + sending: Loader2, + executing: Loader2, + verifying: Loader2, + succeeded: CheckCircle2, + failed: XCircle, + skipped: XCircle, +}; + +export const LiveView = ({ job }: LiveViewProps) => { + const [onlyKey, setOnlyKey] = useState(null); + const [bridge, setBridge] = useState(null); + const [sessionId, setSessionId] = useState(null); + const [connecting, setConnecting] = useState(false); + const [running, setRunning] = useState(false); + + // Connect OnlyKey and establish session + const handleConnect = useCallback(async () => { + setConnecting(true); + try { + const ok = await OnlyKeyHands.connect(); + setOnlyKey(ok); + toast.success(`OnlyKey connected: ${ok.deviceInfo.productName}`); + + // Establish session + toast.info("Press the OnlyKey button to authorize the session..."); + const { bridge: b, sessionId: sid } = await establishHandsSession( + ok, + job.id, + job.host_os ?? undefined, + ); + setBridge(b); + setSessionId(sid); + toast.success("Session authorized — ready to execute"); + } catch (err) { + toast.error( + `Connection failed: ${err instanceof Error ? err.message : "Unknown error"}`, + ); + } finally { + setConnecting(false); + } + }, [job.id, job.host_os]); + + // Start execution + const handleStart = useCallback(async () => { + if (!bridge) return; + setRunning(true); + bridge.start(); // non-blocking + toast.success("Execution started — delivering instructions via WebHID"); + }, [bridge]); + + // Emergency stop + const handleStop = useCallback(async () => { + if (!bridge) return; + try { + await bridge.emergencyStop(); + toast.warning("Emergency stop — all execution halted"); + } catch (err) { + toast.error("Stop failed"); + } + setRunning(false); + }, [bridge]); + + return ( +
+ {/* Connection + Control Bar */} + + +
+
+ + {job.name} +
+ {sessionId && ( + + Session Active + + )} + {onlyKey && ( + + + {onlyKey.deviceInfo.productName} + + )} +
+
+ {!onlyKey ? ( + + ) : !running ? ( + + ) : ( + + )} +
+
+
+ + {/* Step List */} +
+
+

Steps

+ {job.steps.map((step) => { + const Icon = stepStatusIcon[step.status] ?? Clock; + const isActive = step.status === "executing" || step.status === "sending"; + return ( + + +
+ + {step.description} +
+
+
+ ); + })} +
+ + {/* Screenshot feed placeholder */} +
+ + + +

+ {running + ? "Waiting for screenshots..." + : "Screenshots will appear here during execution"} +

+
+
+
+
+
+ ); +}; diff --git a/apps/web/src/app/(dashboard)/hands/page.tsx b/apps/web/src/app/(dashboard)/hands/page.tsx new file mode 100644 index 0000000..1fd5ed9 --- /dev/null +++ b/apps/web/src/app/(dashboard)/hands/page.tsx @@ -0,0 +1,22 @@ +import { Suspense } from "react"; +import type { Metadata } from "next"; +import { PageHeader } from "@dashboard/page-header"; +import { HandsJobList } from "./_components/job-list"; + +export const metadata: Metadata = { + title: "Hands", +}; + +export default function HandsPage() { + return ( +
+ + + + +
+ ); +} diff --git a/apps/web/src/lib/hands/api.ts b/apps/web/src/lib/hands/api.ts new file mode 100644 index 0000000..c75814d --- /dev/null +++ b/apps/web/src/lib/hands/api.ts @@ -0,0 +1,185 @@ +/** + * Client-side API for the OnlyAgent Hands gateway endpoints. + * + * These functions call the gateway's hands endpoints from the browser. + * The browser acts as a relay between OnlyKey (WebHID) and the gateway. + */ + +import type { + CreateJobInput, + CreateSessionResponse, + JobDetail, + JobSummary, + NextPacketResponse, + SessionView, + StepStatusReport, +} from "./types"; + +// ── Configuration ─────────────────────────────────────────────────────── + +const getGatewayBaseUrl = (): string => { + if (typeof window !== "undefined") { + return ( + (window as unknown as Record).__GATEWAY_URL__ ?? + process.env.NEXT_PUBLIC_GATEWAY_URL ?? + "https://localhost:10255" + ); + } + return process.env.GATEWAY_URL ?? "https://localhost:10255"; +}; + +// ── Job API ───────────────────────────────────────────────────────────── + +export const createJob = async ( + input: CreateJobInput, +): Promise<{ id: string; status: string }> => { + const res = await fetch(`${getGatewayBaseUrl()}/v1/hands/jobs`, { + method: "POST", + credentials: "include", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(input), + }); + if (!res.ok) throw new Error(`Failed to create job: ${res.status}`); + return res.json(); +}; + +export const listJobs = async (): Promise => { + const res = await fetch(`${getGatewayBaseUrl()}/v1/hands/jobs`, { + credentials: "include", + }); + if (!res.ok) throw new Error(`Failed to list jobs: ${res.status}`); + const data = await res.json(); + return data.items; +}; + +export const getJob = async (jobId: string): Promise => { + const res = await fetch( + `${getGatewayBaseUrl()}/v1/hands/jobs/${encodeURIComponent(jobId)}`, + { credentials: "include" }, + ); + if (!res.ok) throw new Error(`Failed to get job: ${res.status}`); + return res.json(); +}; + +export const startJob = async (jobId: string): Promise => { + const res = await fetch( + `${getGatewayBaseUrl()}/v1/hands/jobs/${encodeURIComponent(jobId)}/start`, + { method: "POST", credentials: "include" }, + ); + if (!res.ok) throw new Error(`Failed to start job: ${res.status}`); +}; + +export const cancelJob = async (jobId: string): Promise => { + const res = await fetch( + `${getGatewayBaseUrl()}/v1/hands/jobs/${encodeURIComponent(jobId)}/cancel`, + { method: "POST", credentials: "include" }, + ); + if (!res.ok) throw new Error(`Failed to cancel job: ${res.status}`); +}; + +// ── Session API ───────────────────────────────────────────────────────── + +export const createSession = async ( + jobId: string, + hostOS?: string, +): Promise => { + const res = await fetch(`${getGatewayBaseUrl()}/v1/hands/sessions`, { + method: "POST", + credentials: "include", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ job_id: jobId, host_os: hostOS }), + }); + if (!res.ok) throw new Error(`Failed to create session: ${res.status}`); + return res.json(); +}; + +export const getSession = async ( + sessionId: string, +): Promise => { + const res = await fetch( + `${getGatewayBaseUrl()}/v1/hands/sessions/${encodeURIComponent(sessionId)}`, + { credentials: "include" }, + ); + if (!res.ok) throw new Error(`Failed to get session: ${res.status}`); + return res.json(); +}; + +export const closeSession = async (sessionId: string): Promise => { + await fetch( + `${getGatewayBaseUrl()}/v1/hands/sessions/${encodeURIComponent(sessionId)}`, + { method: "DELETE", credentials: "include" }, + ); +}; + +export const activateSession = async ( + sessionId: string, + browserSessionId: string, + deviceId?: string, + hostOS?: string, +): Promise => { + const res = await fetch( + `${getGatewayBaseUrl()}/v1/hands/sessions/${encodeURIComponent(sessionId)}/activated`, + { + method: "POST", + credentials: "include", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + browser_session_id: browserSessionId, + device_id: deviceId, + host_os: hostOS, + }), + }, + ); + if (!res.ok) throw new Error(`Failed to activate session: ${res.status}`); +}; + +export const emergencyStop = async (sessionId: string): Promise => { + await fetch( + `${getGatewayBaseUrl()}/v1/hands/sessions/${encodeURIComponent(sessionId)}/emergency-stop`, + { method: "POST", credentials: "include" }, + ); +}; + +// ── Instruction delivery API ──────────────────────────────────────────── + +export const fetchNextPacket = async ( + sessionId: string, +): Promise => { + const res = await fetch( + `${getGatewayBaseUrl()}/v1/hands/sessions/${encodeURIComponent(sessionId)}/next-packet`, + { credentials: "include" }, + ); + if (res.status === 204) return null; + if (!res.ok) throw new Error(`Failed to fetch packet: ${res.status}`); + return res.json(); +}; + +export const reportPacketAck = async ( + sessionId: string, + packetId: string, +): Promise => { + await fetch( + `${getGatewayBaseUrl()}/v1/hands/sessions/${encodeURIComponent(sessionId)}/packet-acked`, + { + method: "POST", + credentials: "include", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ packet_id: packetId }), + }, + ); +}; + +export const reportStepStatus = async ( + sessionId: string, + report: StepStatusReport, +): Promise => { + await fetch( + `${getGatewayBaseUrl()}/v1/hands/sessions/${encodeURIComponent(sessionId)}/step-status`, + { + method: "POST", + credentials: "include", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(report), + }, + ); +}; diff --git a/apps/web/src/lib/hands/bridge.ts b/apps/web/src/lib/hands/bridge.ts new file mode 100644 index 0000000..5a0126b --- /dev/null +++ b/apps/web/src/lib/hands/bridge.ts @@ -0,0 +1,147 @@ +/** + * Hands bridge — relays instruction packets from the gateway to OnlyKey + * via WebHID and forwards device status back to the gateway. + */ + +import { + activateSession, + createSession, + emergencyStop as apiEmergencyStop, + fetchNextPacket, + reportPacketAck, + reportStepStatus, +} from "./api"; +import type { DeviceStatus, OnlyKeyHands } from "./webhid"; + +// ── HandsBridge ───────────────────────────────────────────────────────── + +export class HandsBridge { + private ok: OnlyKeyHands; + private sessionId: string; + private gatewayUrl: string; + private running = false; + + constructor( + ok: OnlyKeyHands, + sessionId: string, + gatewayUrl: string, + ) { + this.ok = ok; + this.sessionId = sessionId; + this.gatewayUrl = gatewayUrl; + + // Forward device status to gateway + this.ok.onStatusUpdate(async (status: DeviceStatus) => { + if (status.type === "complete" || status.type === "error") { + const statusCode = + status.type === "complete" ? 0x02 : 0x03; + await reportStepStatus(this.sessionId, { + step_id: "", // TODO: track current step + status_code: statusCode, + detail: + status.type === "error" && "detail" in status + ? status.detail + : undefined, + }); + } + if (status.type === "button_stop") { + await apiEmergencyStop(this.sessionId); + this.running = false; + } + }); + } + + /** Start the relay loop: poll gateway → deliver to OnlyKey → repeat */ + async start(): Promise { + this.running = true; + + while (this.running) { + try { + const packet = await fetchNextPacket(this.sessionId); + + if (packet) { + // Decode base64 CBOR and deliver via WebHID + const cbor = Uint8Array.from(atob(packet.cbor_b64), (c) => + c.charCodeAt(0), + ); + await this.ok.deliverInstructionSet(cbor, packet.flags); + await reportPacketAck(this.sessionId, packet.packet_id); + } else { + // No packet queued — wait before polling again + await sleep(500); + } + } catch (err) { + console.error("Hands bridge error:", err); + await sleep(1000); + } + } + } + + /** Stop the relay loop. */ + stop(): void { + this.running = false; + } + + /** Emergency stop: halt device + close session. */ + async emergencyStop(): Promise { + await this.ok.emergencyStop(); + await apiEmergencyStop(this.sessionId); + this.running = false; + } + + get isRunning(): boolean { + return this.running; + } +} + +// ── Session establishment helper ──────────────────────────────────────── + +/** + * Full session establishment flow: + * 1. Connect OnlyKey via WebHID + * 2. Create session on gateway + * 3. Send HANDS_SESSION_AUTH to device (user presses button) + * 4. Report activation to gateway + * 5. Return ready-to-use bridge + */ +export async function establishHandsSession( + ok: OnlyKeyHands, + jobId: string, + hostOS?: string, +): Promise<{ bridge: HandsBridge; sessionId: string; agentToken: string }> { + // 1. Create session on gateway + const { session_id: sessionId, nonce, agent_token: agentToken } = + await createSession(jobId, hostOS); + + // 2. Send session auth to OnlyKey (device flashes, user presses button) + const nonceBytes = Uint8Array.from(atob(nonce), (c) => c.charCodeAt(0)); + await ok.authorizeSession(sessionId, nonceBytes); + + // 3. Wait briefly for user to press the button + await sleep(2000); + + // 4. Report activation to gateway + const browserSessionId = crypto.randomUUID(); + await activateSession( + sessionId, + browserSessionId, + ok.deviceInfo.productName, + hostOS, + ); + + // 5. Create and return bridge + const gatewayUrl = + (window as unknown as Record).__GATEWAY_URL__ ?? + process.env.NEXT_PUBLIC_GATEWAY_URL ?? + "https://localhost:10255"; + + const bridge = new HandsBridge(ok, sessionId, gatewayUrl); + + return { bridge, sessionId, agentToken }; +} + +// ── Helpers ───────────────────────────────────────────────────────────── + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} diff --git a/apps/web/src/lib/hands/index.ts b/apps/web/src/lib/hands/index.ts new file mode 100644 index 0000000..8a8386f --- /dev/null +++ b/apps/web/src/lib/hands/index.ts @@ -0,0 +1,4 @@ +export * from "./types"; +export * from "./api"; +export * from "./webhid"; +export * from "./bridge"; diff --git a/apps/web/src/lib/hands/types.ts b/apps/web/src/lib/hands/types.ts new file mode 100644 index 0000000..9f56430 --- /dev/null +++ b/apps/web/src/lib/hands/types.ts @@ -0,0 +1,153 @@ +/** + * Shared types for the OnlyAgent Hands system. + * + * These types mirror the Rust gateway models and Prisma schema + * for the remote machine control layer using OnlyKey via WebHID. + */ + +// ── Enums ─────────────────────────────────────────────────────────────── + +export type JobStatus = + | "draft" + | "queued" + | "running" + | "paused" + | "completed" + | "failed" + | "cancelled"; + +export type StepStatus = + | "pending" + | "sending" + | "executing" + | "verifying" + | "succeeded" + | "failed" + | "skipped"; + +export type SessionStatus = "establishing" | "active" | "paused" | "closed"; + +export type HostOS = "macos" | "windows" | "linux"; + +// ── Macro instructions (AI agent generates these) ────────────────────── + +export type MacroInstruction = + | { macro: "open_browser"; browser?: string } + | { macro: "navigate_url"; url: string } + | { macro: "open_terminal" } + | { macro: "run_command"; command: string } + | { macro: "screenshot" } + | { macro: "switch_window" } + | { macro: "close_window" } + | { macro: "select_all" } + | { macro: "copy" } + | { macro: "paste" } + | { macro: "save" } + | { macro: "undo" } + | { macro: "find"; text: string } + | { macro: "type_text"; text: string } + | { macro: "wait"; seconds: number } + | { macro: "key_press"; key: string; modifiers?: string[] } + | { macro: "key_combo"; keys: string[]; modifiers?: string[] }; + +// ── Job ──────────────────────────────────────────────────────────────── + +export interface JobSummary { + id: string; + name: string; + description: string; + status: JobStatus; + host_os: HostOS | null; + step_count: number; + created_at: string; +} + +export interface JobDetail { + id: string; + name: string; + description: string; + status: JobStatus; + host_os: HostOS | null; + max_duration_secs: number; + steps: StepView[]; + created_at: string; + updated_at: string; + completed_at: string | null; +} + +export interface StepView { + id: string; + sequence_number: number; + description: string; + macro_instructions: MacroInstruction[]; + expected_outcome: string; + status: StepStatus; + retry_count: number; + max_retries: number; + timeout_ms: number; + require_confirm: boolean; + error_message: string | null; +} + +// ── Session ──────────────────────────────────────────────────────────── + +export interface CreateSessionResponse { + session_id: string; + nonce: string; + agent_token: string; +} + +export interface SessionView { + id: string; + job_id: string; + status: SessionStatus; + host_os: HostOS | null; + created_at: string; + last_activity_at: string; +} + +// ── Instruction delivery ─────────────────────────────────────────────── + +export interface NextPacketResponse { + packet_id: string; + step_id: string; + cbor_b64: string; + flags: number; +} + +export interface StepStatusReport { + step_id: string; + status_code: number; + detail?: string; +} + +// ── Screenshot ───────────────────────────────────────────────────────── + +export interface ScreenshotMeta { + id: string; + session_id: string; + step_id: string; + captured_at: string; + width: number; + height: number; + size_bytes: number; + analysis: unknown; +} + +// ── API input types ──────────────────────────────────────────────────── + +export interface CreateJobInput { + name: string; + description: string; + host_os?: HostOS; + steps: CreateStepInput[]; +} + +export interface CreateStepInput { + description: string; + macro_instructions: MacroInstruction[]; + expected_outcome: string; + max_retries?: number; + timeout_ms?: number; + require_confirm?: boolean; +} diff --git a/apps/web/src/lib/hands/webhid.ts b/apps/web/src/lib/hands/webhid.ts new file mode 100644 index 0000000..c38299f --- /dev/null +++ b/apps/web/src/lib/hands/webhid.ts @@ -0,0 +1,196 @@ +/** + * WebHID driver for OnlyKey. + * + * Provides direct USB HID communication with an OnlyKey device + * from the browser using the WebHID API (Chrome/Edge 89+). + */ + +export const ONLYKEY_VENDOR_ID = 0x16c0; // Teensy/PJRC + +// ── Hands protocol report IDs ────────────────────────────────────────── + +const REPORT_ID_INSTRUCTION = 0x70; +const REPORT_ID_SESSION_AUTH = 0x71; +// const REPORT_ID_ACK = 0x72; // device → browser +const REPORT_ID_STATUS = 0x73; // device → browser +const REPORT_ID_EMERGENCY_STOP = 0x74; +// const REPORT_ID_PING = 0x75; + +// ── Types ────────────────────────────────────────────────────────────── + +export type DeviceStatus = + | { type: "queued"; seqNo: number } + | { type: "executing"; seqNo: number } + | { type: "complete"; seqNo: number } + | { type: "error"; seqNo: number; detail: string } + | { type: "button_stop" }; + +type StatusHandler = (status: DeviceStatus) => void; + +// ── OnlyKeyHands class ───────────────────────────────────────────────── + +export class OnlyKeyHands { + private device: HIDDevice; + private statusHandler: StatusHandler | null = null; + + constructor(device: HIDDevice) { + this.device = device; + this.device.addEventListener( + "inputreport", + this.handleReport.bind(this) as EventListener, + ); + } + + /** + * Request and open the OnlyKey device via WebHID. + * Requires a user gesture (button click) to trigger the device picker. + */ + static async connect(): Promise { + if (!("hid" in navigator)) { + throw new Error( + "WebHID is not available in this browser. Please use Chrome or Edge.", + ); + } + + const devices = await navigator.hid.requestDevice({ + filters: [{ vendorId: ONLYKEY_VENDOR_ID }], + }); + + if (devices.length === 0) { + throw new Error("No OnlyKey device selected."); + } + + const device = devices[0]!; + await device.open(); + return new OnlyKeyHands(device); + } + + /** + * Send session auth report to OnlyKey. + * Device will flash; user must press the button to authorize. + */ + async authorizeSession( + sessionId: string, + nonce: Uint8Array, + ): Promise { + const payload = new Uint8Array(64); + const encoder = new TextEncoder(); + const sidBytes = encoder.encode( + sessionId.replace(/-/g, "").slice(0, 16), + ); + payload.set(sidBytes, 0); + payload.set(nonce.slice(0, 16), 16); + await this.device.sendReport(REPORT_ID_SESSION_AUTH, payload); + } + + /** + * Send a single HID instruction report (one chunk of a larger packet). + */ + async sendInstructionReport( + seqNo: number, + total: number, + flags: number, + payloadSlice: Uint8Array, + ): Promise { + const report = new Uint8Array(64); + report[0] = (seqNo >> 8) & 0xff; + report[1] = seqNo & 0xff; + report[2] = (total >> 8) & 0xff; + report[3] = total & 0xff; + report[4] = flags; + report.set(payloadSlice.slice(0, 59), 5); + await this.device.sendReport(REPORT_ID_INSTRUCTION, report); + } + + /** + * Deliver a complete compiled instruction set (CBOR bytes) + * by chunking into 59-byte HID report payloads. + */ + async deliverInstructionSet( + cbor: Uint8Array, + flags: number = 0x01, + ): Promise { + const PAYLOAD_SIZE = 59; + const total = Math.ceil(cbor.length / PAYLOAD_SIZE); + + for (let seqNo = 0; seqNo < total; seqNo++) { + const slice = cbor.slice( + seqNo * PAYLOAD_SIZE, + (seqNo + 1) * PAYLOAD_SIZE, + ); + await this.sendInstructionReport(seqNo, total, flags, slice); + // Brief pause between reports to avoid overwhelming HID queue + if (seqNo < total - 1) { + await sleep(10); + } + } + } + + /** Immediately halt OnlyKey execution. */ + async emergencyStop(): Promise { + const payload = new Uint8Array(64); + payload[0] = 0xff; + await this.device.sendReport(REPORT_ID_EMERGENCY_STOP, payload); + } + + /** Register a handler for device status reports. */ + onStatusUpdate(handler: StatusHandler): void { + this.statusHandler = handler; + } + + /** Close the HID device connection. */ + async close(): Promise { + await this.device.close(); + } + + get isConnected(): boolean { + return this.device.opened; + } + + get deviceInfo(): { productName: string; vendorId: number } { + return { + productName: this.device.productName ?? "OnlyKey", + vendorId: this.device.vendorId, + }; + } + + // ── Internal ─────────────────────────────────────────────────────── + + private handleReport(event: HIDInputReportEvent): void { + const data = new Uint8Array(event.data.buffer); + + if (event.reportId === REPORT_ID_STATUS) { + const seqNo = (data[0]! << 8) | data[1]!; + const statusCode = data[2]!; + const detail = new TextDecoder() + .decode(data.slice(3)) + .replace(/\0/g, ""); + + const statusMap: Record = { + 0x00: "queued", + 0x01: "executing", + 0x02: "complete", + 0x03: "error", + 0x04: "button_stop", + }; + + const type = statusMap[statusCode] ?? "error"; + + if (type === "button_stop") { + this.statusHandler?.({ type: "button_stop" }); + } else { + this.statusHandler?.({ + type, + seqNo, + detail, + } as DeviceStatus); + } + } + } +} + +// ── Helpers ───────────────────────────────────────────────────────────── + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} diff --git a/apps/web/src/lib/nav-items.ts b/apps/web/src/lib/nav-items.ts index 799d321..96dc726 100644 --- a/apps/web/src/lib/nav-items.ts +++ b/apps/web/src/lib/nav-items.ts @@ -1,9 +1,10 @@ -import { LayoutDashboard, Bot, KeyRound, Settings } from "lucide-react"; +import { LayoutDashboard, Bot, KeyRound, Hand, Settings } from "lucide-react"; import type { NavItem } from "@/app/(dashboard)/_components/nav-main"; export const navItems: NavItem[] = [ { title: "Overview", url: "/overview", icon: LayoutDashboard }, { title: "Agents", url: "/agents", icon: Bot }, { title: "Secrets", url: "/secrets", icon: KeyRound }, + { title: "Hands", url: "/hands", icon: Hand }, { title: "Settings", url: "/settings", icon: Settings }, ]; diff --git a/packages/db/prisma/schema.prisma b/packages/db/prisma/schema.prisma index fffaa44..748abe4 100644 --- a/packages/db/prisma/schema.prisma +++ b/packages/db/prisma/schema.prisma @@ -21,6 +21,7 @@ model User { agents Agent[] secrets Secret[] + handsJobs HandsJob[] } model Agent { @@ -68,3 +69,109 @@ model AgentSecret { @@id([agentId, secretId]) } + +// ── OnlyAgent Hands ──────────────────────────────────────────────────── + +/// A remote control job — a sequence of keystroke instructions to execute +/// on a host machine via OnlyKey WebHID. +model HandsJob { + id String @id @default(cuid()) + userId String + name String + description String + status String @default("draft") // draft|queued|running|paused|completed|failed|cancelled + hostOS String? // macos|windows|linux + priority Int @default(0) + maxDurationSecs Int @default(3600) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + completedAt DateTime? + + user User @relation(fields: [userId], references: [id]) + steps HandsStep[] + sessions HandsSession[] + auditEvents HandsAuditEvent[] + + @@index([userId, status]) +} + +/// An individual step within a job. +model HandsStep { + id String @id @default(cuid()) + jobId String + sequenceNumber Int + description String + macroInstructions Json // MacroInstruction[] + expectedOutcome String + status String @default("pending") // pending|sending|executing|verifying|succeeded|failed|skipped + retryCount Int @default(0) + maxRetries Int @default(3) + timeoutMs Int @default(30000) + requireConfirm Boolean @default(false) + createdAt DateTime @default(now()) + startedAt DateTime? + completedAt DateTime? + errorMessage String? + + job HandsJob @relation(fields: [jobId], references: [id], onDelete: Cascade) + screenshots HandsScreenshot[] + + @@index([jobId, sequenceNumber]) +} + +/// An active WebHID control session (browser ↔ OnlyKey ↔ host). +model HandsSession { + id String @id @default(cuid()) + jobId String + userId String + agentToken String // screenshot agent auth token (session-scoped) + browserSessionId String? + status String @default("establishing") // establishing|active|paused|closed + hostOS String? + sessionKeyHash String? // SHA-256 of session key (audit only) + deviceId String? // WebHID device identifier + createdAt DateTime @default(now()) + lastActivityAt DateTime @default(now()) + closedAt DateTime? + closeReason String? + + job HandsJob @relation(fields: [jobId], references: [id]) + screenshots HandsScreenshot[] + + @@index([jobId, status]) +} + +/// A screenshot captured during a Hands session. +model HandsScreenshot { + id String @id @default(cuid()) + sessionId String + stepId String + capturedAt DateTime @default(now()) + imageData Bytes + width Int + height Int + sizeBytes Int + analysis Json? // AI analysis result + expiresAt DateTime + + session HandsSession @relation(fields: [sessionId], references: [id], onDelete: Cascade) + step HandsStep @relation(fields: [stepId], references: [id]) + + @@index([sessionId, capturedAt(sort: Desc)]) + @@index([stepId]) +} + +/// Immutable audit log for Hands operations. +model HandsAuditEvent { + id String @id @default(cuid()) + jobId String + event String // session_created|session_activated|instruction_sent|step_complete|emergency_stop|etc. + stepId String? + sessionId String? + metadata Json? + timestamp DateTime @default(now()) + + job HandsJob @relation(fields: [jobId], references: [id]) + + @@index([jobId, timestamp]) +}