Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion apps/gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,14 @@ anyhow = "1"
# Time (for certificate validity)
time = "0.3"

# Chrono (for vault timestamp handling)
chrono = { version = "0.4", features = ["serde"] }

# ULID/CUID generation
ulid = "1"

# Direct database access
sqlx = { version = "0.8", features = ["runtime-tokio", "tls-rustls", "postgres"] }
sqlx = { version = "0.8", features = ["runtime-tokio", "tls-rustls", "postgres", "chrono"] }

[features]
cloud = []
Expand Down
44 changes: 43 additions & 1 deletion apps/gateway/src/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::time::{Duration, Instant};
use crate::crypto::CryptoService;
use crate::db;
use crate::inject::{ConnectRule, Injection};
use crate::vault;
use dashmap::DashMap;

/// How long to cache resolved connect responses before re-checking.
Expand Down Expand Up @@ -52,6 +53,7 @@ pub(crate) type ConnectCacheKey = (String, String);
pub(crate) struct PolicyEngine {
pub pool: sqlx::PgPool,
pub crypto: Arc<CryptoService>,
pub vault_cache: Option<vault::cache::InMemoryUnlockCache>,
}

impl PolicyEngine {
Expand Down Expand Up @@ -107,8 +109,48 @@ impl PolicyEngine {
});
}

// 5. Also check vault records (OnlyKey-protected secrets)
if let Some(ref vc) = self.vault_cache {
let vault_records = vault::db::find_vault_records_by_host(
&self.pool,
&agent.user_id,
hostname,
)
.await
.unwrap_or_default();

for vr in vault_records {
let scope = vault::models::CacheScope::from_str(&vr.cache_scope);
let scope_id = &agent.id;
let idle = std::time::Duration::from_secs(vr.idle_timeout_seconds as u64);

// Try the unlock cache
if let Some(entry) = vc.get_and_touch(&vr.id, &scope, scope_id, idle) {
// Decrypt with cached record key
if let Ok(decrypted) = vault::crypto::decrypt_vault_secret(
&entry.record_key,
&vr.ciphertext_b64,
&vr.nonce_b64,
&vr.aad_json,
) {
let path_pattern = vr.path_pattern.unwrap_or_else(|| "*".to_string());
let injections =
build_injections(&vr.record_type, &decrypted, vr.injection_config.as_ref());

rules.push(ConnectRule {
path_pattern,
injections,
});
}
}
// If not in cache, the record stays locked — agent must request via /v1/vault/records/:id/access
}
}

let intercept = !rules.is_empty();

Ok(ConnectResponse {
intercept: true,
intercept,
rules,
user_id: Some(agent.user_id),
})
Expand Down
22 changes: 21 additions & 1 deletion apps/gateway/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::auth::AuthUser;
use crate::ca::CertificateAuthority;
use crate::connect::{self, CachedConnect, ConnectCacheKey, ConnectError, PolicyEngine};
use crate::inject::{self, ConnectRule};
use crate::vault;

// ── GatewayState ───────────────────────────────────────────────────────

Expand All @@ -42,6 +43,7 @@ pub(crate) struct GatewayState {
pub http_client: reqwest::Client,
pub policy_engine: Arc<PolicyEngine>,
pub connect_cache: Arc<DashMap<ConnectCacheKey, CachedConnect>>,
pub vault_state: Arc<vault::api::VaultState>,
}

// ── GatewayServer ───────────────────────────────────────────────────────
Expand All @@ -52,7 +54,12 @@ pub struct GatewayServer {
}

impl GatewayServer {
pub fn new(ca: CertificateAuthority, port: u16, policy_engine: Arc<PolicyEngine>) -> Self {
pub fn new(
ca: CertificateAuthority,
port: u16,
policy_engine: Arc<PolicyEngine>,
vault_state: Arc<vault::api::VaultState>,
) -> Self {
let state = GatewayState {
ca: Arc::new(ca),
http_client: reqwest::Client::builder()
Expand All @@ -63,6 +70,7 @@ impl GatewayServer {
.expect("build HTTP client"),
policy_engine,
connect_cache: Arc::new(DashMap::new()),
vault_state,
};

Self { state, port }
Expand Down Expand Up @@ -95,11 +103,23 @@ impl GatewayServer {
])
.allow_credentials(true);

// Build vault sub-router with its own state (Arc<VaultState>).
// Uses nest_service to integrate into the main router which has GatewayState.
let vault_router: Router = Router::new()
.route("/records/{id}/access", axum::routing::post(vault::api::access_record))
.route("/records/{id}/lock", axum::routing::post(vault::api::lock_record))
.route("/browser/pending", axum::routing::get(vault::api::get_pending_approvals))
.route("/browser/approve", axum::routing::post(vault::api::approve_request))
.route("/agents/{id}/lock", axum::routing::post(vault::api::lock_agent_records))
.route("/cache/revoke-all", axum::routing::post(vault::api::revoke_all_cache))
.with_state(Arc::clone(&self.state.vault_state));

// Build the Axum router for non-CONNECT routes.
// The fallback returns 400 Bad Request for anything other than defined routes.
let axum_router = Router::new()
.route("/healthz", axum::routing::get(healthz))
.route("/me", axum::routing::get(me))
.nest("/v1/vault", vault_router)
.layer(cors_layer)
.fallback(fallback)
.with_state(self.state.clone());
Expand Down
34 changes: 32 additions & 2 deletions apps/gateway/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ mod crypto;
mod db;
mod gateway;
mod inject;
mod vault;

use std::path::{Path, PathBuf};
use std::sync::Arc;
Expand Down Expand Up @@ -78,12 +79,41 @@ async fn main() -> Result<()> {
// Load encryption key for secret decryption
let crypto = Arc::new(crypto::CryptoService::from_env()?);

let policy_engine = Arc::new(PolicyEngine { pool, crypto });
// Initialize OnlyKey Vault subsystem (before PolicyEngine so we can share the cache)
let vault_origin = std::env::var("VAULT_ORIGIN")
.unwrap_or_else(|_| "apps.crp.to".to_string());
let vault_unlock_cache = vault::cache::InMemoryUnlockCache::new();
let vault_state = Arc::new(vault::api::VaultState {
pool: pool.clone(),
unlock_cache: vault_unlock_cache.clone(),
default_origin: vault_origin,
});

let policy_engine = Arc::new(PolicyEngine {
pool,
crypto,
vault_cache: Some(vault_unlock_cache),
});

// Spawn periodic cache cleanup (every 60s)
{
let vc = vault_state.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(60));
loop {
interval.tick().await;
let removed = vc.unlock_cache.cleanup_expired();
if removed > 0 {
info!(removed = removed, "vault cache: cleaned up expired entries");
}
}
});
}

info!(port = cli.port, "gateway ready");

// Start the gateway server (blocks forever)
let server = GatewayServer::new(ca, cli.port, policy_engine);
let server = GatewayServer::new(ca, cli.port, policy_engine, vault_state);
server.run().await
}

Expand Down
Loading