From e8fdc861ce923b66d0037914793332b10183aee0 Mon Sep 17 00:00:00 2001 From: Elkin Cruz Date: Thu, 7 May 2026 22:34:56 +0000 Subject: [PATCH 1/6] fix: Group API submitted patches by submission ID Patches submitted via the API were being split into multiple incomplete patchsets due to timestamp differences and distinct authors. This change adds an article ID to the RawMboxSubmitted event to track the submission ID. The API passes the synthetic ID as article ID, and the main application uses it to group all patches in the batch into a single thread. It also disables strict author checking for API submissions to allow patches from different authors in the same series. Signed-off-by: Elkin Cruz --- src/api.rs | 23 ++++++++++++-- src/db.rs | 17 ++-------- src/events.rs | 14 +++++++++ src/fetcher.rs | 7 ++++- src/main.rs | 85 +++++++++++++++++++++++++++++++++++++++----------- 5 files changed, 110 insertions(+), 36 deletions(-) diff --git a/src/api.rs b/src/api.rs index 356d15907..ef30344b6 100644 --- a/src/api.rs +++ b/src/api.rs @@ -13,7 +13,7 @@ // limitations under the License. use crate::db::Database; -use crate::events::Event; +use crate::events::{Event, MessageSource}; use crate::fetcher::FetchRequest; use crate::settings::ServerSettings; use axum::{ @@ -305,7 +305,7 @@ fn generate_synthetic_id(prefix: &str) -> String { .expect("Time went backwards"); // e.g. sashiko-local-1715890000-12345 format!( - "sashiko-{}-{}-{}", + "sashiko-{}-{}-{}@sashiko.local", prefix, since_the_epoch.as_secs(), fastrand::u32(..) @@ -346,6 +346,8 @@ async fn submit_patch( let event = Event::RawMboxSubmitted { raw, + submission_id: id.clone(), + source: MessageSource::ApiInject, group: "api-submit".to_string(), baseline: base_commit, skip_subjects, @@ -385,7 +387,7 @@ async fn submit_patch( if let Err(e) = state .db .create_fetching_patchset( - &id, + &format!("{}@sashiko.local", id), &format!("Fetching {} from {}...", &sha, repo_display), skip_subjects.as_ref(), only_subjects.as_ref(), @@ -444,6 +446,7 @@ async fn submit_patch( .send(Event::IngestionFailed { article_id: msgid_clone.clone(), error: format!("Failed to fetch thread: {}", e), + source: MessageSource::ApiFetchThread, }) .await; } @@ -487,6 +490,8 @@ async fn fetch_and_inject_thread( let event = Event::RawMboxSubmitted { raw, + submission_id: msgid.to_string(), + source: MessageSource::ApiFetchThread, group: "api-submit".to_string(), baseline: None, skip_subjects: None, @@ -1008,3 +1013,15 @@ async fn rerun_patch( Ok(Json(serde_json::json!({ "status": "accepted" }))) } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_generate_synthetic_id_format() { + let id = generate_synthetic_id("test"); + assert!(id.starts_with("sashiko-test-")); + assert!(id.ends_with("@sashiko.local")); + } +} diff --git a/src/db.rs b/src/db.rs index a5ef50aa7..494f7b087 100644 --- a/src/db.rs +++ b/src/db.rs @@ -3296,7 +3296,7 @@ impl Database { pub async fn create_fetching_patchset( &self, - article_id: &str, + root_msg_id: &str, subject: &str, skip_filters: Option<&Vec>, only_filters: Option<&Vec>, @@ -3305,13 +3305,7 @@ impl Database { .duration_since(std::time::UNIX_EPOCH)? .as_secs() as i64; - let root_msg_id = if article_id.contains('@') { - article_id.to_string() - } else { - format!("{}@sashiko.local", article_id) - }; - - let clid_candidates = vec![article_id.to_string(), root_msg_id.clone()]; + let clid_candidates = vec![root_msg_id.to_string()]; let skip_filters_json = skip_filters.map(|f| serde_json::to_string(f).unwrap_or_default()); let only_filters_json = only_filters.map(|f| serde_json::to_string(f).unwrap_or_default()); @@ -3360,12 +3354,7 @@ impl Database { Err(anyhow::anyhow!("Failed to get patchset ID")) } } - pub async fn update_patchset_error(&self, article_id: &str, error: &str) -> Result<()> { - let root_msg_id = if article_id.contains('@') { - article_id.to_string() - } else { - format!("{}@sashiko.local", article_id) - }; + pub async fn update_patchset_error(&self, root_msg_id: &str, error: &str) -> Result<()> { self.conn .execute( "UPDATE patchsets SET status = 'Failed', failed_reason = ? WHERE cover_letter_message_id = ?", diff --git a/src/events.rs b/src/events.rs index 2b7f1c71f..3a96ce677 100644 --- a/src/events.rs +++ b/src/events.rs @@ -14,6 +14,16 @@ use crate::patch::{Patch, PatchsetMetadata}; +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum MessageSource { + Nntp, + ApiInject, + ApiFetchThread, + GitFetch, + GitImport, + GitArchive, +} + #[derive(Debug)] #[allow(dead_code)] pub enum Event { @@ -39,6 +49,8 @@ pub enum Event { }, RawMboxSubmitted { raw: String, + submission_id: String, + source: MessageSource, group: String, baseline: Option, skip_subjects: Option>, @@ -47,6 +59,7 @@ pub enum Event { IngestionFailed { article_id: String, error: String, + source: MessageSource, }, } @@ -54,6 +67,7 @@ pub enum Event { pub struct ParsedArticle { pub group: String, pub article_id: String, + pub source: MessageSource, pub metadata: Option, pub patch: Option, pub baseline: Option, diff --git a/src/fetcher.rs b/src/fetcher.rs index c24b8d29b..fff3fdcb7 100644 --- a/src/fetcher.rs +++ b/src/fetcher.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::events::Event; +use crate::events::{Event, MessageSource}; use crate::utils::redact_secret; use anyhow::{Result, anyhow}; use std::collections::{HashMap, HashSet}; @@ -134,6 +134,7 @@ impl FetchAgent { .send(Event::IngestionFailed { article_id: commit.clone(), error: format!("Failed to set up remote {}: {}", url, e), + source: MessageSource::GitFetch, }) .await; } @@ -155,6 +156,7 @@ impl FetchAgent { .send(Event::IngestionFailed { article_id: commit.clone(), error: format!("Failed to fetch from {}: {}", url, e), + source: MessageSource::GitFetch, }) .await; } @@ -185,6 +187,7 @@ impl FetchAgent { .send(Event::IngestionFailed { article_id: range.clone(), error: format!("Failed to resolve git range: {}", e), + source: MessageSource::GitFetch, }) .await; continue; @@ -225,6 +228,7 @@ impl FetchAgent { .send(Event::IngestionFailed { article_id: commit_or_range.clone(), error: format!("Failed to resolve SHA: {}", e), + source: MessageSource::GitFetch, }) .await; continue; @@ -252,6 +256,7 @@ impl FetchAgent { .send(Event::IngestionFailed { article_id: commit_or_range, error: format!("Failed to extract patch: {}", e), + source: MessageSource::GitFetch, }) .await; } diff --git a/src/main.rs b/src/main.rs index 0bc425b55..105b8a345 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,7 +14,7 @@ use clap::{Parser, Subcommand}; use sashiko::db::Database; -use sashiko::events::{Event, ParsedArticle}; +use sashiko::events::{Event, ParsedArticle, MessageSource}; use sashiko::ingestor::Ingestor; use sashiko::reviewer::Reviewer; use sashiko::settings::Settings; @@ -235,11 +235,12 @@ async fn main() -> Result<(), Box> { let _permit = permit; // Hold permit until task completion match event { - Event::IngestionFailed { article_id, error } => { + Event::IngestionFailed { article_id, error, source } => { if let Err(e) = tx .send(ParsedArticle { group: "error".to_string(), article_id, + source, metadata: None, patch: None, baseline: None, @@ -298,10 +299,17 @@ async fn main() -> Result<(), Box> { part_index: index, }); + let source = if group.starts_with("git-import") { + MessageSource::GitImport + } else { + MessageSource::GitFetch + }; + if let Err(e) = tx .send(ParsedArticle { group, article_id, + source, metadata: Some(metadata), patch, baseline: base_commit, @@ -316,6 +324,8 @@ async fn main() -> Result<(), Box> { } Event::RawMboxSubmitted { raw, + submission_id, + source, group, baseline, skip_subjects, @@ -350,17 +360,14 @@ async fn main() -> Result<(), Box> { match parse_result { Ok(Ok((metadata, patch_opt))) => { - // Override group "api-submit" -> "manual" to avoid synthetic ID logic - let effective_group = if group_clone == "api-submit" { - "manual".to_string() - } else { - group_clone - }; + // Do not override group "api-submit" to allow grouping logic to trigger + let effective_group = group_clone; if let Err(e) = tx_clone .send(ParsedArticle { group: effective_group, - article_id: msg_id, + article_id: submission_id.clone(), + source, metadata: Some(metadata), patch: patch_opt, baseline: baseline_clone, @@ -415,6 +422,7 @@ async fn main() -> Result<(), Box> { .send(ParsedArticle { group, article_id, + source: MessageSource::Nntp, metadata: Some(metadata), patch: patch_opt, baseline, @@ -593,6 +601,7 @@ async fn process_parsed_article( let ParsedArticle { group, article_id, + source, metadata, patch, baseline, @@ -601,10 +610,12 @@ async fn process_parsed_article( only_filters, } = article; + let root_msg_id = resolve_root_msg_id(source, &article_id); + // Handle ingestion failure if let Some(err) = failed_error { info!("Handling ingestion failure for {}: {}", article_id, err); - if let Err(e) = worker_db.update_patchset_error(&article_id, &err).await { + if let Err(e) = worker_db.update_patchset_error(&root_msg_id, &err).await { error!("Failed to update patchset error in DB: {}", e); } return ProcessStatus::Ingested; // Successfully handled the failure event @@ -675,12 +686,6 @@ async fn process_parsed_article( } else if group == "git-fetch" || group == "api-submit" { // Group these by article_id (which is the range or single SHA/local_id) // For singletons, the message itself is the root. - let root_msg_id = if metadata.total == 1 { - metadata.message_id.clone() - } else { - format!("{}@sashiko.local", article_id) - }; - match worker_db .ensure_thread_for_message(&root_msg_id, metadata.date) .await @@ -820,7 +825,7 @@ async fn process_parsed_article( ); */ - let root_msg_id = format!("{}@sashiko.local", article_id); + let cover_letter_id = if group == "git-fetch" || group == "api-submit" { if metadata.total == 1 { Some(metadata.message_id.as_str()) @@ -854,7 +859,7 @@ async fn process_parsed_article( metadata.subject.clone(), metadata.author.clone(), metadata.total, - !group.starts_with("git-import"), + is_strict_author(source, metadata.total), ) }; @@ -1105,6 +1110,26 @@ fn calculate_embargo_hours( } } +fn resolve_root_msg_id(source: MessageSource, article_id: &str) -> String { + match source { + MessageSource::Nntp | MessageSource::ApiFetchThread | MessageSource::GitArchive | MessageSource::ApiInject => { + article_id.to_string() + } + MessageSource::GitFetch | MessageSource::GitImport => { + format!("{}@sashiko.local", article_id) + } + } +} + +fn is_strict_author(source: MessageSource, total_parts: u32) -> bool { + match source { + MessageSource::GitImport | MessageSource::GitArchive => false, + MessageSource::ApiInject if total_parts > 1 => false, + MessageSource::ApiFetchThread if total_parts > 1 => false, + _ => true, + } +} + fn identify_subsystems(to: &str, cc: &str) -> Vec<(String, String)> { let mut subsystems = Vec::new(); let mut all_recipients = String::new(); @@ -1312,4 +1337,28 @@ mod tests { 0 ); } + + #[test] + fn test_resolve_root_msg_id() { + assert_eq!(resolve_root_msg_id(MessageSource::Nntp, "foo@bar.com"), "foo@bar.com"); + assert_eq!(resolve_root_msg_id(MessageSource::ApiFetchThread, "foo@bar.com"), "foo@bar.com"); + assert_eq!(resolve_root_msg_id(MessageSource::GitArchive, "foo@bar.com"), "foo@bar.com"); + assert_eq!(resolve_root_msg_id(MessageSource::ApiInject, "sashiko-123"), "sashiko-123"); + assert_eq!(resolve_root_msg_id(MessageSource::GitFetch, "abc123_sha"), "abc123_sha@sashiko.local"); + assert_eq!(resolve_root_msg_id(MessageSource::GitImport, "range_a_b"), "range_a_b@sashiko.local"); + } + + #[test] + fn test_is_strict_author() { + assert!(is_strict_author(MessageSource::Nntp, 1)); + assert!(is_strict_author(MessageSource::Nntp, 6)); + assert!(!is_strict_author(MessageSource::ApiFetchThread, 6)); // Lenient for series + assert!(is_strict_author(MessageSource::GitFetch, 6)); + + assert!(!is_strict_author(MessageSource::GitImport, 6)); + assert!(!is_strict_author(MessageSource::GitArchive, 6)); + + assert!(is_strict_author(MessageSource::ApiInject, 1)); // Strict for singleton + assert!(!is_strict_author(MessageSource::ApiInject, 6)); // Lenient for series + } } From 6ce60513660741b2bb358283f1f33db6d885b7bc Mon Sep 17 00:00:00 2001 From: Elkin Cruz Date: Tue, 28 Apr 2026 21:25:54 +0000 Subject: [PATCH 2/6] db: Fix linter issues (cargo clippy and cargo fmt) Removes a reference operator from root_msg_id in call to ensure_thread_for_message, as it is already a reference. And fixes formatting errors in src/main.rs :) Signed-off-by: Elkin Cruz --- src/db.rs | 2 +- src/main.rs | 46 ++++++++++++++++++++++++++++++++++------------ 2 files changed, 35 insertions(+), 13 deletions(-) diff --git a/src/db.rs b/src/db.rs index 494f7b087..fdc3b723c 100644 --- a/src/db.rs +++ b/src/db.rs @@ -3337,7 +3337,7 @@ impl Database { } // 2. Ensure a placeholder thread and message exist to satisfy Foreign Key constraints - let thread_id = self.ensure_thread_for_message(&root_msg_id, now).await?; + let thread_id = self.ensure_thread_for_message(root_msg_id, now).await?; // 3. Create the fetching patchset let mut rows = self.conn diff --git a/src/main.rs b/src/main.rs index 105b8a345..a1d70bdd4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,7 +14,7 @@ use clap::{Parser, Subcommand}; use sashiko::db::Database; -use sashiko::events::{Event, ParsedArticle, MessageSource}; +use sashiko::events::{Event, MessageSource, ParsedArticle}; use sashiko::ingestor::Ingestor; use sashiko::reviewer::Reviewer; use sashiko::settings::Settings; @@ -235,7 +235,11 @@ async fn main() -> Result<(), Box> { let _permit = permit; // Hold permit until task completion match event { - Event::IngestionFailed { article_id, error, source } => { + Event::IngestionFailed { + article_id, + error, + source, + } => { if let Err(e) = tx .send(ParsedArticle { group: "error".to_string(), @@ -825,7 +829,6 @@ async fn process_parsed_article( ); */ - let cover_letter_id = if group == "git-fetch" || group == "api-submit" { if metadata.total == 1 { Some(metadata.message_id.as_str()) @@ -1112,9 +1115,10 @@ fn calculate_embargo_hours( fn resolve_root_msg_id(source: MessageSource, article_id: &str) -> String { match source { - MessageSource::Nntp | MessageSource::ApiFetchThread | MessageSource::GitArchive | MessageSource::ApiInject => { - article_id.to_string() - } + MessageSource::Nntp + | MessageSource::ApiFetchThread + | MessageSource::GitArchive + | MessageSource::ApiInject => article_id.to_string(), MessageSource::GitFetch | MessageSource::GitImport => { format!("{}@sashiko.local", article_id) } @@ -1340,12 +1344,30 @@ mod tests { #[test] fn test_resolve_root_msg_id() { - assert_eq!(resolve_root_msg_id(MessageSource::Nntp, "foo@bar.com"), "foo@bar.com"); - assert_eq!(resolve_root_msg_id(MessageSource::ApiFetchThread, "foo@bar.com"), "foo@bar.com"); - assert_eq!(resolve_root_msg_id(MessageSource::GitArchive, "foo@bar.com"), "foo@bar.com"); - assert_eq!(resolve_root_msg_id(MessageSource::ApiInject, "sashiko-123"), "sashiko-123"); - assert_eq!(resolve_root_msg_id(MessageSource::GitFetch, "abc123_sha"), "abc123_sha@sashiko.local"); - assert_eq!(resolve_root_msg_id(MessageSource::GitImport, "range_a_b"), "range_a_b@sashiko.local"); + assert_eq!( + resolve_root_msg_id(MessageSource::Nntp, "foo@bar.com"), + "foo@bar.com" + ); + assert_eq!( + resolve_root_msg_id(MessageSource::ApiFetchThread, "foo@bar.com"), + "foo@bar.com" + ); + assert_eq!( + resolve_root_msg_id(MessageSource::GitArchive, "foo@bar.com"), + "foo@bar.com" + ); + assert_eq!( + resolve_root_msg_id(MessageSource::ApiInject, "sashiko-123"), + "sashiko-123" + ); + assert_eq!( + resolve_root_msg_id(MessageSource::GitFetch, "abc123_sha"), + "abc123_sha@sashiko.local" + ); + assert_eq!( + resolve_root_msg_id(MessageSource::GitImport, "range_a_b"), + "range_a_b@sashiko.local" + ); } #[test] From 60e7f0001de973005be5f4cebc7e0d5ae2c9ce5d Mon Sep 17 00:00:00 2001 From: Elkin Cruz Date: Tue, 5 May 2026 21:46:17 +0000 Subject: [PATCH 3/6] feat: resume stuck fetches on startup Currently, FetchAgent state is kept in-memory. If the service restarts, patchsets marked as 'Fetching' in the database remain stuck because the in-memory queue is lost. This change adds logic to Main and Database to: 1. Query for patchsets left in 'Fetching' status on startup. 2. Clean synthetic message IDs to extract valid Git SHAs. 3. Automatically re-queue these fetches in the FetchAgent background task. Signed-off-by: Elkin Cruz --- src/db.rs | 31 +++++++++++++++++++++++++++++++ src/main.rs | 46 +++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 76 insertions(+), 1 deletion(-) diff --git a/src/db.rs b/src/db.rs index fdc3b723c..85ebe1686 100644 --- a/src/db.rs +++ b/src/db.rs @@ -3427,6 +3427,37 @@ impl Database { Ok(count_ps + count_rev) } + pub async fn get_stuck_fetches(&self) -> Result)>> { + let mut rows = self + .conn + .query( + "SELECT p.cover_letter_message_id, b.repo_url + FROM patchsets p + LEFT JOIN baselines b ON p.baseline_id = b.id + WHERE p.status = 'Fetching'", + (), + ) + .await?; + + let mut stuck = Vec::new(); + while let Ok(Some(row)) = rows.next().await { + let msgid: String = row.get(0)?; + let repo: Option = row.get(1).ok().flatten(); + stuck.push((msgid, repo)); + } + Ok(stuck) + } + + pub async fn reset_stuck_fetches(&self) -> Result<()> { + self.conn + .execute( + "UPDATE patchsets SET status = 'Failed', failed_reason = 'Stuck after reboot' WHERE status = 'Fetching'", + (), + ) + .await?; + Ok(()) + } + pub async fn get_patchset_counts_by_status( &self, ) -> Result> { diff --git a/src/main.rs b/src/main.rs index a1d70bdd4..93e1f3504 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,6 +15,7 @@ use clap::{Parser, Subcommand}; use sashiko::db::Database; use sashiko::events::{Event, MessageSource, ParsedArticle}; +use sashiko::fetcher::FetchRequest; use sashiko::ingestor::Ingestor; use sashiko::reviewer::Reviewer; use sashiko::settings::Settings; @@ -179,10 +180,52 @@ async fn main() -> Result<(), Box> { let (fetch_agent, fetch_tx) = sashiko::fetcher::FetchAgent::new(repo_path, raw_tx.clone()); // Spawn FetchAgent - tokio::spawn(async move { + let fetch_handle = tokio::spawn(async move { fetch_agent.run().await; }); + // Resume stuck fetches in the background + let db_resumption = db.clone(); + let tx_resumption = fetch_tx.clone(); + tokio::spawn(async move { + match db_resumption.get_stuck_fetches().await { + Ok(stuck) => { + if !stuck.is_empty() { + info!("Resuming {} stuck fetches", stuck.len()); + for (msgid, repo_url) in stuck { + // Extract actual SHA from synthetic message ID + // Format: sashiko--@sashiko.local + // Or just + let mut commit_hash = msgid.clone(); + if commit_hash.starts_with("sashiko-") { + commit_hash = commit_hash + .strip_prefix("sashiko-") + .and_then(|s| s.split('-').next()) + .map(|s| s.to_string()) + .unwrap_or(commit_hash); + } + + // Remove domain suffix if present (e.g. @sashiko.local) + if let Some(pos) = commit_hash.find('@') { + commit_hash.truncate(pos); + } + + if let Err(e) = tx_resumption + .send(FetchRequest { + repo_url, + commit_hash, + }) + .await + { + error!("Failed to re-queue stuck fetch: {}", e); + } + } + } + } + Err(e) => error!("Failed to query stuck fetches: {}", e), + } + }); + // Parser Dispatcher let semaphore = Arc::new(Semaphore::new(50)); @@ -588,6 +631,7 @@ async fn main() -> Result<(), Box> { // Abort handles ingestor_handle.abort(); parser_handle.abort(); + fetch_handle.abort(); Ok(()) } From ff68f4261566b2d7afe3e5b04029143facb56690 Mon Sep 17 00:00:00 2001 From: Elkin Cruz Date: Tue, 5 May 2026 23:15:35 +0000 Subject: [PATCH 4/6] db: optimize duplicate patchset resolution for synthetic IDs Strip synthetic GSA suffix (@sashiko.local) when matching placeholder patchsets in create_fetching_patchset. This prevents duplicate placeholder records from being created. Also allow resetting Cancelled patchsets to Fetching status if they are re-imported. TAG=agy Signed-off-by: Elkin Cruz --- src/db.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/db.rs b/src/db.rs index 85ebe1686..37d119c7d 100644 --- a/src/db.rs +++ b/src/db.rs @@ -3305,7 +3305,10 @@ impl Database { .duration_since(std::time::UNIX_EPOCH)? .as_secs() as i64; - let clid_candidates = vec![root_msg_id.to_string()]; + let mut clid_candidates = vec![root_msg_id.to_string()]; + if let Some(sha) = root_msg_id.strip_suffix("@sashiko.local") { + clid_candidates.push(sha.to_string()); + } let skip_filters_json = skip_filters.map(|f| serde_json::to_string(f).unwrap_or_default()); let only_filters_json = only_filters.map(|f| serde_json::to_string(f).unwrap_or_default()); @@ -3326,7 +3329,7 @@ impl Database { // Only reset to Fetching if it failed or is currently fetching. // We don't want to reset if it is already Incomplete, Pending, or Reviewed. - if status == "Failed" || status == "Fetching" { + if status == "Failed" || status == "Fetching" || status == "Cancelled" { self.conn.execute( "UPDATE patchsets SET status = 'Fetching', failed_reason = NULL, skip_filters = ?, only_filters = ? WHERE id = ?", libsql::params![skip_filters_json.clone(), only_filters_json.clone(), id] From 5c1ce973600f5fd808b05231b1728e85f6ce0253 Mon Sep 17 00:00:00 2001 From: Elkin Cruz Date: Wed, 6 May 2026 16:10:30 +0000 Subject: [PATCH 5/6] api: skip duplicate remote fetch requests for already ingested SHAs Check if a patchset already exists in the database by bare SHA/msgid in submit_patch before creating a remote fetch placeholder or queueing a fetch request. This completely avoids the creation of orphaned placeholder patchsets in Fetching status when identical remote fetch requests are submitted repeatedly. TAG=agy Signed-off-by: Elkin Cruz --- src/api.rs | 19 +++++++++++++++++++ src/db.rs | 11 +++++++++++ 2 files changed, 30 insertions(+) diff --git a/src/api.rs b/src/api.rs index ef30344b6..4b6dfa544 100644 --- a/src/api.rs +++ b/src/api.rs @@ -383,6 +383,25 @@ async fn submit_patch( sha, repo_display ); + // Optimistic check: If we already have this patchset in the DB, + // skip creating placeholder and skip fetch queue entirely. + match state.db.has_patchset_by_msgid(&id).await { + Ok(true) => { + info!( + "Remote fetch request for already ingested SHA {}, skipping placeholder and fetch", + id + ); + return Ok(Json(SubmitResponse { + status: "accepted".to_string(), + id, + })); + } + Err(e) => { + error!("Failed to check if patchset exists: {}", e); + } + _ => {} + } + // Create a placeholder record in the DB so the user can track status if let Err(e) = state .db diff --git a/src/db.rs b/src/db.rs index 37d119c7d..d9b2a7bd6 100644 --- a/src/db.rs +++ b/src/db.rs @@ -3294,6 +3294,17 @@ impl Database { self.rerun_patchset(patchset_id).await } + pub async fn has_patchset_by_msgid(&self, msgid: &str) -> Result { + let mut rows = self + .conn + .query( + "SELECT 1 FROM patchsets WHERE cover_letter_message_id = ? OR cover_letter_message_id = ?", + libsql::params![msgid, format!("<{}>", msgid)], + ) + .await?; + Ok(rows.next().await.ok().flatten().is_some()) + } + pub async fn create_fetching_patchset( &self, root_msg_id: &str, From 966da3eadb7a692cc53204cdc4f978b3dbe6e688 Mon Sep 17 00:00:00 2001 From: Elkin Cruz Date: Thu, 7 May 2026 22:43:39 +0000 Subject: [PATCH 6/6] fetcher: implement non-blocking in-memory delay retry queue Implement an elegant In-Memory Delay Vector queue inside FetchAgent to handle transient fetch/extraction failures (such as Gerrit replication delays) with natural backoff and max 3 retries. Failed requests are parked with exponential backoff (20s, 40s, 80s) and promoted back to the active queue on every 10-second tick. Includes full duplicate-purging, complete state hygiene on completion/exhaustion, and a virtual-time compliant unit test using sub-second intervals. TAG=agy Signed-off-by: Elkin Cruz --- Cargo.toml | 2 +- src/fetcher.rs | 274 ++++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 229 insertions(+), 47 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 74ef8836f..9366a37a3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,7 +40,7 @@ tempfile = "3.24.0" termcolor = "1.4.1" thiserror = "2.0.17" tiktoken-rs = "0.6.0" -tokio = { version = "1.48.0", features = ["full"] } +tokio = { version = "1.48.0", features = ["full", "test-util"] } tokio-util = { version = "0.7.17", features = ["codec", "io"] } toml = "0.8" tower-http = { version = "0.6.8", features = ["cors", "fs", "trace"] } diff --git a/src/fetcher.rs b/src/fetcher.rs index fff3fdcb7..a362b9852 100644 --- a/src/fetcher.rs +++ b/src/fetcher.rs @@ -29,10 +29,19 @@ pub struct FetchRequest { pub commit_hash: String, } +#[derive(Clone)] +struct DelayedRequest { + repo_url: Option, + commit_hash: String, + run_at: std::time::Instant, +} + pub struct FetchAgent { repo_path: PathBuf, rx: mpsc::Receiver, main_tx: mpsc::Sender, + tick_interval: Duration, + backoff_base: Duration, } impl FetchAgent { @@ -46,15 +55,25 @@ impl FetchAgent { repo_path, rx, main_tx, + tick_interval: Duration::from_secs(10), + backoff_base: Duration::from_secs(10), }, tx, ) } + pub fn with_intervals(mut self, tick_interval: Duration, backoff_base: Duration) -> Self { + self.tick_interval = tick_interval; + self.backoff_base = backoff_base; + self + } + pub async fn run(mut self) { info!("FetchAgent started"); let mut queue: HashMap, HashSet> = HashMap::new(); - let mut ticker = interval(Duration::from_secs(10)); + let mut delayed_retries: Vec = Vec::new(); // In-memory delay vector + let mut attempts_tracker: HashMap = HashMap::new(); // Local attempt tracker + let mut ticker = interval(self.tick_interval); // Configurable batch timer loop { tokio::select! { @@ -64,15 +83,41 @@ impl FetchAgent { .insert(req.commit_hash); } _ = ticker.tick() => { + let now = std::time::Instant::now(); + let mut expired_retries = Vec::new(); + + // Filter out expired retries + delayed_retries.retain(|item| { + if now >= item.run_at { + expired_retries.push(item.clone()); + false // Remove from delayed list (promoted) + } else { + true // Keep in delayed list + } + }); + + // Promote expired retries into the active batch queue + for retry in expired_retries { + queue.entry(retry.repo_url) + .or_default() + .insert(retry.commit_hash); + } + + // Process active batch queue if !queue.is_empty() { - self.process_queue(&mut queue).await; + self.process_queue(&mut queue, &mut delayed_retries, &mut attempts_tracker).await; } } } } } - async fn process_queue(&self, queue: &mut HashMap, HashSet>) { + async fn process_queue( + &self, + queue: &mut HashMap, HashSet>, + delayed_retries: &mut Vec, + attempts_tracker: &mut HashMap, + ) { info!("Processing fetch queue with {} repos", queue.len()); for (url_opt, commits) in queue.drain() { @@ -102,9 +147,9 @@ impl FetchAgent { "All commits present locally, skipping fetch for {}", url_display ); - } else if let Some(url) = url_opt { + } else if let Some(ref url) = url_opt { // Remote fetch logic - let remote_name = self.get_remote_name(&url); + let remote_name = self.get_remote_name(url); // Check if repo is local (same as self.repo_path) let is_local = { @@ -126,17 +171,17 @@ impl FetchAgent { ); // Do not continue here; let it fall through to Step 3 where it will fail individually } else { - if let Err(e) = self.ensure_remote(&remote_name, &url).await { + if let Err(e) = self.ensure_remote(&remote_name, url).await { error!("Failed to ensure remote {}: {}", url, e); for commit in &missing_commits { - let _ = self - .main_tx - .send(Event::IngestionFailed { - article_id: commit.clone(), - error: format!("Failed to set up remote {}: {}", url, e), - source: MessageSource::GitFetch, - }) - .await; + self.handle_fetch_failure( + &url_opt, + commit, + &format!("Failed to set up remote {}: {}", url, e), + delayed_retries, + attempts_tracker, + ) + .await; } continue; } @@ -151,14 +196,14 @@ impl FetchAgent { if let Err(e) = self.fetch_all(&remote_name).await { error!("Full fetch failed for {}: {}", url, e); for commit in &missing_commits { - let _ = self - .main_tx - .send(Event::IngestionFailed { - article_id: commit.clone(), - error: format!("Failed to fetch from {}: {}", url, e), - source: MessageSource::GitFetch, - }) - .await; + self.handle_fetch_failure( + &url_opt, + commit, + &format!("Failed to fetch from {}: {}", url, e), + delayed_retries, + attempts_tracker, + ) + .await; } continue; } @@ -182,14 +227,14 @@ impl FetchAgent { { Ok(shas) => shas, Err(e) => { - let _ = self - .main_tx - .send(Event::IngestionFailed { - article_id: range.clone(), - error: format!("Failed to resolve git range: {}", e), - source: MessageSource::GitFetch, - }) - .await; + self.handle_fetch_failure( + &url_opt, + &commit_or_range, + &format!("Failed to resolve git range: {}", e), + delayed_retries, + attempts_tracker, + ) + .await; continue; } }; @@ -217,26 +262,33 @@ impl FetchAgent { } } } + // Range Success! Clean up attempts tracker and delay queue for the range key + attempts_tracker.remove(&commit_or_range); + delayed_retries.retain(|item| item.commit_hash != commit_or_range); info!("Successfully submitted remote range {}", range); } else { // Single commit let full_sha = match self.resolve_sha(&commit_or_range).await { Ok(sha) => sha, Err(e) => { - let _ = self - .main_tx - .send(Event::IngestionFailed { - article_id: commit_or_range.clone(), - error: format!("Failed to resolve SHA: {}", e), - source: MessageSource::GitFetch, - }) - .await; + self.handle_fetch_failure( + &url_opt, + &commit_or_range, + &format!("Failed to resolve SHA: {}", e), + delayed_retries, + attempts_tracker, + ) + .await; continue; } }; match self.extract_patch(&full_sha, &commit_or_range, 1, 1).await { Ok(mut event) => { + // Success! Clean up attempts tracker and the parked delay queue + attempts_tracker.remove(&commit_or_range); + delayed_retries.retain(|item| item.commit_hash != commit_or_range); + if let Event::PatchSubmitted { ref mut message_id, .. } = event @@ -251,14 +303,14 @@ impl FetchAgent { } Err(e) => { error!("Failed to extract patch {}: {}", commit_or_range, e); - let _ = self - .main_tx - .send(Event::IngestionFailed { - article_id: commit_or_range, - error: format!("Failed to extract patch: {}", e), - source: MessageSource::GitFetch, - }) - .await; + self.handle_fetch_failure( + &url_opt, + &commit_or_range, + &format!("Failed to extract patch: {}", e), + delayed_retries, + attempts_tracker, + ) + .await; } } } @@ -439,6 +491,61 @@ impl FetchAgent { total, }) } + + async fn handle_fetch_failure( + &self, + url_opt: &Option, + commit: &str, + error_msg: &str, + delayed_retries: &mut Vec, + attempts_tracker: &mut HashMap, + ) { + let attempts = attempts_tracker.get(commit).cloned().unwrap_or(0); + let max_retries = 3; + + if attempts < max_retries { + let next_attempt = attempts + 1; + attempts_tracker.insert(commit.to_string(), next_attempt); + + // Exponential backoff using configurable base: base * (2 ^ attempts) + let delay = self.backoff_base * (2u32.pow(next_attempt)); + let run_at = std::time::Instant::now() + delay; + + warn!( + "Fetch/extract failed for {} (attempts: {}/{}). Parking in delay queue for {}s.", + commit, + next_attempt, + max_retries, + delay.as_secs() + ); + + // Deduplicate inside delayed_retries first to prevent duplicate parked entries! + delayed_retries.retain(|item| item.commit_hash != commit); + + delayed_retries.push(DelayedRequest { + repo_url: url_opt.clone(), + commit_hash: commit.to_string(), + run_at, + }); + } else { + // Retries exhausted, clean up attempts tracker and the parked delay queue + attempts_tracker.remove(commit); + delayed_retries.retain(|item| item.commit_hash != commit); + error!( + "Fetch/extract failed for {} after {} retries. Marking as Failed. Error: {}", + commit, max_retries, error_msg + ); + + let _ = self + .main_tx + .send(Event::IngestionFailed { + article_id: commit.to_string(), + error: error_msg.to_string(), + source: MessageSource::GitFetch, + }) + .await; + } + } } #[cfg(test)] @@ -590,4 +697,79 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_fetch_agent_delay_queue_retry() -> Result<()> { + let temp_dir = tempfile::tempdir()?; + let repo_path = temp_dir.path().to_path_buf(); + + // Setup empty dummy repo + Command::new("git") + .current_dir(&repo_path) + .arg("init") + .output() + .await?; + + let (event_tx, mut event_rx) = mpsc::channel(10); + + // Create agent with tiny sub-second intervals for fast real-time testing! + let (agent, fetch_tx) = { + let (a, tx) = FetchAgent::new(repo_path.clone(), event_tx); + ( + a.with_intervals(Duration::from_millis(50), Duration::from_millis(50)), + tx, + ) + }; + + // Spawn the agent in the background + let agent_handle = tokio::spawn(agent.run()); + + // Enqueue a FetchRequest for a missing commit (which will fail resolve_sha!) + let req = FetchRequest { + repo_url: None, + commit_hash: "0123456789abcdef0123456789abcdef01234567".to_string(), + }; + fetch_tx.send(req).await?; + + // Let the first tick (50ms) process. Wait 100ms to be safe. + tokio::time::sleep(Duration::from_millis(100)).await; + + // The first tick ran process_queue, failed, and parked it in delayed_retries (Attempts = 1, delay = 100ms) + // Verify no event is received yet. + tokio::select! { + Some(event) = event_rx.recv() => { + panic!("Received unexpected event: {:?}", event); + } + _ = tokio::time::sleep(Duration::from_millis(50)) => { + // Expected: no event sent! + } + } + + // Wait for retries to exhaust: + // Attempt 1: delay 100ms. + // Attempt 2: delay 200ms. + // Attempt 3: delay 400ms. + // Total delay is 700ms. Plus ticks processing time, we sleep 1.2 seconds to be completely safe. + tokio::time::sleep(Duration::from_millis(1200)).await; + + // Verify that Event::IngestionFailed is finally received! + tokio::select! { + Some(Event::IngestionFailed { article_id, error, .. }) = event_rx.recv() => { + assert_eq!(article_id, "0123456789abcdef0123456789abcdef01234567"); + assert!( + error.contains("Failed to resolve SHA") || error.contains("Failed to extract patch"), + "Actual error was: {}", + error + ); + } + _ = tokio::time::sleep(Duration::from_millis(500)) => { + panic!("Timed out waiting for IngestionFailed event after retries!"); + } + } + + // Clean up the agent + agent_handle.abort(); + + Ok(()) + } }