Skip to content

Commit f4a1aa5

Browse files
jerome-benoitnjbrakeclaude
authored
fix(server): replace blocking I/O in async functions with tokio equivalents (#912)
* fix(server): replace blocking I/O in async functions with tokio equivalents Convert all std::fs and std::thread::sleep calls inside async functions to their tokio counterparts, preventing tokio worker thread starvation under concurrent load. Changes: - server/push.rs: persist() uses tokio::fs::write/set_permissions/rename - server/mod.rs: write_secret_file + load_or_generate_token made async - cli/serve.rs: stop_daemon made async, thread::sleep -> tokio::time::sleep, all fs ops -> tokio::fs, tunnel checks wrapped in spawn_blocking - update/install.rs: download_tarball uses tokio::fs::File + AsyncWriteExt server/api/git.rs and server/api/system.rs were already correctly wrapped in spawn_blocking; no changes needed. Closes #911 * fix(update): use sync_all instead of flush for crash-safe downloads flush() only pushes to kernel buffers; sync_all() ensures data reaches persistent storage, preventing corrupt-archive scenarios on power loss. * fix(update): drop unused std::io::Write import after merge The merge with main re-introduced std::io::Write, but download_tarball now uses tokio::io::AsyncWriteExt instead. Caught by clippy -D warnings. Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]> --------- Co-authored-by: njbrake <[email protected]> Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
1 parent 905b6bf commit f4a1aa5

4 files changed

Lines changed: 63 additions & 57 deletions

File tree

src/cli/serve.rs

Lines changed: 32 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ pub fn daemon_pid() -> Option<u32> {
192192

193193
pub async fn run(profile: &str, args: ServeArgs) -> Result<()> {
194194
if args.stop {
195-
return stop_daemon();
195+
return stop_daemon().await;
196196
}
197197

198198
// Refuse to start a second instance (daemon or foreground) while another
@@ -257,12 +257,14 @@ pub async fn run(profile: &str, args: ServeArgs) -> Result<()> {
257257
// it's available, so requiring cloudflared up front would falsely reject
258258
// Tailscale-only setups (issue #813).
259259
let host = if args.remote {
260-
if cloudflared_required(
261-
args.no_tailscale,
262-
args.tunnel_name.is_some(),
263-
crate::server::tunnel::tailscale_available_sync(),
264-
) {
265-
crate::server::tunnel::check_cloudflared()?;
260+
let tailscale_ok =
261+
tokio::task::spawn_blocking(crate::server::tunnel::tailscale_available_sync)
262+
.await
263+
.unwrap_or(false);
264+
if cloudflared_required(args.no_tailscale, args.tunnel_name.is_some(), tailscale_ok) {
265+
tokio::task::spawn_blocking(crate::server::tunnel::check_cloudflared)
266+
.await
267+
.map_err(|e| anyhow::anyhow!(e))??;
266268
}
267269
// Force localhost since the tunnel connects to localhost
268270
"127.0.0.1".to_string()
@@ -320,7 +322,7 @@ pub async fn run(profile: &str, args: ServeArgs) -> Result<()> {
320322

321323
// Write PID file for non-daemon mode too (so --stop works either way)
322324
if let Ok(path) = pid_file_path() {
323-
let _ = std::fs::write(&path, std::process::id().to_string());
325+
let _ = tokio::fs::write(&path, std::process::id().to_string()).await;
324326
}
325327

326328
let result = crate::server::start_server(crate::server::ServerConfig {
@@ -342,17 +344,18 @@ pub async fn run(profile: &str, args: ServeArgs) -> Result<()> {
342344
// still belongs to this process. A newer daemon spawn may have
343345
// overwritten it; removing their file would orphan them.
344346
if let Ok(path) = pid_file_path() {
345-
let is_ours = std::fs::read_to_string(&path)
347+
let is_ours = tokio::fs::read_to_string(&path)
348+
.await
346349
.ok()
347350
.and_then(|s| s.trim().parse::<u32>().ok())
348351
.is_some_and(|pid| pid == std::process::id());
349352
if is_ours {
350-
let _ = std::fs::remove_file(&path);
353+
let _ = tokio::fs::remove_file(&path).await;
351354
if let Ok(dir) = crate::session::get_app_dir() {
352-
let _ = std::fs::remove_file(dir.join("serve.url"));
353-
let _ = std::fs::remove_file(dir.join("serve.log"));
354-
let _ = std::fs::remove_file(dir.join("serve.mode"));
355-
let _ = std::fs::remove_file(dir.join("serve.passphrase"));
355+
let _ = tokio::fs::remove_file(dir.join("serve.url")).await;
356+
let _ = tokio::fs::remove_file(dir.join("serve.log")).await;
357+
let _ = tokio::fs::remove_file(dir.join("serve.mode")).await;
358+
let _ = tokio::fs::remove_file(dir.join("serve.passphrase")).await;
356359
}
357360
}
358361
}
@@ -455,7 +458,7 @@ fn start_daemon(profile: &str, args: &ServeArgs) -> Result<()> {
455458
Ok(())
456459
}
457460

458-
fn stop_daemon() -> Result<()> {
461+
async fn stop_daemon() -> Result<()> {
459462
let path = pid_file_path()?;
460463

461464
if !path.exists() {
@@ -465,15 +468,15 @@ fn stop_daemon() -> Result<()> {
465468
);
466469
}
467470

468-
let pid_str = std::fs::read_to_string(&path)?;
471+
let pid_str = tokio::fs::read_to_string(&path).await?;
469472
let pid: i32 = pid_str
470473
.trim()
471474
.parse()
472475
.map_err(|_| anyhow::anyhow!("Invalid PID in {}: {}", path.display(), pid_str.trim()))?;
473476

474477
// Verify PID belongs to an aoe process on all platforms
475478
if !verify_pid_is_aoe(pid) {
476-
std::fs::remove_file(&path)?;
479+
tokio::fs::remove_file(&path).await?;
477480
bail!(
478481
"PID {} belongs to a different process (stale PID file). Cleaned up.",
479482
pid
@@ -492,7 +495,7 @@ fn stop_daemon() -> Result<()> {
492495
// races with the dying daemon and can orphan it.
493496
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
494497
loop {
495-
std::thread::sleep(std::time::Duration::from_millis(50));
498+
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
496499
match nix::sys::signal::kill(nix::unistd::Pid::from_raw(pid), None) {
497500
Err(nix::errno::Errno::ESRCH) => break,
498501
_ if std::time::Instant::now() >= deadline => {
@@ -501,31 +504,31 @@ fn stop_daemon() -> Result<()> {
501504
nix::unistd::Pid::from_raw(pid),
502505
nix::sys::signal::Signal::SIGKILL,
503506
);
504-
std::thread::sleep(std::time::Duration::from_millis(50));
507+
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
505508
break;
506509
}
507510
_ => {}
508511
}
509512
}
510513
// The daemon's own cleanup may have already removed some
511514
// of these; that's fine.
512-
let _ = std::fs::remove_file(&path);
515+
let _ = tokio::fs::remove_file(&path).await;
513516
if let Ok(dir) = crate::session::get_app_dir() {
514-
let _ = std::fs::remove_file(dir.join("serve.url"));
515-
let _ = std::fs::remove_file(dir.join("serve.log"));
516-
let _ = std::fs::remove_file(dir.join("serve.mode"));
517-
let _ = std::fs::remove_file(dir.join("serve.passphrase"));
517+
let _ = tokio::fs::remove_file(dir.join("serve.url")).await;
518+
let _ = tokio::fs::remove_file(dir.join("serve.log")).await;
519+
let _ = tokio::fs::remove_file(dir.join("serve.mode")).await;
520+
let _ = tokio::fs::remove_file(dir.join("serve.passphrase")).await;
518521
}
519522
println!("Stopped aoe serve daemon (PID {})", pid);
520523
}
521524
Err(nix::errno::Errno::ESRCH) => {
522525
// Process doesn't exist; clean up stale PID file
523-
std::fs::remove_file(&path)?;
526+
tokio::fs::remove_file(&path).await?;
524527
if let Ok(dir) = crate::session::get_app_dir() {
525-
let _ = std::fs::remove_file(dir.join("serve.url"));
526-
let _ = std::fs::remove_file(dir.join("serve.log"));
527-
let _ = std::fs::remove_file(dir.join("serve.mode"));
528-
let _ = std::fs::remove_file(dir.join("serve.passphrase"));
528+
let _ = tokio::fs::remove_file(dir.join("serve.url")).await;
529+
let _ = tokio::fs::remove_file(dir.join("serve.log")).await;
530+
let _ = tokio::fs::remove_file(dir.join("serve.mode")).await;
531+
let _ = tokio::fs::remove_file(dir.join("serve.passphrase")).await;
529532
}
530533
println!("Daemon was not running (stale PID file cleaned up)");
531534
}

src/server/mod.rs

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ impl TokenManager {
129129

130130
// Persist to disk
131131
if let Ok(app_dir) = crate::session::get_app_dir() {
132-
write_secret_file(&app_dir.join("serve.token"), &new_token);
132+
write_secret_file(&app_dir.join("serve.token"), &new_token).await;
133133
}
134134

135135
info!("Auth token rotated (previous token valid for 5 more minutes)");
@@ -314,7 +314,7 @@ pub async fn start_server(config: ServerConfig<'_>) -> anyhow::Result<()> {
314314
);
315315
None
316316
} else {
317-
Some(load_or_generate_token()?)
317+
Some(load_or_generate_token().await?)
318318
};
319319

320320
let token_lifetime = if remote {
@@ -336,7 +336,7 @@ pub async fn start_server(config: ServerConfig<'_>) -> anyhow::Result<()> {
336336
// started from the CLI. Owner-only perms; cleaned up on shutdown.
337337
if let Some(pp) = passphrase {
338338
if let Ok(app_dir) = crate::session::get_app_dir() {
339-
write_secret_file(&app_dir.join("serve.passphrase"), pp);
339+
write_secret_file(&app_dir.join("serve.passphrase"), pp).await;
340340
}
341341
}
342342

@@ -490,12 +490,12 @@ pub async fn start_server(config: ServerConfig<'_>) -> anyhow::Result<()> {
490490
// backward-compatible with any consumer that does `head -1 serve.url`,
491491
// and the TUI parses both single- and multi-URL formats.
492492
if let Ok(app_dir) = crate::session::get_app_dir() {
493-
write_secret_file(&app_dir.join("serve.url"), &tunnel_url_with_token);
493+
write_secret_file(&app_dir.join("serve.url"), &tunnel_url_with_token).await;
494494
// serve.mode lets the TUI reattach to a running daemon and
495495
// render the right transport label: "tunnel" for Cloudflare,
496496
// "tailscale" for Tailscale Funnel, "local" for local-only.
497497
let mode = format!("{}\n", handle.mode_label());
498-
if let Err(e) = std::fs::write(app_dir.join("serve.mode"), mode) {
498+
if let Err(e) = tokio::fs::write(app_dir.join("serve.mode"), mode).await {
499499
tracing::debug!("Failed to write serve.mode: {e}");
500500
}
501501
}
@@ -555,8 +555,8 @@ pub async fn start_server(config: ServerConfig<'_>) -> anyhow::Result<()> {
555555
contents.push_str(url);
556556
contents.push('\n');
557557
}
558-
write_secret_file(&app_dir.join("serve.url"), &contents);
559-
if let Err(e) = std::fs::write(app_dir.join("serve.mode"), "local\n") {
558+
write_secret_file(&app_dir.join("serve.url"), &contents).await;
559+
if let Err(e) = tokio::fs::write(app_dir.join("serve.mode"), "local\n").await {
560560
tracing::debug!("Failed to write serve.mode: {e}");
561561
}
562562
}
@@ -611,7 +611,7 @@ pub async fn start_server(config: ServerConfig<'_>) -> anyhow::Result<()> {
611611
{
612612
let url_with_token = format!("{}/?token={}", base_url, token);
613613
if let Ok(app_dir) = crate::session::get_app_dir() {
614-
write_secret_file(&app_dir.join("serve.url"), &url_with_token);
614+
write_secret_file(&app_dir.join("serve.url"), &url_with_token).await;
615615
}
616616
}
617617

@@ -704,7 +704,7 @@ pub async fn start_server(config: ServerConfig<'_>) -> anyhow::Result<()> {
704704
}
705705

706706
if let Ok(app_dir) = crate::session::get_app_dir() {
707-
let _ = std::fs::remove_file(app_dir.join("serve.passphrase"));
707+
let _ = tokio::fs::remove_file(app_dir.join("serve.passphrase")).await;
708708
}
709709

710710
Ok(())
@@ -962,23 +962,23 @@ pub fn discover_tagged_ips() -> Vec<(IpKind, std::net::Ipv4Addr)> {
962962

963963
/// Write a file with owner-only permissions (0600) to protect secrets.
964964
#[cfg(unix)]
965-
fn write_secret_file(path: &std::path::Path, contents: &str) {
966-
use std::os::unix::fs::OpenOptionsExt;
967-
if let Ok(mut file) = std::fs::OpenOptions::new()
965+
async fn write_secret_file(path: &std::path::Path, contents: &str) {
966+
use tokio::io::AsyncWriteExt;
967+
let opts = tokio::fs::OpenOptions::new()
968968
.write(true)
969969
.create(true)
970970
.truncate(true)
971971
.mode(0o600)
972972
.open(path)
973-
{
974-
use std::io::Write;
975-
let _ = file.write_all(contents.as_bytes());
973+
.await;
974+
if let Ok(mut file) = opts {
975+
let _ = file.write_all(contents.as_bytes()).await;
976976
}
977977
}
978978

979979
#[cfg(not(unix))]
980-
fn write_secret_file(path: &std::path::Path, contents: &str) {
981-
let _ = std::fs::write(path, contents);
980+
async fn write_secret_file(path: &std::path::Path, contents: &str) {
981+
let _ = tokio::fs::write(path, contents).await;
982982
}
983983

984984
/// Generate a cryptographically random 64-character hex token (256 bits of entropy).
@@ -1001,18 +1001,18 @@ fn is_valid_token_format(token: &str) -> bool {
10011001

10021002
/// Load an existing auth token from disk if it's less than 24 hours old,
10031003
/// otherwise generate a fresh one and persist it.
1004-
fn load_or_generate_token() -> anyhow::Result<String> {
1004+
async fn load_or_generate_token() -> anyhow::Result<String> {
10051005
let app_dir = crate::session::get_app_dir()?;
10061006
let token_path = app_dir.join("serve.token");
10071007

10081008
// Try to reuse existing token if fresh enough
1009-
if let Ok(metadata) = std::fs::metadata(&token_path) {
1009+
if let Ok(metadata) = tokio::fs::metadata(&token_path).await {
10101010
if let Ok(modified) = metadata.modified() {
10111011
let age = std::time::SystemTime::now()
10121012
.duration_since(modified)
10131013
.unwrap_or_default();
10141014
if age < std::time::Duration::from_secs(24 * 60 * 60) {
1015-
if let Ok(token) = std::fs::read_to_string(&token_path) {
1015+
if let Ok(token) = tokio::fs::read_to_string(&token_path).await {
10161016
let token = token.trim().to_string();
10171017
if !token.is_empty() && is_valid_token_format(&token) {
10181018
return Ok(token);
@@ -1023,7 +1023,7 @@ fn load_or_generate_token() -> anyhow::Result<String> {
10231023
}
10241024

10251025
let token = generate_token();
1026-
write_secret_file(&token_path, &token);
1026+
write_secret_file(&token_path, &token).await;
10271027
Ok(token)
10281028
}
10291029

src/server/push.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -297,13 +297,13 @@ impl SubscriptionStore {
297297
let all: Vec<Subscription> = self.subs.read().await.values().cloned().collect();
298298
let body = serde_json::to_string_pretty(&all)?;
299299
let tmp = self.path.with_extension("json.tmp");
300-
std::fs::write(&tmp, &body)?;
300+
tokio::fs::write(&tmp, &body).await?;
301301
#[cfg(unix)]
302302
{
303303
use std::os::unix::fs::PermissionsExt;
304-
std::fs::set_permissions(&tmp, std::fs::Permissions::from_mode(0o600))?;
304+
tokio::fs::set_permissions(&tmp, std::fs::Permissions::from_mode(0o600)).await?;
305305
}
306-
std::fs::rename(&tmp, &self.path)?;
306+
tokio::fs::rename(&tmp, &self.path).await?;
307307
Ok(())
308308
}
309309
}

src/update/install.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
33
use anyhow::{Context, Result};
44
use serde::Deserialize;
5-
use std::io::{ErrorKind, Write};
5+
use std::io::ErrorKind;
66
#[cfg(unix)]
77
use std::os::unix::fs::PermissionsExt;
88
use std::path::{Path, PathBuf};
@@ -200,6 +200,8 @@ async fn download_tarball(
200200
dest: &Path,
201201
mut on_progress: Option<&mut dyn FnMut(u64, Option<u64>)>,
202202
) -> Result<()> {
203+
use tokio::io::AsyncWriteExt;
204+
203205
let client = reqwest::Client::builder()
204206
.user_agent("agent-of-empires")
205207
.timeout(std::time::Duration::from_secs(300))
@@ -210,19 +212,20 @@ async fn download_tarball(
210212
}
211213
let total = response.content_length();
212214
let mut stream = response.bytes_stream();
213-
let mut file = std::fs::File::create(dest)
215+
let mut file = tokio::fs::File::create(dest)
216+
.await
214217
.with_context(|| format!("creating download file at {}", dest.display()))?;
215218
let mut downloaded: u64 = 0;
216219
use futures_util::StreamExt;
217220
while let Some(chunk) = stream.next().await {
218221
let chunk = chunk?;
219-
file.write_all(&chunk)?;
222+
file.write_all(&chunk).await?;
220223
downloaded += chunk.len() as u64;
221224
if let Some(cb) = on_progress.as_deref_mut() {
222225
cb(downloaded, total);
223226
}
224227
}
225-
file.sync_all()?;
228+
file.sync_all().await?;
226229
Ok(())
227230
}
228231

0 commit comments

Comments
 (0)