diff --git a/Cargo.lock b/Cargo.lock index 589add2..c3c1858 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -489,6 +489,7 @@ dependencies = [ "base64", "bepository-bep", "bepository-lock", + "bepository-storage", "bepository-tls", "bytes", "chrono", @@ -500,6 +501,7 @@ dependencies = [ "humantime", "humantime-serde", "linear-map", + "lockable", "object_store", "parking_lot", "prost", @@ -765,6 +767,15 @@ version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a6ef517f0926dd24a1582492c791b6a4818a4d94e789a334894aa15b0d12f55c" +[[package]] +name = "convert_case" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "633458d4ef8c78b72454de2d54fd6ab2e60f9e02be22f3c6104cdc8a4e0fceb9" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "core-foundation" version = "0.10.1" @@ -946,6 +957,29 @@ dependencies = [ "syn", ] +[[package]] +name = "derive_more" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d751e9e49156b02b44f9c1815bcb94b984cdcc4396ecc32521c739452808b134" +dependencies = [ + "derive_more-impl", +] + +[[package]] +name = "derive_more-impl" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "799a97264921d8623a957f6c3b9011f3b5492f557bbb7a5a19b7fa6d06ba8dcb" +dependencies = [ + "convert_case", + "proc-macro2", + "quote", + "rustc_version", + "syn", + "unicode-xid", +] + [[package]] name = "difflib" version = "0.4.0" @@ -1544,6 +1578,8 @@ version = "0.15.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" dependencies = [ + "allocator-api2", + "equivalent", "foldhash 0.1.5", ] @@ -2077,12 +2113,34 @@ dependencies = [ "scopeguard", ] +[[package]] +name = "lockable" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b0f35193d57711b7b4730a3b888a5033347fb7be1ee9a64a755fa4ee013ef80" +dependencies = [ + "derive_more", + "futures", + "itertools", + "lru 0.14.0", + "tokio", +] + [[package]] name = "log" version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" +[[package]] +name = "lru" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f8cc7106155f10bdf99a6f379688f543ad6596a415375b36a59a054ceda1198" +dependencies = [ + "hashbrown 0.15.5", +] + [[package]] name = "lru" version = "0.16.4" @@ -4261,6 +4319,12 @@ version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" +[[package]] +name = "unicode-segmentation" +version = "1.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9629274872b2bfaf8d66f5f15725007f635594914870f65218920345aa11aa8c" + [[package]] name = "unicode-xid" version = "0.2.6" diff --git a/Cargo.toml b/Cargo.toml index 28a2807..231c6f0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,6 +47,7 @@ figment = { version = "0.10", features = ["parking_lot"] } futures = "0.3" humantime = "2" linear-map = "1.2.0" +lockable = "0.2" object_store = { version = "0.12", features = ["aws", "gcp", "http"] } once_cell = { version = "1", features = ["parking_lot"] } opendal = { version = "0.55" } diff --git a/bepository-bep/src/connection.rs b/bepository-bep/src/connection.rs index f0b8c42..f182dad 100644 --- a/bepository-bep/src/connection.rs +++ b/bepository-bep/src/connection.rs @@ -6,13 +6,16 @@ use std::collections::HashMap; use std::sync::Arc; use bytes::Bytes; -use futures::StreamExt; +use futures::{FutureExt, StreamExt}; use parking_lot::Mutex; use prost::Message; -use tokio::io::{AsyncRead, AsyncWrite}; -use tokio::sync::oneshot; +use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; +use tokio::sync::mpsc; +use tokio::sync::{OwnedSemaphorePermit, Semaphore, oneshot}; +use tokio::task::JoinSet; use tokio::time::{self, Duration}; use tokio_util::sync::CancellationToken; +use tracing::Instrument; use crate::conflict::ConflictResolver; use crate::device_id::DeviceId; @@ -120,22 +123,283 @@ impl ConnectionInner { } } -struct ConnectionContext<'a> { +#[derive(Clone)] +struct ConnectionContext { remote_device: DeviceId, local_device: DeviceId, - resolver: &'a dyn ConflictResolver, - policy: &'a dyn RetryPolicy, - shutdown: &'a CancellationToken, + resolver: Arc, + policy: Arc, + shutdown: CancellationToken, } -impl<'a> ConnectionContext<'a> { +impl ConnectionContext { /// Helper to run storage operations with the context's retry policy and shutdown token. async fn retry(&self, op: &str, f: F) -> crate::error::Result where F: Fn() -> Fut, Fut: std::future::Future> + Send, { - crate::retry::retry_storage_op(self.policy, self.shutdown, op, f).await + crate::retry::retry_storage_op(self.policy.as_ref(), &self.shutdown, op, f).await + } +} + +struct OutgoingMessage { + msg_type: MessageType, + payload: Vec, +} + +/// Byte-bounded mpsc sender. +/// +/// BEP message payloads vary by ~6 orders of magnitude (empty Ping vs multi-MiB Index). +/// +/// **Oversized messages**: A single message that exceeds the total budget is allowed. +struct ByteBudgetSender { + inner: mpsc::Sender<(OwnedSemaphorePermit, T)>, + budget: Arc, + max_budget: u32, +} + +impl Clone for ByteBudgetSender { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + budget: Arc::clone(&self.budget), + max_budget: self.max_budget, + } + } +} + +impl ByteBudgetSender { + async fn send(&self, msg: T, bytes: usize) -> Result<()> { + // Clamp to [1, max_budget]. + let clamped = u32::try_from(bytes) + .unwrap_or(u32::MAX) + .clamp(1, self.max_budget); + let permit = Arc::clone(&self.budget) + .acquire_many_owned(clamped) + .await + .map_err(|e| BepError::Internal(format!("nothing should close semaphore: {e}")))?; + self.inner + .send((permit, msg)) + .await + .map_err(|_| BepError::WriterClosed) + } +} + +fn byte_budget_channel( + count: usize, + max_budget: u32, +) -> ( + ByteBudgetSender, + mpsc::Receiver<(OwnedSemaphorePermit, T)>, +) { + let (tx, rx) = mpsc::channel(count); + let budget = Arc::new(Semaphore::new(max_budget as usize)); + ( + ByteBudgetSender { + inner: tx, + budget, + max_budget, + }, + rx, + ) +} + +/// Per-connection byte budgets. See `ByteBudgetSender` doc. +/// +/// `WRITER_BUDGET_BYTES` = 4 MiB: ~32 full Response payloads at the default +/// 128 KiB block size — well above latency-bound need, well below OOM territory +/// on small devices. +/// +/// `INDEX_BUDGET_BYTES` = 32 MiB: a full Index for a large folder can be +/// tens of MiB; this keeps multiple such batches queueable before backpressure +/// reaches the TCP reader. +/// +/// NOTE: When this budget is full, `run_message_loop` blocks on `index_tx.send()`. +/// This stops Pings from being sent. 32 MiB is large enough that we should +/// only hit this if the index task is severely stalled (e.g. disk IO) or +/// the peer is flooding us. +const WRITER_BUDGET_BYTES: u32 = 4 * 1024 * 1024; +const INDEX_BUDGET_BYTES: u32 = 32 * 1024 * 1024; + +/// Roughly accounts for the framing header and protobuf overhead on top of +/// the payload bytes. Constant — we don't care about per-byte accuracy. +const WRITER_PER_MESSAGE_OVERHEAD: usize = 16; + +/// The single owner of the outbound socket from the supervisor's point of +/// view: every BEP message we send goes through here, into the writer mpsc, +/// where `run_writer_loop` drains it serially. This serialization is the +/// invariant that makes concurrent sending safe — without it, the message loop +/// and the index loop could write to the socket concurrently and interleave +/// bytes mid-frame, corrupting the wire protocol. +/// +/// **Cloning is cheap and explicitly supported** (each clone is just another +/// `mpsc::Sender` reference) — `MessageWriter` is handed to every task that +/// needs to send. But every additional clone site changes the *message-level* +/// ordering observable by the peer: while bytes within a single message are +/// always contiguous, the relative order of messages from different tasks +/// is non-deterministic (e.g., an `IndexUpdate` from the index loop can +/// land between two `Response`s from the message loop). This is +/// protocol-legal but worth understanding before introducing a new sender. +#[derive(Clone)] +struct MessageWriter { + tx: ByteBudgetSender, +} + +impl MessageWriter { + async fn send(&self, msg_type: MessageType, msg: &M) -> Result<()> { + let payload = msg.encode_to_vec(); + let bytes = payload.len() + WRITER_PER_MESSAGE_OVERHEAD; + self.tx + .send(OutgoingMessage { msg_type, payload }, bytes) + .await + } +} + +enum IndexTaskMessage { + Index(Index), + IndexUpdate(IndexUpdate), +} + +/// Wraps a BepError with a priority used by the connection supervisor to +/// pick the most informative error when multiple worker tasks fail. Ranking: +/// storage/corruption > protocol > peer-level > network/IO > writer-closed-proxy +/// > peer-clean-close. +#[derive(Debug)] +struct WorkerError(BepError); + +impl WorkerError { + fn priority(&self) -> u8 { + match &self.0 { + BepError::Corruption(_) => 100, + BepError::Internal(_) => 90, + BepError::PeerBadHello(_) | BepError::PeerBadMessage(_) => 70, + BepError::PeerError { .. } | BepError::DeviceRejected => 60, + BepError::Standby(_) => 50, + BepError::NetworkError(_) => 20, + BepError::TransientIo(_) => 15, + BepError::WriterClosed => 5, + BepError::PeerClosed(_) => 1, + } + } +} + +impl PartialEq for WorkerError { + fn eq(&self, other: &Self) -> bool { + self.priority() == other.priority() + } +} + +impl Eq for WorkerError {} + +impl PartialOrd for WorkerError { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for WorkerError { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.priority().cmp(&other.priority()) + } +} + +#[cfg(test)] +mod worker_error_tests { + use super::*; + + #[test] + fn corruption_beats_peer_bad_message() { + let a = WorkerError(BepError::Corruption("x".into())); + let b = WorkerError(BepError::PeerBadMessage("y".into())); + assert!(a > b); + } + + #[test] + fn peer_bad_message_beats_network_error() { + let a = WorkerError(BepError::PeerBadMessage("x".into())); + let b = WorkerError(BepError::NetworkError("y".into())); + assert!(a > b); + } + + #[test] + fn network_error_beats_writer_closed() { + let a = WorkerError(BepError::NetworkError("x".into())); + let b = WorkerError(BepError::WriterClosed); + assert!(a > b); + } + + #[test] + fn internal_beats_network_error() { + let a = WorkerError(BepError::Internal("x".into())); + let b = WorkerError(BepError::NetworkError("y".into())); + assert!(a > b); + } +} + +async fn run_writer_loop( + mut writer: W, + mut rx: mpsc::Receiver<(OwnedSemaphorePermit, OutgoingMessage)>, +) -> Result<()> { + while let Some((_permit, msg)) = rx.recv().await { + let header = Header { + r#type: msg.msg_type as i32, + compression: MessageCompression::None as i32, + }; + if let Err(e) = framing::write_message(&mut writer, &header, &msg.payload, false).await { + return Err(BepError::NetworkError(e.to_string())); + } + // _permit drops here, releasing budget back to senders. + } + writer + .shutdown() + .await + .map_err(|e| BepError::NetworkError(e.to_string())) +} + +async fn run_index_loop( + inner: Arc>>, + ctx: ConnectionContext, + writer: MessageWriter, + mut rx: mpsc::Receiver<(OwnedSemaphorePermit, IndexTaskMessage)>, +) -> Result<()> { + loop { + // Shutdown / error semantics: + // - The shutdown branch abandons queued and in-flight Index work + // without draining. This is safe because `apply_remote_index` + // writes `set_remote_state` only at the end of a batch: the + // peer's last acked `max_sequence` is still authoritative, and on + // reconnect the peer resends from there. Mid-batch leftovers in + // the inbox are overwritten or completed idempotently next time. + // - On a fatal error we also call `ctx.shutdown.cancel()`. This is + // redundant in the steady state — the `JoinSet` supervisor in + // `run_connection_inner` cancels the token as soon as any worker + // returns — but it makes the index loop's behaviour + // supervisor-independent and self-documenting. + tokio::select! { + _ = ctx.shutdown.cancelled() => { + return Ok(()); + } + msg = rx.recv() => { + let (_permit, msg) = match msg { + Some(m) => m, + None => return Ok(()), + }; + let res = match msg { + IndexTaskMessage::Index(index) => { + handle_index(&inner, &writer, index, &ctx).await + } + IndexTaskMessage::IndexUpdate(update) => { + handle_index_update(&inner, &writer, update, &ctx).await + } + }; + // _permit drops here, releasing budget back to senders. + if let Err(e) = res { + ctx.shutdown.cancel(); + return Err(e); + } + } + } } } @@ -169,11 +433,11 @@ where /// loop. /// /// Returns `(mutual_folders, our_cc_folders)`. -async fn exchange_initial_cluster_config( +async fn exchange_initial_cluster_config( storage: &Arc, reader: &mut R, - writer: &mut W, - ctx: &ConnectionContext<'_>, + writer: &MessageWriter, + ctx: &ConnectionContext, shared_folders: &[FolderId], ) -> Result<( std::collections::HashMap, @@ -182,7 +446,6 @@ async fn exchange_initial_cluster_config( where S: Storage, R: AsyncRead + Unpin, - W: AsyncWrite + Unpin, { let mut initial_folders: Vec = shared_folders.to_vec(); let folders = ctx.retry("list_folders", || storage.list_folders()).await?; @@ -201,7 +464,9 @@ where ) .await?; tracing::debug!(folders = ?initial_folders, "sending ClusterConfig"); - send_typed_message(writer, MessageType::ClusterConfig, &cluster_config).await?; + writer + .send(MessageType::ClusterConfig, &cluster_config) + .await?; let peer_cc_msg = framing::read_message(reader).await?; if peer_cc_msg.header.r#type != MessageType::ClusterConfig as i32 { @@ -260,7 +525,7 @@ pub(crate) async fn run_connection( shared_folders, options, stream, - &shutdown, + shutdown.clone(), ) .await; @@ -283,26 +548,50 @@ async fn run_connection_inner( shared_folders: Vec, options: ConnectionOptions, stream: T, - shutdown: &CancellationToken, + shutdown: CancellationToken, ) -> Result where S: Storage, T: AsyncRead + AsyncWrite + Send + 'static, { - let (mut reader, mut writer) = tokio::io::split(stream); + let (mut reader, mut raw_writer) = tokio::io::split(stream); - perform_hello(&mut reader, &mut writer, &device_name).await?; + perform_hello(&mut reader, &mut raw_writer, &device_name).await?; + + // Set up the writer channel. From here on, no one writes to raw_writer + // directly — all sends go through MessageWriter -> mpsc -> run_writer_loop. + // The writer loop itself is spawned below in the JoinSet alongside the + // other worker tasks. The count cap (4096) is generous; the real bound is + // `WRITER_BUDGET_BYTES` enforced by `ByteBudgetSender`. + let (writer_tx, writer_rx) = byte_budget_channel::(4096, WRITER_BUDGET_BYTES); + let writer = MessageWriter { tx: writer_tx }; let ctx = ConnectionContext { remote_device, local_device, - resolver: resolver.as_ref(), - policy: options.retry_policy.as_ref(), - shutdown, + resolver: Arc::clone(&resolver), + policy: Arc::clone(&options.retry_policy), + shutdown: shutdown.clone(), }; + // Spawn the writer task FIRST so the cluster-config exchange below has + // something draining the writer channel. Without this, our outbound CC + // sits in the mpsc forever while we wait to read the peer's CC, and the + // peer is in the same state — startup deadlock. + let mut join_set: JoinSet, WorkerError>> = + JoinSet::new(); + join_set.spawn( + async move { + run_writer_loop(raw_writer, writer_rx) + .await + .map(|()| None) + .map_err(WorkerError) + } + .in_current_span(), + ); + let (mutual_folders, our_cc_folders) = - exchange_initial_cluster_config(&storage, &mut reader, &mut writer, &ctx, &shared_folders) + exchange_initial_cluster_config(&storage, &mut reader, &writer, &ctx, &shared_folders) .await?; let inner = Arc::new(Mutex::new(ConnectionInner { @@ -315,18 +604,99 @@ where deferred_blocks: std::collections::VecDeque::new(), })); - run_message_loop( - &inner, - &mut reader, - &mut writer, - &ctx, - options.ping_interval, - ) - .await + // Spawn the remaining two worker tasks into the same JoinSet. Each closure + // adapts the return type to Result, WorkerError>: + // - message loop → Ok(Some(reason)) | Err(WorkerError(e)) + // - index / writer → Ok(None) | Err(WorkerError(e)) + // + // The message loop owns index_tx and a writer clone; the index loop + // gets its own writer clone. No supervisor-side writer or index_tx is + // kept, so channel-close ordering is implicit via task exit: + // message loop exits → index_tx dropped → index loop exits → its + // writer clone dropped → writer channel closed → writer loop drains + // and exits. + + // Index processor — bound by bytes (`INDEX_BUDGET_BYTES`) rather than + // count, because Index payloads vary by orders of magnitude. The count + // cap (256) is just a safety net; the real bound is the byte budget. + // When the budget is full, the message loop's `index_tx.send().await` + // applies backpressure, which travels back to the reader socket + // naturally via TCP windowing. + let (index_tx, index_rx) = byte_budget_channel::(256, INDEX_BUDGET_BYTES); + + // Index loop — owns a writer clone; exits when index_tx is dropped. + join_set.spawn( + run_index_loop(Arc::clone(&inner), ctx.clone(), writer.clone(), index_rx) + .map(|r| r.map(|()| None).map_err(WorkerError)) + .in_current_span(), + ); + + // Message loop — owns reader, index_tx, and a writer clone. + join_set.spawn( + run_message_loop(inner, reader, writer, index_tx, ctx, options.ping_interval) + .map(|r| r.map(Some).map_err(WorkerError)) + .in_current_span(), + ); + + // Supervisor: wait for the first task to finish, then cancel and drain. + let mut first_done = false; + let mut errors: Vec = Vec::new(); + let mut close_reason: Option = None; + + loop { + let outcome = if first_done { + // After the first task finished we give the others 2 s to drain. + match tokio::time::timeout(Duration::from_secs(2), join_set.join_next()).await { + Ok(r) => r, + Err(_) => { + tracing::warn!("worker tasks did not shut down within 2s; abandoning"); + join_set.shutdown().await; + break; + } + } + } else { + join_set.join_next().await + }; + + match outcome { + None => break, // JoinSet exhausted + Some(join_result) => { + if !first_done { + first_done = true; + shutdown.cancel(); + } + match join_result { + Ok(Ok(reason)) => { + if close_reason.is_none() { + close_reason = reason; + } + } + Ok(Err(e)) => errors.push(e), + Err(e) => { + if e.is_cancelled() { + tracing::debug!(error = ?e, "task cancelled during shutdown"); + } else { + tracing::warn!(error = ?e, "task panicked"); + errors.push(WorkerError(BepError::Internal(format!( + "task panicked: {e}" + )))); + } + } + } + } + } + } + + // Return the highest-priority error, or the close reason if all succeeded. + match errors.into_iter().max() { + Some(e) => Err(e.0), + None => Ok(close_reason.unwrap_or(CloseReason::Local)), + } } + async fn build_cluster_config( storage: &Arc, - ctx: &ConnectionContext<'_>, + ctx: &ConnectionContext, local_device: &DeviceId, remote_device: &DeviceId, folders: &[FolderId], @@ -334,7 +704,10 @@ async fn build_cluster_config( let mut cc_folders = Vec::with_capacity(folders.len()); for folder_id in folders { let f = ctx.retry("folder", || storage.folder(*folder_id)).await?; - let seq = f.local_sequence().await.unwrap_or(Sequence::ZERO).get(); + let seq = ctx + .retry("local_sequence", || f.local_sequence()) + .await? + .get(); cc_folders.push(Folder { id: folder_id.to_string(), label: f.label().to_string(), @@ -368,11 +741,11 @@ async fn build_cluster_config( /// Returns `(new_mutual, new_for_our_cc)` where: /// - `new_mutual`: folders the peer proposed that weren't already in `current_mutual` /// - `new_for_our_cc`: subset of `new_mutual` absent from `our_cc` (caller adds to its CC set) -async fn process_peer_cc( +async fn process_peer_cc( peer_cc: &ClusterConfig, storage: &Arc, - writer: &mut W, - ctx: &ConnectionContext<'_>, + writer: &MessageWriter, + ctx: &ConnectionContext, current_mutual: &std::collections::HashMap, our_cc: &std::collections::HashSet, ) -> Result<(Vec<(FolderId, u64)>, Vec)> { @@ -438,7 +811,7 @@ async fn process_peer_cc( ) .await?; tracing::debug!(folders = ?all_our_cc, "sending updated ClusterConfig"); - send_typed_message(writer, MessageType::ClusterConfig, &updated_cc).await?; + writer.send(MessageType::ClusterConfig, &updated_cc).await?; } // Send Index for newly-mutual folders so the peer can see our local sequence. @@ -450,11 +823,11 @@ async fn process_peer_cc( } #[tracing::instrument(level = "info", skip(storage, writer, ctx), fields(folder_id = %folder_id))] -async fn send_index( +async fn send_index( storage: &Arc, - writer: &mut W, + writer: &MessageWriter, folder_id: FolderId, - ctx: &ConnectionContext<'_>, + ctx: &ConnectionContext, ) -> Result<()> { let folder = ctx.retry("folder", || storage.folder(folder_id)).await?; let mut stream = ctx.retry("index", || folder.index(Sequence::ZERO)).await?; @@ -474,20 +847,30 @@ async fn send_index( files, last_sequence: last_seq, }; - send_typed_message(writer, MessageType::Index, &index).await + writer.send(MessageType::Index, &index).await } -async fn apply_remote_index( +/// Apply a peer Index/IndexUpdate batch. +/// +/// **Cancellation invariant:** `set_remote_state` MUST remain the last +/// storage write in this function. `run_index_loop` may be cancelled +/// mid-batch (shutdown, fatal error elsewhere); when that happens the +/// per-file `apply_update` calls already made are persisted, but +/// `max_sequence` is unchanged. On reconnect the peer resends the batch +/// from the old `max_sequence` and `apply_update` no-ops on the +/// already-applied files. Moving `set_remote_state` earlier would break +/// this — partial application would advance the cursor and the unsent +/// files would be skipped forever. +async fn apply_remote_index( inner: &Arc>>, - writer: &mut W, + writer: &MessageWriter, folder_id_str: &str, files: &[FileInfo], last_sequence: i64, - ctx: &ConnectionContext<'_>, + ctx: &ConnectionContext, ) -> Result<()> where S: Storage, - W: AsyncWrite + Unpin, { let (storage, peer_index_id) = { let conn = inner.lock(); @@ -575,14 +958,14 @@ where } #[tracing::instrument(level = "info", skip(inner, writer, ctx, index), fields(folder_id = %index.folder), err)] -async fn handle_index( +async fn handle_index( inner: &Arc>>, - writer: &mut W, + writer: &MessageWriter, index: Index, - ctx: &ConnectionContext<'_>, + ctx: &ConnectionContext, ) -> Result<()> { tracing::debug!( - folder = %index.folder, + folder_id = %index.folder, files = index.files.len(), "received Index" ); @@ -597,15 +980,15 @@ async fn handle_index( .await } -#[tracing::instrument(level = "debug", skip(inner, writer, ctx), fields(folder_id = %update.folder), err)] -async fn handle_index_update( +#[tracing::instrument(level = "debug", skip(inner, writer, ctx, update), fields(folder_id = %update.folder), err)] +async fn handle_index_update( inner: &Arc>>, - writer: &mut W, + writer: &MessageWriter, update: IndexUpdate, - ctx: &ConnectionContext<'_>, + ctx: &ConnectionContext, ) -> Result<()> { tracing::debug!( - folder = %update.folder, + folder_id = %update.folder, files = update.files.len(), "received IndexUpdate" ); @@ -620,17 +1003,17 @@ async fn handle_index_update( .await } -async fn run_message_loop( - inner: &Arc>>, - reader: &mut R, - writer: &mut W, - ctx: &ConnectionContext<'_>, +async fn run_message_loop( + inner: Arc>>, + mut reader: R, + writer: MessageWriter, + index_tx: ByteBudgetSender, + ctx: ConnectionContext, ping_interval_duration: Duration, ) -> Result where S: Storage, R: AsyncRead + Unpin, - W: AsyncWrite + Unpin, { let mut ping_interval = time::interval(ping_interval_duration); ping_interval.reset(); // Don't fire immediately @@ -640,37 +1023,43 @@ where // Graceful shutdown _ = ctx.shutdown.cancelled() => { let close = Close { reason: "shutdown requested".into() }; - let _ = send_typed_message(writer, MessageType::Close, &close).await; + let _ = writer.send(MessageType::Close, &close).await; return Ok(CloseReason::Local); } // Keepalive _ = ping_interval.tick() => { let ping = Ping {}; - send_typed_message(writer, MessageType::Ping, &ping).await?; + writer.send(MessageType::Ping, &ping).await?; } // Inbound message - msg = framing::read_message(reader) => { + msg = framing::read_message(&mut reader) => { let msg = msg?; ping_interval.reset(); match MessageType::try_from(msg.header.r#type) { Ok(MessageType::Index) => { + let body_len = msg.body.len(); let index = Index::decode(msg.body).map_err(|e| BepError::PeerBadMessage(format!("protobuf decode error: {e}")))?; - handle_index(inner, writer, index, ctx).await?; + if index_tx.send(IndexTaskMessage::Index(index), body_len).await.is_err() { + return Err(BepError::Internal("index task closed".into())); + } } Ok(MessageType::IndexUpdate) => { + let body_len = msg.body.len(); let update = IndexUpdate::decode(msg.body).map_err(|e| BepError::PeerBadMessage(format!("protobuf decode error: {e}")))?; - handle_index_update(inner, writer, update, ctx).await?; + if index_tx.send(IndexTaskMessage::IndexUpdate(update), body_len).await.is_err() { + return Err(BepError::Internal("index task closed".into())); + } } Ok(MessageType::Request) => { let request = Request::decode(msg.body).map_err(|e| BepError::PeerBadMessage(format!("protobuf decode error: {e}")))?; - handle_request(inner, writer, request, ctx).await?; + handle_request(&inner, &writer, request, &ctx).await?; } Ok(MessageType::Response) => { let response = Response::decode(msg.body).map_err(|e| BepError::PeerBadMessage(format!("protobuf decode error: {e}")))?; - handle_response(inner, writer, response, ctx).await?; + handle_response(&inner, &writer, response, &ctx).await?; } Ok(MessageType::Ping) => { // No-op; receipt already reset the timer @@ -683,14 +1072,14 @@ where Ok(MessageType::ClusterConfig) => { let cc = ClusterConfig::decode(msg.body) .map_err(|e| BepError::PeerBadMessage(format!("protobuf decode error: {e}")))?; - handle_cluster_config_update(inner, writer, cc, ctx).await?; + handle_cluster_config_update(&inner, &writer, cc, &ctx).await?; } Ok(MessageType::DownloadProgress) => { let progress = DownloadProgress::decode(msg.body) .map_err(|e| BepError::PeerBadMessage(format!("protobuf decode error: {e}")))?; tracing::debug!( - device = %ctx.remote_device, - folder = %progress.folder, + remote_device = %ctx.remote_device, + folder_id = %progress.folder, updates = progress.updates.len(), "received download progress" ); @@ -709,15 +1098,14 @@ where /// BEP allows ClusterConfig to be sent more than once to add or update /// folders. We snapshot the current state without holding the connection lock /// across awaits, run `process_peer_cc`, then merge the new entries back. -async fn handle_cluster_config_update( +async fn handle_cluster_config_update( inner: &Arc>>, - writer: &mut W, + writer: &MessageWriter, cc: ClusterConfig, - ctx: &ConnectionContext<'_>, + ctx: &ConnectionContext, ) -> Result<()> where S: Storage, - W: AsyncWrite + Unpin, { let (storage, current_mutual, our_cc_snap) = { let conn = inner.lock(); @@ -744,11 +1132,11 @@ where Ok(()) } -async fn handle_request( +async fn handle_request( inner: &Arc>>, - writer: &mut W, + writer: &MessageWriter, request: Request, - ctx: &ConnectionContext<'_>, + ctx: &ConnectionContext, ) -> Result<()> { let storage = { let conn = inner.lock(); @@ -763,7 +1151,7 @@ async fn handle_request( id = request.id, offset = request.offset, size = request.size, - folder = %request.folder, + folder_id = %request.folder, file = %request.name, "rejecting request with negative offset or size" ); @@ -793,14 +1181,14 @@ async fn handle_request( } }; - send_typed_message(writer, MessageType::Response, &response).await + writer.send(MessageType::Response, &response).await } -async fn handle_response( +async fn handle_response( inner: &Arc>>, - writer: &mut W, + writer: &MessageWriter, response: Response, - ctx: &ConnectionContext<'_>, + ctx: &ConnectionContext, ) -> Result<()> { let pending = { let mut conn = inner.lock(); @@ -813,7 +1201,7 @@ async fn handle_response( tracing::warn!( id = response.id, code = response.code, - folder = %block.folder.id(), + folder_id = %block.folder.id(), file = %block.name, "peer returned error for block request" ); @@ -832,7 +1220,7 @@ async fn handle_response( .await?; tracing::debug!( - folder = %block.folder.id(), + folder_id = %block.folder.id(), file = %block.name, offset = block.offset, "stored block from peer" @@ -862,24 +1250,24 @@ async fn handle_response( /// Pure messaging — no storage interaction. `fi` must already be committed to the /// index with its sequence number assigned (e.g. returned as `Applied(fi)` from /// `apply_update`, or as `Some(fi)` from `complete_file`). -async fn send_index_update( +async fn send_index_update( fi: &FileInfo, folder_id: FolderId, - writer: &mut W, + writer: &MessageWriter, ) -> Result<()> { let seq = fi.sequence; tracing::debug!(file = %fi.name, sequence = seq, "sending IndexUpdate"); - send_typed_message( - writer, - MessageType::IndexUpdate, - &IndexUpdate { - folder: folder_id.to_string(), - files: vec![fi.clone()], - last_sequence: seq, - prev_sequence: 0, - }, - ) - .await + writer + .send( + MessageType::IndexUpdate, + &IndexUpdate { + folder: folder_id.to_string(), + files: vec![fi.clone()], + last_sequence: seq, + prev_sequence: 0, + }, + ) + .await } /// Call `complete_file` on storage and, if the file was committed, send a single-file @@ -887,27 +1275,27 @@ async fn send_index_update( /// /// Returns without sending if the file still has pending or deferred block requests, /// or if `complete_file` returns `None` (version mismatch / not staged). -async fn complete_and_notify( +async fn complete_and_notify( inner: &Arc>>, folder: &S::Folder, name: &str, version: Option<&crate::proto::bep::Vector>, - writer: &mut W, - ctx: &ConnectionContext<'_>, + writer: &MessageWriter, + ctx: &ConnectionContext, ) -> Result<()> { if let Some(fi) = maybe_complete_file(inner, folder, name, version, ctx).await? { let seq = fi.sequence; - send_typed_message( - writer, - MessageType::IndexUpdate, - &IndexUpdate { - folder: folder.id().to_string(), - files: vec![fi], - last_sequence: seq, - prev_sequence: 0, - }, - ) - .await?; + writer + .send( + MessageType::IndexUpdate, + &IndexUpdate { + folder: folder.id().to_string(), + files: vec![fi], + last_sequence: seq, + prev_sequence: 0, + }, + ) + .await?; } Ok(()) } @@ -917,32 +1305,30 @@ async fn maybe_complete_file( folder: &S::Folder, name: &str, version: Option<&crate::proto::bep::Vector>, - ctx: &ConnectionContext<'_>, + ctx: &ConnectionContext, ) -> Result> { let should_complete = { let conn = inner.lock(); let folder_id = folder.id(); - let has_pending = conn + !conn .pending_requests .values() - .any(|p| p.folder.id() == folder_id && p.name == name); - let has_deferred = conn - .deferred_blocks - .iter() - .any(|d| d.block.folder.id() == folder_id && d.block.name == name); - !has_pending && !has_deferred + .any(|p| p.folder.id() == folder_id && p.name == name) + && !conn + .deferred_blocks + .iter() + .any(|d| d.block.folder.id() == folder_id && d.block.name == name) }; - - if should_complete { - let committed = ctx - .retry("complete_file", || folder.complete_file(name, version)) - .await?; - if committed.is_some() { - tracing::info!(folder = %folder.id(), file = %name, "file transfer complete, promoted to index"); - } - return Ok(committed); + if !should_complete { + return Ok(None); } - Ok(None) + let committed = ctx + .retry("complete_file", || folder.complete_file(name, version)) + .await?; + if committed.is_some() { + tracing::info!(folder_id = %folder.id(), file = %name, "file transfer complete, promoted to index"); + } + Ok(committed) } /// Submit a single block `Request` to the peer, or defer it if the pipeline @@ -954,23 +1340,22 @@ async fn maybe_complete_file( /// If `pending_requests` is at `max_pending_requests`, pushes a /// `DeferredRequest` onto the back of `deferred_blocks` instead. The deferred /// queue is drained from the front by `drain_deferred` as responses come in. -async fn submit_or_defer_block( +async fn submit_or_defer_block( inner: &Arc>>, - writer: &mut W, + writer: &MessageWriter, block: FileBlock, size: i32, block_no: i32, ) -> Result<()> where S: Storage, - W: AsyncWrite + Unpin, { let request = { let mut conn = inner.lock(); if conn.pending_requests.len() >= conn.max_pending_requests { tracing::debug!( - folder = %block.folder.id(), + folder_id = %block.folder.id(), file = %block.name, block_no, max = conn.max_pending_requests, @@ -999,15 +1384,15 @@ where req }; - send_typed_message(writer, MessageType::Request, &request).await + writer.send(MessageType::Request, &request).await } -async fn drain_deferred( +async fn drain_deferred( inner: &Arc>>, - writer: &mut W, - ctx: &ConnectionContext<'_>, + writer: &MessageWriter, + ctx: &ConnectionContext, ) -> Result<()> { - loop { + while !ctx.shutdown.is_cancelled() { let deferred = { let mut conn = inner.lock(); if conn.pending_requests.len() >= conn.max_pending_requests { @@ -1034,7 +1419,7 @@ async fn drain_deferred( if reused { tracing::debug!( - folder = %deferred.block.folder.id(), + folder_id = %deferred.block.folder.id(), file = %deferred.block.name, block_no = deferred.block_no, "deferred block already present, skipping" @@ -1067,26 +1452,29 @@ async fn drain_deferred( Ok(()) } -#[tracing::instrument(level = "debug", skip(inner, writer, folder, ctx), fields(file = %file.name), err)] -async fn request_blocks( +#[tracing::instrument(level = "debug", skip(inner, writer, folder, ctx, file), fields(file = %file.name), err)] +async fn request_blocks( inner: &Arc>>, - writer: &mut W, + writer: &MessageWriter, folder: &S::Folder, file: &FileInfo, - ctx: &ConnectionContext<'_>, + ctx: &ConnectionContext, ) -> Result<()> { - tracing::debug!(file = %file.name, total_blocks = file.blocks.len(), "requesting blocks"); + tracing::debug!(total_blocks = file.blocks.len(), "requesting blocks"); for (i, block) in file.blocks.iter().enumerate() { + if ctx.shutdown.is_cancelled() { + break; + } // Skip if we already have this block (e.g. from a rename/move). let reused = ctx .retry("reuse_block", || { folder.reuse_block(&file.name, block.offset, &block.hash, block.size) }) .await?; - tracing::debug!(file = %file.name, block_no = i, reused, "reuse_block"); + tracing::debug!(block_no = i, reused, "reuse_block"); if reused { tracing::debug!( - folder = %folder.id(), file = %file.name, block_no = i, + folder_id = %folder.id(), block_no = i, "block already present, skipping request" ); continue; @@ -1109,19 +1497,6 @@ async fn request_blocks( Ok(()) } -async fn send_typed_message( - writer: &mut W, - msg_type: MessageType, - msg: &M, -) -> Result<()> { - let header = Header { - r#type: msg_type as i32, - compression: MessageCompression::None as i32, - }; - let body = msg.encode_to_vec(); - framing::write_message(writer, &header, &body, false).await -} - #[cfg(test)] mod tests { use super::*; @@ -1131,4 +1506,18 @@ mod tests { let r = CloseReason::Local; assert!(format!("{r:?}").contains("Local")); } + + #[tokio::test] + async fn message_writer_send_returns_writer_closed_when_channel_is_closed() { + let (tx, rx) = byte_budget_channel(1, 1024); + let writer = MessageWriter { tx }; + + // Drop the receiver to close the channel + drop(rx); + + let msg = Ping {}; + let res = writer.send(MessageType::Ping, &msg).await; + + assert!(matches!(res, Err(BepError::WriterClosed))); + } } diff --git a/bepository-bep/src/error.rs b/bepository-bep/src/error.rs index 3dd0bb9..27c76f8 100644 --- a/bepository-bep/src/error.rs +++ b/bepository-bep/src/error.rs @@ -103,6 +103,15 @@ pub enum BepError { #[error("network error: {0}")] NetworkError(String), + /// The writer task has exited; sends through MessageWriter no longer succeed. + /// This is a proxy error — the writer's real exit reason (network I/O failure, + /// panic, etc.) is reported separately as that task's own error. WriterClosed + /// has a low priority in the WorkerError ranking — only `PeerClosed` (the + /// clean peer-initiated close) ranks lower — so it is unlikely to mask a + /// more informative error from a sibling task. + #[error("writer task closed")] + WriterClosed, + #[error("device rejected by event handler")] DeviceRejected, } diff --git a/bepository-bep/src/storage.rs b/bepository-bep/src/storage.rs index 2189701..d34f2ca 100644 --- a/bepository-bep/src/storage.rs +++ b/bepository-bep/src/storage.rs @@ -177,6 +177,11 @@ pub trait StorageFolder: Clone + Send + Sync + 'static { /// Returns the committed [`FileInfo`] with its locally-assigned sequence /// number on success, or `None` if this was a no-op (not staged, or /// version mismatch). + /// + /// **Concurrency:** Implementations MUST be safe under concurrent calls + /// for the same `name`. Concurrent callers may both see the staged + /// entry and write identical commit data; the resulting peer-visible + /// state is the same single commit regardless of execution order. async fn complete_file( &self, name: &str, diff --git a/bepository-bep/tests/integration.rs b/bepository-bep/tests/integration.rs index 47d8ab1..9c89754 100644 --- a/bepository-bep/tests/integration.rs +++ b/bepository-bep/tests/integration.rs @@ -29,7 +29,7 @@ use bepository_bep::test_utils::{ }; use bepository_bep::{ BepEngine, BepError, CloseReason, ConnectionOptions, DeviceId, EngineEvent, FolderId, - ImmediateRetry, Storage, StorageError, + ImmediateRetry, Storage, StorageError, StorageFolder, }; use bytes::Bytes; @@ -234,7 +234,10 @@ async fn block_requests_are_served() { .await .expect("should not time out") .expect("channel should deliver"); - assert!(matches!(reason, CloseReason::Remote(_))); + assert!( + matches!(reason, CloseReason::Remote(_)), + "expected CloseReason::Remote, got: {reason:?}" + ); } // --------------------------------------------------------------------------- @@ -1266,3 +1269,106 @@ async fn storage_standby_closes_connection() { "B should have no peers after Standby" ); } + +// --------------------------------------------------------------------------- +// Concurrent index and block processing: no duplicate completions +// --------------------------------------------------------------------------- +// +// Processing an Index batch runs in a separate task from handling Responses. +// Both can call into `complete_and_notify` for files staged by the index loop. +// This test asserts that the completion mechanism is idempotent: even under +// concurrent pressure, a file is committed exactly once, and B's +// `local_sequence` increments exactly once per distinct file. +#[tokio::test] +async fn many_files_complete_exactly_once_under_concurrent_pressure() { + init_tracing(); + + let dev_a = make_device(1); + let dev_b = make_device(2); + + let storage_a = MemoryStorage::new(); + let f_a = storage_a.folder(FolderId::from("shared")).await.unwrap(); + + // 20 single-block files. Each unique payload → unique block hash. + const N: usize = 20; + let mut files: Vec<(String, Bytes, [u8; 32])> = Vec::with_capacity(N); + for i in 0..N { + let name = format!("file-{i:02}.bin"); + let data = Bytes::from(format!("payload-for-{name}")); + let mut hash = [0u8; 32]; + hash[0] = i as u8; + hash[1] = (i >> 8) as u8; + files.push((name, data, hash)); + } + for (name, data, hash) in &files { + f_a.insert_file(make_file_with_blocks(name, &[(1, 1)], &[*hash])) + .await; + f_a.insert_block(name, 0, data.clone()).await; + } + + let storage_b = MemoryStorage::new(); + let f_b = storage_b.folder(FolderId::from("shared")).await.unwrap(); + + let engine_a = BepEngine::new( + storage_a, + dev_a, + "node-a".into(), + vec!["shared".into()], + resolver(), + ); + let engine_b = BepEngine::new( + storage_b, + dev_b, + "node-b".into(), + vec!["shared".into()], + resolver(), + ); + + let (stream_a, stream_b) = tokio::io::duplex(128 * 1024); + let handle_a = engine_a.connect(stream_a, dev_b).await.unwrap(); + let handle_b = engine_b.accept(stream_b, dev_a).await.unwrap(); + + // Wait until B has every file in its committed index. Poll with a short + // sleep until completion. + let deadline = std::time::Instant::now() + Duration::from_secs(10); + loop { + let mut all_present = true; + for (name, _, _) in &files { + if f_b.get_file(name).await.is_none() { + all_present = false; + break; + } + } + if all_present { + break; + } + if std::time::Instant::now() > deadline { + panic!("not all files were committed on B within 10s"); + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + + // Every file present with its block stored. + for (name, data, _) in &files { + let got = f_b.get_block(name, 0).await; + assert_eq!( + got.as_deref(), + Some(data.as_ref()), + "block for {name} should be stored on B" + ); + } + + // The key invariant: B's local_sequence advances exactly once per file + // completion. If complete_file ran twice for any file, the counter + // would be > N. (>N is also possible from index re-staging, but A + // sends each file exactly once in a single Index.) + let seq = f_b.local_sequence().await.unwrap().get(); + assert_eq!( + seq, N as i64, + "B's local_sequence must equal N={N} (one increment per completion); \ + a higher value indicates duplicate complete_file calls" + ); + + handle_a.shutdown.cancel(); + let _ = tokio::time::timeout(Duration::from_secs(5), handle_b.closed).await; +} diff --git a/bepository-storage/Cargo.toml b/bepository-storage/Cargo.toml index 40bb1bf..540d566 100644 --- a/bepository-storage/Cargo.toml +++ b/bepository-storage/Cargo.toml @@ -26,6 +26,7 @@ hex = "0.4" humantime.workspace = true humantime-serde = "1" linear-map.workspace = true +lockable.workspace = true object_store.workspace = true parking_lot.workspace = true prost.workspace = true @@ -46,6 +47,9 @@ test-utils = ["bepository-bep/test-utils"] [dev-dependencies] bepository-bep = { path = "../bepository-bep", features = ["test-utils"] } +bepository-storage = { path = ".", features = ["test-utils"] } +futures = { workspace = true } +prost = { workspace = true } tokio = { version = "1", features = ["rt-multi-thread", "macros"] } [build-dependencies] diff --git a/bepository-storage/OVERVIEW.md b/bepository-storage/OVERVIEW.md index 9beeffe..a58e83d 100644 --- a/bepository-storage/OVERVIEW.md +++ b/bepository-storage/OVERVIEW.md @@ -72,6 +72,11 @@ The inbox holds file metadata during block transfer. sequence mapping are deleted, and the new file entry and sequence mapping are written atomically. +**Concurrency:** Per-name locks (`name_locks`) protect staging and promotion; a +global `seq_lock` serializes sequence allocation. This ensures safe, idempotent +promotion (e.g., concurrent Index/Message loop updates) without blocking +operations on different files. + Version comparison uses the committed index only. The inbox is not authoritative. @@ -100,6 +105,11 @@ persistence from the wire format, ensuring stability across bepository versions. ## Invariants +- **Index mutation serialization:** `name_locks` serializes `n/`, `s/`, `in/` + mutations per name; `seq_lock` serializes the `IX_KEY` allocation. + `commit_with_new_seq` must remain the sole writer of `n/` and `s/`; enforced + at the type level by `LockedFileName`. Compaction may drop dead `n/`, `s/`, + `in/` entries outside any lock (see Compaction GC); `ix` is preserved. - **Index commit atomicity:** All mutations for file promotion happen in a single atomic batch. - **Block reference integrity:** Canonical block data is verified to exist @@ -190,4 +200,4 @@ or skipping. negative not possible, but if dropped), it forces a re-request from peers, safely rewriting it. - **Crash safety:** Bloom filters are in-memory. SlateDB handles crash recovery - independently. + independently. s are in-memory. SlateDB handles crash recovery independently. diff --git a/bepository-storage/src/api.rs b/bepository-storage/src/api.rs index beadc77..0751861 100644 --- a/bepository-storage/src/api.rs +++ b/bepository-storage/src/api.rs @@ -262,7 +262,8 @@ impl SlateStorage { let mut builder = DbReaderBuilder::new(folder_sk.to_string(), self.inner.object_store.clone()) - .with_checkpoint_id(id); + .with_checkpoint_id(id) + .with_filter_policies(crate::store_keys::make_filter_policies()); if let Some(cache) = db_cache { builder = builder.with_db_cache(cache); } @@ -925,16 +926,25 @@ impl Activated { let mut compactor_builder = CompactorBuilder::new(path.clone(), self.object_store.clone()) .with_runtime(self.runtime.clone()) - .with_compaction_filter_supplier(supplier); + .with_compaction_filter_supplier(supplier) + .with_filter_policies(crate::store_keys::make_filter_policies()); if full_compaction { compactor_builder = compactor_builder .with_scheduler_supplier(Arc::new(FullCompactionSchedulerSupplier)); } + // Shorten poll interval for one-shot compaction so the scheduler + // triggers within ~1s instead of the 60s default. + let mut settings = make_db_settings(); + if full_compaction && let Some(c) = settings.compactor_options.as_mut() { + c.poll_interval = Duration::from_secs(1); + } + let db = Db::builder(path.clone(), self.object_store.clone()) .with_gc_runtime(self.runtime.clone()) .with_db_cache(db_cache) - .with_settings(make_db_settings()) + .with_settings(settings) + .with_filter_policies(crate::store_keys::make_filter_policies()) .with_compactor_builder(compactor_builder) .build() .await @@ -1034,7 +1044,31 @@ impl Activated { let store = self.open_folder_store(sk.as_str(), true).await?; - // Closing flushes the memtable and waits for the compactor to finish. + // Db::close() cancels background tasks. Wait for L0 to drain before + // closing to ensure the compaction isn't aborted mid-upload. + // + // TODO: Polling L0 emptiness is a proxy for compaction completion. + // Replace with a proper completion signal once SlateDB exposes one. + let admin = AdminBuilder::new(sk.to_string(), self.object_store.clone()).build(); + let timeout = Duration::from_secs(3600); + let start = std::time::Instant::now(); + tokio::time::sleep(Duration::from_secs(2)).await; + loop { + let view = admin + .read_compactor_state_view() + .await + .map_err(|e| StorageError::TransientIo(format!("read compactor state: {e}")))?; + if view.manifest().l0().is_empty() { + break; + } + if start.elapsed() > timeout { + return Err(StorageError::TransientIo( + "compaction timed out after 1h".into(), + )); + } + tokio::time::sleep(Duration::from_secs(2)).await; + } + store.close().await?; Ok(()) } @@ -1178,6 +1212,24 @@ impl StorageInspectorForTests for SlateFolder { } impl SlateFolder { + /// Write raw bytes to a key in the underlying DB. For testing only. + #[cfg(any(test, feature = "test-utils"))] + pub async fn put_raw(&self, key: Vec, value: Vec) { + use slatedb::config::PutOptions; + use slatedb::config::WriteOptions; + self.store + .db + .put_with_options(key, value, &PutOptions::default(), &WriteOptions::default()) + .await + .expect("put_raw"); + } + + /// Return the current epoch (test helper). + #[cfg(any(test, feature = "test-utils"))] + pub fn epoch(&self) -> Option { + self.epoch + } + fn require_epoch(&self) -> Result { self.epoch.ok_or_else(|| { StorageError::Standby("epoch not set: call activate() before this operation".into()) @@ -1391,36 +1443,44 @@ async fn make_block_cache(cache_dir: Option) -> Arc { /// Creates per-folder SlateDB settings for cold/archival workloads. /// -/// Tuned for battery efficiency: fewer wakeups, serialized I/O, and longer -/// intervals between polls and flushes. `max_unflushed_bytes` stays at 4 MiB -/// so individual flush uploads remain bounded (avoids timeouts on large files). +/// Optimized for slow (≥10 Mbps) uplinks with a 60s compactor cadence. +/// +/// Background: +/// - `l0_sst_size_bytes`: Bounds foreground stall duration (stall = size / speed). +/// - `max_unflushed_bytes`: Burst capacity before back-pressure pauses the writer. fn make_db_settings() -> Settings { Settings { - // WAL flush: 60s instead of 100ms default. The 4 MiB unflushed-bytes - // cap still triggers size-based flushes for large file ingestion. + // WAL flush interval. Size-based flush still fires at `l0_sst_size_bytes`. flush_interval: Some(Duration::from_secs(60)), - // Backpressure threshold: keeps individual L0 SST uploads small so - // large file ingestion never blocks for longer than a single ~4 MiB - // upload on a slow connection. - max_unflushed_bytes: 4 * 1024 * 1024, - // Single-writer setup: no need to detect remote manifest changes often. + // Memtable freeze threshold. Sized to 8 MiB. + l0_sst_size_bytes: 8 * 1024 * 1024, + // Memory ceiling for frozen memtables. Allows ~8 memtables in flight (64 MiB). + max_unflushed_bytes: 64 * 1024 * 1024, + // Manifest poll interval. Long to minimize battery/request costs. manifest_poll_interval: Duration::from_secs(120), - // Serialize L0 SST uploads to avoid CPU/radio bursts. Cold storage - // doesn't need flush throughput. + // Serializes L0 flushes to avoid bandwidth contention and timeouts on slow links. l0_flush_parallelism: 1, - // More L0 headroom: with slower compaction polling the L0 backlog can - // grow before compaction catches up, so raise the stall threshold. - l0_max_ssts: 16, + // Back-pressure threshold. Caps L0 at 64 MiB. Higher values increase + // burst capacity but slow down `read_dir` scans on `n/` keys. + l0_max_ssts: 64, compactor_options: Some(CompactorOptions { - // Wake up 12x less often to check whether compaction is needed. + // Battery floor: each tick triggers a GCS manifest GET that keeps + // the radio awake even when there is nothing to compact. poll_interval: Duration::from_secs(60), - // Serial compaction: halves CPU burst, fine for cold/archival use. + // Serial compaction: bounds CPU and network bursts. Fine for + // cold/archival; L0 headroom above absorbs flushes that arrive + // during a compaction cycle. max_concurrent_compactions: 1, - // Fewer parallel fetch tasks reduces network bursts during compaction. + // Bound parallel L0 fetches during compaction. Two is enough to + // overlap download with merge work without saturating the link. max_fetch_tasks: 2, - // 64 MiB SSTs upload in ~51s at 10 Mbps upload. - // Default 256 MiB would take 3+ minutes on the same connection. - max_sst_size: 64 * 1024 * 1024, + // L1+ output chunk size. Compactor uploads are background work, + // so per-SST upload time only affects compaction wall-clock, not + // writer latency — pick the largest size that still completes + // each upload within a single GCS retry window on the slow link + // (~107 s @ 10 Mbps; ~11 s @ 100 Mbps). The 256 MiB default + // exceeds 3 min on a slow link and risks transient timeouts. + max_sst_size: 128 * 1024 * 1024, ..CompactorOptions::default() }), ..Default::default() diff --git a/bepository-storage/src/compaction.rs b/bepository-storage/src/compaction.rs index fa3bbd5..e1f50a3 100644 --- a/bepository-storage/src/compaction.rs +++ b/bepository-storage/src/compaction.rs @@ -42,6 +42,52 @@ const BLOOM_CAPACITY: usize = 5_000_000; /// False-positive rate for the compaction bloom filter. const BLOOM_FP_RATE: f64 = 0.001; +/// Scan `prefix` against `snapshot`, decode each entry, and insert every block +/// hash from the contained `FileInfo` into `bloom`. A wrong-length hash is +/// treated as corruption: the compaction aborts rather than silently produce +/// a bloom that drops live data. +async fn index_blocks_under_prefix( + snapshot: &slatedb::DbSnapshot, + prefix: &[u8], + bloom: &AtomicBloomFilter, + extract: F, +) -> Result<(), CompactionFilterError> +where + F: Fn(bytes::Bytes) -> Result, prost::DecodeError>, +{ + let mut iter = snapshot + .scan_prefix(prefix) + .await + .map_err(|e| CompactionFilterError::CreationError(crate::store::slate_err(e).into()))?; + while let Some(kv) = iter + .next() + .await + .map_err(|e| CompactionFilterError::CreationError(crate::store::slate_err(e).into()))? + { + let fi = match extract(kv.value) + .map_err(|e| CompactionFilterError::CreationError(format!("decode: {e}").into()))? + { + Some(fi) => fi, + None => continue, + }; + for block in &fi.blocks { + let hash: &[u8; store_keys::HASH_LEN] = + block.hash.as_slice().try_into().map_err(|_| { + CompactionFilterError::CreationError( + format!( + "invalid hash length {} in file {}", + block.hash.len(), + fi.name + ) + .into(), + ) + })?; + bloom.insert(hash); + } + } + Ok(()) +} + /// Factory that creates a [`GcFilter`] for each compaction job. /// /// Holds a late-binding reference to the [`FolderStore`] (via `OnceLock`) @@ -89,31 +135,25 @@ impl CompactionFilterSupplier for GcFilterSupplier { // Register with shared compaction state before scanning so writes go into our bloom filter. let job = self.gc.register(bloom.clone()); - // Committed files (n/ entries). - let files = store - .all_files() - .await - .map_err(|e| CompactionFilterError::CreationError(e.into()))?; - for file in &files { - for block in &file.blocks { - if block.hash.len() == store_keys::HASH_LEN { - bloom.insert(&block.hash[..]); - } - } - } - - // In-progress transfers (in// entries). - let inbox_files = store - .inbox_files(self.epoch) - .await - .map_err(|e| CompactionFilterError::CreationError(e.into()))?; - for file in &inbox_files { - for block in &file.blocks { - if block.hash.len() == store_keys::HASH_LEN { - bloom.insert(&block.hash[..]); - } - } - } + // Single snapshot covers both scans: `complete_file` atomically moves + // an entry from `in//` to `n/`, so scanning the two + // prefixes against the same point-in-time view guarantees every live + // file appears in exactly one of them. Independent scans would each + // take their own snapshot and could miss a file mid-transition. + let snapshot = + store.db.snapshot().await.map_err(|e| { + CompactionFilterError::CreationError(crate::store::slate_err(e).into()) + })?; + + let inbox_prefix = store_keys::inbox_key(self.epoch, ""); + index_blocks_under_prefix(&snapshot, &inbox_prefix, &bloom, |bytes| { + crate::proto::storage::Inbox::decode(bytes).map(|i| i.file_info) + }) + .await?; + index_blocks_under_prefix(&snapshot, store_keys::FILE_PREFIX, &bloom, |bytes| { + crate::proto::storage::File::decode(bytes).map(|f| f.file_info) + }) + .await?; // Compute peer floor for sequence/tombstone pruning. let peer_floor = store diff --git a/bepository-storage/src/store.rs b/bepository-storage/src/store.rs index 50a367e..0df4961 100644 --- a/bepository-storage/src/store.rs +++ b/bepository-storage/src/store.rs @@ -8,6 +8,7 @@ use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; use bytes::Bytes; +use lockable::{LockPool, Lockable}; use parking_lot::RwLock; use prost::Message; use slatedb::config::{PutOptions, WriteOptions}; @@ -106,6 +107,22 @@ impl CompactionState { } } +/// Witness that the caller holds [`FolderStore::name_locks`] for `name`. +/// +/// Constructed only by [`FolderStore::lock_filename`]. Acts as a compile-time +/// proof for functions that mutate per-name state and must run under +/// the lock. +struct LockedFileName<'a> { + name: String, + _guard: as Lockable>::Guard<'a>, +} + +impl LockedFileName<'_> { + fn name(&self) -> &str { + &self.name + } +} + /// Per-folder SlateDB wrapper. /// /// Each shared folder gets its own SlateDB instance with the key layout @@ -113,9 +130,13 @@ impl CompactionState { pub(crate) struct FolderStore { pub(crate) db: Db, pub(crate) gc: Arc, - /// Serialises all operations that allocate a sequence number - /// (`put_file`, `complete_file`) so the read-modify-write on - /// `max_sequence` is never interleaved. + /// Per-name async lock pool. Serializes `n/`, `s/`, and `in/` + /// mutations for a given name. Witnessed by `LockedFileName`. + /// Compaction may drop dead entries outside this lock; see + /// `compaction.rs`. + name_locks: LockPool, + /// Guards the `IX_KEY` RMW and its batch write. Held briefly, + /// in-memory only. Compaction preserves `ix`. seq_lock: Mutex<()>, } @@ -124,10 +145,21 @@ impl FolderStore { Self { db, gc, + name_locks: LockPool::new(), seq_lock: Mutex::new(()), } } + /// Acquire the per-name lock. + async fn lock_filename(&self, name: impl Into) -> LockedFileName<'_> { + let name = name.into(); + let guard = self.name_locks.async_lock(name.clone()).await; + LockedFileName { + name, + _guard: guard, + } + } + async fn put_non_durable(&self, key: K, value: V) -> Result<(), StorageError> where K: AsRef<[u8]>, @@ -207,34 +239,33 @@ impl FolderStore { } } - /// The single code path that allocates a sequence number and commits a - /// file entry. + /// Allocate a sequence number and commit a file entry. /// - /// Holds `seq_lock` for the entire read-modify-write so no two callers - /// can observe the same `max_sequence`. All sequence-related keys - /// (old cleanup, new allocation, metadata bump, file entry, seq mapping) - /// plus any caller-supplied `extra_batch` operations are written in one - /// `WriteBatch`. + /// The prior-entry read runs outside `seq_lock`: the per-name lock + /// excludes other `commit_with_new_seq` calls for this name, and this + /// fn is the sole writer of `file_key`/`seq_key`. Compaction may drop + /// dead entries concurrently; memtable writes win on reads. /// - /// **Every** public method that needs a new sequence number MUST go - /// through this function — never read/increment `max_sequence` directly. - async fn commit_file_with_seq( + /// INVARIANT: must remain the sole writer of `file_key`/`seq_key` in + /// this module. + async fn commit_with_new_seq( &self, - name: &str, + locked_filename: &LockedFileName<'_>, file: FileInfo, - extra_batch: WriteBatch, - ) -> Result { - let _guard = self.seq_lock.lock().await; - - let mut batch = extra_batch; + mut batch: WriteBatch, + ) -> Result<(i64, FileInfo), StorageError> { + let name = locked_filename.name(); // Remove old sequence entry if file already exists. + // Safe outside seq_lock — see function doc. if let Some(old) = self.get_file(name).await? && old.sequence > 0 { batch.delete(store_keys::seq_key(old.sequence)?); } + let _guard = self.seq_lock.lock().await; + // Allocate next sequence. let mut meta = self.get_index_meta().await?; meta.max_sequence += 1; @@ -245,13 +276,13 @@ impl FolderStore { stored.sequence = seq; let file_wrapper = File { - file_info: Some(stored.into()), + file_info: Some(stored.clone().into()), }; batch.put(store_keys::file_key(name), file_wrapper.encode_to_vec()); batch.put(store_keys::seq_key(seq)?, name.as_bytes()); self.write_non_durable(batch).await?; - Ok(seq) + Ok((seq, stored)) } /// Insert or update a file in the index. Handles sequence bookkeeping. @@ -260,8 +291,11 @@ impl FolderStore { /// update, file entry, sequence entry) are written in a single `WriteBatch` /// to ensure atomicity. pub async fn put_file(&self, file: &FileInfo) -> Result { - self.commit_file_with_seq(&file.name, file.clone(), WriteBatch::new()) - .await + let locked_filename = self.lock_filename(&file.name).await; + let (seq, _) = self + .commit_with_new_seq(&locked_filename, file.clone(), WriteBatch::new()) + .await?; + Ok(seq) } // --- Full index scan --- @@ -308,9 +342,17 @@ impl FolderStore { // --- Inbox (two-phase file intake) --- /// Stage a file in the inbox for block transfer. + /// + /// The inbox key is unique per `(epoch, name)`; a later `stage_file` for + /// the same name simply overwrites the entry (last-write-wins). + /// + /// Holds the per-name lock to serialize against `complete_file`'s + /// read-check-commit sequence; this ensures we don't overwrite the inbox + /// just as a concurrent completion is trying to promote it. #[tracing::instrument(level = "debug", skip_all, fields(file = %file.name, epoch = %epoch.as_base32()))] pub async fn stage_file(&self, epoch: Epoch, file: &FileInfo) -> Result<(), StorageError> { - let key = store_keys::inbox_key(epoch, &file.name); + let locked_filename = self.lock_filename(&file.name).await; + let key = store_keys::inbox_key(epoch, locked_filename.name()); let inbox_wrapper = Inbox { file_info: Some(file.clone().into()), }; @@ -321,8 +363,14 @@ impl FolderStore { /// Promote a staged file from inbox to committed index. /// - /// Atomically: delete inbox entry, write to `n/` with sequence, - /// write `s/` mapping. No-op if no inbox entry exists. + /// Holds the per-name lock across the entire read-check-commit so a + /// concurrent `stage_file` (e.g. a newer `IndexUpdate` arriving + /// mid-download) cannot race with the version check: either we observe + /// and commit the staged version, or we observe the newer one and return + /// `Ok(None)`. + /// + /// Returns `Ok(None)` if the inbox is empty (idempotent re-call) or if + /// the staged entry's version differs from `expected_version`. #[tracing::instrument(level = "debug", skip_all, fields(file = %name, epoch = %epoch.as_base32()))] pub async fn complete_file( &self, @@ -330,7 +378,9 @@ impl FolderStore { name: &str, expected_version: Option<&Vector>, ) -> Result, StorageError> { - let inbox_key = store_keys::inbox_key(epoch, name); + let locked_filename = self.lock_filename(name).await; + + let inbox_key = store_keys::inbox_key(epoch, locked_filename.name()); let staged: FileInfo = match self.db.get(&inbox_key).await.map_err(slate_err)? { Some(bytes) => { let inbox = Inbox::decode(bytes) @@ -353,13 +403,12 @@ impl FolderStore { return Ok(None); } - // Inbox deletion is batched atomically with the sequence commit. let mut batch = WriteBatch::new(); batch.delete(inbox_key); - let mut committed = staged.clone(); - let seq = self.commit_file_with_seq(name, staged, batch).await?; - committed.sequence = seq; + let (_seq, committed) = self + .commit_with_new_seq(&locked_filename, staged, batch) + .await?; tracing::debug!("file complete"); @@ -395,24 +444,6 @@ impl FolderStore { Ok(count) } - /// Return all inbox entries for a specific epoch. - pub async fn inbox_files(&self, epoch: Epoch) -> Result, StorageError> { - let prefix = store_keys::inbox_key(epoch, ""); - let mut iter = self.db.scan_prefix(&prefix).await.map_err(slate_err)?; - - let mut files = Vec::new(); - while let Some(kv) = iter.next().await.map_err(slate_err)? { - let inbox_entry = Inbox::decode(kv.value) - .map_err(|e| StorageError::Corruption(format!("decode inbox Inbox: {e}")))?; - let fi = inbox_entry - .file_info - .ok_or_else(|| StorageError::Corruption("missing file_info in Inbox".into()))? - .try_into()?; - files.push(fi); - } - Ok(files) - } - /// Return a specific inbox entry. #[cfg(any(test, feature = "test-utils"))] pub async fn get_inbox_file( @@ -624,7 +655,6 @@ impl FolderStore { while let Some(kv) = iter.next().await.map_err(slate_err)? { if let Some((_hash, name)) = store_keys::parse_block_reverse_key(&kv.key) { let dir = store_keys::dirname(&name).to_string(); - // Verify canonical data actually exists at this directory. let data_key = store_keys::block_data_key(&dir, hash); if self.db.get(&data_key).await.map_err(slate_err)?.is_some() { return Ok(Some(dir)); diff --git a/bepository-storage/src/store_keys.rs b/bepository-storage/src/store_keys.rs index c4ebad0..c4eb021 100644 --- a/bepository-storage/src/store_keys.rs +++ b/bepository-storage/src/store_keys.rs @@ -14,6 +14,11 @@ //! dx/ — remote device index state //! in//// — inbox staging area +use std::sync::Arc; + +use bytes::Bytes; +use slatedb::{BloomFilterPolicy, FilterPolicy, PrefixExtractor, PrefixTarget}; + use bepository_bep::error::StorageError; /// Hash length in bytes (SHA-256). @@ -140,6 +145,50 @@ pub fn block_ref_key(dir: &str, hash: &[u8; HASH_LEN]) -> Vec { key } +/// Length of the bloom-filter prefix extracted from `br/` keys: the literal +/// `br/` plus the full 32-byte hash. Lets `scan_prefix(br//...)` consult +/// the per-SST bloom filter — without it, prefix scans would have to fetch +/// each candidate SST's index block to test membership, which is the hot path +/// during initial sync where most lookups return empty. +const BR_FILTER_PREFIX_LEN: usize = BLOCK_REV_PREFIX.len() + HASH_LEN; + +/// Filter extractor that hashes the `br/` prefix of every reverse-ref +/// key into the bloom filter. Keys outside the `br/` family return `None` and +/// are not added to the prefix filter; they remain covered by point-key +/// filtering via `with_whole_key_filtering(true)`. +#[derive(Debug, Default)] +pub struct BrPrefixExtractor; + +impl PrefixExtractor for BrPrefixExtractor { + fn name(&self) -> &str { + "bep-br-35" + } + + fn prefix_len(&self, target: &PrefixTarget) -> Option { + let bytes: &Bytes = match target { + PrefixTarget::Point(b) | PrefixTarget::Prefix(b) => b, + }; + (bytes.len() >= BR_FILTER_PREFIX_LEN && bytes.starts_with(BLOCK_REV_PREFIX)) + .then_some(BR_FILTER_PREFIX_LEN) + } +} + +/// Filter policies for SSTs. These must match across writer and reader components. +/// +/// Registers both whole-key and prefix-aware bloom filters. SlateDB selects the +/// policy by name; keeping both ensures that SSTs written with either policy +/// remain decodable without falling back to expensive index fetches. +/// +/// 1. `BloomFilterPolicy::new(10)`: Default whole-key bloom (`_bf`). +/// 2. `BloomFilterPolicy` + `BrPrefixExtractor`: Accelerates `scan_prefix("br//")`. +#[must_use] +pub fn make_filter_policies() -> Vec> { + vec![ + Arc::new(BloomFilterPolicy::new(10)), + Arc::new(BloomFilterPolicy::new(10).with_prefix_extractor(Arc::new(BrPrefixExtractor))), + ] +} + // --- Block reverse ref key: br//// --- #[must_use] diff --git a/bepository-storage/tests/storage_tests.rs b/bepository-storage/tests/storage_tests.rs index d1cc3de..12fbef1 100644 --- a/bepository-storage/tests/storage_tests.rs +++ b/bepository-storage/tests/storage_tests.rs @@ -507,3 +507,96 @@ async fn object_store_list_error_propagates_as_transient_io() { "List error should map to TransientIo, got: {result:?}" ); } + +// --------------------------------------------------------------------------- +// New tests: seq-at-stage-time invariants +// --------------------------------------------------------------------------- + +/// Concurrent complete_file calls on the same staged file must produce a +/// consistent committed state: exactly one seq_key, all callers that return +/// Some(_) carry the same sequence number. +#[tokio::test] +async fn concurrent_complete_is_consistent() { + let (_storage, folder) = setup_folder("concurrent").await; + let file = make_file("concur.txt", &[(1, 1)], false); + + folder.apply_update(&file, &REMOTE_DEV).await.unwrap(); + + // Fire 8 concurrent complete_file calls. + let n = 8usize; + let mut handles = Vec::with_capacity(n); + for _ in 0..n { + let folder = folder.clone(); + let ver = file.version.clone(); + handles.push(tokio::spawn(async move { + folder + .complete_file("concur.txt", ver.as_ref()) + .await + .expect("complete_file must not error") + })); + } + let results: Vec> = futures::future::join_all(handles) + .await + .into_iter() + .map(|r| r.expect("task panicked")) + .collect(); + + // Collect all sequences returned by Some(_) results. + let seqs: Vec = results + .iter() + .filter_map(|r| r.as_ref()) + .map(|fi| fi.sequence) + .collect(); + + // At least one caller must have committed. + assert!(!seqs.is_empty(), "at least one caller should return Some"); + + // All Some(_) results must carry the same sequence. + let first_seq = seqs[0]; + for &s in &seqs { + assert_eq!(s, first_seq, "all successful callers must return same seq"); + } + + // The committed file must be visible. + let committed = folder.get_file("concur.txt").await.unwrap(); + assert_eq!(committed.sequence, first_seq); +} + +/// Stage v1, then stage v2 (overwriting v1 in inbox) without completing v1. +/// Completing v1 must be a no-op and v2 must commit cleanly at the next seq. +#[tokio::test] +async fn restage_overwrites_inbox_and_v1_complete_is_noop() { + let (_storage, folder) = setup_folder("restage").await; + let v1 = make_file("gap.txt", &[(1, 1)], false); + let v2 = make_file("gap.txt", &[(1, 2)], false); + + folder.apply_update(&v1, &REMOTE_DEV).await.unwrap(); + + let epoch = folder.epoch().unwrap(); + let v1_staged = folder.get_inbox_file(epoch, "gap.txt").await; + assert!(v1_staged.is_some(), "v1 should be staged"); + + // v2 arrives — overwrites the inbox entry. + folder.apply_update(&v2, &REMOTE_DEV).await.unwrap(); + + // Completing v1 must be a no-op because inbox now holds v2. + let v1_result = folder + .complete_file("gap.txt", v1.version.as_ref()) + .await + .unwrap(); + assert!(v1_result.is_none(), "completing stale v1 must be a no-op"); + + // Completing v2 must succeed. + let v2_result = folder + .complete_file("gap.txt", v2.version.as_ref()) + .await + .unwrap() + .expect("v2 must commit"); + + let v2_seq = v2_result.sequence; + assert!(v2_seq > 0, "v2 must have a positive sequence"); + + let committed = folder.get_file("gap.txt").await.unwrap(); + assert_eq!(committed.version, v2.version); + assert_eq!(committed.sequence, v2_seq); +}