diff --git a/Cargo.toml b/Cargo.toml index 74ef8836f..9366a37a3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,7 +40,7 @@ tempfile = "3.24.0" termcolor = "1.4.1" thiserror = "2.0.17" tiktoken-rs = "0.6.0" -tokio = { version = "1.48.0", features = ["full"] } +tokio = { version = "1.48.0", features = ["full", "test-util"] } tokio-util = { version = "0.7.17", features = ["codec", "io"] } toml = "0.8" tower-http = { version = "0.6.8", features = ["cors", "fs", "trace"] } diff --git a/src/api.rs b/src/api.rs index 356d15907..4b6dfa544 100644 --- a/src/api.rs +++ b/src/api.rs @@ -13,7 +13,7 @@ // limitations under the License. use crate::db::Database; -use crate::events::Event; +use crate::events::{Event, MessageSource}; use crate::fetcher::FetchRequest; use crate::settings::ServerSettings; use axum::{ @@ -305,7 +305,7 @@ fn generate_synthetic_id(prefix: &str) -> String { .expect("Time went backwards"); // e.g. sashiko-local-1715890000-12345 format!( - "sashiko-{}-{}-{}", + "sashiko-{}-{}-{}@sashiko.local", prefix, since_the_epoch.as_secs(), fastrand::u32(..) @@ -346,6 +346,8 @@ async fn submit_patch( let event = Event::RawMboxSubmitted { raw, + submission_id: id.clone(), + source: MessageSource::ApiInject, group: "api-submit".to_string(), baseline: base_commit, skip_subjects, @@ -381,11 +383,30 @@ async fn submit_patch( sha, repo_display ); + // Optimistic check: If we already have this patchset in the DB, + // skip creating placeholder and skip fetch queue entirely. + match state.db.has_patchset_by_msgid(&id).await { + Ok(true) => { + info!( + "Remote fetch request for already ingested SHA {}, skipping placeholder and fetch", + id + ); + return Ok(Json(SubmitResponse { + status: "accepted".to_string(), + id, + })); + } + Err(e) => { + error!("Failed to check if patchset exists: {}", e); + } + _ => {} + } + // Create a placeholder record in the DB so the user can track status if let Err(e) = state .db .create_fetching_patchset( - &id, + &format!("{}@sashiko.local", id), &format!("Fetching {} from {}...", &sha, repo_display), skip_subjects.as_ref(), only_subjects.as_ref(), @@ -444,6 +465,7 @@ async fn submit_patch( .send(Event::IngestionFailed { article_id: msgid_clone.clone(), error: format!("Failed to fetch thread: {}", e), + source: MessageSource::ApiFetchThread, }) .await; } @@ -487,6 +509,8 @@ async fn fetch_and_inject_thread( let event = Event::RawMboxSubmitted { raw, + submission_id: msgid.to_string(), + source: MessageSource::ApiFetchThread, group: "api-submit".to_string(), baseline: None, skip_subjects: None, @@ -1008,3 +1032,15 @@ async fn rerun_patch( Ok(Json(serde_json::json!({ "status": "accepted" }))) } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_generate_synthetic_id_format() { + let id = generate_synthetic_id("test"); + assert!(id.starts_with("sashiko-test-")); + assert!(id.ends_with("@sashiko.local")); + } +} diff --git a/src/db.rs b/src/db.rs index a5ef50aa7..d9b2a7bd6 100644 --- a/src/db.rs +++ b/src/db.rs @@ -3294,9 +3294,20 @@ impl Database { self.rerun_patchset(patchset_id).await } + pub async fn has_patchset_by_msgid(&self, msgid: &str) -> Result { + let mut rows = self + .conn + .query( + "SELECT 1 FROM patchsets WHERE cover_letter_message_id = ? OR cover_letter_message_id = ?", + libsql::params![msgid, format!("<{}>", msgid)], + ) + .await?; + Ok(rows.next().await.ok().flatten().is_some()) + } + pub async fn create_fetching_patchset( &self, - article_id: &str, + root_msg_id: &str, subject: &str, skip_filters: Option<&Vec>, only_filters: Option<&Vec>, @@ -3305,13 +3316,10 @@ impl Database { .duration_since(std::time::UNIX_EPOCH)? .as_secs() as i64; - let root_msg_id = if article_id.contains('@') { - article_id.to_string() - } else { - format!("{}@sashiko.local", article_id) - }; - - let clid_candidates = vec![article_id.to_string(), root_msg_id.clone()]; + let mut clid_candidates = vec![root_msg_id.to_string()]; + if let Some(sha) = root_msg_id.strip_suffix("@sashiko.local") { + clid_candidates.push(sha.to_string()); + } let skip_filters_json = skip_filters.map(|f| serde_json::to_string(f).unwrap_or_default()); let only_filters_json = only_filters.map(|f| serde_json::to_string(f).unwrap_or_default()); @@ -3332,7 +3340,7 @@ impl Database { // Only reset to Fetching if it failed or is currently fetching. // We don't want to reset if it is already Incomplete, Pending, or Reviewed. - if status == "Failed" || status == "Fetching" { + if status == "Failed" || status == "Fetching" || status == "Cancelled" { self.conn.execute( "UPDATE patchsets SET status = 'Fetching', failed_reason = NULL, skip_filters = ?, only_filters = ? WHERE id = ?", libsql::params![skip_filters_json.clone(), only_filters_json.clone(), id] @@ -3343,7 +3351,7 @@ impl Database { } // 2. Ensure a placeholder thread and message exist to satisfy Foreign Key constraints - let thread_id = self.ensure_thread_for_message(&root_msg_id, now).await?; + let thread_id = self.ensure_thread_for_message(root_msg_id, now).await?; // 3. Create the fetching patchset let mut rows = self.conn @@ -3360,12 +3368,7 @@ impl Database { Err(anyhow::anyhow!("Failed to get patchset ID")) } } - pub async fn update_patchset_error(&self, article_id: &str, error: &str) -> Result<()> { - let root_msg_id = if article_id.contains('@') { - article_id.to_string() - } else { - format!("{}@sashiko.local", article_id) - }; + pub async fn update_patchset_error(&self, root_msg_id: &str, error: &str) -> Result<()> { self.conn .execute( "UPDATE patchsets SET status = 'Failed', failed_reason = ? WHERE cover_letter_message_id = ?", @@ -3438,6 +3441,37 @@ impl Database { Ok(count_ps + count_rev) } + pub async fn get_stuck_fetches(&self) -> Result)>> { + let mut rows = self + .conn + .query( + "SELECT p.cover_letter_message_id, b.repo_url + FROM patchsets p + LEFT JOIN baselines b ON p.baseline_id = b.id + WHERE p.status = 'Fetching'", + (), + ) + .await?; + + let mut stuck = Vec::new(); + while let Ok(Some(row)) = rows.next().await { + let msgid: String = row.get(0)?; + let repo: Option = row.get(1).ok().flatten(); + stuck.push((msgid, repo)); + } + Ok(stuck) + } + + pub async fn reset_stuck_fetches(&self) -> Result<()> { + self.conn + .execute( + "UPDATE patchsets SET status = 'Failed', failed_reason = 'Stuck after reboot' WHERE status = 'Fetching'", + (), + ) + .await?; + Ok(()) + } + pub async fn get_patchset_counts_by_status( &self, ) -> Result> { diff --git a/src/events.rs b/src/events.rs index 2b7f1c71f..3a96ce677 100644 --- a/src/events.rs +++ b/src/events.rs @@ -14,6 +14,16 @@ use crate::patch::{Patch, PatchsetMetadata}; +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum MessageSource { + Nntp, + ApiInject, + ApiFetchThread, + GitFetch, + GitImport, + GitArchive, +} + #[derive(Debug)] #[allow(dead_code)] pub enum Event { @@ -39,6 +49,8 @@ pub enum Event { }, RawMboxSubmitted { raw: String, + submission_id: String, + source: MessageSource, group: String, baseline: Option, skip_subjects: Option>, @@ -47,6 +59,7 @@ pub enum Event { IngestionFailed { article_id: String, error: String, + source: MessageSource, }, } @@ -54,6 +67,7 @@ pub enum Event { pub struct ParsedArticle { pub group: String, pub article_id: String, + pub source: MessageSource, pub metadata: Option, pub patch: Option, pub baseline: Option, diff --git a/src/fetcher.rs b/src/fetcher.rs index c24b8d29b..a362b9852 100644 --- a/src/fetcher.rs +++ b/src/fetcher.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::events::Event; +use crate::events::{Event, MessageSource}; use crate::utils::redact_secret; use anyhow::{Result, anyhow}; use std::collections::{HashMap, HashSet}; @@ -29,10 +29,19 @@ pub struct FetchRequest { pub commit_hash: String, } +#[derive(Clone)] +struct DelayedRequest { + repo_url: Option, + commit_hash: String, + run_at: std::time::Instant, +} + pub struct FetchAgent { repo_path: PathBuf, rx: mpsc::Receiver, main_tx: mpsc::Sender, + tick_interval: Duration, + backoff_base: Duration, } impl FetchAgent { @@ -46,15 +55,25 @@ impl FetchAgent { repo_path, rx, main_tx, + tick_interval: Duration::from_secs(10), + backoff_base: Duration::from_secs(10), }, tx, ) } + pub fn with_intervals(mut self, tick_interval: Duration, backoff_base: Duration) -> Self { + self.tick_interval = tick_interval; + self.backoff_base = backoff_base; + self + } + pub async fn run(mut self) { info!("FetchAgent started"); let mut queue: HashMap, HashSet> = HashMap::new(); - let mut ticker = interval(Duration::from_secs(10)); + let mut delayed_retries: Vec = Vec::new(); // In-memory delay vector + let mut attempts_tracker: HashMap = HashMap::new(); // Local attempt tracker + let mut ticker = interval(self.tick_interval); // Configurable batch timer loop { tokio::select! { @@ -64,15 +83,41 @@ impl FetchAgent { .insert(req.commit_hash); } _ = ticker.tick() => { + let now = std::time::Instant::now(); + let mut expired_retries = Vec::new(); + + // Filter out expired retries + delayed_retries.retain(|item| { + if now >= item.run_at { + expired_retries.push(item.clone()); + false // Remove from delayed list (promoted) + } else { + true // Keep in delayed list + } + }); + + // Promote expired retries into the active batch queue + for retry in expired_retries { + queue.entry(retry.repo_url) + .or_default() + .insert(retry.commit_hash); + } + + // Process active batch queue if !queue.is_empty() { - self.process_queue(&mut queue).await; + self.process_queue(&mut queue, &mut delayed_retries, &mut attempts_tracker).await; } } } } } - async fn process_queue(&self, queue: &mut HashMap, HashSet>) { + async fn process_queue( + &self, + queue: &mut HashMap, HashSet>, + delayed_retries: &mut Vec, + attempts_tracker: &mut HashMap, + ) { info!("Processing fetch queue with {} repos", queue.len()); for (url_opt, commits) in queue.drain() { @@ -102,9 +147,9 @@ impl FetchAgent { "All commits present locally, skipping fetch for {}", url_display ); - } else if let Some(url) = url_opt { + } else if let Some(ref url) = url_opt { // Remote fetch logic - let remote_name = self.get_remote_name(&url); + let remote_name = self.get_remote_name(url); // Check if repo is local (same as self.repo_path) let is_local = { @@ -126,16 +171,17 @@ impl FetchAgent { ); // Do not continue here; let it fall through to Step 3 where it will fail individually } else { - if let Err(e) = self.ensure_remote(&remote_name, &url).await { + if let Err(e) = self.ensure_remote(&remote_name, url).await { error!("Failed to ensure remote {}: {}", url, e); for commit in &missing_commits { - let _ = self - .main_tx - .send(Event::IngestionFailed { - article_id: commit.clone(), - error: format!("Failed to set up remote {}: {}", url, e), - }) - .await; + self.handle_fetch_failure( + &url_opt, + commit, + &format!("Failed to set up remote {}: {}", url, e), + delayed_retries, + attempts_tracker, + ) + .await; } continue; } @@ -150,13 +196,14 @@ impl FetchAgent { if let Err(e) = self.fetch_all(&remote_name).await { error!("Full fetch failed for {}: {}", url, e); for commit in &missing_commits { - let _ = self - .main_tx - .send(Event::IngestionFailed { - article_id: commit.clone(), - error: format!("Failed to fetch from {}: {}", url, e), - }) - .await; + self.handle_fetch_failure( + &url_opt, + commit, + &format!("Failed to fetch from {}: {}", url, e), + delayed_retries, + attempts_tracker, + ) + .await; } continue; } @@ -180,13 +227,14 @@ impl FetchAgent { { Ok(shas) => shas, Err(e) => { - let _ = self - .main_tx - .send(Event::IngestionFailed { - article_id: range.clone(), - error: format!("Failed to resolve git range: {}", e), - }) - .await; + self.handle_fetch_failure( + &url_opt, + &commit_or_range, + &format!("Failed to resolve git range: {}", e), + delayed_retries, + attempts_tracker, + ) + .await; continue; } }; @@ -214,25 +262,33 @@ impl FetchAgent { } } } + // Range Success! Clean up attempts tracker and delay queue for the range key + attempts_tracker.remove(&commit_or_range); + delayed_retries.retain(|item| item.commit_hash != commit_or_range); info!("Successfully submitted remote range {}", range); } else { // Single commit let full_sha = match self.resolve_sha(&commit_or_range).await { Ok(sha) => sha, Err(e) => { - let _ = self - .main_tx - .send(Event::IngestionFailed { - article_id: commit_or_range.clone(), - error: format!("Failed to resolve SHA: {}", e), - }) - .await; + self.handle_fetch_failure( + &url_opt, + &commit_or_range, + &format!("Failed to resolve SHA: {}", e), + delayed_retries, + attempts_tracker, + ) + .await; continue; } }; match self.extract_patch(&full_sha, &commit_or_range, 1, 1).await { Ok(mut event) => { + // Success! Clean up attempts tracker and the parked delay queue + attempts_tracker.remove(&commit_or_range); + delayed_retries.retain(|item| item.commit_hash != commit_or_range); + if let Event::PatchSubmitted { ref mut message_id, .. } = event @@ -247,13 +303,14 @@ impl FetchAgent { } Err(e) => { error!("Failed to extract patch {}: {}", commit_or_range, e); - let _ = self - .main_tx - .send(Event::IngestionFailed { - article_id: commit_or_range, - error: format!("Failed to extract patch: {}", e), - }) - .await; + self.handle_fetch_failure( + &url_opt, + &commit_or_range, + &format!("Failed to extract patch: {}", e), + delayed_retries, + attempts_tracker, + ) + .await; } } } @@ -434,6 +491,61 @@ impl FetchAgent { total, }) } + + async fn handle_fetch_failure( + &self, + url_opt: &Option, + commit: &str, + error_msg: &str, + delayed_retries: &mut Vec, + attempts_tracker: &mut HashMap, + ) { + let attempts = attempts_tracker.get(commit).cloned().unwrap_or(0); + let max_retries = 3; + + if attempts < max_retries { + let next_attempt = attempts + 1; + attempts_tracker.insert(commit.to_string(), next_attempt); + + // Exponential backoff using configurable base: base * (2 ^ attempts) + let delay = self.backoff_base * (2u32.pow(next_attempt)); + let run_at = std::time::Instant::now() + delay; + + warn!( + "Fetch/extract failed for {} (attempts: {}/{}). Parking in delay queue for {}s.", + commit, + next_attempt, + max_retries, + delay.as_secs() + ); + + // Deduplicate inside delayed_retries first to prevent duplicate parked entries! + delayed_retries.retain(|item| item.commit_hash != commit); + + delayed_retries.push(DelayedRequest { + repo_url: url_opt.clone(), + commit_hash: commit.to_string(), + run_at, + }); + } else { + // Retries exhausted, clean up attempts tracker and the parked delay queue + attempts_tracker.remove(commit); + delayed_retries.retain(|item| item.commit_hash != commit); + error!( + "Fetch/extract failed for {} after {} retries. Marking as Failed. Error: {}", + commit, max_retries, error_msg + ); + + let _ = self + .main_tx + .send(Event::IngestionFailed { + article_id: commit.to_string(), + error: error_msg.to_string(), + source: MessageSource::GitFetch, + }) + .await; + } + } } #[cfg(test)] @@ -585,4 +697,79 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_fetch_agent_delay_queue_retry() -> Result<()> { + let temp_dir = tempfile::tempdir()?; + let repo_path = temp_dir.path().to_path_buf(); + + // Setup empty dummy repo + Command::new("git") + .current_dir(&repo_path) + .arg("init") + .output() + .await?; + + let (event_tx, mut event_rx) = mpsc::channel(10); + + // Create agent with tiny sub-second intervals for fast real-time testing! + let (agent, fetch_tx) = { + let (a, tx) = FetchAgent::new(repo_path.clone(), event_tx); + ( + a.with_intervals(Duration::from_millis(50), Duration::from_millis(50)), + tx, + ) + }; + + // Spawn the agent in the background + let agent_handle = tokio::spawn(agent.run()); + + // Enqueue a FetchRequest for a missing commit (which will fail resolve_sha!) + let req = FetchRequest { + repo_url: None, + commit_hash: "0123456789abcdef0123456789abcdef01234567".to_string(), + }; + fetch_tx.send(req).await?; + + // Let the first tick (50ms) process. Wait 100ms to be safe. + tokio::time::sleep(Duration::from_millis(100)).await; + + // The first tick ran process_queue, failed, and parked it in delayed_retries (Attempts = 1, delay = 100ms) + // Verify no event is received yet. + tokio::select! { + Some(event) = event_rx.recv() => { + panic!("Received unexpected event: {:?}", event); + } + _ = tokio::time::sleep(Duration::from_millis(50)) => { + // Expected: no event sent! + } + } + + // Wait for retries to exhaust: + // Attempt 1: delay 100ms. + // Attempt 2: delay 200ms. + // Attempt 3: delay 400ms. + // Total delay is 700ms. Plus ticks processing time, we sleep 1.2 seconds to be completely safe. + tokio::time::sleep(Duration::from_millis(1200)).await; + + // Verify that Event::IngestionFailed is finally received! + tokio::select! { + Some(Event::IngestionFailed { article_id, error, .. }) = event_rx.recv() => { + assert_eq!(article_id, "0123456789abcdef0123456789abcdef01234567"); + assert!( + error.contains("Failed to resolve SHA") || error.contains("Failed to extract patch"), + "Actual error was: {}", + error + ); + } + _ = tokio::time::sleep(Duration::from_millis(500)) => { + panic!("Timed out waiting for IngestionFailed event after retries!"); + } + } + + // Clean up the agent + agent_handle.abort(); + + Ok(()) + } } diff --git a/src/main.rs b/src/main.rs index 0bc425b55..93e1f3504 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,7 +14,8 @@ use clap::{Parser, Subcommand}; use sashiko::db::Database; -use sashiko::events::{Event, ParsedArticle}; +use sashiko::events::{Event, MessageSource, ParsedArticle}; +use sashiko::fetcher::FetchRequest; use sashiko::ingestor::Ingestor; use sashiko::reviewer::Reviewer; use sashiko::settings::Settings; @@ -179,10 +180,52 @@ async fn main() -> Result<(), Box> { let (fetch_agent, fetch_tx) = sashiko::fetcher::FetchAgent::new(repo_path, raw_tx.clone()); // Spawn FetchAgent - tokio::spawn(async move { + let fetch_handle = tokio::spawn(async move { fetch_agent.run().await; }); + // Resume stuck fetches in the background + let db_resumption = db.clone(); + let tx_resumption = fetch_tx.clone(); + tokio::spawn(async move { + match db_resumption.get_stuck_fetches().await { + Ok(stuck) => { + if !stuck.is_empty() { + info!("Resuming {} stuck fetches", stuck.len()); + for (msgid, repo_url) in stuck { + // Extract actual SHA from synthetic message ID + // Format: sashiko--@sashiko.local + // Or just + let mut commit_hash = msgid.clone(); + if commit_hash.starts_with("sashiko-") { + commit_hash = commit_hash + .strip_prefix("sashiko-") + .and_then(|s| s.split('-').next()) + .map(|s| s.to_string()) + .unwrap_or(commit_hash); + } + + // Remove domain suffix if present (e.g. @sashiko.local) + if let Some(pos) = commit_hash.find('@') { + commit_hash.truncate(pos); + } + + if let Err(e) = tx_resumption + .send(FetchRequest { + repo_url, + commit_hash, + }) + .await + { + error!("Failed to re-queue stuck fetch: {}", e); + } + } + } + } + Err(e) => error!("Failed to query stuck fetches: {}", e), + } + }); + // Parser Dispatcher let semaphore = Arc::new(Semaphore::new(50)); @@ -235,11 +278,16 @@ async fn main() -> Result<(), Box> { let _permit = permit; // Hold permit until task completion match event { - Event::IngestionFailed { article_id, error } => { + Event::IngestionFailed { + article_id, + error, + source, + } => { if let Err(e) = tx .send(ParsedArticle { group: "error".to_string(), article_id, + source, metadata: None, patch: None, baseline: None, @@ -298,10 +346,17 @@ async fn main() -> Result<(), Box> { part_index: index, }); + let source = if group.starts_with("git-import") { + MessageSource::GitImport + } else { + MessageSource::GitFetch + }; + if let Err(e) = tx .send(ParsedArticle { group, article_id, + source, metadata: Some(metadata), patch, baseline: base_commit, @@ -316,6 +371,8 @@ async fn main() -> Result<(), Box> { } Event::RawMboxSubmitted { raw, + submission_id, + source, group, baseline, skip_subjects, @@ -350,17 +407,14 @@ async fn main() -> Result<(), Box> { match parse_result { Ok(Ok((metadata, patch_opt))) => { - // Override group "api-submit" -> "manual" to avoid synthetic ID logic - let effective_group = if group_clone == "api-submit" { - "manual".to_string() - } else { - group_clone - }; + // Do not override group "api-submit" to allow grouping logic to trigger + let effective_group = group_clone; if let Err(e) = tx_clone .send(ParsedArticle { group: effective_group, - article_id: msg_id, + article_id: submission_id.clone(), + source, metadata: Some(metadata), patch: patch_opt, baseline: baseline_clone, @@ -415,6 +469,7 @@ async fn main() -> Result<(), Box> { .send(ParsedArticle { group, article_id, + source: MessageSource::Nntp, metadata: Some(metadata), patch: patch_opt, baseline, @@ -576,6 +631,7 @@ async fn main() -> Result<(), Box> { // Abort handles ingestor_handle.abort(); parser_handle.abort(); + fetch_handle.abort(); Ok(()) } @@ -593,6 +649,7 @@ async fn process_parsed_article( let ParsedArticle { group, article_id, + source, metadata, patch, baseline, @@ -601,10 +658,12 @@ async fn process_parsed_article( only_filters, } = article; + let root_msg_id = resolve_root_msg_id(source, &article_id); + // Handle ingestion failure if let Some(err) = failed_error { info!("Handling ingestion failure for {}: {}", article_id, err); - if let Err(e) = worker_db.update_patchset_error(&article_id, &err).await { + if let Err(e) = worker_db.update_patchset_error(&root_msg_id, &err).await { error!("Failed to update patchset error in DB: {}", e); } return ProcessStatus::Ingested; // Successfully handled the failure event @@ -675,12 +734,6 @@ async fn process_parsed_article( } else if group == "git-fetch" || group == "api-submit" { // Group these by article_id (which is the range or single SHA/local_id) // For singletons, the message itself is the root. - let root_msg_id = if metadata.total == 1 { - metadata.message_id.clone() - } else { - format!("{}@sashiko.local", article_id) - }; - match worker_db .ensure_thread_for_message(&root_msg_id, metadata.date) .await @@ -820,7 +873,6 @@ async fn process_parsed_article( ); */ - let root_msg_id = format!("{}@sashiko.local", article_id); let cover_letter_id = if group == "git-fetch" || group == "api-submit" { if metadata.total == 1 { Some(metadata.message_id.as_str()) @@ -854,7 +906,7 @@ async fn process_parsed_article( metadata.subject.clone(), metadata.author.clone(), metadata.total, - !group.starts_with("git-import"), + is_strict_author(source, metadata.total), ) }; @@ -1105,6 +1157,27 @@ fn calculate_embargo_hours( } } +fn resolve_root_msg_id(source: MessageSource, article_id: &str) -> String { + match source { + MessageSource::Nntp + | MessageSource::ApiFetchThread + | MessageSource::GitArchive + | MessageSource::ApiInject => article_id.to_string(), + MessageSource::GitFetch | MessageSource::GitImport => { + format!("{}@sashiko.local", article_id) + } + } +} + +fn is_strict_author(source: MessageSource, total_parts: u32) -> bool { + match source { + MessageSource::GitImport | MessageSource::GitArchive => false, + MessageSource::ApiInject if total_parts > 1 => false, + MessageSource::ApiFetchThread if total_parts > 1 => false, + _ => true, + } +} + fn identify_subsystems(to: &str, cc: &str) -> Vec<(String, String)> { let mut subsystems = Vec::new(); let mut all_recipients = String::new(); @@ -1312,4 +1385,46 @@ mod tests { 0 ); } + + #[test] + fn test_resolve_root_msg_id() { + assert_eq!( + resolve_root_msg_id(MessageSource::Nntp, "foo@bar.com"), + "foo@bar.com" + ); + assert_eq!( + resolve_root_msg_id(MessageSource::ApiFetchThread, "foo@bar.com"), + "foo@bar.com" + ); + assert_eq!( + resolve_root_msg_id(MessageSource::GitArchive, "foo@bar.com"), + "foo@bar.com" + ); + assert_eq!( + resolve_root_msg_id(MessageSource::ApiInject, "sashiko-123"), + "sashiko-123" + ); + assert_eq!( + resolve_root_msg_id(MessageSource::GitFetch, "abc123_sha"), + "abc123_sha@sashiko.local" + ); + assert_eq!( + resolve_root_msg_id(MessageSource::GitImport, "range_a_b"), + "range_a_b@sashiko.local" + ); + } + + #[test] + fn test_is_strict_author() { + assert!(is_strict_author(MessageSource::Nntp, 1)); + assert!(is_strict_author(MessageSource::Nntp, 6)); + assert!(!is_strict_author(MessageSource::ApiFetchThread, 6)); // Lenient for series + assert!(is_strict_author(MessageSource::GitFetch, 6)); + + assert!(!is_strict_author(MessageSource::GitImport, 6)); + assert!(!is_strict_author(MessageSource::GitArchive, 6)); + + assert!(is_strict_author(MessageSource::ApiInject, 1)); // Strict for singleton + assert!(!is_strict_author(MessageSource::ApiInject, 6)); // Lenient for series + } }