Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,012 changes: 986 additions & 26 deletions relay-server/Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions relay-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ urlencoding = "2.1"
tower_governor = "0.4"
governor = "0.6"
validator = { version = "0.18", features = ["derive"] }
sqlx = { version = "0.7", default-features = false, features = ["runtime-tokio", "tls-rustls", "postgres", "chrono", "macros", "migrate"] }
async-trait = "0.1"

[dev-dependencies]
tower = { version = "0.5", features = ["util"] }
15 changes: 15 additions & 0 deletions relay-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,20 @@ Web screen sharing with up to 8 participants.
- `GET /api/rtc-sessions/:id` → `{app_id, channel, host_uid}` - Get session info
- `POST /api/rtc-sessions/:id/join {name}` → `{app_id, channel, token, uid}` - Join session (assigns unique UID)

### Vault
Durable, append-only, versioned shared context store for collaborating atems.
Backed by Postgres (`DATABASE_URL`). All requests require
`Authorization: session <session_id>` and `?id=<client_id>`.

- `POST /api/vault {summary}` → `{vault_id}` - Create a vault
- `GET /api/vault` → `[{vault_id, summary}]` - List readable vaults
- `GET /api/vault/:id [?since=<seq>&history=true]` → `[VaultEntry]` - Read (current view or history)
- `POST /api/vault/:id {text, entry_id?}` → `{entry_no, version, seq}` - Append (no `entry_id`) or override (with `entry_id`)
- `POST /api/vault/:id/summary {text}` → `{}` - Update summary

Authz: in-session callers (same `work_session_id` = bound astation_id) get read+write;
out-of-session past content-writers get read-only; others are denied (403).

## Astation Integration

The Astation macOS app uses this relay server for:
Expand All @@ -91,6 +105,7 @@ Config: Set `relay_url` and `ws_url` in `.atem/config.toml`
| `PUBLIC_BASE_URL` | _(unset)_ | Public base URL used for generated session links (recommended in production) |
| `PORT` | `3000` | Server port |
| `RUST_LOG` | `info` | Log level (error, warn, info, debug, trace) |
| `DATABASE_URL` | _(unset)_ | Postgres connection string for **vault** storage (e.g. `postgres://vault:vault@localhost:5432/vault`). When unset, vault storage falls back to **in-memory** (non-durable) and logs a warning. Migrations in `migrations/` run automatically at startup. |

**Production:**
```bash
Expand Down
22 changes: 22 additions & 0 deletions relay-server/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@ services:
build: .
image: station-relay-server:latest
restart: unless-stopped
depends_on:
postgres:
condition: service_healthy

environment:
- CORS_ORIGIN=${CORS_ORIGIN:-https://station.agora.build}
- PUBLIC_BASE_URL=${PUBLIC_BASE_URL:-https://station.agora.build}
- PORT=${PORT:-3000}
- RUST_LOG=${RUST_LOG:-info}
- DATABASE_URL=${DATABASE_URL:-postgres://vault:vault@postgres:5432/vault}

ports:
- "3000:3000"
Expand All @@ -19,3 +23,21 @@ services:
timeout: 10s
retries: 3
start_period: 10s

postgres:
image: postgres:16-alpine
restart: unless-stopped
environment:
- POSTGRES_USER=${POSTGRES_USER:-vault}
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD:-vault}
- POSTGRES_DB=${POSTGRES_DB:-vault}
volumes:
- vault_pgdata:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER:-vault}"]
interval: 10s
timeout: 5s
retries: 5

volumes:
vault_pgdata:
24 changes: 24 additions & 0 deletions relay-server/migrations/0001_vault.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
-- Vault: durable, append-only, versioned shared context store.
CREATE TABLE IF NOT EXISTS vaults (
vault_id TEXT PRIMARY KEY, -- short, URL-safe, e.g. "v-7Kf3qD"
summary TEXT NOT NULL DEFAULT '', -- mutable description
work_session_id TEXT NOT NULL, -- the work session (astation_id) this vault belongs to
created_by TEXT NOT NULL, -- client_id of creator
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
writer_list TEXT[] NOT NULL DEFAULT '{}', -- denormalized content-writer client_ids
next_entry_no INT NOT NULL DEFAULT 1 -- per-vault entry-number allocator
);

CREATE TABLE IF NOT EXISTS vault_entries (
seq BIGSERIAL PRIMARY KEY, -- global write order (also the --since cursor)
vault_id TEXT NOT NULL REFERENCES vaults(vault_id),
entry_no INT NOT NULL, -- per-vault: 1,2,3 -> shown as e1, e2, e3
version INT NOT NULL, -- per-entry: 1,2,3 -> shown as v1, v2, v3
kind TEXT NOT NULL, -- 'content' | 'summary'
writer_id TEXT NOT NULL, -- client_id that wrote this row
content TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (vault_id, entry_no, version)
);

CREATE INDEX IF NOT EXISTS vault_entries_by_vault_seq ON vault_entries (vault_id, seq);
6 changes: 6 additions & 0 deletions relay-server/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ pub struct Session {
pub token: Option<String>,
pub created_at: DateTime<Utc>,
pub expires_at: DateTime<Utc>,
/// The astation_id this session is bound to (the "work session"). Set when
/// known (e.g. at grant / verification time). Used as vault work_session_id.
#[serde(default)]
pub astation_id: Option<String>,
}

/// Generate an 8-digit numeric OTP.
Expand Down Expand Up @@ -49,6 +53,7 @@ pub fn create_session(hostname: &str) -> Session {
token: None,
created_at: now,
expires_at: now + Duration::minutes(5),
astation_id: None,
}
}

Expand Down Expand Up @@ -151,6 +156,7 @@ mod tests {
token: None,
created_at: now - Duration::minutes(10),
expires_at: now - Duration::minutes(5), // Already expired
astation_id: None,
};
assert!(
!validate_otp(&session, "12345678"),
Expand Down
1 change: 1 addition & 0 deletions relay-server/src/llm_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ mod tests {
rtc_sessions: RtcSessionStore::new(),
session_verify_cache: SessionVerifyCache::new(),
voice_sessions: VoiceSessionStore::new(),
vault: std::sync::Arc::new(crate::vault_store::InMemoryVaultStore::new()),
}
}

Expand Down
43 changes: 43 additions & 0 deletions relay-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ mod session_verify;
mod voice_session;
mod voice_routes;
mod llm_proxy;
mod vault_store;
mod vault_routes;
mod web;

use axum::http::{header, HeaderValue, Method};
Expand All @@ -30,6 +32,7 @@ pub struct AppState {
pub rtc_sessions: RtcSessionStore,
pub session_verify_cache: SessionVerifyCache,
pub voice_sessions: VoiceSessionStore,
pub vault: Arc<dyn vault_store::VaultStore>,
}

#[tokio::main]
Expand All @@ -49,6 +52,32 @@ async fn main() {
let session_verify_cache = SessionVerifyCache::new();
let voice_sessions = VoiceSessionStore::new();

// Vault store: Postgres when DATABASE_URL is set (the durable path), else an
// in-memory fallback so the rest of the server still runs without a DB.
let vault: Arc<dyn vault_store::VaultStore> = match std::env::var("DATABASE_URL") {
Ok(url) if !url.is_empty() => {
tracing::info!("Connecting to Postgres for vault storage...");
let pool = sqlx::postgres::PgPoolOptions::new()
.max_connections(5)
.connect(&url)
.await
.expect("Failed to connect to DATABASE_URL for vault storage");
sqlx::migrate!("./migrations")
.run(&pool)
.await
.expect("Failed to run vault migrations");
tracing::info!("Vault storage ready (Postgres)");
Arc::new(vault_store::PgVaultStore::new(pool))
}
_ => {
tracing::warn!(
"DATABASE_URL not set — vault storage is IN-MEMORY (not durable). \
Set DATABASE_URL to enable persistent vaults."
);
Arc::new(vault_store::InMemoryVaultStore::new())
}
};

// Spawn background cleanup for expired sessions
let cleanup_sessions = sessions.clone();
tokio::spawn(async move {
Expand Down Expand Up @@ -109,6 +138,7 @@ async fn main() {
rtc_sessions,
session_verify_cache,
voice_sessions,
vault,
};

// Configure CORS - Allow specific origin or default to localhost for development
Expand Down Expand Up @@ -210,6 +240,19 @@ async fn main() {
"/api/llm/chat",
post(llm_proxy::llm_chat_handler),
)
// Vault API routes
.route(
"/api/vault",
post(vault_routes::create_vault_handler).get(vault_routes::list_vaults_handler),
)
.route(
"/api/vault/:id",
get(vault_routes::read_vault_handler).post(vault_routes::write_vault_handler),
)
.route(
"/api/vault/:id/summary",
post(vault_routes::set_summary_handler),
)
// Relay API routes
.route("/api/pair", post(relay::create_pair_handler))
.route("/api/pair/:code", get(relay::pair_status_handler).delete(relay::delete_pair_handler));
Expand Down
50 changes: 44 additions & 6 deletions relay-server/src/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,17 +300,32 @@ pub async fn ws_handler(
}
}

// Sanitize atem_id to URL-safe chars (hostname may contain spaces or special chars)
let atem_id = params.atem_id
.as_deref()
.map(|s| s.chars().filter(|c| c.is_alphanumeric() || *c == '-' || *c == '_' || *c == '.').collect::<String>())
.filter(|s| !s.is_empty())
.unwrap_or_else(|| format!("atem-{:x}", rand::thread_rng().gen::<u32>()));
// Sanitize atem_id. atem percent-encodes ids that may contain non-ASCII
// (CJK hostnames), so decode first, then keep non-ASCII while restricting
// ASCII to [A-Za-z0-9-] (matches atem's own identity rule).
let atem_id = sanitize_atem_id(params.atem_id.as_deref());

ws.on_upgrade(move |socket| handle_ws(hub, code, role, atem_id, socket))
.into_response()
}

/// Percent-decode an incoming atem_id and filter to a safe id, preserving
/// non-ASCII characters. Falls back to a random id when empty/missing.
fn sanitize_atem_id(raw: Option<&str>) -> String {
let decoded = urlencoding::decode(raw.unwrap_or(""))
.map(|c| c.into_owned())
.unwrap_or_default();
let filtered: String = decoded
.chars()
.filter(|c| !c.is_ascii() || c.is_ascii_alphanumeric() || *c == '-')
.collect();
if filtered.is_empty() {
format!("atem-{:x}", rand::thread_rng().gen::<u32>())
} else {
filtered
}
}

/// Message routing protocol for multi-Atem rooms:
///
/// Atem → Astation: relay WRAPS `{"atem_id":"<id>","payload":<original_msg>}`
Expand Down Expand Up @@ -719,6 +734,27 @@ mod tests {
assert!(unique.len() > 1, "Pairing codes should vary");
}

#[test]
fn sanitize_atem_id_preserves_decoded_cjk() {
// "团队-mac" percent-encoded; non-ASCII preserved, ASCII restricted to [A-Za-z0-9-].
let encoded = "%E5%9B%A2%E9%98%9F-mac";
assert_eq!(sanitize_atem_id(Some(encoded)), "团队-mac");
}

#[test]
fn sanitize_atem_id_strips_unsafe_ascii() {
assert_eq!(sanitize_atem_id(Some("a_b.c d!e")), "abcde");
assert_eq!(sanitize_atem_id(Some("keep-this-123")), "keep-this-123");
}

#[test]
fn sanitize_atem_id_falls_back_when_empty() {
let id = sanitize_atem_id(None);
assert!(id.starts_with("atem-"));
let id2 = sanitize_atem_id(Some(""));
assert!(id2.starts_with("atem-"));
}

#[test]
fn pairing_code_no_ambiguous_chars() {
for _ in 0..100 {
Expand Down Expand Up @@ -846,6 +882,7 @@ mod tests {
rtc_sessions: crate::rtc_session::RtcSessionStore::new(),
session_verify_cache: SessionVerifyCache::new(),
voice_sessions: VoiceSessionStore::new(),
vault: std::sync::Arc::new(crate::vault_store::InMemoryVaultStore::new()),
};
Router::new()
.route("/api/pair", axum::routing::post(create_pair_handler))
Expand Down Expand Up @@ -1219,6 +1256,7 @@ mod tests {
rtc_sessions: crate::rtc_session::RtcSessionStore::new(),
session_verify_cache: SessionVerifyCache::new(),
voice_sessions: VoiceSessionStore::new(),
vault: std::sync::Arc::new(crate::vault_store::InMemoryVaultStore::new()),
};

// Create pair
Expand Down
11 changes: 11 additions & 0 deletions relay-server/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ mod tests {
rtc_sessions: RtcSessionStore::new(),
session_verify_cache: SessionVerifyCache::new(),
voice_sessions: VoiceSessionStore::new(),
vault: std::sync::Arc::new(crate::vault_store::InMemoryVaultStore::new()),
};
Router::new()
.route("/api/sessions", post(create_session_handler))
Expand Down Expand Up @@ -334,6 +335,7 @@ mod tests {
rtc_sessions: RtcSessionStore::new(),
session_verify_cache: SessionVerifyCache::new(),
voice_sessions: VoiceSessionStore::new(),
vault: std::sync::Arc::new(crate::vault_store::InMemoryVaultStore::new()),
};
let app = Router::new()
.route("/api/sessions", post(create_session_handler))
Expand Down Expand Up @@ -434,6 +436,7 @@ mod tests {
rtc_sessions: RtcSessionStore::new(),
session_verify_cache: SessionVerifyCache::new(),
voice_sessions: VoiceSessionStore::new(),
vault: std::sync::Arc::new(crate::vault_store::InMemoryVaultStore::new()),
};
let app = Router::new()
.route("/api/sessions", post(create_session_handler))
Expand Down Expand Up @@ -504,6 +507,7 @@ mod tests {
rtc_sessions: RtcSessionStore::new(),
session_verify_cache: SessionVerifyCache::new(),
voice_sessions: VoiceSessionStore::new(),
vault: std::sync::Arc::new(crate::vault_store::InMemoryVaultStore::new()),
};
let app = Router::new()
.route("/api/sessions", post(create_session_handler))
Expand Down Expand Up @@ -592,6 +596,7 @@ mod tests {
rtc_sessions: RtcSessionStore::new(),
session_verify_cache: SessionVerifyCache::new(),
voice_sessions: VoiceSessionStore::new(),
vault: std::sync::Arc::new(crate::vault_store::InMemoryVaultStore::new()),
};
let session = create_session("my-machine");
let session_id = session.id.clone();
Expand Down Expand Up @@ -645,6 +650,7 @@ mod tests {
rtc_sessions: RtcSessionStore::new(),
session_verify_cache: SessionVerifyCache::new(),
voice_sessions: VoiceSessionStore::new(),
vault: std::sync::Arc::new(crate::vault_store::InMemoryVaultStore::new()),
};
let app = Router::new()
.route("/api/sessions", post(create_session_handler))
Expand Down Expand Up @@ -710,6 +716,7 @@ mod tests {
rtc_sessions: RtcSessionStore::new(),
session_verify_cache: SessionVerifyCache::new(),
voice_sessions: VoiceSessionStore::new(),
vault: std::sync::Arc::new(crate::vault_store::InMemoryVaultStore::new()),
};
let app = Router::new()
.route("/api/sessions", post(create_session_handler))
Expand Down Expand Up @@ -774,6 +781,7 @@ mod tests {
rtc_sessions: RtcSessionStore::new(),
session_verify_cache: SessionVerifyCache::new(),
voice_sessions: VoiceSessionStore::new(),
vault: std::sync::Arc::new(crate::vault_store::InMemoryVaultStore::new()),
};
let app = Router::new()
.route("/api/sessions", post(create_session_handler))
Expand Down Expand Up @@ -882,6 +890,7 @@ mod tests {
rtc_sessions: RtcSessionStore::new(),
session_verify_cache: SessionVerifyCache::new(),
voice_sessions: VoiceSessionStore::new(),
vault: std::sync::Arc::new(crate::vault_store::InMemoryVaultStore::new()),
};
let app = Router::new()
.route("/api/sessions", post(create_session_handler))
Expand Down Expand Up @@ -947,6 +956,7 @@ mod tests {
rtc_sessions: RtcSessionStore::new(),
session_verify_cache: SessionVerifyCache::new(),
voice_sessions: VoiceSessionStore::new(),
vault: std::sync::Arc::new(crate::vault_store::InMemoryVaultStore::new()),
};

// Create an expired session manually
Expand All @@ -959,6 +969,7 @@ mod tests {
token: None,
created_at: now - Duration::minutes(10),
expires_at: now - Duration::minutes(5),
astation_id: None,
};
let session_id = expired_session.id.clone();
state.sessions.create(expired_session).await;
Expand Down
Loading
Loading