From 82da2e1cada41b54bd1c5d22cbd073fe029d4a8f Mon Sep 17 00:00:00 2001 From: liyunpeng Date: Thu, 14 May 2026 21:32:28 +0800 Subject: [PATCH 1/2] fix: refresh daemon contacts when cache changes Refresh the in-memory contact cache when contact.db changes so long-running daemon sessions pick up updated names and verification flags without a restart. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/daemon/cache.rs | 45 ++++++++++++++++++++++++++++++++++++++++++++ src/daemon/mod.rs | 15 ++------------- src/daemon/query.rs | 2 +- src/daemon/server.rs | 44 +++++++++++++++++++++++++++++++++---------- 4 files changed, 82 insertions(+), 24 deletions(-) diff --git a/src/daemon/cache.rs b/src/daemon/cache.rs index 9801396..5ad8a14 100644 --- a/src/daemon/cache.rs +++ b/src/daemon/cache.rs @@ -59,6 +59,51 @@ impl DbCache { self.cache_dir.join(format!("{}.db", hash)) } + pub async fn needs_refresh(&self, rel_key: &str) -> bool { + if !self.all_keys.contains_key(rel_key) { + return false; + } + + let db_path = self.db_dir.join( + rel_key.replace('\\', std::path::MAIN_SEPARATOR_STR) + .replace('/', std::path::MAIN_SEPARATOR_STR), + ); + if !db_path.exists() { + return false; + } + + let wal_path = wal_path_for(&db_path); + let db_mt = mtime_nanos(&db_path); + let wal_mt = if wal_path.exists() { mtime_nanos(&wal_path) } else { 0 }; + + let inner = self.inner.lock().await; + match inner.get(rel_key) { + Some(entry) => { + entry.db_mtime != db_mt + || entry.wal_mtime != wal_mt + || !entry.decrypted_path.exists() + } + None => true, + } + } + + pub fn message_db_keys(&self) -> Vec { + let mut keys: Vec = self + .all_keys + .keys() + .filter(|k| { + let normalized = k.replace('\\', "/"); + normalized.contains("message/message_") + && normalized.ends_with(".db") + && !normalized.contains("_fts") + && !normalized.contains("_resource") + }) + .cloned() + .collect(); + keys.sort(); + keys + } + /// 从持久化文件加载 mtime 记录,复用未过期的解密文件 async fn load_persistent(&self) { let mtime_file = config::mtime_file(); diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index 02dc99f..bec6359 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -48,16 +48,6 @@ async fn async_run() -> Result<()> { // 初始化 DbCache let db = Arc::new(cache::DbCache::new(cfg.db_dir.clone(), all_keys.clone()).await?); - // 收集消息 DB 列表 - let msg_db_keys: Vec = all_keys.keys() - .filter(|k| { - let k = k.replace('\\', "/"); - k.contains("message/message_") && k.ends_with(".db") - && !k.contains("_fts") && !k.contains("_resource") - }) - .cloned() - .collect(); - // 预热:加载联系人 + 解密 session.db eprintln!("[daemon] 预热..."); let names_raw = query::load_names(&*db).await.unwrap_or_else(|e| { @@ -65,12 +55,11 @@ async fn async_run() -> Result<()> { query::Names { map: HashMap::new(), md5_to_uname: HashMap::new(), - msg_db_keys: Vec::new(), + msg_db_keys: db.message_db_keys(), verify_flags: HashMap::new(), } }); - let mut names = names_raw; - names.msg_db_keys = msg_db_keys; + let names = names_raw; let _ = db.get("session/session.db").await; let _ = db.get("sns/sns.db").await; diff --git a/src/daemon/query.rs b/src/daemon/query.rs index 18cf28e..5daabfe 100644 --- a/src/daemon/query.rs +++ b/src/daemon/query.rs @@ -106,7 +106,7 @@ pub async fn load_names(db: &DbCache) -> Result { .map(|u| (format!("{:x}", md5::compute(u.as_bytes())), u.clone())) .collect(); - Ok(Names { map, md5_to_uname, msg_db_keys: Vec::new(), verify_flags }) + Ok(Names { map, md5_to_uname, msg_db_keys: db.message_db_keys(), verify_flags }) } /// 查询最近会话列表 diff --git a/src/daemon/server.rs b/src/daemon/server.rs index 896a08e..52472b7 100644 --- a/src/daemon/server.rs +++ b/src/daemon/server.rs @@ -4,7 +4,7 @@ use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use crate::ipc::{Request, Response}; use super::cache::DbCache; -use super::query::Names; +use super::query::{self, Names}; /// 启动 IPC server(Unix socket / Windows named pipe) pub async fn serve( @@ -147,15 +147,8 @@ async fn dispatch( names: &tokio::sync::RwLock>, ) -> Response { use crate::ipc::Request::*; - use super::query; - - // 取 guard → O(1) clone Arc → 立即 drop 锁。后续 await 期间不持有锁, - // 多个并发 IPC 请求可以真正并行。Names 本身不可变(由 daemon 启动时 - // 一次性构建),共享 Arc 即可。 - let names_arc: Arc = { - let guard = names.read().await; - Arc::clone(&*guard) - }; + + let names_arc = current_names(db, names).await; match req { Ping => Response::ok(serde_json::json!({ "pong": true })), @@ -233,3 +226,34 @@ async fn dispatch( } } } + +async fn current_names( + db: &DbCache, + names: &tokio::sync::RwLock>, +) -> Arc { + if !db.needs_refresh("contact/contact.db").await { + return clone_names(names).await; + } + + let mut guard = names.write().await; + if !db.needs_refresh("contact/contact.db").await { + return Arc::clone(&*guard); + } + + match query::load_names(db).await { + Ok(fresh) => { + let fresh = Arc::new(fresh); + *guard = Arc::clone(&fresh); + fresh + } + Err(e) => { + eprintln!("[daemon] 刷新联系人失败: {}", e); + Arc::clone(&*guard) + } + } +} + +async fn clone_names(names: &tokio::sync::RwLock>) -> Arc { + let guard = names.read().await; + Arc::clone(&*guard) +} From 5b6863160d419be6bcb549e5bf5f1a6da741393b Mon Sep 17 00:00:00 2001 From: liyunpeng Date: Fri, 15 May 2026 11:50:12 +0800 Subject: [PATCH 2/2] fix: retry contact refresh after load failures Avoid refreshing contacts for daemon requests that do not need names, and invalidate the contact cache entry when reload fails so the next request retries instead of staying stale. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/daemon/cache.rs | 5 +++++ src/daemon/server.rs | 17 +++++++++++++++-- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/src/daemon/cache.rs b/src/daemon/cache.rs index 813f717..43785a1 100644 --- a/src/daemon/cache.rs +++ b/src/daemon/cache.rs @@ -120,6 +120,11 @@ impl DbCache { keys } + pub async fn invalidate(&self, rel_key: &str) { + let mut inner = self.inner.lock().await; + inner.remove(rel_key); + } + /// 从持久化文件加载 mtime 记录,复用未过期的解密文件 async fn load_persistent(&self) { let mtime_file = &self.mtime_file; diff --git a/src/daemon/server.rs b/src/daemon/server.rs index e640fb1..4b968ce 100644 --- a/src/daemon/server.rs +++ b/src/daemon/server.rs @@ -148,47 +148,52 @@ async fn dispatch( ) -> Response { use crate::ipc::Request::*; - let names_arc = current_names(db, names).await; - match req { Ping => Response::ok(serde_json::json!({ "pong": true })), Sessions { limit } => { + let names_arc = current_names(db, names).await; match query::q_sessions(db, &names_arc, limit).await { Ok(v) => Response::ok(v), Err(e) => Response::err(e.to_string()), } } History { chat, limit, offset, since, until, msg_type } => { + let names_arc = current_names(db, names).await; match query::q_history(db, &names_arc, &chat, limit, offset, since, until, msg_type).await { Ok(v) => Response::ok(v), Err(e) => Response::err(e.to_string()), } } Search { keyword, chats, limit, since, until, msg_type } => { + let names_arc = current_names(db, names).await; match query::q_search(db, &names_arc, &keyword, chats, limit, since, until, msg_type).await { Ok(v) => Response::ok(v), Err(e) => Response::err(e.to_string()), } } Contacts { query, limit } => { + let names_arc = current_names(db, names).await; match query::q_contacts(&names_arc, query.as_deref(), limit).await { Ok(v) => Response::ok(v), Err(e) => Response::err(e.to_string()), } } Unread { limit, filter } => { + let names_arc = current_names(db, names).await; match query::q_unread(db, &names_arc, limit, filter).await { Ok(v) => Response::ok(v), Err(e) => Response::err(e.to_string()), } } Members { chat } => { + let names_arc = current_names(db, names).await; match query::q_members(db, &names_arc, &chat).await { Ok(v) => Response::ok(v), Err(e) => Response::err(e.to_string()), } } NewMessages { state, limit } => { + let names_arc = current_names(db, names).await; match query::q_new_messages(db, &names_arc, state, limit).await { Ok(v) => Response::ok(v), Err(e) => Response::err(e.to_string()), @@ -201,24 +206,28 @@ async fn dispatch( } } Stats { chat, since, until } => { + let names_arc = current_names(db, names).await; match query::q_stats(db, &names_arc, &chat, since, until).await { Ok(v) => Response::ok(v), Err(e) => Response::err(e.to_string()), } } SnsNotifications { limit, since, until, include_read } => { + let names_arc = current_names(db, names).await; match query::q_sns_notifications(db, &names_arc, limit, since, until, include_read).await { Ok(v) => Response::ok(v), Err(e) => Response::err(e.to_string()), } } SnsFeed { limit, since, until, user } => { + let names_arc = current_names(db, names).await; match query::q_sns_feed(db, &names_arc, limit, since, until, user.as_deref()).await { Ok(v) => Response::ok(v), Err(e) => Response::err(e.to_string()), } } SnsSearch { keyword, limit, since, until, user } => { + let names_arc = current_names(db, names).await; match query::q_sns_search(db, &names_arc, &keyword, limit, since, until, user.as_deref()).await { Ok(v) => Response::ok(v), Err(e) => Response::err(e.to_string()), @@ -228,18 +237,21 @@ async fn dispatch( Response::ok(serde_json::json!({ "reloading": true })) } BizArticles { limit, account, since, until, unread } => { + let names_arc = current_names(db, names).await; match query::q_biz_articles(db, &names_arc, limit, account, since, until, unread).await { Ok(v) => Response::ok(v), Err(e) => Response::err(e.to_string()), } } Attachments { chat, kinds, limit, offset, since, until } => { + let names_arc = current_names(db, names).await; match query::q_attachments(db, &names_arc, &chat, kinds, limit, offset, since, until).await { Ok(v) => Response::ok(v), Err(e) => Response::err(e.to_string()), } } Extract { attachment_id, output, overwrite } => { + let names_arc = current_names(db, names).await; match query::q_extract(db, &names_arc, &attachment_id, &output, overwrite).await { Ok(v) => Response::ok(v), Err(e) => Response::err(e.to_string()), @@ -269,6 +281,7 @@ async fn current_names( } Err(e) => { eprintln!("[daemon] 刷新联系人失败: {}", e); + db.invalidate("contact/contact.db").await; Arc::clone(&*guard) } }