Skip to content

Commit 1e52014

Browse files
committed
perf(daemon): Arc<Names> + tokio RwLock, O(1) clone per IPC request
Was: Arc<std::sync::RwLock<Names>>; each dispatch clone_names() copied 4 HashMaps (~100KB for a user with 2700 contacts) and used std RwLock which blocks the tokio worker thread during the clone. Now: Arc<tokio::sync::RwLock<Arc<Names>>>; dispatch takes the read guard, does Arc::clone (pointer bump), drops the guard, then spawns the query work. Names is immutable after daemon startup; Arc is ideal. Smoke tested: `wx sessions --json` returns correct data including chat_type; 8 concurrent clients finish in 12ms.
1 parent e977007 commit 1e52014

2 files changed

Lines changed: 26 additions & 58 deletions

File tree

src/daemon/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,10 @@ async fn async_run() -> Result<()> {
7575
let _ = db.get("session/session.db").await;
7676
eprintln!("[daemon] 预热完成,联系人 {} 个", names.map.len());
7777

78-
let names_arc = Arc::new(std::sync::RwLock::new(names));
78+
// 包一层内部 Arc:IPC 请求取 guard 后只做 Arc::clone(O(1)),
79+
// 避免每次请求都全量 clone 几千个联系人的 HashMap。
80+
// 用 tokio::sync::RwLock 允许 guard 跨 await(当前不跨,为未来 reload 留余地)。
81+
let names_arc = Arc::new(tokio::sync::RwLock::new(Arc::new(names)));
7982

8083
// 启动 IPC server(阻塞)
8184
server::serve(Arc::clone(&db), Arc::clone(&names_arc)).await?;

src/daemon/server.rs

Lines changed: 22 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use super::query::Names;
99
/// 启动 IPC server(Unix socket / Windows named pipe)
1010
pub async fn serve(
1111
db: Arc<DbCache>,
12-
names: Arc<std::sync::RwLock<Names>>,
12+
names: Arc<tokio::sync::RwLock<Arc<Names>>>,
1313
) -> Result<()> {
1414
#[cfg(unix)]
1515
serve_unix(db, names).await?;
@@ -21,7 +21,7 @@ pub async fn serve(
2121
#[cfg(unix)]
2222
async fn serve_unix(
2323
db: Arc<DbCache>,
24-
names: Arc<std::sync::RwLock<Names>>,
24+
names: Arc<tokio::sync::RwLock<Arc<Names>>>,
2525
) -> Result<()> {
2626
use tokio::net::UnixListener;
2727
let sock_path = crate::config::sock_path();
@@ -58,7 +58,7 @@ async fn serve_unix(
5858
async fn handle_connection_unix(
5959
stream: tokio::net::UnixStream,
6060
db: Arc<DbCache>,
61-
names: Arc<std::sync::RwLock<Names>>,
61+
names: Arc<tokio::sync::RwLock<Arc<Names>>>,
6262
) -> Result<()> {
6363
let (reader, mut writer) = stream.into_split();
6464
let mut lines = BufReader::new(reader).lines();
@@ -86,7 +86,7 @@ async fn handle_connection_unix(
8686
#[cfg(windows)]
8787
async fn serve_windows(
8888
db: Arc<DbCache>,
89-
names: Arc<std::sync::RwLock<Names>>,
89+
names: Arc<tokio::sync::RwLock<Arc<Names>>>,
9090
) -> Result<()> {
9191
use interprocess::local_socket::{
9292
tokio::prelude::*, GenericNamespaced, ListenerOptions,
@@ -117,7 +117,7 @@ async fn serve_windows(
117117
async fn handle_connection_windows(
118118
conn: interprocess::local_socket::tokio::Stream,
119119
db: Arc<DbCache>,
120-
names: Arc<std::sync::RwLock<Names>>,
120+
names: Arc<tokio::sync::RwLock<Arc<Names>>>,
121121
) -> Result<()> {
122122
let (reader, mut writer) = tokio::io::split(conn);
123123
let mut lines = BufReader::new(reader).lines();
@@ -144,79 +144,59 @@ async fn handle_connection_windows(
144144
async fn dispatch(
145145
req: Request,
146146
db: &DbCache,
147-
names: &std::sync::RwLock<Names>,
147+
names: &tokio::sync::RwLock<Arc<Names>>,
148148
) -> Response {
149149
use crate::ipc::Request::*;
150150
use super::query;
151151

152+
// 取 guard → O(1) clone Arc → 立即 drop 锁。后续 await 期间不持有锁,
153+
// 多个并发 IPC 请求可以真正并行。Names 本身不可变(由 daemon 启动时
154+
// 一次性构建),共享 Arc 即可。
155+
let names_arc: Arc<Names> = {
156+
let guard = names.read().await;
157+
Arc::clone(&*guard)
158+
};
159+
152160
match req {
153161
Ping => Response::ok(serde_json::json!({ "pong": true })),
154162
Sessions { limit } => {
155-
let names_snapshot = match clone_names(names) {
156-
Ok(n) => n,
157-
Err(e) => return Response::err(e),
158-
};
159-
match query::q_sessions(db, &names_snapshot, limit).await {
163+
match query::q_sessions(db, &names_arc, limit).await {
160164
Ok(v) => Response::ok(v),
161165
Err(e) => Response::err(e.to_string()),
162166
}
163167
}
164168
History { chat, limit, offset, since, until, msg_type } => {
165-
let names_snapshot = match clone_names(names) {
166-
Ok(n) => n,
167-
Err(e) => return Response::err(e),
168-
};
169-
match query::q_history(db, &names_snapshot, &chat, limit, offset, since, until, msg_type).await {
169+
match query::q_history(db, &names_arc, &chat, limit, offset, since, until, msg_type).await {
170170
Ok(v) => Response::ok(v),
171171
Err(e) => Response::err(e.to_string()),
172172
}
173173
}
174174
Search { keyword, chats, limit, since, until, msg_type } => {
175-
let names_snapshot = match clone_names(names) {
176-
Ok(n) => n,
177-
Err(e) => return Response::err(e),
178-
};
179-
match query::q_search(db, &names_snapshot, &keyword, chats, limit, since, until, msg_type).await {
175+
match query::q_search(db, &names_arc, &keyword, chats, limit, since, until, msg_type).await {
180176
Ok(v) => Response::ok(v),
181177
Err(e) => Response::err(e.to_string()),
182178
}
183179
}
184180
Contacts { query, limit } => {
185-
let names_snapshot = match clone_names(names) {
186-
Ok(n) => n,
187-
Err(e) => return Response::err(e),
188-
};
189-
match query::q_contacts(&names_snapshot, query.as_deref(), limit).await {
181+
match query::q_contacts(&names_arc, query.as_deref(), limit).await {
190182
Ok(v) => Response::ok(v),
191183
Err(e) => Response::err(e.to_string()),
192184
}
193185
}
194186
Unread { limit, filter } => {
195-
let names_snapshot = match clone_names(names) {
196-
Ok(n) => n,
197-
Err(e) => return Response::err(e),
198-
};
199-
match query::q_unread(db, &names_snapshot, limit, filter).await {
187+
match query::q_unread(db, &names_arc, limit, filter).await {
200188
Ok(v) => Response::ok(v),
201189
Err(e) => Response::err(e.to_string()),
202190
}
203191
}
204192
Members { chat } => {
205-
let names_snapshot = match clone_names(names) {
206-
Ok(n) => n,
207-
Err(e) => return Response::err(e),
208-
};
209-
match query::q_members(db, &names_snapshot, &chat).await {
193+
match query::q_members(db, &names_arc, &chat).await {
210194
Ok(v) => Response::ok(v),
211195
Err(e) => Response::err(e.to_string()),
212196
}
213197
}
214198
NewMessages { state, limit } => {
215-
let names_snapshot = match clone_names(names) {
216-
Ok(n) => n,
217-
Err(e) => return Response::err(e),
218-
};
219-
match query::q_new_messages(db, &names_snapshot, state, limit).await {
199+
match query::q_new_messages(db, &names_arc, state, limit).await {
220200
Ok(v) => Response::ok(v),
221201
Err(e) => Response::err(e.to_string()),
222202
}
@@ -228,25 +208,10 @@ async fn dispatch(
228208
}
229209
}
230210
Stats { chat, since, until } => {
231-
let names_snapshot = match clone_names(names) {
232-
Ok(n) => n,
233-
Err(e) => return Response::err(e),
234-
};
235-
match query::q_stats(db, &names_snapshot, &chat, since, until).await {
211+
match query::q_stats(db, &names_arc, &chat, since, until).await {
236212
Ok(v) => Response::ok(v),
237213
Err(e) => Response::err(e.to_string()),
238214
}
239215
}
240216
}
241217
}
242-
243-
/// 克隆 Names 以避免 RwLockGuard 跨 await
244-
fn clone_names(names: &std::sync::RwLock<Names>) -> Result<Names, String> {
245-
let guard = names.read().map_err(|_| "内部错误: names lock poisoned".to_string())?;
246-
Ok(Names {
247-
map: guard.map.clone(),
248-
md5_to_uname: guard.md5_to_uname.clone(),
249-
msg_db_keys: guard.msg_db_keys.clone(),
250-
verify_flags: guard.verify_flags.clone(),
251-
})
252-
}

0 commit comments

Comments
 (0)