diff --git a/src/daemon/cache.rs b/src/daemon/cache.rs index d6e5892..43785a1 100644 --- a/src/daemon/cache.rs +++ b/src/daemon/cache.rs @@ -75,6 +75,56 @@ impl DbCache { self.cache_dir.join(format!("{}.db", hash)) } + pub async fn needs_refresh(&self, rel_key: &str) -> bool { + if !self.all_keys.contains_key(rel_key) { + return false; + } + + let db_path = self.db_dir.join( + rel_key.replace('\\', std::path::MAIN_SEPARATOR_STR) + .replace('/', std::path::MAIN_SEPARATOR_STR), + ); + if !db_path.exists() { + return false; + } + + let wal_path = wal_path_for(&db_path); + let db_mt = mtime_nanos(&db_path); + let wal_mt = if wal_path.exists() { mtime_nanos(&wal_path) } else { 0 }; + + let inner = self.inner.lock().await; + match inner.get(rel_key) { + Some(entry) => { + entry.db_mtime != db_mt + || entry.wal_mtime != wal_mt + || !entry.decrypted_path.exists() + } + None => true, + } + } + + pub fn message_db_keys(&self) -> Vec { + let mut keys: Vec = self + .all_keys + .keys() + .filter(|k| { + let normalized = k.replace('\\', "/"); + normalized.contains("message/message_") + && normalized.ends_with(".db") + && !normalized.contains("_fts") + && !normalized.contains("_resource") + }) + .cloned() + .collect(); + keys.sort(); + keys + } + + pub async fn invalidate(&self, rel_key: &str) { + let mut inner = self.inner.lock().await; + inner.remove(rel_key); + } + /// 从持久化文件加载 mtime 记录,复用未过期的解密文件 async fn load_persistent(&self) { let mtime_file = &self.mtime_file; diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index b4a34c3..315fcd6 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -47,19 +47,6 @@ async fn async_run() -> Result<()> { // 初始化 DbCache let db = Arc::new(cache::DbCache::new(cfg.db_dir.clone(), all_keys.clone()).await?); - // 收集消息 DB 列表 - let msg_db_keys: Vec = all_keys - .keys() - .filter(|k| { - let k = k.replace('\\', "/"); - k.contains("message/message_") - && k.ends_with(".db") - && !k.contains("_fts") - && !k.contains("_resource") - }) - .cloned() - .collect(); - // 预热:加载联系人 + 解密 session.db eprintln!("[daemon] 预热..."); let names_raw = query::load_names(&*db).await.unwrap_or_else(|e| { @@ -67,12 +54,11 @@ async fn async_run() -> Result<()> { query::Names { map: HashMap::new(), md5_to_uname: HashMap::new(), - msg_db_keys: Vec::new(), + msg_db_keys: db.message_db_keys(), verify_flags: HashMap::new(), } }); - let mut names = names_raw; - names.msg_db_keys = msg_db_keys; + let names = names_raw; let _ = db.get("session/session.db").await; let _ = db.get("sns/sns.db").await; diff --git a/src/daemon/query.rs b/src/daemon/query.rs index 634ff2d..eba476c 100644 --- a/src/daemon/query.rs +++ b/src/daemon/query.rs @@ -106,7 +106,7 @@ pub async fn load_names(db: &DbCache) -> Result { .map(|u| (format!("{:x}", md5::compute(u.as_bytes())), u.clone())) .collect(); - Ok(Names { map, md5_to_uname, msg_db_keys: Vec::new(), verify_flags }) + Ok(Names { map, md5_to_uname, msg_db_keys: db.message_db_keys(), verify_flags }) } /// 查询最近会话列表 diff --git a/src/daemon/server.rs b/src/daemon/server.rs index 9f54076..4b968ce 100644 --- a/src/daemon/server.rs +++ b/src/daemon/server.rs @@ -4,7 +4,7 @@ use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use crate::ipc::{Request, Response}; use super::cache::DbCache; -use super::query::Names; +use super::query::{self, Names}; /// 启动 IPC server(Unix socket / Windows named pipe) pub async fn serve( @@ -147,55 +147,53 @@ async fn dispatch( names: &tokio::sync::RwLock>, ) -> Response { use crate::ipc::Request::*; - use super::query; - - // 取 guard → O(1) clone Arc → 立即 drop 锁。后续 await 期间不持有锁, - // 多个并发 IPC 请求可以真正并行。Names 本身不可变(由 daemon 启动时 - // 一次性构建),共享 Arc 即可。 - let names_arc: Arc = { - let guard = names.read().await; - Arc::clone(&*guard) - }; match req { Ping => Response::ok(serde_json::json!({ "pong": true })), Sessions { limit } => { + let names_arc = current_names(db, names).await; match query::q_sessions(db, &names_arc, limit).await { Ok(v) => Response::ok(v), Err(e) => Response::err(e.to_string()), } } History { chat, limit, offset, since, until, msg_type } => { + let names_arc = current_names(db, names).await; match query::q_history(db, &names_arc, &chat, limit, offset, since, until, msg_type).await { Ok(v) => Response::ok(v), Err(e) => Response::err(e.to_string()), } } Search { keyword, chats, limit, since, until, msg_type } => { + let names_arc = current_names(db, names).await; match query::q_search(db, &names_arc, &keyword, chats, limit, since, until, msg_type).await { Ok(v) => Response::ok(v), Err(e) => Response::err(e.to_string()), } } Contacts { query, limit } => { + let names_arc = current_names(db, names).await; match query::q_contacts(&names_arc, query.as_deref(), limit).await { Ok(v) => Response::ok(v), Err(e) => Response::err(e.to_string()), } } Unread { limit, filter } => { + let names_arc = current_names(db, names).await; match query::q_unread(db, &names_arc, limit, filter).await { Ok(v) => Response::ok(v), Err(e) => Response::err(e.to_string()), } } Members { chat } => { + let names_arc = current_names(db, names).await; match query::q_members(db, &names_arc, &chat).await { Ok(v) => Response::ok(v), Err(e) => Response::err(e.to_string()), } } NewMessages { state, limit } => { + let names_arc = current_names(db, names).await; match query::q_new_messages(db, &names_arc, state, limit).await { Ok(v) => Response::ok(v), Err(e) => Response::err(e.to_string()), @@ -208,24 +206,28 @@ async fn dispatch( } } Stats { chat, since, until } => { + let names_arc = current_names(db, names).await; match query::q_stats(db, &names_arc, &chat, since, until).await { Ok(v) => Response::ok(v), Err(e) => Response::err(e.to_string()), } } SnsNotifications { limit, since, until, include_read } => { + let names_arc = current_names(db, names).await; match query::q_sns_notifications(db, &names_arc, limit, since, until, include_read).await { Ok(v) => Response::ok(v), Err(e) => Response::err(e.to_string()), } } SnsFeed { limit, since, until, user } => { + let names_arc = current_names(db, names).await; match query::q_sns_feed(db, &names_arc, limit, since, until, user.as_deref()).await { Ok(v) => Response::ok(v), Err(e) => Response::err(e.to_string()), } } SnsSearch { keyword, limit, since, until, user } => { + let names_arc = current_names(db, names).await; match query::q_sns_search(db, &names_arc, &keyword, limit, since, until, user.as_deref()).await { Ok(v) => Response::ok(v), Err(e) => Response::err(e.to_string()), @@ -235,18 +237,21 @@ async fn dispatch( Response::ok(serde_json::json!({ "reloading": true })) } BizArticles { limit, account, since, until, unread } => { + let names_arc = current_names(db, names).await; match query::q_biz_articles(db, &names_arc, limit, account, since, until, unread).await { Ok(v) => Response::ok(v), Err(e) => Response::err(e.to_string()), } } Attachments { chat, kinds, limit, offset, since, until } => { + let names_arc = current_names(db, names).await; match query::q_attachments(db, &names_arc, &chat, kinds, limit, offset, since, until).await { Ok(v) => Response::ok(v), Err(e) => Response::err(e.to_string()), } } Extract { attachment_id, output, overwrite } => { + let names_arc = current_names(db, names).await; match query::q_extract(db, &names_arc, &attachment_id, &output, overwrite).await { Ok(v) => Response::ok(v), Err(e) => Response::err(e.to_string()), @@ -254,3 +259,35 @@ async fn dispatch( } } } + +async fn current_names( + db: &DbCache, + names: &tokio::sync::RwLock>, +) -> Arc { + if !db.needs_refresh("contact/contact.db").await { + return clone_names(names).await; + } + + let mut guard = names.write().await; + if !db.needs_refresh("contact/contact.db").await { + return Arc::clone(&*guard); + } + + match query::load_names(db).await { + Ok(fresh) => { + let fresh = Arc::new(fresh); + *guard = Arc::clone(&fresh); + fresh + } + Err(e) => { + eprintln!("[daemon] 刷新联系人失败: {}", e); + db.invalidate("contact/contact.db").await; + Arc::clone(&*guard) + } + } +} + +async fn clone_names(names: &tokio::sync::RwLock>) -> Arc { + let guard = names.read().await; + Arc::clone(&*guard) +}