Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions src/daemon/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,56 @@ impl DbCache {
self.cache_dir.join(format!("{}.db", hash))
}

pub async fn needs_refresh(&self, rel_key: &str) -> bool {
if !self.all_keys.contains_key(rel_key) {
return false;
}

let db_path = self.db_dir.join(
rel_key.replace('\\', std::path::MAIN_SEPARATOR_STR)
.replace('/', std::path::MAIN_SEPARATOR_STR),
);
if !db_path.exists() {
return false;
}

let wal_path = wal_path_for(&db_path);
let db_mt = mtime_nanos(&db_path);
let wal_mt = if wal_path.exists() { mtime_nanos(&wal_path) } else { 0 };

let inner = self.inner.lock().await;
match inner.get(rel_key) {
Some(entry) => {
entry.db_mtime != db_mt
|| entry.wal_mtime != wal_mt
|| !entry.decrypted_path.exists()
}
None => true,
}
}

pub fn message_db_keys(&self) -> Vec<String> {
let mut keys: Vec<String> = self
.all_keys
.keys()
.filter(|k| {
let normalized = k.replace('\\', "/");
normalized.contains("message/message_")
&& normalized.ends_with(".db")
&& !normalized.contains("_fts")
&& !normalized.contains("_resource")
})
.cloned()
.collect();
keys.sort();
keys
}

pub async fn invalidate(&self, rel_key: &str) {
let mut inner = self.inner.lock().await;
inner.remove(rel_key);
}

/// 从持久化文件加载 mtime 记录,复用未过期的解密文件
async fn load_persistent(&self) {
let mtime_file = &self.mtime_file;
Expand Down
18 changes: 2 additions & 16 deletions src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,32 +47,18 @@ async fn async_run() -> Result<()> {
// 初始化 DbCache
let db = Arc::new(cache::DbCache::new(cfg.db_dir.clone(), all_keys.clone()).await?);

// 收集消息 DB 列表
let msg_db_keys: Vec<String> = all_keys
.keys()
.filter(|k| {
let k = k.replace('\\', "/");
k.contains("message/message_")
&& k.ends_with(".db")
&& !k.contains("_fts")
&& !k.contains("_resource")
})
.cloned()
.collect();

// 预热:加载联系人 + 解密 session.db
eprintln!("[daemon] 预热...");
let names_raw = query::load_names(&*db).await.unwrap_or_else(|e| {
eprintln!("[daemon] 加载联系人失败: {}", e);
query::Names {
map: HashMap::new(),
md5_to_uname: HashMap::new(),
msg_db_keys: Vec::new(),
msg_db_keys: db.message_db_keys(),
verify_flags: HashMap::new(),
}
});
let mut names = names_raw;
names.msg_db_keys = msg_db_keys;
let names = names_raw;

let _ = db.get("session/session.db").await;
let _ = db.get("sns/sns.db").await;
Expand Down
2 changes: 1 addition & 1 deletion src/daemon/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ pub async fn load_names(db: &DbCache) -> Result<Names> {
.map(|u| (format!("{:x}", md5::compute(u.as_bytes())), u.clone()))
.collect();

Ok(Names { map, md5_to_uname, msg_db_keys: Vec::new(), verify_flags })
Ok(Names { map, md5_to_uname, msg_db_keys: db.message_db_keys(), verify_flags })
}

/// 查询最近会话列表
Expand Down
57 changes: 47 additions & 10 deletions src/daemon/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};

use crate::ipc::{Request, Response};
use super::cache::DbCache;
use super::query::Names;
use super::query::{self, Names};

/// 启动 IPC server(Unix socket / Windows named pipe)
pub async fn serve(
Expand Down Expand Up @@ -147,55 +147,53 @@ async fn dispatch(
names: &tokio::sync::RwLock<Arc<Names>>,
) -> Response {
use crate::ipc::Request::*;
use super::query;

// 取 guard → O(1) clone Arc → 立即 drop 锁。后续 await 期间不持有锁,
// 多个并发 IPC 请求可以真正并行。Names 本身不可变(由 daemon 启动时
// 一次性构建),共享 Arc 即可。
let names_arc: Arc<Names> = {
let guard = names.read().await;
Arc::clone(&*guard)
};

match req {
Ping => Response::ok(serde_json::json!({ "pong": true })),
Sessions { limit } => {
let names_arc = current_names(db, names).await;
match query::q_sessions(db, &names_arc, limit).await {
Ok(v) => Response::ok(v),
Err(e) => Response::err(e.to_string()),
}
}
History { chat, limit, offset, since, until, msg_type } => {
let names_arc = current_names(db, names).await;
match query::q_history(db, &names_arc, &chat, limit, offset, since, until, msg_type).await {
Ok(v) => Response::ok(v),
Err(e) => Response::err(e.to_string()),
}
}
Search { keyword, chats, limit, since, until, msg_type } => {
let names_arc = current_names(db, names).await;
match query::q_search(db, &names_arc, &keyword, chats, limit, since, until, msg_type).await {
Ok(v) => Response::ok(v),
Err(e) => Response::err(e.to_string()),
}
}
Contacts { query, limit } => {
let names_arc = current_names(db, names).await;
match query::q_contacts(&names_arc, query.as_deref(), limit).await {
Ok(v) => Response::ok(v),
Err(e) => Response::err(e.to_string()),
}
}
Unread { limit, filter } => {
let names_arc = current_names(db, names).await;
match query::q_unread(db, &names_arc, limit, filter).await {
Ok(v) => Response::ok(v),
Err(e) => Response::err(e.to_string()),
}
}
Members { chat } => {
let names_arc = current_names(db, names).await;
match query::q_members(db, &names_arc, &chat).await {
Ok(v) => Response::ok(v),
Err(e) => Response::err(e.to_string()),
}
}
NewMessages { state, limit } => {
let names_arc = current_names(db, names).await;
match query::q_new_messages(db, &names_arc, state, limit).await {
Ok(v) => Response::ok(v),
Err(e) => Response::err(e.to_string()),
Expand All @@ -208,24 +206,28 @@ async fn dispatch(
}
}
Stats { chat, since, until } => {
let names_arc = current_names(db, names).await;
match query::q_stats(db, &names_arc, &chat, since, until).await {
Ok(v) => Response::ok(v),
Err(e) => Response::err(e.to_string()),
}
}
SnsNotifications { limit, since, until, include_read } => {
let names_arc = current_names(db, names).await;
match query::q_sns_notifications(db, &names_arc, limit, since, until, include_read).await {
Ok(v) => Response::ok(v),
Err(e) => Response::err(e.to_string()),
}
}
SnsFeed { limit, since, until, user } => {
let names_arc = current_names(db, names).await;
match query::q_sns_feed(db, &names_arc, limit, since, until, user.as_deref()).await {
Ok(v) => Response::ok(v),
Err(e) => Response::err(e.to_string()),
}
}
SnsSearch { keyword, limit, since, until, user } => {
let names_arc = current_names(db, names).await;
match query::q_sns_search(db, &names_arc, &keyword, limit, since, until, user.as_deref()).await {
Ok(v) => Response::ok(v),
Err(e) => Response::err(e.to_string()),
Expand All @@ -235,22 +237,57 @@ async fn dispatch(
Response::ok(serde_json::json!({ "reloading": true }))
}
BizArticles { limit, account, since, until, unread } => {
let names_arc = current_names(db, names).await;
match query::q_biz_articles(db, &names_arc, limit, account, since, until, unread).await {
Ok(v) => Response::ok(v),
Err(e) => Response::err(e.to_string()),
}
}
Attachments { chat, kinds, limit, offset, since, until } => {
let names_arc = current_names(db, names).await;
match query::q_attachments(db, &names_arc, &chat, kinds, limit, offset, since, until).await {
Ok(v) => Response::ok(v),
Err(e) => Response::err(e.to_string()),
}
}
Extract { attachment_id, output, overwrite } => {
let names_arc = current_names(db, names).await;
match query::q_extract(db, &names_arc, &attachment_id, &output, overwrite).await {
Ok(v) => Response::ok(v),
Err(e) => Response::err(e.to_string()),
}
}
}
}

async fn current_names(
db: &DbCache,
names: &tokio::sync::RwLock<Arc<Names>>,
) -> Arc<Names> {
if !db.needs_refresh("contact/contact.db").await {
return clone_names(names).await;
}

let mut guard = names.write().await;
if !db.needs_refresh("contact/contact.db").await {
return Arc::clone(&*guard);
}

match query::load_names(db).await {
Ok(fresh) => {
let fresh = Arc::new(fresh);
*guard = Arc::clone(&fresh);
fresh
}
Err(e) => {
eprintln!("[daemon] 刷新联系人失败: {}", e);
db.invalidate("contact/contact.db").await;
Arc::clone(&*guard)
}
}
}

async fn clone_names(names: &tokio::sync::RwLock<Arc<Names>>) -> Arc<Names> {
let guard = names.read().await;
Arc::clone(&*guard)
}