From d290e1b0599e5944daa7ecdca50ebd9083313377 Mon Sep 17 00:00:00 2001 From: bernie-g Date: Mon, 4 May 2026 16:07:36 -0400 Subject: [PATCH 1/8] feat(pam-rdp): record RDP sessions through the chunk uploader Tap each PDU in the post-CredSSP byte bridge and stream structured events (target_frame / keyboard / unicode / mouse) through the existing session logger so they land in the encrypted chunk pipeline. Capture switches the post-CredSSP path from copy_bidirectional to a PDU-framed bridge: read_pdu yields TPKT/FastPath frames pure-framing, no RDP state machine, the bytes are forwarded unchanged, and the tap emits SessionEvent variants on an mpsc channel. This preserves the no-MCS/capability/share-state-drift property of the byte-pump it replaces. The FFI gains rdp_bridge_poll_event for Go to drain those events with a timeout. TargetFrame payloads are handed across as libc::malloc'd buffers; the Go side defers C.free after copying. Go-side, RDPProxy.HandleConnection spawns a drain goroutine that JSON- encodes each event and calls SessionLogger.LogTerminalEvent with ChannelType=rdp. The chunk uploader is protocol-agnostic, so RDP sessions now flow into pam_session_event_chunks like SSH/HTTP do. session.LogTerminalEvent skips masking for the rdp channel because the data field carries a base64-JSON envelope; SSH-shaped masking regexes would corrupt valid recordings. --- packages/pam/handlers/rdp/bridge.go | 47 ++++ .../pam/handlers/rdp/bridge_cgo_shared.go | 80 ++++++ packages/pam/handlers/rdp/bridge_stub.go | 5 + packages/pam/handlers/rdp/native/Cargo.lock | 2 + packages/pam/handlers/rdp/native/Cargo.toml | 2 + .../handlers/rdp/native/include/rdp_bridge.h | 42 ++++ .../pam/handlers/rdp/native/src/bridge.rs | 144 ++++++++++- .../pam/handlers/rdp/native/src/events.rs | 50 ++++ packages/pam/handlers/rdp/native/src/ffi.rs | 236 +++++++++++++++++- packages/pam/handlers/rdp/native/src/lib.rs | 1 + packages/pam/handlers/rdp/proxy.go | 137 +++++++++- packages/pam/session/logger.go | 11 +- 12 files changed, 740 insertions(+), 17 deletions(-) create mode 100644 packages/pam/handlers/rdp/native/src/events.rs diff --git a/packages/pam/handlers/rdp/bridge.go b/packages/pam/handlers/rdp/bridge.go index f582c864..8e99ebc2 100644 --- a/packages/pam/handlers/rdp/bridge.go +++ b/packages/pam/handlers/rdp/bridge.go @@ -15,3 +15,50 @@ type Bridge struct { handle uint64 cleanup func() } + +// EventType discriminates the variants in Event. +type EventType uint8 + +const ( + EventTypeKeyboard EventType = 1 + EventTypeUnicode EventType = 2 + EventTypeMouse EventType = 3 + EventTypeTargetFrame EventType = 4 +) + +// Action identifies the RDP framing of a TargetFrame event. +type Action uint8 + +const ( + ActionX224 Action = 0 + ActionFastPath Action = 1 +) + +// Event is a structured tap event drained from the bridge. +// +// Fields are reused across variants. Switch on Type: +// - Keyboard: Scancode + Flags. +// - Unicode: CodePoint + Flags. +// - Mouse: X, Y, Flags, WheelDelta. +// - TargetFrame: Action + Payload (raw PDU bytes; owned Go slice). +type Event struct { + Type EventType + ElapsedNs uint64 + Scancode uint8 + CodePoint uint16 + X uint16 + Y uint16 + Flags uint32 + WheelDelta int32 + Action Action + Payload []byte +} + +// PollResult discriminates PollEvent outcomes. +type PollResult uint8 + +const ( + PollOK PollResult = 0 + PollTimeout PollResult = 1 + PollEnded PollResult = 2 +) diff --git a/packages/pam/handlers/rdp/bridge_cgo_shared.go b/packages/pam/handlers/rdp/bridge_cgo_shared.go index 9a822e6f..e825aaa3 100644 --- a/packages/pam/handlers/rdp/bridge_cgo_shared.go +++ b/packages/pam/handlers/rdp/bridge_cgo_shared.go @@ -5,6 +5,7 @@ package rdp /* #cgo CFLAGS: -I${SRCDIR}/native/include +#include #include "rdp_bridge.h" */ import "C" @@ -14,6 +15,8 @@ import ( "errors" "fmt" "net" + "time" + "unsafe" ) func (p *RDPProxy) HandleConnection(ctx context.Context, clientConn net.Conn) error { @@ -36,6 +39,26 @@ func (p *RDPProxy) HandleConnection(ctx context.Context, clientConn net.Conn) er } defer bridge.Close() + // Drain bridge tap events into the session logger. The Rust side closes + // the events channel when the session ends, so the goroutine exits via + // PollEnded without needing an explicit shutdown signal. + drainCtx, cancelDrain := context.WithCancel(ctx) + drainDone := make(chan struct{}) + go func() { + defer close(drainDone) + drainBridgeEvents(drainCtx, bridge, p.config.SessionLogger, p.config.SessionID) + }() + defer func() { + cancelDrain() + // Wait briefly for the drain loop to exit so a cancelled session + // can't race the Bridge.Close below. PollEvent's timeout caps how + // long this can take. + select { + case <-drainDone: + case <-time.After(2 * pollTimeout): + } + }() + waitErr := make(chan error, 1) go func() { waitErr <- bridge.Wait() }() @@ -94,3 +117,60 @@ func (b *Bridge) Close() error { // response: a stub-build gateway that advertises support would route // RDP sessions only to fail them at connect time. func IsSupported() bool { return true } + +// PollEvent drains one tap event with the given timeout. The returned Event +// is only meaningful when result == PollOK. PollEvent is not safe to call +// concurrently for the same Bridge; serialize calls in a single goroutine. +func (b *Bridge) PollEvent(timeout time.Duration) (PollResult, Event, error) { + timeoutMs := timeout.Milliseconds() + if timeoutMs < 0 { + timeoutMs = 0 + } + if timeoutMs > int64(^C.uint32_t(0)) { + timeoutMs = int64(^C.uint32_t(0)) + } + + var raw C.struct_RdpEvent + rc := C.rdp_bridge_poll_event(C.uint64_t(b.handle), &raw, C.uint32_t(timeoutMs)) + + switch rc { + case C.RDP_POLL_OK: + // fall through to event materialization below + case C.RDP_POLL_TIMEOUT: + return PollTimeout, Event{}, nil + case C.RDP_POLL_ENDED: + return PollEnded, Event{}, nil + case C.RDP_POLL_INVALID_HANDLE: + return PollEnded, Event{}, ErrInvalidHandle + default: + return PollEnded, Event{}, fmt.Errorf("rdp bridge: poll returned unexpected status %d", int32(rc)) + } + + ev := Event{ + Type: EventType(uint8(raw.event_type)), + ElapsedNs: uint64(raw.elapsed_ns), + Flags: uint32(raw.flags), + WheelDelta: int32(raw.wheel_delta), + Action: Action(uint8(raw.action)), + } + switch ev.Type { + case EventTypeKeyboard: + ev.Scancode = uint8(raw.value_a) + case EventTypeUnicode: + ev.CodePoint = uint16(raw.value_a) + case EventTypeMouse: + ev.X = uint16(raw.value_a) + ev.Y = uint16(raw.value_b) + case EventTypeTargetFrame: + // Always free the libc-malloc'd buffer Rust handed us, even if + // the copy below is empty -- ownership transfer is unconditional. + if raw.payload_ptr != nil { + defer C.free(unsafe.Pointer(raw.payload_ptr)) + if raw.payload_len > 0 { + ev.Payload = C.GoBytes(unsafe.Pointer(raw.payload_ptr), C.int(raw.payload_len)) + } + } + } + + return PollOK, ev, nil +} diff --git a/packages/pam/handlers/rdp/bridge_stub.go b/packages/pam/handlers/rdp/bridge_stub.go index 37a3bcdf..0d704908 100644 --- a/packages/pam/handlers/rdp/bridge_stub.go +++ b/packages/pam/handlers/rdp/bridge_stub.go @@ -6,6 +6,7 @@ import ( "context" "io" "net" + "time" ) // Stub implementations for builds without `-tags rdp` or on platforms @@ -29,6 +30,10 @@ func (b *Bridge) Wait() error { return ErrRdpUnavailable } func (b *Bridge) Cancel() error { return ErrRdpUnavailable } func (b *Bridge) Close() error { return ErrRdpUnavailable } +func (b *Bridge) PollEvent(_ time.Duration) (PollResult, Event, error) { + return PollEnded, Event{}, ErrRdpUnavailable +} + // IsSupported reports whether this build has a real RDP bridge. See the // rdp-enabled counterpart in bridge_cgo_shared.go for details. func IsSupported() bool { return false } diff --git a/packages/pam/handlers/rdp/native/Cargo.lock b/packages/pam/handlers/rdp/native/Cargo.lock index 5c04a3e5..c4652505 100644 --- a/packages/pam/handlers/rdp/native/Cargo.lock +++ b/packages/pam/handlers/rdp/native/Cargo.lock @@ -1309,9 +1309,11 @@ dependencies = [ "bytes", "ironrdp-acceptor", "ironrdp-connector", + "ironrdp-core", "ironrdp-pdu", "ironrdp-tls", "ironrdp-tokio", + "libc", "libz-sys", "rcgen", "rustls", diff --git a/packages/pam/handlers/rdp/native/Cargo.toml b/packages/pam/handlers/rdp/native/Cargo.toml index 500a2117..cb53a5d2 100644 --- a/packages/pam/handlers/rdp/native/Cargo.toml +++ b/packages/pam/handlers/rdp/native/Cargo.toml @@ -13,10 +13,12 @@ path = "src/lib.rs" [dependencies] ironrdp-acceptor = "0.8" ironrdp-connector = "0.8" +ironrdp-core = "0.1" ironrdp-tokio = { version = "0.8", features = ["reqwest"] } ironrdp-pdu = "0.7" ironrdp-tls = { version = "0.2", features = ["rustls"] } x509-cert = { version = "0.2", features = ["std"] } +libc = "0.2" tokio = { version = "1", features = ["full"] } tokio-util = "0.7" diff --git a/packages/pam/handlers/rdp/native/include/rdp_bridge.h b/packages/pam/handlers/rdp/native/include/rdp_bridge.h index 83088768..753dd351 100644 --- a/packages/pam/handlers/rdp/native/include/rdp_bridge.h +++ b/packages/pam/handlers/rdp/native/include/rdp_bridge.h @@ -46,6 +46,48 @@ int32_t rdp_bridge_wait(uint64_t handle); int32_t rdp_bridge_cancel(uint64_t handle); int32_t rdp_bridge_free(uint64_t handle); +/* Poll return codes (distinct number space from the bridge status codes + * above; consumed by rdp_bridge_poll_event only). */ +#define RDP_POLL_OK 0 +#define RDP_POLL_TIMEOUT 1 +#define RDP_POLL_ENDED 2 +#define RDP_POLL_INVALID_HANDLE -1 + +/* Event type discriminator. */ +#define RDP_EVENT_KEYBOARD 1 +#define RDP_EVENT_UNICODE 2 +#define RDP_EVENT_MOUSE 3 +#define RDP_EVENT_TARGET_FRAME 4 + +/* + * Bridge tap event surfaced to Go. + * + * Fields are reused across variants -- check `event_type` to decide which + * fields are meaningful: + * - Keyboard: value_a = scancode, flags = KeyboardFlags bits. + * - Unicode: value_a = code point, flags = KeyboardFlags bits. + * - Mouse: value_a = x, value_b = y, flags = PointerFlags bits, + * wheel_delta is signed. + * - TargetFrame: action = 0 (X.224) or 1 (FastPath); payload_ptr points + * at a heap buffer of size payload_len with the raw PDU + * bytes. The buffer was allocated with libc malloc; the Go + * caller MUST free it via C.free after copying the bytes. + * Other variants leave payload_ptr = NULL, payload_len = 0. + */ +struct RdpEvent { + uint8_t event_type; + uint64_t elapsed_ns; + uint32_t value_a; + uint32_t value_b; + uint32_t flags; + int32_t wheel_delta; + uint8_t action; + uint8_t *payload_ptr; + uint32_t payload_len; +}; + +int32_t rdp_bridge_poll_event(uint64_t handle, struct RdpEvent *out, uint32_t timeout_ms); + #ifdef __cplusplus } #endif diff --git a/packages/pam/handlers/rdp/native/src/bridge.rs b/packages/pam/handlers/rdp/native/src/bridge.rs index cfe5e992..01a41d70 100644 --- a/packages/pam/handlers/rdp/native/src/bridge.rs +++ b/packages/pam/handlers/rdp/native/src/bridge.rs @@ -4,6 +4,7 @@ //! avoids drift that breaks strict clients (Windows App, mstsc). use std::sync::Arc; +use std::time::Instant; use anyhow::{Context, Result}; use ironrdp_acceptor::{Acceptor, BeginResult}; @@ -11,20 +12,24 @@ use ironrdp_connector::credssp::{CredsspSequence, KerberosConfig}; use ironrdp_connector::sspi::credssp::ClientState; use ironrdp_connector::sspi::generator::GeneratorState; use ironrdp_connector::{encode_x224_packet, ClientConnector, ClientConnectorState}; +use ironrdp_core::ReadCursor; use ironrdp_pdu::gcc::ConferenceCreateRequest; +use ironrdp_pdu::input::fast_path::{FastPathInput, FastPathInputEvent}; use ironrdp_pdu::ironrdp_core::{decode, WriteBuf}; use ironrdp_pdu::mcs::ConnectInitial; use ironrdp_pdu::nego::SecurityProtocol; use ironrdp_pdu::rdp::client_info::Credentials as AcceptorCredentials; use ironrdp_pdu::x224::{X224Data, X224}; +use ironrdp_pdu::Action; use ironrdp_tokio::reqwest::ReqwestNetworkClient; use ironrdp_tokio::{FramedWrite, NetworkClient}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use tokio::net::TcpStream; use tokio_util::sync::CancellationToken; -use tracing::info; +use tracing::{info, warn}; use crate::config::{connector_config, DEFAULT_HEIGHT, DEFAULT_WIDTH}; +use crate::events::{elapsed_ns_since, EventSender, SessionEvent}; // The acceptor side of the bridge expects the user to type the target // username with an empty password. The real password is injected by the @@ -42,9 +47,10 @@ pub async fn run_mitm( client_tcp: TcpStream, target: TargetEndpoint, cancel: CancellationToken, + tx: EventSender, ) -> Result<()> { tokio::select! { - result = run_mitm_inner(client_tcp, target) => result, + result = run_mitm_inner(client_tcp, target, tx) => result, _ = cancel.cancelled() => { info!("session canceled by caller"); Ok(()) @@ -52,7 +58,11 @@ pub async fn run_mitm( } } -async fn run_mitm_inner(client_tcp: TcpStream, target: TargetEndpoint) -> Result<()> { +async fn run_mitm_inner( + client_tcp: TcpStream, + target: TargetEndpoint, + tx: EventSender, +) -> Result<()> { // Our tree pulls both ring (direct) and aws-lc-rs (via reqwest); rustls // 0.23 needs an explicit provider when more than one is compiled in. let _ = rustls::crypto::ring::default_provider().install_default(); @@ -92,18 +102,128 @@ async fn run_mitm_inner(client_tcp: TcpStream, target: TargetEndpoint) -> Result .await .context("flush target stream before passthrough")?; - // Real RDP clients hard-close TCP without TLS close_notify, which - // rustls surfaces as UnexpectedEof. Treat that as clean shutdown. - match tokio::io::copy_bidirectional(&mut client_stream, &mut target_stream).await { - Ok(_) => info!("session ended cleanly"), - Err(e) if is_unexpected_eof(&e) => info!("session ended (peer hard-closed)"), - Err(e) => return Err(e).context("passthrough copy_bidirectional"), + // Bridge PDUs end-to-end with an event tap. read_pdu is pure TPKT/FastPath + // framing -- it does not run any RDP state machine -- so this preserves + // the "no MCS/capability/share-state drift" property of the byte-level + // copy_bidirectional path it replaces. Each PDU is forwarded byte-for-byte + // before/after the tap. + let client_framed = ironrdp_tokio::TokioFramed::new(client_stream); + let target_framed = ironrdp_tokio::TokioFramed::new(target_stream); + bridge_pdus(client_framed, target_framed, tx).await +} + +async fn bridge_pdus( + client_framed: ironrdp_tokio::TokioFramed, + target_framed: ironrdp_tokio::TokioFramed, + tx: EventSender, +) -> Result<()> +where + C: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static, + T: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static, +{ + let (mut client_read, mut client_write) = ironrdp_tokio::split_tokio_framed(client_framed); + let (mut target_read, mut target_write) = ironrdp_tokio::split_tokio_framed(target_framed); + + let started_at = Instant::now(); + let tx_c2t = tx.clone(); + let tx_t2c = tx; + + let c2t = async move { + loop { + let (action, frame) = match client_read.read_pdu().await { + Ok(v) => v, + Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break, + Err(e) => return Err::<_, anyhow::Error>(e.into()), + }; + tap_client_to_target(action, &frame, started_at, &tx_c2t); + target_write + .write_all(&frame) + .await + .context("write client PDU to target")?; + } + Ok(()) + }; + + let t2c = async move { + loop { + let (action, frame) = match target_read.read_pdu().await { + Ok(v) => v, + Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break, + Err(e) => return Err::<_, anyhow::Error>(e.into()), + }; + tap_target_to_client(action, &frame, started_at, &tx_t2c); + client_write + .write_all(&frame) + .await + .context("write target PDU to client")?; + } + Ok(()) + }; + + match tokio::try_join!(c2t, t2c) { + Ok(_) => { + info!("session ended cleanly"); + Ok(()) + } + Err(e) => Err(e).context("bridge_pdus"), } - Ok(()) } -fn is_unexpected_eof(err: &std::io::Error) -> bool { - err.kind() == std::io::ErrorKind::UnexpectedEof +fn tap_client_to_target(action: Action, frame: &[u8], started_at: Instant, tx: &EventSender) { + if action != Action::FastPath { + return; + } + let input: FastPathInput = match decode_fast_path_input(frame) { + Ok(input) => input, + Err(e) => { + warn!(?e, "failed to decode FastPathInput"); + return; + } + }; + let elapsed_ns = elapsed_ns_since(started_at); + for event in input.input_events() { + let session_event = match *event { + FastPathInputEvent::KeyboardEvent(flags, scancode) => SessionEvent::KeyboardInput { + scancode, + flags, + elapsed_ns, + }, + FastPathInputEvent::UnicodeKeyboardEvent(flags, code_point) => { + SessionEvent::UnicodeInput { + code_point, + flags, + elapsed_ns, + } + } + FastPathInputEvent::MouseEvent(pdu) => SessionEvent::MouseInput { + x: pdu.x_position, + y: pdu.y_position, + flags: pdu.flags, + wheel_delta: pdu.number_of_wheel_rotation_units, + elapsed_ns, + }, + // MouseEventEx, MouseEventRel, QoeEvent, SyncEvent: skip for now; + // uncommon in normal sessions and not needed for replay V1. + _ => continue, + }; + // send error means the receiver was dropped (poll loop exited). + // The bridge keeps forwarding bytes regardless. + let _ = tx.send(session_event); + } +} + +fn tap_target_to_client(action: Action, frame: &[u8], started_at: Instant, tx: &EventSender) { + let _ = tx.send(SessionEvent::TargetFrame { + action, + payload: frame.to_vec(), + elapsed_ns: elapsed_ns_since(started_at), + }); +} + +fn decode_fast_path_input(frame: &[u8]) -> anyhow::Result { + use ironrdp_core::Decode as _; + let mut cursor = ReadCursor::new(frame); + FastPathInput::decode(&mut cursor).map_err(|e| anyhow::anyhow!("decode FastPathInput: {e}")) } // Reads the client's MCS Connect Initial PDU, removes any virtual channels diff --git a/packages/pam/handlers/rdp/native/src/events.rs b/packages/pam/handlers/rdp/native/src/events.rs new file mode 100644 index 00000000..e1298b07 --- /dev/null +++ b/packages/pam/handlers/rdp/native/src/events.rs @@ -0,0 +1,50 @@ +//! Structured session events emitted by the bridge's PDU tap. +//! +//! Keyboard / unicode / mouse events are decoded from FastPath input PDUs on +//! the client->target path. TargetFrame events carry the full raw PDU bytes +//! exactly as they came off the wire on the target->client path; decoding +//! (RLE, 16bpp->RGBA, etc.) happens at replay time in the browser. + +use std::time::Instant; + +use ironrdp_pdu::input::fast_path::KeyboardFlags; +use ironrdp_pdu::input::mouse::PointerFlags; +use ironrdp_pdu::Action; +use tokio::sync::mpsc; + +#[derive(Debug, Clone)] +pub enum SessionEvent { + KeyboardInput { + scancode: u8, + flags: KeyboardFlags, + elapsed_ns: u64, + }, + UnicodeInput { + code_point: u16, + flags: KeyboardFlags, + elapsed_ns: u64, + }, + MouseInput { + x: u16, + y: u16, + flags: PointerFlags, + wheel_delta: i16, + elapsed_ns: u64, + }, + TargetFrame { + action: Action, + payload: Vec, + elapsed_ns: u64, + }, +} + +pub fn elapsed_ns_since(started_at: Instant) -> u64 { + started_at.elapsed().as_nanos() as u64 +} + +pub type EventSender = mpsc::UnboundedSender; +pub type EventReceiver = mpsc::UnboundedReceiver; + +pub fn channel() -> (EventSender, EventReceiver) { + mpsc::unbounded_channel() +} diff --git a/packages/pam/handlers/rdp/native/src/ffi.rs b/packages/pam/handlers/rdp/native/src/ffi.rs index ecef7782..dd0c6bd3 100644 --- a/packages/pam/handlers/rdp/native/src/ffi.rs +++ b/packages/pam/handlers/rdp/native/src/ffi.rs @@ -3,6 +3,10 @@ //! Each session runs on its own OS thread with a current-thread tokio //! runtime. `start_*` transfers ownership of the client fd/socket to //! Rust (Go hands in a dup). Contract: wait, then free. +//! +//! Events: the bridge taps each forwarded PDU and emits structured events +//! (keyboard / unicode / mouse / target frame) that Go drains via +//! `rdp_bridge_poll_event`. use std::collections::HashMap; use std::ffi::{c_char, CStr}; @@ -10,12 +14,15 @@ use std::net::TcpStream as StdTcpStream; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{LazyLock, Mutex}; use std::thread::JoinHandle; +use std::time::Duration; use tokio::net::TcpStream; +use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; use tracing::{error, info}; use crate::bridge::{run_mitm, TargetEndpoint}; +use crate::events::{self, SessionEvent}; pub const RDP_BRIDGE_OK: i32 = 0; pub const RDP_BRIDGE_SESSION_ERROR: i32 = 1; @@ -24,10 +31,152 @@ pub const RDP_BRIDGE_INVALID_HANDLE: i32 = -1; pub const RDP_BRIDGE_BAD_ARG: i32 = -2; pub const RDP_BRIDGE_RUNTIME_ERROR: i32 = -3; +// Poll return codes -- distinct number space from the bridge status codes +// above, even though their numeric values overlap, because they're consumed +// by a different Go function. +pub const RDP_POLL_OK: i32 = 0; +pub const RDP_POLL_TIMEOUT: i32 = 1; +pub const RDP_POLL_ENDED: i32 = 2; +pub const RDP_POLL_INVALID_HANDLE: i32 = -1; + +#[repr(u8)] +pub enum RdpEventType { + Keyboard = 1, + Unicode = 2, + Mouse = 3, + TargetFrame = 4, +} + +/// C-ABI friendly event. Fields are reused across variants; check +/// `event_type` to decide which fields are meaningful. +/// +/// For TargetFrame, `payload_ptr` points at a `libc::malloc`-allocated buffer +/// of size `payload_len`. The Go caller takes ownership and must +/// `libc::free(payload_ptr)` after copying the bytes out. Other variants set +/// `payload_ptr = NULL` and `payload_len = 0`. +#[repr(C)] +pub struct RdpEvent { + pub event_type: u8, + /// Nanoseconds since bridge start. + pub elapsed_ns: u64, + /// Keyboard: scancode. Unicode: code point. Mouse: x. TargetFrame: bytes. + pub value_a: u32, + /// Mouse: y. Others: 0. + pub value_b: u32, + /// Keyboard / Unicode / Mouse flags (raw bits from the RDP layer). + pub flags: u32, + /// Mouse wheel delta (signed). 0 for others. + pub wheel_delta: i32, + /// TargetFrame: 0 = X.224, 1 = FastPath. 0 for others. + pub action: u8, + pub payload_ptr: *mut u8, + pub payload_len: u32, +} + +impl RdpEvent { + const fn zero() -> Self { + Self { + event_type: 0, + elapsed_ns: 0, + value_a: 0, + value_b: 0, + flags: 0, + wheel_delta: 0, + action: 0, + payload_ptr: std::ptr::null_mut(), + payload_len: 0, + } + } + + fn from_session_event(ev: SessionEvent) -> Self { + match ev { + SessionEvent::KeyboardInput { + scancode, + flags, + elapsed_ns, + } => Self { + event_type: RdpEventType::Keyboard as u8, + elapsed_ns, + value_a: scancode.into(), + flags: flags.bits().into(), + ..Self::zero() + }, + SessionEvent::UnicodeInput { + code_point, + flags, + elapsed_ns, + } => Self { + event_type: RdpEventType::Unicode as u8, + elapsed_ns, + value_a: code_point.into(), + flags: flags.bits().into(), + ..Self::zero() + }, + SessionEvent::MouseInput { + x, + y, + flags, + wheel_delta, + elapsed_ns, + } => Self { + event_type: RdpEventType::Mouse as u8, + elapsed_ns, + value_a: x.into(), + value_b: y.into(), + flags: flags.bits().into(), + wheel_delta: wheel_delta.into(), + ..Self::zero() + }, + SessionEvent::TargetFrame { + action, + payload, + elapsed_ns, + } => { + // Copy into a libc::malloc'd buffer the Go caller will free. + // Using libc (not Rust's allocator) lets Go free directly via + // C.free without an extra trip back through the FFI. + let len = payload.len(); + let ptr = if len == 0 { + std::ptr::null_mut() + } else { + unsafe { + let p = libc::malloc(len) as *mut u8; + if p.is_null() { + std::ptr::null_mut() + } else { + std::ptr::copy_nonoverlapping(payload.as_ptr(), p, len); + p + } + } + }; + Self { + event_type: RdpEventType::TargetFrame as u8, + elapsed_ns, + value_a: len as u32, + action: match action { + ironrdp_pdu::Action::X224 => 0, + ironrdp_pdu::Action::FastPath => 1, + }, + payload_ptr: ptr, + payload_len: len as u32, + ..Self::zero() + } + } + } + } +} + struct BridgeEntry { cancel: CancellationToken, // Taken by wait(); None afterward. join: Mutex>>>, + // Receiver side of the bridge's event channel. Polled by Go via + // rdp_bridge_poll_event. Wrapped in Option so the poll loop can take it + // out for the duration of the await without holding the HANDLES lock. + events_rx: Mutex>>, + // Set once the events channel has reported closed; subsequent polls + // short-circuit to RDP_POLL_ENDED. + events_ended: Mutex, } static HANDLES: LazyLock>> = @@ -64,6 +213,8 @@ fn spawn_session( let cancel = CancellationToken::new(); let cancel_for_thread = cancel.clone(); + let (events_tx, events_rx) = events::channel(); + let join = std::thread::Builder::new() .name("rdp-bridge-session".to_owned()) .spawn(move || -> anyhow::Result<()> { @@ -78,13 +229,15 @@ fn spawn_session( username, password, }; - run_mitm(client, endpoint, cancel_for_thread).await + run_mitm(client, endpoint, cancel_for_thread, events_tx).await }) })?; Ok(register(BridgeEntry { cancel, join: Mutex::new(Some(join)), + events_rx: Mutex::new(Some(events_rx)), + events_ended: Mutex::new(false), })) } @@ -227,3 +380,84 @@ pub extern "C" fn rdp_bridge_free(handle: u64) -> i32 { RDP_BRIDGE_INVALID_HANDLE } } + +/// Poll the next event, blocking up to `timeout_ms` milliseconds. +/// +/// Returns: +/// * `RDP_POLL_OK` -- event written to *out (caller owns *payload_ptr* if +/// non-null and must `libc::free` it). +/// * `RDP_POLL_TIMEOUT` -- no event in time; *out not modified. +/// * `RDP_POLL_ENDED` -- bridge finished; no more events. +/// * `RDP_POLL_INVALID_HANDLE` -- unknown or already-closed handle. +/// +/// # Safety +/// +/// `out` must be a non-null, writable `*mut RdpEvent`. +#[no_mangle] +pub unsafe extern "C" fn rdp_bridge_poll_event( + handle: u64, + out: *mut RdpEvent, + timeout_ms: u32, +) -> i32 { + if out.is_null() { + return RDP_POLL_INVALID_HANDLE; + } + + // Take the receiver out of the entry so we don't hold the HANDLES lock + // across the await. Put it back at the end (or leave None and mark + // ended). + let take_result: Result>, i32> = { + let handles = HANDLES.lock().expect("HANDLES poisoned"); + match handles.get(&handle) { + None => Err(RDP_POLL_INVALID_HANDLE), + Some(entry) => { + if *entry.events_ended.lock().expect("events_ended poisoned") { + Err(RDP_POLL_ENDED) + } else { + Ok(entry.events_rx.lock().expect("events_rx poisoned").take()) + } + } + } + }; + let mut rx = match take_result { + Ok(Some(rx)) => rx, + // Another poll is already in flight on this handle. Treat as + // invalid: callers should serialize their poll calls. + Ok(None) => return RDP_POLL_INVALID_HANDLE, + Err(code) => return code, + }; + + // Short-lived single-thread runtime just for the timeout. Cheap; the + // bridge thread runs its own runtime. + let result = { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_time() + .build() + .expect("build poll runtime"); + rt.block_on(async { + tokio::time::timeout(Duration::from_millis(timeout_ms.into()), rx.recv()).await + }) + }; + + let outcome = match result { + Ok(Some(event)) => { + let rdp_event = RdpEvent::from_session_event(event); + unsafe { out.write(rdp_event) }; + RDP_POLL_OK + } + Ok(None) => RDP_POLL_ENDED, // sender side dropped (bridge ended) + Err(_timeout) => RDP_POLL_TIMEOUT, + }; + + // Restore the receiver, or mark ended if the channel reported closed. + let handles = HANDLES.lock().expect("HANDLES poisoned"); + if let Some(entry) = handles.get(&handle) { + if outcome == RDP_POLL_ENDED { + *entry.events_ended.lock().expect("events_ended poisoned") = true; + } else { + *entry.events_rx.lock().expect("events_rx poisoned") = Some(rx); + } + } + + outcome +} diff --git a/packages/pam/handlers/rdp/native/src/lib.rs b/packages/pam/handlers/rdp/native/src/lib.rs index 61c64480..3f37292d 100644 --- a/packages/pam/handlers/rdp/native/src/lib.rs +++ b/packages/pam/handlers/rdp/native/src/lib.rs @@ -4,4 +4,5 @@ pub mod bridge; pub mod config; +pub mod events; pub mod ffi; diff --git a/packages/pam/handlers/rdp/proxy.go b/packages/pam/handlers/rdp/proxy.go index e113902a..a57d4583 100644 --- a/packages/pam/handlers/rdp/proxy.go +++ b/packages/pam/handlers/rdp/proxy.go @@ -1,6 +1,13 @@ package rdp import ( + "context" + "encoding/json" + "errors" + "time" + + "github.com/rs/zerolog/log" + "github.com/Infisical/infisical-merge/packages/pam/session" ) @@ -10,9 +17,7 @@ type RDPProxyConfig struct { InjectUsername string InjectPassword string SessionID string - // Retained for API symmetry with other PAM handlers; not yet written - // through (no RDP session recording in this MVP). - SessionLogger session.SessionLogger + SessionLogger session.SessionLogger } type RDPProxy struct { @@ -22,3 +27,129 @@ type RDPProxy struct { func NewRDPProxy(config RDPProxyConfig) *RDPProxy { return &RDPProxy{config: config} } + +// Wire-format JSON envelopes carried inside session.TerminalEvent.Data when +// ChannelType == TerminalChannelRDP. The frontend RDP player decodes these +// after AAD-bound chunk decryption and feeds target_frame payloads into the +// IronRDP WASM decoder. +type rdpTargetFrameEnvelope struct { + Type string `json:"type"` // "target_frame" + Action string `json:"action"` // "x224" | "fastpath" + Payload []byte `json:"payload"` // raw PDU bytes (base64 by Go's json.Marshal) + ElapsedNs uint64 `json:"elapsedNs"` +} + +type rdpKeyboardEnvelope struct { + Type string `json:"type"` // "keyboard" + Scancode uint8 `json:"scancode"` + Flags uint32 `json:"flags"` + ElapsedNs uint64 `json:"elapsedNs"` +} + +type rdpUnicodeEnvelope struct { + Type string `json:"type"` // "unicode" + CodePoint uint16 `json:"codePoint"` + Flags uint32 `json:"flags"` + ElapsedNs uint64 `json:"elapsedNs"` +} + +type rdpMouseEnvelope struct { + Type string `json:"type"` // "mouse" + X uint16 `json:"x"` + Y uint16 `json:"y"` + Flags uint32 `json:"flags"` + WheelDelta int32 `json:"wheelDelta"` + ElapsedNs uint64 `json:"elapsedNs"` +} + +// pollTimeout bounds how long a single rdp_bridge_poll_event call blocks. +// Short enough that a Cancel on the bridge ends the drain loop quickly via +// PollEnded; long enough not to busy-wait when no events are produced. +const pollTimeout = 250 * time.Millisecond + +var errUnknownRdpEventType = errors.New("rdp: unknown event type") + +// drainBridgeEvents polls bridge events and forwards each as a +// session.TerminalEvent. Returns when ctx is cancelled, the bridge ends, or +// PollEvent returns a hard error. Designed to run in its own goroutine. +// +// Errors from individual logger calls are logged but do not stop the drain: +// dropping a single recording event is preferable to letting the bridge byte +// stream be back-pressured by a transient logger failure. +func drainBridgeEvents(ctx context.Context, b *Bridge, logger session.SessionLogger, sessionID string) { + if logger == nil { + return + } + for { + if ctx.Err() != nil { + return + } + result, ev, err := b.PollEvent(pollTimeout) + if err != nil { + log.Debug().Err(err).Str("sessionID", sessionID).Msg("rdp event drain stopped") + return + } + switch result { + case PollEnded: + return + case PollTimeout: + continue + case PollOK: + data, encErr := encodeRdpEvent(ev) + if encErr != nil { + log.Warn().Err(encErr).Str("sessionID", sessionID).Uint8("type", uint8(ev.Type)).Msg("encode RDP event") + continue + } + te := session.TerminalEvent{ + Timestamp: time.Now(), + EventType: session.TerminalEventRDP, + ChannelType: session.TerminalChannelRDP, + Data: data, + ElapsedTime: float64(ev.ElapsedNs) / 1e9, + } + if logErr := logger.LogTerminalEvent(te); logErr != nil { + log.Warn().Err(logErr).Str("sessionID", sessionID).Msg("log RDP event") + } + } + } +} + +func encodeRdpEvent(ev Event) ([]byte, error) { + switch ev.Type { + case EventTypeTargetFrame: + action := "x224" + if ev.Action == ActionFastPath { + action = "fastpath" + } + return json.Marshal(rdpTargetFrameEnvelope{ + Type: "target_frame", + Action: action, + Payload: ev.Payload, + ElapsedNs: ev.ElapsedNs, + }) + case EventTypeKeyboard: + return json.Marshal(rdpKeyboardEnvelope{ + Type: "keyboard", + Scancode: ev.Scancode, + Flags: ev.Flags, + ElapsedNs: ev.ElapsedNs, + }) + case EventTypeUnicode: + return json.Marshal(rdpUnicodeEnvelope{ + Type: "unicode", + CodePoint: ev.CodePoint, + Flags: ev.Flags, + ElapsedNs: ev.ElapsedNs, + }) + case EventTypeMouse: + return json.Marshal(rdpMouseEnvelope{ + Type: "mouse", + X: ev.X, + Y: ev.Y, + Flags: ev.Flags, + WheelDelta: ev.WheelDelta, + ElapsedNs: ev.ElapsedNs, + }) + } + return nil, errUnknownRdpEventType +} diff --git a/packages/pam/session/logger.go b/packages/pam/session/logger.go index 77c3c3e3..cfddd621 100644 --- a/packages/pam/session/logger.go +++ b/packages/pam/session/logger.go @@ -31,6 +31,7 @@ type TerminalEventType string const ( TerminalEventInput TerminalEventType = "input" // Data from user to server TerminalEventOutput TerminalEventType = "output" // Data from server to user + TerminalEventRDP TerminalEventType = "rdp" // RDP tap event (see TerminalChannelRDP) ) // TerminalChannelType represents the type of SSH channel @@ -40,6 +41,7 @@ const ( TerminalChannelShell TerminalChannelType = "terminal" // Interactive shell session TerminalChannelExec TerminalChannelType = "exec" // Single command execution TerminalChannelSFTP TerminalChannelType = "sftp" // SFTP file transfer + TerminalChannelRDP TerminalChannelType = "rdp" // RDP frame/input tap; Data carries an RDP-specific JSON envelope ) // TerminalEvent represents a single event in a terminal session @@ -305,7 +307,14 @@ func (sl *EncryptedSessionLogger) LogTerminalEvent(event TerminalEvent) error { if event.ElapsedTime == 0 { event.ElapsedTime = time.Since(sl.sessionStart).Seconds() } - event.Data = sl.applyMasking(event.Data) + // RDP carries a structured JSON envelope (with base64-encoded PDU + // bytes, scancodes, etc.) in Data, not free-form terminal text. + // Masking patterns are SSH-shaped regexes; running them over the + // envelope would corrupt valid recordings whenever a pattern + // happened to match a substring of the JSON or base64. + if event.ChannelType != TerminalChannelRDP { + event.Data = sl.applyMasking(event.Data) + } return json.Marshal(event) }) } From d9e912fc9e1ab69563e895f88c279afa332ba1a5 Mon Sep 17 00:00:00 2001 From: bernie-g Date: Wed, 6 May 2026 13:40:52 -0400 Subject: [PATCH 2/8] fix(pam-rdp): patch capabilities + anchor timestamps for replay Three fixes that together make RDP recording playback render correctly: - Filter Order, BitmapCodecs, and INFO_COMPRESSION on the wire so the server only emits Bitmap update PDUs IronRDP-session can decompress. Implemented as byte surgery on Confirm Active and Client Info PDUs; IronRDP's typed decode->encode loses unrelated fields. New cap_filter module + walk_caps + 14 unit tests pin the byte-preservation contract. - Override ev.ElapsedNs with time.Since(SessionStartedAt) in the Go drain so reconnects within the same PAM session don't restart the bridge's local clock from zero. SessionUploader exposes GetSessionStartedAt (reconstructed from the persisted lastEndElapsedMs). - Stamp chunk endElapsedMs from the last entry's elapsedTime instead of time.Since(state.startedAt) at flush moment, so the playback total doesn't reach past the last actual frame. readFromOffset returns the trailing entry's elapsed time; falls back to wallclock for non-terminal sessions whose entries lack the field. Comment cleanup pass across the touched RDP files. --- packages/pam/handlers/rdp/bridge.go | 8 +- .../pam/handlers/rdp/bridge_cgo_shared.go | 7 +- packages/pam/handlers/rdp/bridge_cgo_unix.go | 15 +- .../handlers/rdp/native/include/rdp_bridge.h | 24 +- .../pam/handlers/rdp/native/src/bridge.rs | 296 ++++++++++++++++-- .../pam/handlers/rdp/native/src/cap_filter.rs | 278 ++++++++++++++++ .../pam/handlers/rdp/native/src/config.rs | 6 +- .../pam/handlers/rdp/native/src/events.rs | 8 +- packages/pam/handlers/rdp/native/src/ffi.rs | 46 +-- packages/pam/handlers/rdp/native/src/lib.rs | 1 + packages/pam/handlers/rdp/proxy.go | 27 +- packages/pam/local/rdp-proxy.go | 30 +- packages/pam/pam-proxy.go | 16 +- packages/pam/session/uploader.go | 76 +++-- 14 files changed, 651 insertions(+), 187 deletions(-) create mode 100644 packages/pam/handlers/rdp/native/src/cap_filter.rs diff --git a/packages/pam/handlers/rdp/bridge.go b/packages/pam/handlers/rdp/bridge.go index 8e99ebc2..17970ccc 100644 --- a/packages/pam/handlers/rdp/bridge.go +++ b/packages/pam/handlers/rdp/bridge.go @@ -34,13 +34,7 @@ const ( ActionFastPath Action = 1 ) -// Event is a structured tap event drained from the bridge. -// -// Fields are reused across variants. Switch on Type: -// - Keyboard: Scancode + Flags. -// - Unicode: CodePoint + Flags. -// - Mouse: X, Y, Flags, WheelDelta. -// - TargetFrame: Action + Payload (raw PDU bytes; owned Go slice). +// Fields are reused across variants; switch on Type. type Event struct { Type EventType ElapsedNs uint64 diff --git a/packages/pam/handlers/rdp/bridge_cgo_shared.go b/packages/pam/handlers/rdp/bridge_cgo_shared.go index e825aaa3..f5181057 100644 --- a/packages/pam/handlers/rdp/bridge_cgo_shared.go +++ b/packages/pam/handlers/rdp/bridge_cgo_shared.go @@ -46,7 +46,7 @@ func (p *RDPProxy) HandleConnection(ctx context.Context, clientConn net.Conn) er drainDone := make(chan struct{}) go func() { defer close(drainDone) - drainBridgeEvents(drainCtx, bridge, p.config.SessionLogger, p.config.SessionID) + drainBridgeEvents(drainCtx, bridge, p.config.SessionLogger, p.config.SessionID, p.config.SessionStartedAt) }() defer func() { cancelDrain() @@ -112,10 +112,7 @@ func (b *Bridge) Close() error { return nil } -// IsSupported reports whether this build has a real RDP bridge. Used -// by the gateway to decide whether to advertise RDP in the capabilities -// response: a stub-build gateway that advertises support would route -// RDP sessions only to fail them at connect time. +// True when the real bridge is compiled in (vs the stub). func IsSupported() bool { return true } // PollEvent drains one tap event with the given timeout. The returned Event diff --git a/packages/pam/handlers/rdp/bridge_cgo_unix.go b/packages/pam/handlers/rdp/bridge_cgo_unix.go index 91b24d38..f940eb6d 100644 --- a/packages/pam/handlers/rdp/bridge_cgo_unix.go +++ b/packages/pam/handlers/rdp/bridge_cgo_unix.go @@ -62,19 +62,8 @@ func startWithDupedFD(dupFd int, targetHost string, targetPort uint16, username, return &Bridge{handle: uint64(handle)}, nil } -// StartWithReadWriter adapts an fd-less Go byte stream (e.g. *tls.Conn -// from the gateway's mTLS-wrapped virtual connection) to the bridge, -// which needs a real file descriptor because the Rust side uses tokio's -// TcpStream::from_raw_fd and does direct async I/O on the socket. -// -// Trick: open a loopback TCP pair. Hand one end's fd to the bridge (it -// thinks it has a real client). Keep the other end in Go and shuttle -// bytes between it and rw with two io.Copy goroutines. -// -// rw (e.g. *tls.Conn) <-io.Copy-> peer <-kernel loopback-> accepted (fd -> Rust bridge) -// -// Cost: two extra in-process copies and a loopback round-trip per byte. -// Negligible vs. the TLS + CredSSP work on either side. +// Adapts an fd-less Go byte stream to the Rust bridge (which needs a real fd +// for tokio's TcpStream::from_raw_fd) by routing through a loopback TCP pair. func StartWithReadWriter(rw io.ReadWriter, targetHost string, targetPort uint16, username, password string) (*Bridge, error) { listener, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { diff --git a/packages/pam/handlers/rdp/native/include/rdp_bridge.h b/packages/pam/handlers/rdp/native/include/rdp_bridge.h index 753dd351..888afbbd 100644 --- a/packages/pam/handlers/rdp/native/include/rdp_bridge.h +++ b/packages/pam/handlers/rdp/native/include/rdp_bridge.h @@ -1,8 +1,5 @@ -/* - * infisical-rdp-bridge C ABI. See ffi.rs for details. Lifecycle: - * start_* -> wait -> free; cancel may be called from any thread. - * start_* transfers ownership of the client fd/socket to the bridge. - */ +/* C ABI; see ffi.rs. Lifecycle: start_* -> wait -> free. start_* takes + * ownership of the client fd/socket. cancel is thread-safe. */ #ifndef INFISICAL_RDP_BRIDGE_H #define INFISICAL_RDP_BRIDGE_H @@ -59,21 +56,8 @@ int32_t rdp_bridge_free(uint64_t handle); #define RDP_EVENT_MOUSE 3 #define RDP_EVENT_TARGET_FRAME 4 -/* - * Bridge tap event surfaced to Go. - * - * Fields are reused across variants -- check `event_type` to decide which - * fields are meaningful: - * - Keyboard: value_a = scancode, flags = KeyboardFlags bits. - * - Unicode: value_a = code point, flags = KeyboardFlags bits. - * - Mouse: value_a = x, value_b = y, flags = PointerFlags bits, - * wheel_delta is signed. - * - TargetFrame: action = 0 (X.224) or 1 (FastPath); payload_ptr points - * at a heap buffer of size payload_len with the raw PDU - * bytes. The buffer was allocated with libc malloc; the Go - * caller MUST free it via C.free after copying the bytes. - * Other variants leave payload_ptr = NULL, payload_len = 0. - */ +/* Fields reused across variants; check event_type. For TargetFrame, + * payload_ptr is libc-malloc'd and the Go caller must C.free it. */ struct RdpEvent { uint8_t event_type; uint64_t elapsed_ns; diff --git a/packages/pam/handlers/rdp/native/src/bridge.rs b/packages/pam/handlers/rdp/native/src/bridge.rs index 01a41d70..c7bccf70 100644 --- a/packages/pam/handlers/rdp/native/src/bridge.rs +++ b/packages/pam/handlers/rdp/native/src/bridge.rs @@ -1,10 +1,10 @@ -//! MITM bridge. Runs acceptor + connector only through CredSSP (to inject -//! credentials), then byte-forwards between the two TLS streams. Letting -//! client and target negotiate MCS/capabilities/share-state directly -//! avoids drift that breaks strict clients (Windows App, mstsc). +//! MITM bridge. Runs acceptor + connector through CredSSP only, then byte- +//! forwards. Letting client/target negotiate MCS directly avoids drift +//! that breaks strict clients (Windows App, mstsc). +use std::borrow::Cow; use std::sync::Arc; -use std::time::Instant; +use std::time::{Duration, Instant}; use anyhow::{Context, Result}; use ironrdp_acceptor::{Acceptor, BeginResult}; @@ -16,9 +16,10 @@ use ironrdp_core::ReadCursor; use ironrdp_pdu::gcc::ConferenceCreateRequest; use ironrdp_pdu::input::fast_path::{FastPathInput, FastPathInputEvent}; use ironrdp_pdu::ironrdp_core::{decode, WriteBuf}; -use ironrdp_pdu::mcs::ConnectInitial; +use ironrdp_pdu::mcs::{ConnectInitial, SendDataRequest}; use ironrdp_pdu::nego::SecurityProtocol; use ironrdp_pdu::rdp::client_info::Credentials as AcceptorCredentials; +use ironrdp_pdu::rdp::headers::{ShareControlHeader, ShareControlPdu}; use ironrdp_pdu::x224::{X224Data, X224}; use ironrdp_pdu::Action; use ironrdp_tokio::reqwest::ReqwestNetworkClient; @@ -26,11 +27,17 @@ use ironrdp_tokio::{FramedWrite, NetworkClient}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use tokio::net::TcpStream; use tokio_util::sync::CancellationToken; -use tracing::{info, warn}; +use tracing::{debug, info, warn}; +use crate::cap_filter; use crate::config::{connector_config, DEFAULT_HEIGHT, DEFAULT_WIDTH}; use crate::events::{elapsed_ns_since, EventSender, SessionEvent}; +/// Cap on c2t PDUs to inspect before giving up on the cap filter. +const CONFIRM_ACTIVE_SCAN_MAX_PDUS: usize = 32; +/// Wall-clock cap on the cap-filter scan window. +const CONFIRM_ACTIVE_SCAN_MAX_DURATION: Duration = Duration::from_secs(5); + // The acceptor side of the bridge expects the user to type the target // username with an empty password. The real password is injected by the // connector side from the PAM vault. @@ -76,10 +83,8 @@ async fn run_mitm_inner( let (mut client_stream, client_leftover) = acceptor_output; let (mut target_stream, target_leftover) = connector_output; - // Strip virtual channels (clipboard, drives, audio, USB, etc.) from the - // client's MCS Connect Initial before forwarding. Mouse/keyboard/screen - // ride the implicit MCS I/O channel, not virtual channels, so they're - // unaffected. + // Strip virtual channels (clipboard, drives, audio, USB) from MCS Connect Initial. + // Mouse/keyboard/screen ride the implicit I/O channel and are unaffected. filter_client_mcs_connect_initial(&mut client_stream, &mut target_stream, client_leftover) .await .context("filter client MCS Connect Initial")?; @@ -102,11 +107,9 @@ async fn run_mitm_inner( .await .context("flush target stream before passthrough")?; - // Bridge PDUs end-to-end with an event tap. read_pdu is pure TPKT/FastPath - // framing -- it does not run any RDP state machine -- so this preserves - // the "no MCS/capability/share-state drift" property of the byte-level - // copy_bidirectional path it replaces. Each PDU is forwarded byte-for-byte - // before/after the tap. + // PDU-framed bridge with an event tap. read_pdu is pure TPKT/FastPath + // framing (no state machine) so this preserves the "no MCS drift" + // property of the byte-level copy_bidirectional it replaces. let client_framed = ironrdp_tokio::TokioFramed::new(client_stream); let target_framed = ironrdp_tokio::TokioFramed::new(target_stream); bridge_pdus(client_framed, target_framed, tx).await @@ -129,6 +132,12 @@ where let tx_t2c = tx; let c2t = async move { + let mut cap_filter = CapFilterState::Scanning { + started_at: Instant::now(), + pdus_seen: 0, + info_done: false, + confirm_done: false, + }; loop { let (action, frame) = match client_read.read_pdu().await { Ok(v) => v, @@ -136,8 +145,13 @@ where Err(e) => return Err::<_, anyhow::Error>(e.into()), }; tap_client_to_target(action, &frame, started_at, &tx_c2t); + + let bytes_to_forward: Vec = match cap_filter.consider(action, &frame) { + CapFilterDecision::Forward => frame.to_vec(), + CapFilterDecision::Replace(modified) => modified, + }; target_write - .write_all(&frame) + .write_all(&bytes_to_forward) .await .context("write client PDU to target")?; } @@ -169,6 +183,202 @@ where } } +/// One-shot c2t scan that patches Client Info + Client Confirm Active. +enum CapFilterState { + Scanning { + started_at: Instant, + pdus_seen: usize, + info_done: bool, + confirm_done: bool, + }, + Done, +} + +enum CapFilterDecision { + Forward, + Replace(Vec), +} + +impl CapFilterState { + fn consider(&mut self, action: Action, frame: &[u8]) -> CapFilterDecision { + let CapFilterState::Scanning { + started_at, + pdus_seen, + info_done, + confirm_done, + } = self + else { + return CapFilterDecision::Forward; + }; + + if action != Action::X224 { + return CapFilterDecision::Forward; + } + + *pdus_seen += 1; + if *pdus_seen > CONFIRM_ACTIVE_SCAN_MAX_PDUS + || started_at.elapsed() > CONFIRM_ACTIVE_SCAN_MAX_DURATION + { + warn!( + pdus_seen, + info_done = *info_done, + confirm_done = *confirm_done, + "scan window exhausted before both filters fired" + ); + *self = CapFilterState::Done; + return CapFilterDecision::Forward; + } + + // The two filters are disjoint, so a match short-circuits. + if !*info_done { + if let Some(modified) = try_filter_client_info(frame) { + *info_done = true; + let both_done = *info_done && *confirm_done; + if both_done { + *self = CapFilterState::Done; + } + return CapFilterDecision::Replace(modified); + } + } + if !*confirm_done { + if let Some(modified) = try_filter_confirm_active(frame) { + *confirm_done = true; + let both_done = *info_done && *confirm_done; + if both_done { + *self = CapFilterState::Done; + } + return CapFilterDecision::Replace(modified); + } + } + CapFilterDecision::Forward + } +} + +#[derive(Debug, Clone, Copy)] +struct ByteRange { + offset: usize, + len: usize, +} + +impl ByteRange { + fn slice<'a>(&self, frame: &'a [u8]) -> &'a [u8] { + &frame[self.offset..self.offset + self.len] + } + + fn slice_mut<'a>(&self, frame: &'a mut [u8]) -> &'a mut [u8] { + &mut frame[self.offset..self.offset + self.len] + } +} + +/// Locate `send_data.user_data` inside `frame`. Bails on Cow::Owned. +fn user_data_range_within(frame: &[u8], send_data: &SendDataRequest<'_>) -> Option { + let slice: &[u8] = match &send_data.user_data { + Cow::Borrowed(s) => *s, + Cow::Owned(_) => return None, + }; + let frame_start = frame.as_ptr() as usize; + let slice_start = slice.as_ptr() as usize; + if slice_start < frame_start || slice_start + slice.len() > frame_start + frame.len() { + return None; + } + Some(ByteRange { + offset: slice_start - frame_start, + len: slice.len(), + }) +} + +fn locate_client_info(frame: &[u8]) -> Option { + const SEC_INFO_PKT: u16 = 0x0040; + let send_data = decode::>>(frame).ok()?.0; + let user_data = user_data_range_within(frame, &send_data)?; + if user_data.len < 4 { + return None; + } + let bytes = user_data.slice(frame); + let sec_flags = u16::from_le_bytes([bytes[0], bytes[1]]); + (sec_flags & SEC_INFO_PKT != 0).then_some(user_data) +} + +struct ConfirmActiveLayout { + user_data: ByteRange, + caps_start_in_user_data: usize, +} + +fn locate_confirm_active(frame: &[u8]) -> Option { + let send_data = decode::>>(frame).ok()?.0; + let share_control = decode::(send_data.user_data.as_ref()).ok()?; + if !matches!( + share_control.share_control_pdu, + ShareControlPdu::ClientConfirmActive(_), + ) { + return None; + } + let user_data = user_data_range_within(frame, &send_data)?; + let caps_start_in_user_data = parse_confirm_active_caps_start(user_data.slice(frame))?; + Some(ConfirmActiveLayout { + user_data, + caps_start_in_user_data, + }) +} + +/// MS-RDPBCGR 2.2.1.13.2.1: ShareControlHeader(10) + originatorId(2) + +/// sourceDescLen(2) + combinedLen(2) + sourceDescriptor(var) + numCaps(2) + pad(2) +fn parse_confirm_active_caps_start(user_data: &[u8]) -> Option { + let mut p = 10 + 2; + if user_data.len() < p + 4 { + return None; + } + let source_desc_len = u16::from_le_bytes([user_data[p], user_data[p + 1]]) as usize; + p += 4 + source_desc_len + 4; + (p <= user_data.len()).then_some(p) +} + +fn try_filter_client_info(frame: &[u8]) -> Option> { + let user_data = locate_client_info(frame)?; + let mut out = frame.to_vec(); + if !cap_filter::client_info::clear_compression(user_data.slice_mut(&mut out)) { + return None; + } + debug!("Client Info PDU: cleared INFO_COMPRESSION + CompressionTypeMask"); + Some(out) +} + +fn try_filter_confirm_active(frame: &[u8]) -> Option> { + let layout = locate_confirm_active(frame)?; + let user_data_bytes = layout.user_data.slice(frame); + + let mut order_body_offset_in_frame: Option = None; + let mut codecs_body_offset_in_frame: Option = None; + for cap in cap_filter::walk_caps(user_data_bytes, layout.caps_start_in_user_data) { + let body_offset_in_frame = layout.user_data.offset + cap.body_offset_in_user_data; + match cap.cap_type { + cap_filter::cap_types::ORDER + if cap.cap_len >= cap_filter::order_cap::BODY_LEN + 4 => + { + order_body_offset_in_frame = Some(body_offset_in_frame); + } + cap_filter::cap_types::BITMAP_CODECS + if cap.cap_len >= cap_filter::bitmap_codecs_cap::MIN_BODY_LEN + 4 => + { + codecs_body_offset_in_frame = Some(body_offset_in_frame); + } + _ => {} + } + } + + // Without Order patched, server emits unrenderable Orders. + let order_offset = order_body_offset_in_frame?; + let mut out = frame.to_vec(); + cap_filter::order_cap::clear_order_support( + &mut out[order_offset..order_offset + cap_filter::order_cap::BODY_LEN], + ); + if let Some(codecs_offset) = codecs_body_offset_in_frame { + cap_filter::bitmap_codecs_cap::clear_codec_count(&mut out[codecs_offset..]); + } + debug!("Confirm Active: cleared Order support + BitmapCodecs count"); + Some(out) +} + fn tap_client_to_target(action: Action, frame: &[u8], started_at: Instant, tx: &EventSender) { if action != Action::FastPath { return; @@ -226,10 +436,7 @@ fn decode_fast_path_input(frame: &[u8]) -> anyhow::Result { FastPathInput::decode(&mut cursor).map_err(|e| anyhow::anyhow!("decode FastPathInput: {e}")) } -// Reads the client's MCS Connect Initial PDU, removes any virtual channels -// declared in its Client Network Data block, and forwards the rewritten PDU -// to the target. Any bytes after the PDU (rare; PDUs typically arrive one at -// a time at this stage) are forwarded unchanged. +// Strips virtual channels from the Client Network Data block of MCS Connect Initial. async fn filter_client_mcs_connect_initial( client_stream: &mut ErasedStream, target_stream: &mut ErasedStream, @@ -529,3 +736,50 @@ pub trait AsyncReadWrite: AsyncRead + AsyncWrite {} impl AsyncReadWrite for T where T: AsyncRead + AsyncWrite {} pub type ErasedStream = Box; + +#[cfg(test)] +mod tests { + use super::*; + + /// Build a synthetic ConfirmActive user_data prefix: + /// ShareControlHeader(10) + originatorId(2) + sourceDescLen(2) + + /// combinedLen(2) + sourceDescriptor(source_desc_len) + numCaps(2) + pad(2) + fn confirm_active_prefix(source_desc_len: usize) -> Vec { + let mut buf = vec![0xAA_u8; 10 + 2]; + buf.extend_from_slice(&(source_desc_len as u16).to_le_bytes()); + buf.extend_from_slice(&0xBBBB_u16.to_le_bytes()); + buf.extend_from_slice(&vec![0xCC; source_desc_len]); + buf.extend_from_slice(&0xDDDD_u16.to_le_bytes()); + buf.extend_from_slice(&0xEEEE_u16.to_le_bytes()); + buf + } + + #[test] + fn caps_start_after_variable_source_descriptor() { + let user_data = confirm_active_prefix(6); + let p = parse_confirm_active_caps_start(&user_data).expect("caps start"); + assert_eq!(p, 12 + 4 + 6 + 4); + assert_eq!(p, user_data.len()); + } + + #[test] + fn caps_start_works_when_source_descriptor_is_empty() { + let user_data = confirm_active_prefix(0); + let p = parse_confirm_active_caps_start(&user_data).expect("caps start"); + assert_eq!(p, 12 + 4 + 0 + 4); + } + + #[test] + fn caps_start_returns_none_when_header_truncated() { + let user_data = vec![0u8; 15]; + assert!(parse_confirm_active_caps_start(&user_data).is_none()); + } + + #[test] + fn caps_start_returns_none_when_source_desc_len_overflows() { + let mut user_data = vec![0u8; 12]; + user_data.extend_from_slice(&9999_u16.to_le_bytes()); + user_data.extend_from_slice(&[0u8; 2]); + assert!(parse_confirm_active_caps_start(&user_data).is_none()); + } +} diff --git a/packages/pam/handlers/rdp/native/src/cap_filter.rs b/packages/pam/handlers/rdp/native/src/cap_filter.rs new file mode 100644 index 00000000..04af6a2b --- /dev/null +++ b/packages/pam/handlers/rdp/native/src/cap_filter.rs @@ -0,0 +1,278 @@ +//! Byte-level patches for Confirm Active / Client Info PDUs. +//! IronRDP's typed decode->encode loses unrelated fields, so we mutate in place. + +/// MS-RDPBCGR 2.2.7 +pub mod cap_types { + pub const ORDER: u16 = 0x0003; + pub const BITMAP_CODECS: u16 = 0x001d; +} + +/// MS-RDPBCGR 2.2.7.1.3 +pub mod order_cap { + use std::ops::Range; + + pub const BODY_LEN: usize = 84; + pub const ORDER_SUPPORT: Range = 32..64; + + /// Forces server to fall back to Bitmap updates. + /// orderFlags untouched so NEGOTIATEORDERSUPPORT (mandatory) stays set. + pub fn clear_order_support(body: &mut [u8]) { + body[ORDER_SUPPORT].fill(0); + } +} + +/// MS-RDPBCGR 2.2.7.2.10 +pub mod bitmap_codecs_cap { + pub const CODEC_COUNT_OFFSET: usize = 0; + pub const MIN_BODY_LEN: usize = 1; + + /// Prevents server from picking RFX/NSCodec/AVC. + pub fn clear_codec_count(body: &mut [u8]) { + body[CODEC_COUNT_OFFSET] = 0; + } +} + +/// MS-RDPBCGR 2.2.1.11.1.1, given user_data of an MCS Send Data Request +/// whose security header has SEC_INFO_PKT set. +pub mod client_info { + use std::ops::Range; + + /// 4 bytes security header + 4 bytes CodePage. + pub const FLAGS: Range = 8..12; + pub const INFO_COMPRESSION: u32 = 0x0000_0080; + pub const COMPRESSION_TYPE_MASK: u32 = 0x0000_1E00; + + /// Disables MPPC bulk compression (IronRDP-session can't decompress it). + pub fn clear_compression(user_data: &mut [u8]) -> bool { + if user_data.len() < FLAGS.end { + return false; + } + let bytes: [u8; 4] = match user_data[FLAGS.clone()].try_into() { + Ok(b) => b, + Err(_) => return false, + }; + let flags = u32::from_le_bytes(bytes); + let new_flags = flags & !(INFO_COMPRESSION | COMPRESSION_TYPE_MASK); + if flags == new_flags { + return false; + } + user_data[FLAGS.clone()].copy_from_slice(&new_flags.to_le_bytes()); + true + } +} + +#[derive(Debug, Clone, Copy)] +pub struct WalkedCap { + pub cap_type: u16, + pub cap_len: usize, + pub body_offset_in_user_data: usize, +} + +/// Stops on a malformed cap header. +pub fn walk_caps(user_data: &[u8], caps_start: usize) -> CapIter<'_> { + CapIter { + user_data, + cursor: caps_start, + } +} + +pub struct CapIter<'a> { + user_data: &'a [u8], + cursor: usize, +} + +impl<'a> Iterator for CapIter<'a> { + type Item = WalkedCap; + + fn next(&mut self) -> Option { + if self.cursor + 4 > self.user_data.len() { + return None; + } + let cap_type = u16::from_le_bytes([ + self.user_data[self.cursor], + self.user_data[self.cursor + 1], + ]); + let cap_len = u16::from_le_bytes([ + self.user_data[self.cursor + 2], + self.user_data[self.cursor + 3], + ]) as usize; + if cap_len < 4 || self.cursor + cap_len > self.user_data.len() { + return None; + } + let item = WalkedCap { + cap_type, + cap_len, + body_offset_in_user_data: self.cursor + 4, + }; + self.cursor += cap_len; + Some(item) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn order_clear_zeros_only_the_support_array() { + let mut body = vec![0xff_u8; order_cap::BODY_LEN]; + order_cap::clear_order_support(&mut body); + assert_eq!(&body[order_cap::ORDER_SUPPORT], &[0; 32]); + assert_eq!(&body[28..32], &[0xff; 4]); + assert_eq!(&body[64..68], &[0xff; 4]); + } + + #[test] + fn bitmap_codecs_clears_only_first_byte() { + let mut body = vec![0xff_u8; 16]; + bitmap_codecs_cap::clear_codec_count(&mut body); + assert_eq!(body[0], 0); + assert_eq!(&body[1..], &[0xff; 15]); + } + + #[test] + fn client_info_clears_compression_bits() { + let mut user_data = vec![0u8; 12]; + user_data[8..12].copy_from_slice(&0x0000_1E80_u32.to_le_bytes()); + assert!(client_info::clear_compression(&mut user_data)); + let new_flags = u32::from_le_bytes(user_data[8..12].try_into().unwrap()); + assert_eq!(new_flags, 0); + } + + #[test] + fn client_info_noop_when_compression_already_off() { + let mut user_data = vec![0u8; 12]; + user_data[8..12].copy_from_slice(&0x0000_0040_u32.to_le_bytes()); + assert!(!client_info::clear_compression(&mut user_data)); + } + + #[test] + fn client_info_returns_false_when_user_data_too_short() { + let mut user_data = vec![0u8; 11]; + assert!(!client_info::clear_compression(&mut user_data)); + } + + #[test] + fn client_info_preserves_unrelated_flag_bits() { + let mut user_data = vec![0xAB_u8; 12]; + // INFO_COMPRESSION + CompressionTypeMask + INFO_AUTOLOGON(0x0008) + INFO_UNICODE(0x0010) + let original = 0x0000_1E80_u32 | 0x0000_0008 | 0x0000_0010; + user_data[8..12].copy_from_slice(&original.to_le_bytes()); + assert!(client_info::clear_compression(&mut user_data)); + let new_flags = u32::from_le_bytes(user_data[8..12].try_into().unwrap()); + assert_eq!(new_flags, 0x0000_0008 | 0x0000_0010); + assert_eq!(&user_data[..8], &[0xAB; 8]); + } + + #[test] + fn walk_caps_iterates_each_cap() { + let mut user_data = vec![0u8; 8]; + user_data.extend_from_slice(&[0x01, 0x00, 0x08, 0x00, 0xaa, 0xbb, 0xcc, 0xdd]); + user_data.extend_from_slice(&[ + 0x03, 0x00, 0x0c, 0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, + ]); + let caps: Vec<_> = walk_caps(&user_data, 8).collect(); + assert_eq!(caps.len(), 2); + assert_eq!(caps[0].cap_type, 0x0001); + assert_eq!(caps[0].cap_len, 8); + assert_eq!(caps[0].body_offset_in_user_data, 12); + assert_eq!(caps[1].cap_type, 0x0003); + assert_eq!(caps[1].cap_len, 12); + assert_eq!(caps[1].body_offset_in_user_data, 20); + } + + #[test] + fn walk_caps_stops_on_malformed_header() { + let mut user_data = vec![0u8; 4]; + user_data.extend_from_slice(&[0x01, 0x00, 0x64, 0x00]); + let caps: Vec<_> = walk_caps(&user_data, 4).collect(); + assert_eq!(caps.len(), 0); + } + + #[test] + fn walk_caps_stops_on_cap_len_below_header_size() { + let user_data = vec![0x01, 0x00, 0x02, 0x00]; + let caps: Vec<_> = walk_caps(&user_data, 0).collect(); + assert_eq!(caps.len(), 0); + } + + /// End-to-end byte-preservation contract: walk a synthetic caps array + /// containing Order, BitmapCodecs, and an unrelated cap; patch only + /// the targeted fields; assert every other byte is identical. + #[test] + fn walk_and_patch_preserves_unrelated_bytes() { + let mut buf: Vec = Vec::new(); + + // Cap 1: unrelated cap_type=0x0001, len=8, body filled with 0x77 + buf.extend_from_slice(&[0x01, 0x00, 0x08, 0x00]); + buf.extend_from_slice(&[0x77; 4]); + let unrelated_range = 0..buf.len(); + + // Cap 2: Order (0x0003), full body of 0xFF + 4-byte header + let order_header_offset = buf.len(); + let order_total_len = (order_cap::BODY_LEN + 4) as u16; + buf.extend_from_slice(&[0x03, 0x00]); + buf.extend_from_slice(&order_total_len.to_le_bytes()); + let order_body_offset = buf.len(); + buf.extend_from_slice(&vec![0xFF; order_cap::BODY_LEN]); + + // Cap 3: BitmapCodecs (0x001d), 4-byte header + body of 0xEE + let codecs_header_offset = buf.len(); + let codecs_body_len = 16usize; + buf.extend_from_slice(&[0x1D, 0x00]); + buf.extend_from_slice(&((codecs_body_len + 4) as u16).to_le_bytes()); + let codecs_body_offset = buf.len(); + buf.extend_from_slice(&vec![0xEE; codecs_body_len]); + + // Cap 4: trailing unrelated cap (filter must not stop early or read past it) + let trailing_offset = buf.len(); + buf.extend_from_slice(&[0x02, 0x00, 0x06, 0x00, 0x55, 0x55]); + + let original = buf.clone(); + + let caps: Vec<_> = walk_caps(&buf, 0).collect(); + assert_eq!(caps.len(), 4); + assert_eq!(caps[0].body_offset_in_user_data, order_header_offset - 4); + assert_eq!(caps[1].cap_type, cap_types::ORDER); + assert_eq!(caps[1].body_offset_in_user_data, order_body_offset); + assert_eq!(caps[2].cap_type, cap_types::BITMAP_CODECS); + assert_eq!(caps[2].body_offset_in_user_data, codecs_body_offset); + assert_eq!(caps[3].body_offset_in_user_data, trailing_offset + 4); + + order_cap::clear_order_support( + &mut buf[order_body_offset..order_body_offset + order_cap::BODY_LEN], + ); + bitmap_codecs_cap::clear_codec_count(&mut buf[codecs_body_offset..]); + + // Unrelated cap: byte-identical + assert_eq!(&buf[unrelated_range.clone()], &original[unrelated_range]); + // Order cap: header preserved, only ORDER_SUPPORT range zeroed + assert_eq!( + &buf[order_header_offset..order_body_offset], + &original[order_header_offset..order_body_offset] + ); + let zeroed_start = order_body_offset + order_cap::ORDER_SUPPORT.start; + let zeroed_end = order_body_offset + order_cap::ORDER_SUPPORT.end; + assert_eq!( + &buf[order_body_offset..zeroed_start], + &original[order_body_offset..zeroed_start] + ); + assert_eq!(&buf[zeroed_start..zeroed_end], &[0u8; 32]); + assert_eq!( + &buf[zeroed_end..codecs_header_offset], + &original[zeroed_end..codecs_header_offset] + ); + // BitmapCodecs cap: header preserved, only first body byte zeroed + assert_eq!( + &buf[codecs_header_offset..codecs_body_offset], + &original[codecs_header_offset..codecs_body_offset] + ); + assert_eq!(buf[codecs_body_offset], 0); + assert_eq!( + &buf[codecs_body_offset + 1..trailing_offset], + &original[codecs_body_offset + 1..trailing_offset] + ); + // Trailing cap: byte-identical + assert_eq!(&buf[trailing_offset..], &original[trailing_offset..]); + } +} diff --git a/packages/pam/handlers/rdp/native/src/config.rs b/packages/pam/handlers/rdp/native/src/config.rs index b1f9a77a..f7588e4b 100644 --- a/packages/pam/handlers/rdp/native/src/config.rs +++ b/packages/pam/handlers/rdp/native/src/config.rs @@ -17,10 +17,8 @@ pub fn connector_config(username: String, password: String) -> Config { }, desktop_scale_factor: 0, - // Advertise HYBRID_EX|HYBRID|SSL to match what native clients send. - // Windows App validates the target's echoed clientRequestedProtocols - // against what it sent on the acceptor side; if the sets diverge it - // disconnects right after Connect Response. + // Match native client's HYBRID_EX|HYBRID|SSL set; Windows App validates the + // target echo against what it sent and disconnects on divergence. enable_tls: true, enable_credssp: true, diff --git a/packages/pam/handlers/rdp/native/src/events.rs b/packages/pam/handlers/rdp/native/src/events.rs index e1298b07..ffb10fd3 100644 --- a/packages/pam/handlers/rdp/native/src/events.rs +++ b/packages/pam/handlers/rdp/native/src/events.rs @@ -1,9 +1,5 @@ -//! Structured session events emitted by the bridge's PDU tap. -//! -//! Keyboard / unicode / mouse events are decoded from FastPath input PDUs on -//! the client->target path. TargetFrame events carry the full raw PDU bytes -//! exactly as they came off the wire on the target->client path; decoding -//! (RLE, 16bpp->RGBA, etc.) happens at replay time in the browser. +//! Bridge tap events. Input is FastPath-decoded c2t; TargetFrame is raw t2c +//! PDU bytes (decoded at replay time in the browser). use std::time::Instant; diff --git a/packages/pam/handlers/rdp/native/src/ffi.rs b/packages/pam/handlers/rdp/native/src/ffi.rs index dd0c6bd3..d178bfaa 100644 --- a/packages/pam/handlers/rdp/native/src/ffi.rs +++ b/packages/pam/handlers/rdp/native/src/ffi.rs @@ -1,12 +1,5 @@ -//! C ABI for the bridge. Called from Go via CGo. -//! -//! Each session runs on its own OS thread with a current-thread tokio -//! runtime. `start_*` transfers ownership of the client fd/socket to -//! Rust (Go hands in a dup). Contract: wait, then free. -//! -//! Events: the bridge taps each forwarded PDU and emits structured events -//! (keyboard / unicode / mouse / target frame) that Go drains via -//! `rdp_bridge_poll_event`. +//! C ABI for the bridge. Each session runs on its own thread with a +//! current-thread tokio runtime. Caller contract: wait, then free. use std::collections::HashMap; use std::ffi::{c_char, CStr}; @@ -31,9 +24,8 @@ pub const RDP_BRIDGE_INVALID_HANDLE: i32 = -1; pub const RDP_BRIDGE_BAD_ARG: i32 = -2; pub const RDP_BRIDGE_RUNTIME_ERROR: i32 = -3; -// Poll return codes -- distinct number space from the bridge status codes -// above, even though their numeric values overlap, because they're consumed -// by a different Go function. +// Distinct number space from the bridge status codes above; consumed by +// a different Go function. pub const RDP_POLL_OK: i32 = 0; pub const RDP_POLL_TIMEOUT: i32 = 1; pub const RDP_POLL_ENDED: i32 = 2; @@ -47,13 +39,8 @@ pub enum RdpEventType { TargetFrame = 4, } -/// C-ABI friendly event. Fields are reused across variants; check -/// `event_type` to decide which fields are meaningful. -/// -/// For TargetFrame, `payload_ptr` points at a `libc::malloc`-allocated buffer -/// of size `payload_len`. The Go caller takes ownership and must -/// `libc::free(payload_ptr)` after copying the bytes out. Other variants set -/// `payload_ptr = NULL` and `payload_len = 0`. +/// Fields are reused across variants; check `event_type` first. +/// For TargetFrame, `payload_ptr` is libc::malloc'd; Go must libc::free it. #[repr(C)] pub struct RdpEvent { pub event_type: u8, @@ -381,18 +368,8 @@ pub extern "C" fn rdp_bridge_free(handle: u64) -> i32 { } } -/// Poll the next event, blocking up to `timeout_ms` milliseconds. -/// -/// Returns: -/// * `RDP_POLL_OK` -- event written to *out (caller owns *payload_ptr* if -/// non-null and must `libc::free` it). -/// * `RDP_POLL_TIMEOUT` -- no event in time; *out not modified. -/// * `RDP_POLL_ENDED` -- bridge finished; no more events. -/// * `RDP_POLL_INVALID_HANDLE` -- unknown or already-closed handle. -/// -/// # Safety -/// -/// `out` must be a non-null, writable `*mut RdpEvent`. +/// Poll the next event, blocking up to `timeout_ms` ms. On RDP_POLL_OK, +/// caller owns *payload_ptr (must libc::free). #[no_mangle] pub unsafe extern "C" fn rdp_bridge_poll_event( handle: u64, @@ -403,9 +380,7 @@ pub unsafe extern "C" fn rdp_bridge_poll_event( return RDP_POLL_INVALID_HANDLE; } - // Take the receiver out of the entry so we don't hold the HANDLES lock - // across the await. Put it back at the end (or leave None and mark - // ended). + // Avoid holding the HANDLES lock across the await. let take_result: Result>, i32> = { let handles = HANDLES.lock().expect("HANDLES poisoned"); match handles.get(&handle) { @@ -421,8 +396,7 @@ pub unsafe extern "C" fn rdp_bridge_poll_event( }; let mut rx = match take_result { Ok(Some(rx)) => rx, - // Another poll is already in flight on this handle. Treat as - // invalid: callers should serialize their poll calls. + // Concurrent poll on the same handle; callers must serialize. Ok(None) => return RDP_POLL_INVALID_HANDLE, Err(code) => return code, }; diff --git a/packages/pam/handlers/rdp/native/src/lib.rs b/packages/pam/handlers/rdp/native/src/lib.rs index 3f37292d..abb6f0bd 100644 --- a/packages/pam/handlers/rdp/native/src/lib.rs +++ b/packages/pam/handlers/rdp/native/src/lib.rs @@ -3,6 +3,7 @@ //! passes bytes through. pub mod bridge; +pub mod cap_filter; pub mod config; pub mod events; pub mod ffi; diff --git a/packages/pam/handlers/rdp/proxy.go b/packages/pam/handlers/rdp/proxy.go index a57d4583..b5220f60 100644 --- a/packages/pam/handlers/rdp/proxy.go +++ b/packages/pam/handlers/rdp/proxy.go @@ -18,6 +18,10 @@ type RDPProxyConfig struct { InjectPassword string SessionID string SessionLogger session.SessionLogger + // Session-anchored origin for elapsedNs. The Rust bridge restarts its + // own clock per RDP client connection; we rewrite each event's elapsedNs + // against this anchor so timestamps stay monotonic across reconnects. + SessionStartedAt time.Time } type RDPProxy struct { @@ -28,10 +32,7 @@ func NewRDPProxy(config RDPProxyConfig) *RDPProxy { return &RDPProxy{config: config} } -// Wire-format JSON envelopes carried inside session.TerminalEvent.Data when -// ChannelType == TerminalChannelRDP. The frontend RDP player decodes these -// after AAD-bound chunk decryption and feeds target_frame payloads into the -// IronRDP WASM decoder. +// Wire envelopes carried inside TerminalEvent.Data for ChannelType=RDP. type rdpTargetFrameEnvelope struct { Type string `json:"type"` // "target_frame" Action string `json:"action"` // "x224" | "fastpath" @@ -62,21 +63,14 @@ type rdpMouseEnvelope struct { ElapsedNs uint64 `json:"elapsedNs"` } -// pollTimeout bounds how long a single rdp_bridge_poll_event call blocks. -// Short enough that a Cancel on the bridge ends the drain loop quickly via -// PollEnded; long enough not to busy-wait when no events are produced. +// Bounds bridge poll latency so Cancel ends the drain loop promptly. const pollTimeout = 250 * time.Millisecond var errUnknownRdpEventType = errors.New("rdp: unknown event type") -// drainBridgeEvents polls bridge events and forwards each as a -// session.TerminalEvent. Returns when ctx is cancelled, the bridge ends, or -// PollEvent returns a hard error. Designed to run in its own goroutine. -// -// Errors from individual logger calls are logged but do not stop the drain: -// dropping a single recording event is preferable to letting the bridge byte -// stream be back-pressured by a transient logger failure. -func drainBridgeEvents(ctx context.Context, b *Bridge, logger session.SessionLogger, sessionID string) { +// Logger errors are warned but don't stop the drain; dropping one event is +// better than back-pressuring the bridge byte stream. +func drainBridgeEvents(ctx context.Context, b *Bridge, logger session.SessionLogger, sessionID string, sessionStartedAt time.Time) { if logger == nil { return } @@ -95,6 +89,9 @@ func drainBridgeEvents(ctx context.Context, b *Bridge, logger session.SessionLog case PollTimeout: continue case PollOK: + if !sessionStartedAt.IsZero() { + ev.ElapsedNs = uint64(time.Since(sessionStartedAt).Nanoseconds()) + } data, encErr := encodeRdpEvent(ev) if encErr != nil { log.Warn().Err(encErr).Str("sessionID", sessionID).Uint8("type", uint8(ev.Type)).Msg("encode RDP event") diff --git a/packages/pam/local/rdp-proxy.go b/packages/pam/local/rdp-proxy.go index af3b43ef..68760d25 100644 --- a/packages/pam/local/rdp-proxy.go +++ b/packages/pam/local/rdp-proxy.go @@ -18,22 +18,15 @@ import ( "github.com/rs/zerolog/log" ) -// RDPProxyServer exposes a local loopback TCP listener that tunnels bytes -// to the gateway's RDP MITM bridge via the existing mTLS + SSH relay. The -// user's RDP client connects to the loopback port; the gateway takes care -// of credential injection and forwarding to the Windows target. +// Loopback listener that tunnels RDP client traffic to the gateway's MITM bridge. type RDPProxyServer struct { BaseProxyServer server net.Listener port int - rdpFilePath string // path to the generated .rdp file, if any + rdpFilePath string } -// StartRDPLocalProxy is the CLI entry point for `infisical pam rdp access`. -// It creates a PAM session with the backend, binds a loopback listener, -// writes a .rdp file pointing at that loopback, optionally launches the -// user's default RDP client, and forwards accepted connections to the -// gateway. +// CLI entry point for `infisical pam rdp access`. func StartRDPLocalProxy(accessToken string, accessParams PAMAccessParams, projectID string, durationStr string, port int, noLaunch bool) { log.Info().Msgf("Starting RDP proxy for account: %s", accessParams.GetDisplayName()) log.Info().Msgf("Session duration: %s", durationStr) @@ -164,10 +157,8 @@ func (p *RDPProxyServer) gracefulShutdown() { p.shutdownOnce.Do(func() { log.Info().Msg("Starting graceful shutdown of RDP proxy...") - // Remove the .rdp file first: p.cancel() below unblocks Run(), - // which returns to main, which may exit before the rest of this - // goroutine completes. Do the cleanup that has to happen before - // anything that could let main race ahead. + // p.cancel() below can return main before this goroutine finishes; + // remove the .rdp file before risking that race. if p.rdpFilePath != "" { if err := os.Remove(p.rdpFilePath); err != nil && !os.IsNotExist(err) { log.Debug().Err(err).Str("path", p.rdpFilePath).Msg("Failed to remove .rdp file on exit") @@ -308,15 +299,8 @@ func (p *RDPProxyServer) handleConnection(clientConn net.Conn) { log.Info().Msgf("RDP connection closed for client: %s", clientConn.RemoteAddr().String()) } -// writeRDPFile creates a .rdp file pointing at the local loopback -// listener. Files live under `~/.infisical/rdp/` to match the CLI's -// existing convention for per-user state (alongside the login config -// and update-check cache). Filename includes the session ID so -// concurrent sessions don't collide. The file is removed on graceful -// shutdown (see gracefulShutdown) since the embedded loopback port -// becomes invalid as soon as the CLI exits; reopening the file later -// would just dial a dead port. -// Falls back to the OS temp dir if the home directory can't be resolved. +// Generates a per-session .rdp file under ~/.infisical/rdp/ pointing at +// the loopback listener. Removed on graceful shutdown. func writeRDPFile(listenPort int, sessionID, username string) (string, error) { filename := fmt.Sprintf("infisical-rdp-%s.rdp", sessionID) diff --git a/packages/pam/pam-proxy.go b/packages/pam/pam-proxy.go index 0cd6c29e..567e08e2 100644 --- a/packages/pam/pam-proxy.go +++ b/packages/pam/pam-proxy.go @@ -417,13 +417,17 @@ func HandlePAMProxy(ctx context.Context, conn *tls.Conn, pamConfig *GatewayPAMCo if credentials.Port <= 0 || credentials.Port > 65535 { return fmt.Errorf("rdp: target port %d out of range", credentials.Port) } + // Anchor event timestamps to the session-level start so reconnects + // within the same PAM session don't restart elapsedNs from zero. + sessionStartedAt, _ := pamConfig.SessionUploader.GetSessionStartedAt(pamConfig.SessionId) rdpConfig := rdp.RDPProxyConfig{ - TargetHost: credentials.Host, - TargetPort: uint16(credentials.Port), - InjectUsername: credentials.Username, - InjectPassword: credentials.Password, - SessionID: pamConfig.SessionId, - SessionLogger: sessionLogger, + TargetHost: credentials.Host, + TargetPort: uint16(credentials.Port), + InjectUsername: credentials.Username, + InjectPassword: credentials.Password, + SessionID: pamConfig.SessionId, + SessionLogger: sessionLogger, + SessionStartedAt: sessionStartedAt, } proxy := rdp.NewRDPProxy(rdpConfig) log.Info(). diff --git a/packages/pam/session/uploader.go b/packages/pam/session/uploader.go index 6f43781c..5d016f72 100644 --- a/packages/pam/session/uploader.go +++ b/packages/pam/session/uploader.go @@ -273,27 +273,27 @@ func deletePersistedOffset(filename string) { _ = os.Remove(offsetFilePath(filename)) } -// readFromOffset reads length-prefixed encrypted records from filename starting at offset, -// decrypts each, and returns them as a JSON array payload plus the new file offset. -// When maxPayloadBytes > 0, stops accumulating once the next entry would push the serialized JSON array past that limit -// Returns nil payload (and the unchanged offset) if there are no new records. -func readFromOffset(filename, encryptionKey string, offset int64, maxPayloadBytes int) ([]byte, int64, error) { +// Returns (payload JSON array, new offset, last entry's elapsedMs, err). +// lastEntryElapsedMs is 0 if entries lack the field. maxPayloadBytes>0 +// caps the JSON array size; caller loops for the rest. +func readFromOffset(filename, encryptionKey string, offset int64, maxPayloadBytes int) ([]byte, int64, int64, error) { recordingDir := GetSessionRecordingDir() fullPath := filepath.Join(recordingDir, filename) file, err := os.Open(fullPath) if err != nil { - return nil, offset, fmt.Errorf("failed to open session file: %w", err) + return nil, offset, 0, fmt.Errorf("failed to open session file: %w", err) } defer file.Close() if _, err := file.Seek(offset, io.SeekStart); err != nil { - return nil, offset, fmt.Errorf("failed to seek to offset %d: %w", offset, err) + return nil, offset, 0, fmt.Errorf("failed to seek to offset %d: %w", offset, err) } var entries []json.RawMessage newOffset := offset runningSize := 2 // account for JSON array brackets [] + var lastEntryElapsedMs int64 for { lengthBytes := make([]byte, 4) @@ -301,7 +301,7 @@ func readFromOffset(filename, encryptionKey string, offset int64, maxPayloadByte if err == io.EOF || err == io.ErrUnexpectedEOF { break // No more complete records } - return nil, newOffset, fmt.Errorf("failed to read length prefix: %w", err) + return nil, newOffset, 0, fmt.Errorf("failed to read length prefix: %w", err) } length := binary.BigEndian.Uint32(lengthBytes) @@ -312,7 +312,7 @@ func readFromOffset(filename, encryptionKey string, offset int64, maxPayloadByte decryptedData, err := DecryptData(encryptedData, encryptionKey) if err != nil { - return nil, newOffset, fmt.Errorf("failed to decrypt record at offset %d: %w", newOffset, err) + return nil, newOffset, 0, fmt.Errorf("failed to decrypt record at offset %d: %w", newOffset, err) } entrySize := len(decryptedData) @@ -323,21 +323,40 @@ func readFromOffset(filename, encryptionKey string, offset int64, maxPayloadByte break // would exceed budget; caller will loop for the rest } + // Probe the entry's elapsedTime field. Absent on non-terminal events. + var probe struct { + ElapsedTime float64 `json:"elapsedTime"` + } + if jsonErr := json.Unmarshal(decryptedData, &probe); jsonErr == nil && probe.ElapsedTime > 0 { + lastEntryElapsedMs = int64(probe.ElapsedTime * 1000) + } + entries = append(entries, json.RawMessage(decryptedData)) newOffset += int64(4 + length) runningSize += entrySize } if len(entries) == 0 { - return nil, newOffset, nil + return nil, newOffset, 0, nil } payload, err := json.Marshal(entries) if err != nil { - return nil, newOffset, fmt.Errorf("failed to marshal event batch: %w", err) + return nil, newOffset, 0, fmt.Errorf("failed to marshal event batch: %w", err) } - return payload, newOffset, nil + return payload, newOffset, lastEntryElapsedMs, nil +} + +// Stable across gateway restarts and per-connection bridge restarts. +func (su *SessionUploader) GetSessionStartedAt(sessionID string) (time.Time, bool) { + su.activeSessionsMu.RLock() + defer su.activeSessionsMu.RUnlock() + state, ok := su.activeSessions[sessionID] + if !ok { + return time.Time{}, false + } + return state.startedAt, true } // RegisterSession registers a session for incremental batch uploads, resuming from @@ -415,12 +434,8 @@ func (su *SessionUploader) startUploadRoutine() { }() } -// resumeInProgressSessions re-registers non-expired recording files into the upload loop at startup. -// A gateway restart kills all proxy connections, so any file on disk is from a session that is -// already over from the customer's perspective. Re-registering restores offset tracking so the -// ticker-based flush and chunk reconciliation can drive uploads to completion over subsequent ticks. -// Already-expired files are skipped here and handled exclusively by uploadExpiredSessionFiles -// to avoid duplicate back-to-back cleanup attempts on the same file at startup. +// Re-registers non-expired recording files at startup so the flush ticker +// can drain them. Expired files are handled by uploadExpiredSessionFiles. func (su *SessionUploader) resumeInProgressSessions() { allFiles, err := ListSessionFiles() if err != nil { @@ -494,10 +509,7 @@ func (su *SessionUploader) flushActiveSessions() { } } -// flushSession reads new events from the session recording file since the last uploaded offset, -// uploads them as a batch, and advances the offset on success. Returns nil when there is nothing -// to do (session not registered, already in legacy mode, no new events) or when a 404 cleanly -// transitions the session to legacy mode; the caller treats those as success. +// Uploads new events as a batch and advances the offset on success. func (su *SessionUploader) flushSession(sessionID, encryptionKey string) error { su.activeSessionsMu.RLock() state, ok := su.activeSessions[sessionID] @@ -518,7 +530,7 @@ func (su *SessionUploader) flushSession(sessionID, encryptionKey string) error { currentOffset := state.fileOffset for { - payload, newOffset, err := readFromOffset(state.filename, encryptionKey, currentOffset, pamRecordingMaxPlaintextBytes) + payload, newOffset, lastEntryElapsedMs, err := readFromOffset(state.filename, encryptionKey, currentOffset, pamRecordingMaxPlaintextBytes) if err != nil { log.Error().Err(err).Str("sessionId", sessionID).Msg("Failed to read session events for chunk upload") break @@ -527,7 +539,12 @@ func (su *SessionUploader) flushSession(sessionID, encryptionKey string) error { break } - endElapsedMs := time.Since(state.startedAt).Milliseconds() + // Prefer the last event's actual elapsedTime; fall back to wallclock for + // non-terminal sessions whose entries lack the field (HTTP, Kubernetes). + endElapsedMs := lastEntryElapsedMs + if endElapsedMs <= startElapsedMs { + endElapsedMs = time.Since(state.startedAt).Milliseconds() + } pc, encErr := su.chunkUploader.EncryptAndQueueChunk(sessionID, payload, startElapsedMs, endElapsedMs) if encErr != nil { @@ -551,7 +568,7 @@ func (su *SessionUploader) flushSession(sessionID, encryptionKey string) error { return nil } - payload, newOffset, err := readFromOffset(state.filename, encryptionKey, state.fileOffset, 0) + payload, newOffset, _, err := readFromOffset(state.filename, encryptionKey, state.fileOffset, 0) if err != nil { log.Error().Err(err).Str("sessionId", sessionID).Msg("Failed to read session events for batch upload") return err @@ -700,10 +717,8 @@ func (su *SessionUploader) CleanupPAMSession(sessionID string, reason string) er su.RegisterSession(sessionID) } - // Final flush: upload any remaining events before we delete the file. Any failure on this path - // (key fetch, batch flush, or legacy bulk upload) returns early with the recording file, registry - // entry, and persisted offset intact so uploadExpiredSessionFiles can retry once the file crosses - // ExpiresAt. Deleting on failure would lose unuploaded events unrecoverably. + // On any failure here, return early so uploadExpiredSessionFiles can retry + // past ExpiresAt; deleting the file on failure would lose events. encryptionKey, err := su.credentialsManager.GetPAMSessionEncryptionKey() if err != nil { log.Error().Err(err).Str("sessionId", sessionID).Msg("Could not get encryption key for final flush, keeping recording file for retry") @@ -714,8 +729,7 @@ func (su *SessionUploader) CleanupPAMSession(sessionID string, reason string) er return flushErr } - // If the batch endpoint was not supported (or this session was already in legacy mode), - // fall back to a single bulk upload of the whole file. + // Legacy fallback: single bulk upload of the whole file. su.activeSessionsMu.RLock() state, stateExists := su.activeSessions[sessionID] su.activeSessionsMu.RUnlock() From f9164946b67d0625c2f8d68f06eaedd7a4ec522f Mon Sep 17 00:00:00 2001 From: bernie-g Date: Wed, 6 May 2026 13:47:08 -0400 Subject: [PATCH 3/8] style(pam-rdp): cargo fmt --- packages/pam/handlers/rdp/native/src/bridge.rs | 4 +--- packages/pam/handlers/rdp/native/src/cap_filter.rs | 6 ++---- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/packages/pam/handlers/rdp/native/src/bridge.rs b/packages/pam/handlers/rdp/native/src/bridge.rs index c7bccf70..da866ac2 100644 --- a/packages/pam/handlers/rdp/native/src/bridge.rs +++ b/packages/pam/handlers/rdp/native/src/bridge.rs @@ -352,9 +352,7 @@ fn try_filter_confirm_active(frame: &[u8]) -> Option> { for cap in cap_filter::walk_caps(user_data_bytes, layout.caps_start_in_user_data) { let body_offset_in_frame = layout.user_data.offset + cap.body_offset_in_user_data; match cap.cap_type { - cap_filter::cap_types::ORDER - if cap.cap_len >= cap_filter::order_cap::BODY_LEN + 4 => - { + cap_filter::cap_types::ORDER if cap.cap_len >= cap_filter::order_cap::BODY_LEN + 4 => { order_body_offset_in_frame = Some(body_offset_in_frame); } cap_filter::cap_types::BITMAP_CODECS diff --git a/packages/pam/handlers/rdp/native/src/cap_filter.rs b/packages/pam/handlers/rdp/native/src/cap_filter.rs index 04af6a2b..f584e260 100644 --- a/packages/pam/handlers/rdp/native/src/cap_filter.rs +++ b/packages/pam/handlers/rdp/native/src/cap_filter.rs @@ -88,10 +88,8 @@ impl<'a> Iterator for CapIter<'a> { if self.cursor + 4 > self.user_data.len() { return None; } - let cap_type = u16::from_le_bytes([ - self.user_data[self.cursor], - self.user_data[self.cursor + 1], - ]); + let cap_type = + u16::from_le_bytes([self.user_data[self.cursor], self.user_data[self.cursor + 1]]); let cap_len = u16::from_le_bytes([ self.user_data[self.cursor + 2], self.user_data[self.cursor + 3], From e2dd9c0d5ec7a231acf57dbe204989591027ff38 Mon Sep 17 00:00:00 2001 From: bernie-g Date: Wed, 6 May 2026 13:54:45 -0400 Subject: [PATCH 4/8] style(pam-rdp): clippy fixes --- packages/pam/handlers/rdp/native/src/bridge.rs | 4 ++-- packages/pam/handlers/rdp/native/src/cap_filter.rs | 2 +- packages/pam/handlers/rdp/native/src/ffi.rs | 4 ++++ 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/packages/pam/handlers/rdp/native/src/bridge.rs b/packages/pam/handlers/rdp/native/src/bridge.rs index da866ac2..0a56ec10 100644 --- a/packages/pam/handlers/rdp/native/src/bridge.rs +++ b/packages/pam/handlers/rdp/native/src/bridge.rs @@ -273,7 +273,7 @@ impl ByteRange { /// Locate `send_data.user_data` inside `frame`. Bails on Cow::Owned. fn user_data_range_within(frame: &[u8], send_data: &SendDataRequest<'_>) -> Option { let slice: &[u8] = match &send_data.user_data { - Cow::Borrowed(s) => *s, + Cow::Borrowed(s) => s, Cow::Owned(_) => return None, }; let frame_start = frame.as_ptr() as usize; @@ -764,7 +764,7 @@ mod tests { fn caps_start_works_when_source_descriptor_is_empty() { let user_data = confirm_active_prefix(0); let p = parse_confirm_active_caps_start(&user_data).expect("caps start"); - assert_eq!(p, 12 + 4 + 0 + 4); + assert_eq!(p, 12 + 4 + 4); } #[test] diff --git a/packages/pam/handlers/rdp/native/src/cap_filter.rs b/packages/pam/handlers/rdp/native/src/cap_filter.rs index f584e260..d2076669 100644 --- a/packages/pam/handlers/rdp/native/src/cap_filter.rs +++ b/packages/pam/handlers/rdp/native/src/cap_filter.rs @@ -212,7 +212,7 @@ mod tests { buf.extend_from_slice(&[0x03, 0x00]); buf.extend_from_slice(&order_total_len.to_le_bytes()); let order_body_offset = buf.len(); - buf.extend_from_slice(&vec![0xFF; order_cap::BODY_LEN]); + buf.extend_from_slice(&[0xFF; order_cap::BODY_LEN]); // Cap 3: BitmapCodecs (0x001d), 4-byte header + body of 0xEE let codecs_header_offset = buf.len(); diff --git a/packages/pam/handlers/rdp/native/src/ffi.rs b/packages/pam/handlers/rdp/native/src/ffi.rs index d178bfaa..162a83c8 100644 --- a/packages/pam/handlers/rdp/native/src/ffi.rs +++ b/packages/pam/handlers/rdp/native/src/ffi.rs @@ -370,6 +370,10 @@ pub extern "C" fn rdp_bridge_free(handle: u64) -> i32 { /// Poll the next event, blocking up to `timeout_ms` ms. On RDP_POLL_OK, /// caller owns *payload_ptr (must libc::free). +/// +/// # Safety +/// +/// `out` must be a non-null, writable `*mut RdpEvent`. #[no_mangle] pub unsafe extern "C" fn rdp_bridge_poll_event( handle: u64, From 9357a92383e018ead2c94a166a52abad0816b99e Mon Sep 17 00:00:00 2001 From: bernie-g Date: Wed, 6 May 2026 17:53:19 -0400 Subject: [PATCH 5/8] fix(pam-rdp): support FreeRDP + match native clients' protocol advertisement Adds two MITM bridge fixes so both Windows App/mstsc and FreeRDP work through the gateway: - Connector now advertises HYBRID_EX|HYBRID|SSL (matching native clients) instead of IronRDP's hardcoded HYBRID|HYBRID_EX. Native clients validate the MCS Connect Response echo of clientRequestedProtocols against what they sent on their own X.224 step and disconnect on mismatch. Done via a small connector_x224_with_protocol helper that replaces ironrdp_tokio::connect_begin (which exposes no knob for the protocol set). - filter_client_mcs_connect_initial now mutates CS_CORE.serverSelectedProtocol to HYBRID_EX before forwarding (FreeRDP echoes the wrong value, which target Windows servers reject) in addition to clearing CS_NET channels to stop the target from opening virtual channels the bridge can't service. Bridge errors and panics also surface to the gateway stderr via eprintln so silent Rust failures aren't lost. --- .../pam/handlers/rdp/native/src/bridge.rs | 108 ++++++++++++++---- packages/pam/handlers/rdp/native/src/ffi.rs | 4 +- 2 files changed, 88 insertions(+), 24 deletions(-) diff --git a/packages/pam/handlers/rdp/native/src/bridge.rs b/packages/pam/handlers/rdp/native/src/bridge.rs index 0a56ec10..59612f27 100644 --- a/packages/pam/handlers/rdp/native/src/bridge.rs +++ b/packages/pam/handlers/rdp/native/src/bridge.rs @@ -11,13 +11,15 @@ use ironrdp_acceptor::{Acceptor, BeginResult}; use ironrdp_connector::credssp::{CredsspSequence, KerberosConfig}; use ironrdp_connector::sspi::credssp::ClientState; use ironrdp_connector::sspi::generator::GeneratorState; -use ironrdp_connector::{encode_x224_packet, ClientConnector, ClientConnectorState}; +use ironrdp_connector::{encode_x224_packet, ClientConnector, ClientConnectorState, Credentials}; use ironrdp_core::ReadCursor; use ironrdp_pdu::gcc::ConferenceCreateRequest; use ironrdp_pdu::input::fast_path::{FastPathInput, FastPathInputEvent}; -use ironrdp_pdu::ironrdp_core::{decode, WriteBuf}; +use ironrdp_pdu::ironrdp_core::{decode, encode_buf, DecodeOwned as _, WriteBuf}; use ironrdp_pdu::mcs::{ConnectInitial, SendDataRequest}; -use ironrdp_pdu::nego::SecurityProtocol; +use ironrdp_pdu::nego::{ + ConnectionConfirm, ConnectionRequest, NegoRequestData, RequestFlags, SecurityProtocol, +}; use ironrdp_pdu::rdp::client_info::Credentials as AcceptorCredentials; use ironrdp_pdu::rdp::headers::{ShareControlHeader, ShareControlPdu}; use ironrdp_pdu::x224::{X224Data, X224}; @@ -77,14 +79,12 @@ async fn run_mitm_inner( let acceptor_username = target.username.clone(); let (acceptor_output, connector_output) = tokio::try_join!( run_acceptor_half(client_tcp, acceptor_username), - run_connector_half(target) + run_connector_half(target), )?; let (mut client_stream, client_leftover) = acceptor_output; let (mut target_stream, target_leftover) = connector_output; - // Strip virtual channels (clipboard, drives, audio, USB) from MCS Connect Initial. - // Mouse/keyboard/screen ride the implicit I/O channel and are unaffected. filter_client_mcs_connect_initial(&mut client_stream, &mut target_stream, client_leftover) .await .context("filter client MCS Connect Initial")?; @@ -434,7 +434,11 @@ fn decode_fast_path_input(frame: &[u8]) -> anyhow::Result { FastPathInput::decode(&mut cursor).map_err(|e| anyhow::anyhow!("decode FastPathInput: {e}")) } -// Strips virtual channels from the Client Network Data block of MCS Connect Initial. +// Decode + mutate + re-encode the client's MCS Connect Initial: +// - set CS_CORE.serverSelectedProtocol to HYBRID_EX (FreeRDP echoes the +// wrong value, and target servers reject mismatched echoes) +// - clear CS_NET.channels so the target doesn't try to open virtual +// channels (clipboard, drives, audio, USB) the bridge can't service async fn filter_client_mcs_connect_initial( client_stream: &mut ErasedStream, target_stream: &mut ErasedStream, @@ -480,19 +484,9 @@ async fn filter_client_mcs_connect_initial( .map_err(|e| anyhow::anyhow!("decode MCS Connect Initial: {e:?}"))?; let mut gcc_blocks = connect_initial.conference_create_request.into_gcc_blocks(); + gcc_blocks.core.optional_data.server_selected_protocol = Some(SecurityProtocol::HYBRID_EX); if let Some(network) = gcc_blocks.network.as_mut() { - let stripped: Vec = network - .channels - .iter() - .map(|c| c.name.as_str().unwrap_or("?").to_owned()) - .collect(); - if !stripped.is_empty() { - info!( - ?stripped, - "stripped virtual channels from MCS Connect Initial" - ); - network.channels.clear(); - } + network.channels.clear(); } connect_initial.conference_create_request = ConferenceCreateRequest::new(gcc_blocks) .map_err(|e| anyhow::anyhow!("rebuild ConferenceCreateRequest: {e:?}"))?; @@ -528,7 +522,6 @@ async fn run_acceptor_half( password: ACCEPTOR_PASSWORD.to_owned(), domain: None, }; - // Capabilities/desktop-size are shape-fillers; we never call accept_finalize. let mut acceptor = Acceptor::new( SecurityProtocol::HYBRID_EX | SecurityProtocol::HYBRID | SecurityProtocol::SSL, ironrdp_acceptor::DesktopSize { @@ -588,16 +581,20 @@ async fn run_connector_half(target: TargetEndpoint) -> Result<(ErasedStream, byt let config = connector_config(target.username.clone(), target.password.clone()); let mut connector = ClientConnector::new(config, client_addr); - let should_upgrade = ironrdp_tokio::connect_begin(&mut target_framed, &mut connector) + // Request the same protocol set native clients send so the target's + // ServerCoreData.clientRequestedProtocols echo matches what they expect. + let request_set = + SecurityProtocol::HYBRID_EX | SecurityProtocol::HYBRID | SecurityProtocol::SSL; + connector_x224_with_protocol(&mut target_framed, &mut connector, request_set) .await - .context("connector: connect_begin")?; + .context("connector: X.224 init")?; let (initial_stream, leftover) = target_framed.into_inner(); let (upgraded_stream, tls_cert) = ironrdp_tls::upgrade(initial_stream, &target.host) .await .context("connector: TLS upgrade")?; - let _upgraded = ironrdp_tokio::mark_as_upgraded(should_upgrade, &mut connector); + connector.mark_security_upgrade_as_done(); let erased: ErasedStream = Box::new(upgraded_stream); let mut target_framed = ironrdp_tokio::TokioFramed::new_with_leftover(erased, leftover); @@ -622,6 +619,71 @@ async fn run_connector_half(target: TargetEndpoint) -> Result<(ErasedStream, byt Ok(target_framed.into_inner()) } +// Drive the X.224 negotiation with the caller-chosen protocol set, then +// transition the connector into EnhancedSecurityUpgrade so the rest of +// the pipeline (TLS upgrade + CredSSP) proceeds normally. ironrdp's +// connect_begin hardcodes HYBRID|HYBRID_EX, which doesn't match the set +// native clients (Windows App, mstsc) advertise. +async fn connector_x224_with_protocol( + framed: &mut ironrdp_tokio::TokioFramed, + connector: &mut ClientConnector, + requested: SecurityProtocol, +) -> Result<()> +where + S: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static, +{ + // Mirror what ironrdp's connect_begin includes: routing cookie with the + // username, which some Windows targets / load balancers expect. + let nego_data = + connector + .config + .request_data + .clone() + .or_else(|| match &connector.config.credentials { + Credentials::UsernamePassword { username, .. } if !username.is_empty() => { + Some(NegoRequestData::cookie(username.clone())) + } + _ => None, + }); + let request = ConnectionRequest { + nego_data, + flags: RequestFlags::empty(), + protocol: requested, + }; + + let mut buf = WriteBuf::new(); + encode_buf(&X224(request), &mut buf) + .map_err(|e| anyhow::anyhow!("encode X.224 connection request: {e:?}"))?; + framed + .write_all(buf.filled()) + .await + .context("write X.224 connection request")?; + + let pdu = framed + .read_pdu() + .await + .context("read X.224 connection confirm")?; + let confirm = ConnectionConfirm::decode_owned(&mut ReadCursor::new(&pdu.1)) + .map_err(|e| anyhow::anyhow!("decode X.224 connection confirm: {e:?}"))?; + + let selected_protocol = match confirm { + ConnectionConfirm::Response { protocol, .. } => protocol, + ConnectionConfirm::Failure { code } => { + anyhow::bail!("X.224 negotiation failure: {:?}", code); + } + }; + if !requested.contains(selected_protocol) { + anyhow::bail!( + "target selected protocol {:?} not in requested set {:?}", + selected_protocol, + requested + ); + } + + connector.state = ClientConnectorState::EnhancedSecurityUpgrade { selected_protocol }; + Ok(()) +} + // Replicated from ironrdp-async's private perform_credssp_step so we can // stop before connect_finalize (which would start MCS/capability exchange). async fn perform_connector_credssp( diff --git a/packages/pam/handlers/rdp/native/src/ffi.rs b/packages/pam/handlers/rdp/native/src/ffi.rs index 162a83c8..5f0b7ccf 100644 --- a/packages/pam/handlers/rdp/native/src/ffi.rs +++ b/packages/pam/handlers/rdp/native/src/ffi.rs @@ -335,10 +335,12 @@ pub extern "C" fn rdp_bridge_wait(handle: u64) -> i32 { } Ok(Err(e)) => { error!(handle, error = ?e, "rdp_bridge_wait: session failed"); + eprintln!("rdp bridge session failed (handle={handle}): {e:?}"); RDP_BRIDGE_SESSION_ERROR } - Err(_) => { + Err(panic) => { error!(handle, "rdp_bridge_wait: session thread panicked"); + eprintln!("rdp bridge session thread panicked (handle={handle}): {panic:?}"); RDP_BRIDGE_THREAD_PANIC } }, From 3f00ca5e0e0caee3081428f4b86b404e085c8db5 Mon Sep 17 00:00:00 2001 From: bernie-g Date: Thu, 7 May 2026 16:22:13 -0400 Subject: [PATCH 6/8] fix(pam-rdp): bypass mstsc TLS cert validation in generated .rdp Generated .rdp file now sets `authentication level:i:0`. mstsc validates the server's TLS cert by default and rejects the bridge's self-signed cert with "unexpected server authentication certificate", terminating the connection before the X.224 handshake. FreeRDP and Windows App don't enforce the same check, so this only manifests for mstsc users. Verified through mstsc on a Windows EC2 connecting via gateway+relay. --- packages/pam/local/rdp-proxy.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/packages/pam/local/rdp-proxy.go b/packages/pam/local/rdp-proxy.go index 68760d25..3363bbfe 100644 --- a/packages/pam/local/rdp-proxy.go +++ b/packages/pam/local/rdp-proxy.go @@ -313,9 +313,14 @@ func writeRDPFile(listenPort int, sessionID, username string) (string, error) { } path := filepath.Join(dir, filename) + // authentication level:i:0 -> mstsc connects even if it can't verify the + // server's TLS cert. The bridge presents a self-signed cert, so without + // this mstsc terminates with "unexpected server authentication certificate". + // FreeRDP/Windows App ignore the cert by default; mstsc is the strict one. content := fmt.Sprintf( "full address:s:127.0.0.1:%d\r\n"+ - "username:s:%s\r\n", + "username:s:%s\r\n"+ + "authentication level:i:0\r\n", listenPort, username, ) From 5b44f1a53a844817b22ff240600dc025582fd2c5 Mon Sep 17 00:00:00 2001 From: bernie-g Date: Thu, 7 May 2026 11:02:44 -0400 Subject: [PATCH 7/8] fix(release): ship linux RDP binaries as fully static (musl) PR #191's release pipeline flipped the linux builds from CGO_ENABLED=0 to CGO_ENABLED=1 to link the Rust IronRDP bridge. With CGO on, the Go linker hands off to gcc, which dynamically links against the build host's glibc. v0.43.80 ended up with a GLIBC_2.39 floor from the ubuntu-24.04 GitHub runner, breaking ~80% of customer environments (Ubuntu 22.04, RHEL 8/9, Amazon Linux, Alpine, distroless/static). Switch the linux RDP builds to musl-static so the binary is fully self-contained again, matching pre-PAM portability: - build-rdp-bridge.yml: linux Rust matrix swapped from *-linux-gnu* to *-linux-musl* (windows-gnu kept). - goreleaser.yaml: each linux-*-rdp build entry uses CC=-unknown-linux-musl-gcc, points CGO_LDFLAGS at the musl target dir, adds -extldflags '-static' to ldflags, and adds osusergo,netgo to build tags to keep Go's pure-Go user/DNS resolvers (matching pre-RDP behaviour and sidestepping musl's NSS-less getaddrinfo). - release_build_infisical_cli.yml: install musl cross-toolchains from cross-tools/musl-cross GitHub releases (CDN-backed, replaces the unreliable musl.cc single-host mirror); pinned to release 20260430. curl retries kept for any network blips. - README.md (rust bridge): updated example triples. Adds a release-time gate: every linux RDP binary in dist/ must be 'statically linked', and the amd64 binary must --version cleanly across a matrix of older / minimal distros (Ubuntu 20.04+, RHEL 8+, Amazon Linux 2+, Alpine, distroless/static). A regression of the v0.43.80 shape now blocks publish. The Alpine Docker images and the .apk package are fixed for free since copying a musl-static binary into Alpine works cleanly. No Go or Rust source code changed beyond restoring the RDP feature. --- .github/workflows/build-rdp-bridge.yml | 12 +- .../workflows/release_build_infisical_cli.yml | 106 ++++++++++++++---- .goreleaser-darwin.yaml | 2 + .goreleaser-windows.yaml | 1 + .goreleaser.yaml | 36 +++--- packages/pam/handlers/rdp/native/README.md | 6 +- packages/pam/local/rdp-proxy.go | 7 ++ 7 files changed, 123 insertions(+), 47 deletions(-) diff --git a/.github/workflows/build-rdp-bridge.yml b/.github/workflows/build-rdp-bridge.yml index 2dfd7f79..0346d8af 100644 --- a/.github/workflows/build-rdp-bridge.yml +++ b/.github/workflows/build-rdp-bridge.yml @@ -16,11 +16,11 @@ jobs: fail-fast: false matrix: include: - - target: x86_64-unknown-linux-gnu - - target: aarch64-unknown-linux-gnu - - target: i686-unknown-linux-gnu - - target: arm-unknown-linux-gnueabi - - target: armv7-unknown-linux-gnueabihf + - target: x86_64-unknown-linux-musl + - target: aarch64-unknown-linux-musl + - target: i686-unknown-linux-musl + - target: arm-unknown-linux-musleabi + - target: armv7-unknown-linux-musleabihf - target: x86_64-pc-windows-gnu steps: - uses: actions/checkout@v4 @@ -56,6 +56,8 @@ jobs: rust-darwin: name: macos-latest (${{ matrix.target }}) runs-on: macos-latest + env: + MACOSX_DEPLOYMENT_TARGET: "11.0" strategy: fail-fast: false matrix: diff --git a/.github/workflows/release_build_infisical_cli.yml b/.github/workflows/release_build_infisical_cli.yml index cea56363..2abb7832 100644 --- a/.github/workflows/release_build_infisical_cli.yml +++ b/.github/workflows/release_build_infisical_cli.yml @@ -36,7 +36,11 @@ jobs: # parallel instead of serializing on ubuntu creating the draft. # Skipped on dry-run since --snapshot doesn't touch GitHub at all. create-release-draft: - if: github.event_name == 'push' || (github.event_name == 'workflow_dispatch' && !inputs.dry_run) + if: | + always() && + (needs.validate-tag-branch.result == 'success' || needs.validate-tag-branch.result == 'skipped') && + needs.cli-tests.result == 'success' && + (github.event_name == 'push' || (github.event_name == 'workflow_dispatch' && !inputs.dry_run)) needs: - validate-tag-branch - cli-tests @@ -168,12 +172,22 @@ jobs: sudo apt-get install -y libssl1.0-dev - name: Install cross-compile toolchains for RDP tier run: | - sudo apt-get install -y \ - gcc-aarch64-linux-gnu \ - gcc-i686-linux-gnu \ - gcc-arm-linux-gnueabi \ - gcc-arm-linux-gnueabihf \ - gcc-mingw-w64-x86-64 + set -euo pipefail + sudo apt-get install -y gcc-mingw-w64-x86-64 + MUSL_CROSS_TAG=20260430 + sudo mkdir -p /opt/musl-cross + for triple in \ + x86_64-unknown-linux-musl \ + aarch64-unknown-linux-musl \ + i686-unknown-linux-musl \ + arm-unknown-linux-musleabi \ + armv7-unknown-linux-musleabihf; do + curl --retry 5 --retry-delay 10 --retry-all-errors \ + --connect-timeout 30 --max-time 240 \ + -fsSL "https://github.com/cross-tools/musl-cross/releases/download/${MUSL_CROSS_TAG}/${triple}.tar.xz" \ + | sudo tar -xJ -C /opt/musl-cross + echo "/opt/musl-cross/${triple}/bin" >> "$GITHUB_PATH" + done - name: Download RDP bridge static libs uses: actions/download-artifact@v4 with: @@ -183,49 +197,93 @@ jobs: run: | set -euo pipefail for triple in \ - x86_64-unknown-linux-gnu \ - aarch64-unknown-linux-gnu \ - i686-unknown-linux-gnu \ - arm-unknown-linux-gnueabi \ - armv7-unknown-linux-gnueabihf \ + x86_64-unknown-linux-musl \ + aarch64-unknown-linux-musl \ + i686-unknown-linux-musl \ + arm-unknown-linux-musleabi \ + armv7-unknown-linux-musleabihf \ x86_64-pc-windows-gnu; do target_dir="packages/pam/handlers/rdp/native/target/$triple/release" mkdir -p "$target_dir" cp "/tmp/rdp-bridge-artifacts/rdp-bridge-$triple/libinfisical_rdp_bridge.a" "$target_dir/" done - - name: GoReleaser (dry-run snapshot) - if: github.event_name == 'workflow_dispatch' && inputs.dry_run + - name: GoReleaser (build, no publish) uses: goreleaser/goreleaser-action@v4 with: distribution: goreleaser-pro version: v1.26.2-pro - args: release --clean --snapshot --skip=publish + args: >- + release --clean --skip=publish,announce + ${{ (github.event_name == 'workflow_dispatch' && inputs.dry_run) && '--snapshot' || '' }} env: GITHUB_TOKEN: ${{ secrets.GO_RELEASER_GITHUB_TOKEN }} POSTHOG_API_KEY_FOR_CLI: ${{ secrets.POSTHOG_API_KEY_FOR_CLI }} FURY_TOKEN: ${{ secrets.FURYPUSHTOKEN }} AUR_KEY: ${{ secrets.AUR_KEY }} GORELEASER_KEY: ${{ secrets.GORELEASER_KEY }} - - name: GoReleaser (release) + - name: Upload dry-run dist as workflow artifact + if: github.event_name == 'workflow_dispatch' && inputs.dry_run + uses: actions/upload-artifact@v4 + with: + name: goreleaser-dist-linux + path: dist/ + retention-days: 7 + - name: Smoke test linux binary across supported distros + run: | + set -uo pipefail + fail=0 + echo "::group::Static-link assertion (file)" + for d in dist/linux-*-rdp_linux_*; do + bin="$d/infisical" + [ -f "$bin" ] || continue + info=$(file "$bin") + echo "$info" + if ! echo "$info" | grep -q "statically linked"; then + echo "::error file=$bin::not statically linked" + fail=1 + fi + done + echo "::endgroup::" + + BIN=dist/linux-amd64-rdp_linux_amd64_v1/infisical + if [ ! -f "$BIN" ]; then + echo "::error::expected $BIN missing from dist/" + exit 1 + fi + echo "::group::Smoke test linux-amd64 binary across distro floor" + for img in \ + ubuntu:22.04 \ + ubuntu:20.04 \ + rockylinux:9 \ + rockylinux:8 \ + amazonlinux:2023 \ + amazonlinux:2 \ + alpine:3.19 \ + gcr.io/distroless/static-debian12; do + echo "--- $img ---" + if ! docker run --rm --platform linux/amd64 \ + -v "$PWD/$BIN":/infisical:ro \ + "$img" /infisical --version; then + echo "::error::binary failed to run on $img" + fail=1 + fi + done + echo "::endgroup::" + + [ "$fail" -eq 0 ] || exit 1 + - name: GoReleaser (publish) if: github.event_name == 'push' || (github.event_name == 'workflow_dispatch' && !inputs.dry_run) uses: goreleaser/goreleaser-action@v4 with: distribution: goreleaser-pro version: v1.26.2-pro - args: release --clean + args: release --skip=build,validate,before env: GITHUB_TOKEN: ${{ secrets.GO_RELEASER_GITHUB_TOKEN }} POSTHOG_API_KEY_FOR_CLI: ${{ secrets.POSTHOG_API_KEY_FOR_CLI }} FURY_TOKEN: ${{ secrets.FURYPUSHTOKEN }} AUR_KEY: ${{ secrets.AUR_KEY }} GORELEASER_KEY: ${{ secrets.GORELEASER_KEY }} - - name: Upload dry-run dist as workflow artifact - if: github.event_name == 'workflow_dispatch' && inputs.dry_run - uses: actions/upload-artifact@v4 - with: - name: goreleaser-dist-linux - path: dist/ - retention-days: 7 - uses: actions/setup-python@v4 with: python-version: "3.12" diff --git a/.goreleaser-darwin.yaml b/.goreleaser-darwin.yaml index 9871e640..adfe00b1 100644 --- a/.goreleaser-darwin.yaml +++ b/.goreleaser-darwin.yaml @@ -14,6 +14,7 @@ builds: - -tags=rdp env: - CGO_ENABLED=1 + - MACOSX_DEPLOYMENT_TARGET=11.0 - 'CGO_LDFLAGS=-L packages/pam/handlers/rdp/native/target/x86_64-apple-darwin/release' goos: - darwin @@ -30,6 +31,7 @@ builds: - -tags=rdp env: - CGO_ENABLED=1 + - MACOSX_DEPLOYMENT_TARGET=11.0 - 'CGO_LDFLAGS=-L packages/pam/handlers/rdp/native/target/aarch64-apple-darwin/release' goos: - darwin diff --git a/.goreleaser-windows.yaml b/.goreleaser-windows.yaml index d73884d2..73752b9d 100644 --- a/.goreleaser-windows.yaml +++ b/.goreleaser-windows.yaml @@ -13,6 +13,7 @@ builds: ldflags: - -X github.com/Infisical/infisical-merge/packages/util.CLI_VERSION={{ .Version }} - -X github.com/Infisical/infisical-merge/packages/telemetry.POSTHOG_API_KEY_FOR_CLI={{ .Env.POSTHOG_API_KEY_FOR_CLI }} + - -extldflags "-static" flags: - -trimpath - -tags=rdp diff --git a/.goreleaser.yaml b/.goreleaser.yaml index 654258ef..b39dfde9 100644 --- a/.goreleaser.yaml +++ b/.goreleaser.yaml @@ -9,13 +9,14 @@ builds: ldflags: - -X github.com/Infisical/infisical-merge/packages/util.CLI_VERSION={{ .Version }} - -X github.com/Infisical/infisical-merge/packages/telemetry.POSTHOG_API_KEY_FOR_CLI={{ .Env.POSTHOG_API_KEY_FOR_CLI }} + - -extldflags "-static" flags: - -trimpath - - -tags=rdp + - -tags=rdp,osusergo,netgo env: - CGO_ENABLED=1 - - CC=gcc - - 'CGO_LDFLAGS=-L packages/pam/handlers/rdp/native/target/x86_64-unknown-linux-gnu/release' + - CC=x86_64-unknown-linux-musl-gcc + - 'CGO_LDFLAGS=-L packages/pam/handlers/rdp/native/target/x86_64-unknown-linux-musl/release' goos: - linux goarch: @@ -26,13 +27,14 @@ builds: ldflags: - -X github.com/Infisical/infisical-merge/packages/util.CLI_VERSION={{ .Version }} - -X github.com/Infisical/infisical-merge/packages/telemetry.POSTHOG_API_KEY_FOR_CLI={{ .Env.POSTHOG_API_KEY_FOR_CLI }} + - -extldflags "-static" flags: - -trimpath - - -tags=rdp + - -tags=rdp,osusergo,netgo env: - CGO_ENABLED=1 - - CC=aarch64-linux-gnu-gcc - - 'CGO_LDFLAGS=-L packages/pam/handlers/rdp/native/target/aarch64-unknown-linux-gnu/release' + - CC=aarch64-unknown-linux-musl-gcc + - 'CGO_LDFLAGS=-L packages/pam/handlers/rdp/native/target/aarch64-unknown-linux-musl/release' goos: - linux goarch: @@ -43,6 +45,7 @@ builds: ldflags: - -X github.com/Infisical/infisical-merge/packages/util.CLI_VERSION={{ .Version }} - -X github.com/Infisical/infisical-merge/packages/telemetry.POSTHOG_API_KEY_FOR_CLI={{ .Env.POSTHOG_API_KEY_FOR_CLI }} + - -extldflags "-static" flags: - -trimpath - -tags=rdp @@ -61,13 +64,14 @@ builds: ldflags: - -X github.com/Infisical/infisical-merge/packages/util.CLI_VERSION={{ .Version }} - -X github.com/Infisical/infisical-merge/packages/telemetry.POSTHOG_API_KEY_FOR_CLI={{ .Env.POSTHOG_API_KEY_FOR_CLI }} + - -extldflags "-static" flags: - -trimpath - - -tags=rdp + - -tags=rdp,osusergo,netgo env: - CGO_ENABLED=1 - - CC=i686-linux-gnu-gcc - - 'CGO_LDFLAGS=-L packages/pam/handlers/rdp/native/target/i686-unknown-linux-gnu/release' + - CC=i686-unknown-linux-musl-gcc + - 'CGO_LDFLAGS=-L packages/pam/handlers/rdp/native/target/i686-unknown-linux-musl/release' goos: - linux goarch: @@ -78,13 +82,14 @@ builds: ldflags: - -X github.com/Infisical/infisical-merge/packages/util.CLI_VERSION={{ .Version }} - -X github.com/Infisical/infisical-merge/packages/telemetry.POSTHOG_API_KEY_FOR_CLI={{ .Env.POSTHOG_API_KEY_FOR_CLI }} + - -extldflags "-static" flags: - -trimpath - - -tags=rdp + - -tags=rdp,osusergo,netgo env: - CGO_ENABLED=1 - - CC=arm-linux-gnueabi-gcc - - 'CGO_LDFLAGS=-L packages/pam/handlers/rdp/native/target/arm-unknown-linux-gnueabi/release' + - CC=arm-unknown-linux-musleabi-gcc + - 'CGO_LDFLAGS=-L packages/pam/handlers/rdp/native/target/arm-unknown-linux-musleabi/release' goos: - linux goarch: @@ -97,13 +102,14 @@ builds: ldflags: - -X github.com/Infisical/infisical-merge/packages/util.CLI_VERSION={{ .Version }} - -X github.com/Infisical/infisical-merge/packages/telemetry.POSTHOG_API_KEY_FOR_CLI={{ .Env.POSTHOG_API_KEY_FOR_CLI }} + - -extldflags "-static" flags: - -trimpath - - -tags=rdp + - -tags=rdp,osusergo,netgo env: - CGO_ENABLED=1 - - CC=arm-linux-gnueabihf-gcc - - 'CGO_LDFLAGS=-L packages/pam/handlers/rdp/native/target/armv7-unknown-linux-gnueabihf/release' + - CC=armv7-unknown-linux-musleabihf-gcc + - 'CGO_LDFLAGS=-L packages/pam/handlers/rdp/native/target/armv7-unknown-linux-musleabihf/release' goos: - linux goarch: diff --git a/packages/pam/handlers/rdp/native/README.md b/packages/pam/handlers/rdp/native/README.md index 83228343..c640ff31 100644 --- a/packages/pam/handlers/rdp/native/README.md +++ b/packages/pam/handlers/rdp/native/README.md @@ -33,12 +33,12 @@ For Linux targets from any host: ```bash cargo install cross --locked --version 0.2.5 -cross build --release --target x86_64-unknown-linux-gnu +cross build --release --target x86_64-unknown-linux-musl ``` Supported targets: -- `x86_64-unknown-linux-gnu` -- `aarch64-unknown-linux-gnu` +- `x86_64-unknown-linux-musl` +- `aarch64-unknown-linux-musl` - `x86_64-apple-darwin` - `aarch64-apple-darwin` - `x86_64-pc-windows-gnu` diff --git a/packages/pam/local/rdp-proxy.go b/packages/pam/local/rdp-proxy.go index af3b43ef..eee9d8a3 100644 --- a/packages/pam/local/rdp-proxy.go +++ b/packages/pam/local/rdp-proxy.go @@ -13,6 +13,7 @@ import ( "syscall" "time" + "github.com/Infisical/infisical-merge/packages/pam/session" "github.com/Infisical/infisical-merge/packages/util" "github.com/go-resty/resty/v2" "github.com/rs/zerolog/log" @@ -53,6 +54,12 @@ func StartRDPLocalProxy(accessToken string, accessParams PAMAccessParams, projec return } + // Verify this is a Windows resource + if pamResponse.ResourceType != session.ResourceTypeWindows { + util.HandleError(fmt.Errorf("account is not a Windows resource, got: %s", pamResponse.ResourceType), "Invalid resource type") + return + } + log.Info().Msgf("RDP session created with ID: %s", pamResponse.SessionId) duration, err := time.ParseDuration(durationStr) From f81c8cbf1c1b8e04122a48d40f3a7feab5fd3293 Mon Sep 17 00:00:00 2001 From: bernie-g Date: Thu, 7 May 2026 17:25:56 -0400 Subject: [PATCH 8/8] fix(pam-rdp): address PR review issues - Bridge tap channel switched from unbounded to bounded(1024) with try_send; drops on full instead of risking gateway OOM under heavy graphics. - bridge_pdus uses tokio::select! instead of try_join! so a normal client disconnect doesn't hang on the t2c branch waiting for a quiet target. - HandleConnection no longer cancels the drain on normal session end; the drain runs to PollEnded so the recording tail is preserved. Cancellation paths still cancel explicitly. - SessionUploader.RegisterSession preserves the existing in-memory anchor when called multiple times for the same session (RDP reconnects), so elapsedNs stays monotonic across reconnects within a single PAM session. - uploadSessionFile bulk-upload fallback handles ResourceTypeWindows the same way as SSH (TerminalEvent records); previously fell through to the database-row decoder which silently zero-filled input/output. --- .../pam/handlers/rdp/bridge_cgo_shared.go | 12 ++++--- .../pam/handlers/rdp/native/src/bridge.rs | 36 +++++++++++++++---- .../pam/handlers/rdp/native/src/events.rs | 12 +++++-- packages/pam/handlers/rdp/native/src/ffi.rs | 4 +-- packages/pam/session/uploader.go | 25 +++++++++---- 5 files changed, 67 insertions(+), 22 deletions(-) diff --git a/packages/pam/handlers/rdp/bridge_cgo_shared.go b/packages/pam/handlers/rdp/bridge_cgo_shared.go index b8735bad..e6d8c7d1 100644 --- a/packages/pam/handlers/rdp/bridge_cgo_shared.go +++ b/packages/pam/handlers/rdp/bridge_cgo_shared.go @@ -49,15 +49,17 @@ func (p *RDPProxy) HandleConnection(ctx context.Context, clientConn net.Conn) er defer close(drainDone) drainBridgeEvents(drainCtx, bridge, p.config.SessionLogger, p.config.SessionID, p.config.SessionStartedAt) }() + // Wait for the drain to finish naturally on the normal-end path so the + // tail of the recording isn't dropped: PollEnded fires after the Rust + // side closes the events channel (post bridge.Wait return). Cancellation + // paths trigger cancelDrain() explicitly below to bail early. defer func() { - cancelDrain() - // Wait briefly for the drain loop to exit so a cancelled session - // can't race the Bridge.Close below. PollEvent's timeout caps how - // long this can take. select { case <-drainDone: case <-time.After(2 * pollTimeout): } + // Always release the drain context (no-op if already cancelled). + cancelDrain() }() waitErr := make(chan error, 1) @@ -66,10 +68,12 @@ func (p *RDPProxy) HandleConnection(ctx context.Context, clientConn net.Conn) er select { case err := <-waitErr: if err != nil && !errors.Is(err, ErrInvalidHandle) { + cancelDrain() return fmt.Errorf("rdp proxy: session: %w", err) } return nil case <-ctx.Done(): + cancelDrain() _ = bridge.Cancel() <-waitErr return ctx.Err() diff --git a/packages/pam/handlers/rdp/native/src/bridge.rs b/packages/pam/handlers/rdp/native/src/bridge.rs index 9fd2ce54..98503597 100644 --- a/packages/pam/handlers/rdp/native/src/bridge.rs +++ b/packages/pam/handlers/rdp/native/src/bridge.rs @@ -28,6 +28,7 @@ use ironrdp_tokio::reqwest::ReqwestNetworkClient; use ironrdp_tokio::{FramedWrite, NetworkClient}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use tokio::net::TcpStream; +use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; use tracing::{debug, info, warn}; @@ -176,7 +177,17 @@ where Ok(()) }; - match tokio::try_join!(c2t, t2c) { + // select! (not try_join!) so the first branch to EOF cancels the other: + // try_join! waits for both to complete on Ok, but on a normal client + // disconnect the t2c read_pdu blocks indefinitely on a quiet target. + // Dropping the cancelled future releases its read+write halves; with + // the opposite branch already done, the underlying stream's Drop closes + // the socket and the peer observes the half-close. + let result = tokio::select! { + r = c2t => r, + r = t2c => r, + }; + match result { Ok(_) => { info!("session ended cleanly"); Ok(()) @@ -416,18 +427,31 @@ fn tap_client_to_target(action: Action, frame: &[u8], started_at: Instant, tx: & // uncommon in normal sessions and not needed for replay V1. _ => continue, }; - // send error means the receiver was dropped (poll loop exited). - // The bridge keeps forwarding bytes regardless. - let _ = tx.send(session_event); + // try_send: never block the bridge byte stream on a slow consumer. + // Errors mean either Full (drop the input event; rare under typical + // sub-1k events/sec input rates) or Closed (poll loop exited; bridge + // keeps forwarding bytes regardless). + if let Err(e) = tx.try_send(session_event) { + if matches!(e, mpsc::error::TrySendError::Full(_)) { + warn!("session event channel full, dropping input event"); + } + } } } fn tap_target_to_client(action: Action, frame: &[u8], started_at: Instant, tx: &EventSender) { - let _ = tx.send(SessionEvent::TargetFrame { + // try_send: see tap_client_to_target. Heavy-graphics RDP can produce + // hundreds of TargetFrames/sec; if the consumer (Go fsync-bound logger) + // can't keep up, drop frames rather than queueing unbounded. + if let Err(e) = tx.try_send(SessionEvent::TargetFrame { action, payload: frame.to_vec(), elapsed_ns: elapsed_ns_since(started_at), - }); + }) { + if matches!(e, mpsc::error::TrySendError::Full(_)) { + warn!("session event channel full, dropping target frame"); + } + } } fn decode_fast_path_input(frame: &[u8]) -> anyhow::Result { diff --git a/packages/pam/handlers/rdp/native/src/events.rs b/packages/pam/handlers/rdp/native/src/events.rs index ffb10fd3..346bc8f6 100644 --- a/packages/pam/handlers/rdp/native/src/events.rs +++ b/packages/pam/handlers/rdp/native/src/events.rs @@ -38,9 +38,15 @@ pub fn elapsed_ns_since(started_at: Instant) -> u64 { started_at.elapsed().as_nanos() as u64 } -pub type EventSender = mpsc::UnboundedSender; -pub type EventReceiver = mpsc::UnboundedReceiver; +pub type EventSender = mpsc::Sender; +pub type EventReceiver = mpsc::Receiver; + +// Bounded so a busy-disk gateway can't OOM under heavy graphics: producer +// (tap_*) uses try_send and drops on full rather than back-pressuring the +// bridge byte stream. Sized to ~few seconds of 60 fps RDP frames at typical +// PDU rates; lossy recording > unbounded memory. +pub const EVENT_CHANNEL_CAPACITY: usize = 1024; pub fn channel() -> (EventSender, EventReceiver) { - mpsc::unbounded_channel() + mpsc::channel(EVENT_CHANNEL_CAPACITY) } diff --git a/packages/pam/handlers/rdp/native/src/ffi.rs b/packages/pam/handlers/rdp/native/src/ffi.rs index d7c534b3..fb637e99 100644 --- a/packages/pam/handlers/rdp/native/src/ffi.rs +++ b/packages/pam/handlers/rdp/native/src/ffi.rs @@ -160,7 +160,7 @@ struct BridgeEntry { // Receiver side of the bridge's event channel. Polled by Go via // rdp_bridge_poll_event. Wrapped in Option so the poll loop can take it // out for the duration of the await without holding the HANDLES lock. - events_rx: Mutex>>, + events_rx: Mutex>>, // Set once the events channel has reported closed; subsequent polls // short-circuit to RDP_POLL_ENDED. events_ended: Mutex, @@ -395,7 +395,7 @@ pub unsafe extern "C" fn rdp_bridge_poll_event( } // Avoid holding the HANDLES lock across the await. - let take_result: Result>, i32> = { + let take_result: Result>, i32> = { let handles = HANDLES.lock().expect("HANDLES poisoned"); match handles.get(&handle) { None => Err(RDP_POLL_INVALID_HANDLE), diff --git a/packages/pam/session/uploader.go b/packages/pam/session/uploader.go index 5d016f72..7fee58dc 100644 --- a/packages/pam/session/uploader.go +++ b/packages/pam/session/uploader.go @@ -378,11 +378,18 @@ func (su *SessionUploader) RegisterSession(sessionID string) { } su.activeSessionsMu.Lock() - su.activeSessions[sessionID] = &sessionUploadState{ - fileOffset: startOffset, - filename: fileInfo.Filename, - startedAt: time.Now().Add(-time.Duration(lastEndElapsedMs) * time.Millisecond), - lastEndElapsedMs: lastEndElapsedMs, + // Preserve the original anchor across RDP reconnects within the same PAM + // session: HandlePAMProxy calls RegisterSession on every gateway connection, + // and overwriting the entry would reset startedAt to ~now, making elapsedNs + // rewind on reconnect. The persisted .offset only catches up after a flush, + // so it can't be the source of truth here. + if _, exists := su.activeSessions[sessionID]; !exists { + su.activeSessions[sessionID] = &sessionUploadState{ + fileOffset: startOffset, + filename: fileInfo.Filename, + startedAt: time.Now().Add(-time.Duration(lastEndElapsedMs) * time.Millisecond), + lastEndElapsedMs: lastEndElapsedMs, + } } su.activeSessionsMu.Unlock() @@ -604,10 +611,14 @@ func (su *SessionUploader) uploadSessionFile(fileInfo *SessionFileInfo) error { return fmt.Errorf("failed to get encryption key: %w", err) } - if fileInfo.ResourceType == ResourceTypeSSH { + // SSH and Windows both write TerminalEvent records (SSH uses input/output/ + // resize/error; Windows uses ChannelType=rdp). Bulk-uploading either via + // the Database fallback would silently zero-fill input/output, dropping + // the entire recording. + if fileInfo.ResourceType == ResourceTypeSSH || fileInfo.ResourceType == ResourceTypeWindows { terminalEvents, err := ReadEncryptedTerminalEventsFromFile(fileInfo.Filename, encryptionKey) if err != nil { - return fmt.Errorf("failed to read SSH session file: %w", err) + return fmt.Errorf("failed to read terminal session file: %w", err) } log.Debug().