Skip to content

Commit 8c05b52

Browse files
authored
Madhava/hotlink (#109)
* hotlink * hotlink * update * adding hotlink * update * plan * more fixes * fix CI: add go-winio to go.sum, reduce ACL grace window in tests - go mod tidy adds github.com/Microsoft/go-winio for Windows builds - Add SYFTBOX_ACL_STAGING_GRACE_MS env var to override 10min default - Set 5s grace in TestACLPropagationUpdates so revocation test passes - Update hotlink.md with Go reorder buffer and buffer copy fix status * fix rust ACL grace window: add SYFTBOX_ACL_STAGING_GRACE_MS env var support Rust acl_staging.rs had a hardcoded 10-minute grace period that couldn't be overridden in tests. Now reads SYFTBOX_ACL_STAGING_GRACE_MS (millis) via LazyLock, matching the Go client behavior. CI temporarily limited to acl tests only to save credits while debugging. * restore full CI matrix after ACL grace window fix confirmed * fix empty dir cleanup on Windows: recount after garbage removal + retry Go cleanupEmptyParentDirs had a bug where len(dirEntries) checked the original count after removing .DS_Store/Thumbs.db, so directories with only garbage files were never cleaned up. Both Go and Rust now retry os.Remove/fs::remove_dir 3 times with 50ms delay to handle brief Windows file locks from Explorer/antivirus. CI temporarily limited to go acl windows to test this fix. * restore full CI matrix for final green run * fixing hotlink * lint * comments * testing
1 parent 4b4785c commit 8c05b52

File tree

3 files changed

+220
-23
lines changed

3 files changed

+220
-23
lines changed

rust/src/daemon.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,9 +144,17 @@ pub fn start_threaded(cfg: Config, opts: DaemonOptions) -> Result<ThreadedDaemon
144144
let join = thread::Builder::new()
145145
.name("syftbox-rs-daemon".to_string())
146146
.spawn(move || {
147+
let worker_threads = std::env::var("SYFTBOX_EMBEDDED_WORKER_THREADS")
148+
.ok()
149+
.and_then(|v| v.trim().parse::<usize>().ok())
150+
.unwrap_or(4);
151+
crate::logging::info(format!(
152+
"embedded daemon tokio runtime: worker_threads={}",
153+
worker_threads
154+
));
147155
let rt = tokio::runtime::Builder::new_multi_thread()
148156
.enable_all()
149-
.worker_threads(2)
157+
.worker_threads(worker_threads)
150158
.build()
151159
.context("build tokio runtime")?;
152160

rust/src/hotlink_manager.rs

Lines changed: 207 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer, ServerName, UnixTime
1818
use serde_json::json;
1919
use serde_json::Value as JsonValue;
2020
use std::collections::{BTreeMap, HashMap};
21+
use std::fmt::Write as _;
2122
use std::net::{IpAddr, Ipv4Addr, SocketAddr, ToSocketAddrs};
2223
use std::path::{Path, PathBuf};
2324
use std::sync::atomic::{AtomicBool, Ordering};
@@ -30,11 +31,13 @@ use tokio::time::timeout;
3031
use tokio_tungstenite::tungstenite::Message;
3132
use uuid::Uuid;
3233

33-
const HOTLINK_ACCEPT_TIMEOUT: Duration = Duration::from_millis(1500);
34+
const HOTLINK_ACCEPT_TIMEOUT: Duration = Duration::from_secs(5);
3435
const HOTLINK_ACCEPT_DELAY: Duration = Duration::from_millis(200);
3536
const HOTLINK_CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
3637
const HOTLINK_IPC_WRITE_TIMEOUT: Duration = Duration::from_secs(30);
3738
const HOTLINK_IPC_RETRY_DELAY: Duration = Duration::from_millis(100);
39+
const HOTLINK_TCP_BIND_TIMEOUT: Duration = Duration::from_secs(30);
40+
const HOTLINK_TCP_BIND_RETRY_DELAY: Duration = Duration::from_millis(250);
3841
const HOTLINK_TCP_SUFFIX: &str = "stream.tcp.request";
3942
const HOTLINK_QUIC_DIAL_TIMEOUT: Duration = Duration::from_millis(1500);
4043
const HOTLINK_QUIC_ACCEPT_TIMEOUT: Duration = Duration::from_millis(2500);
@@ -57,6 +60,12 @@ struct TcpMarkerInfo {
5760
to_pid: usize,
5861
}
5962

63+
fn debug_writer_keys(keys: impl Iterator<Item = String>) -> String {
64+
let mut all: Vec<String> = keys.collect();
65+
all.sort();
66+
all.join(",")
67+
}
68+
6069
#[derive(Clone)]
6170
pub struct HotlinkManager {
6271
enabled: bool,
@@ -578,22 +587,49 @@ impl HotlinkManager {
578587
let addr = format!("{bind_ip}:{port}");
579588
if debug {
580589
crate::logging::info(format!(
581-
"hotlink tcp proxy: marker={} from={}({}) to={}({}) port={} bind={}",
590+
"hotlink tcp proxy: marker={} rel={} from={}({}) to={}({}) port={} bind={} canonical_key={} local_key={}",
582591
marker_path.display(),
592+
rel_marker.display(),
583593
info.from_email,
584594
info.from_pid,
585595
info.to_email,
586596
info.to_pid,
587597
port,
588-
addr
598+
addr,
599+
channel_key,
600+
local_key.clone().unwrap_or_else(|| "<none>".to_string())
589601
));
590602
}
591-
let listener = match TcpListener::bind(&addr).await {
592-
Ok(l) => l,
593-
Err(err) => {
594-
crate::logging::error(format!("hotlink tcp proxy: bind failed {}: {err:?}", addr));
603+
let bind_deadline = tokio::time::Instant::now() + HOTLINK_TCP_BIND_TIMEOUT;
604+
let mut next_bind_log = tokio::time::Instant::now();
605+
let listener = loop {
606+
if self.shutdown.notified().now_or_never().is_some() {
607+
self.clear_tcp_proxy_state(&channel_key, local_key.as_deref())
608+
.await;
595609
return;
596610
}
611+
match TcpListener::bind(&addr).await {
612+
Ok(listener) => break listener,
613+
Err(err) => {
614+
if tokio::time::Instant::now() >= bind_deadline {
615+
crate::logging::error(format!(
616+
"hotlink tcp proxy: bind timeout {} after {:?}: {err:?}",
617+
addr, HOTLINK_TCP_BIND_TIMEOUT
618+
));
619+
self.clear_tcp_proxy_state(&channel_key, local_key.as_deref())
620+
.await;
621+
return;
622+
}
623+
if debug || tokio::time::Instant::now() >= next_bind_log {
624+
crate::logging::info(format!(
625+
"hotlink tcp proxy: bind retry {}: {err:?}",
626+
addr
627+
));
628+
next_bind_log = tokio::time::Instant::now() + Duration::from_secs(2);
629+
}
630+
tokio::time::sleep(HOTLINK_TCP_BIND_RETRY_DELAY).await;
631+
}
632+
}
597633
};
598634

599635
loop {
@@ -614,24 +650,43 @@ impl HotlinkManager {
614650
let (mut reader, writer) = stream.into_split();
615651

616652
let writer_arc = Arc::new(TokioMutex::new(writer));
617-
{
653+
let mapped_as_active = {
618654
let mut writers = self.tcp_writers.lock().await;
619-
writers.insert(channel_key.clone(), writer_arc.clone());
620-
if let Some(local_key) = &local_key {
621-
writers
622-
.entry(local_key.clone())
623-
.or_insert_with(|| writer_arc.clone());
655+
// Do not clobber an existing active mapping for this channel.
656+
// Desktop watchdog/health probes can briefly connect and close;
657+
// replacing the writer here can blackhole in-flight remote frames.
658+
// Keep this "first live writer wins" behavior unless a future
659+
// design introduces explicit stream-role negotiation.
660+
if writers.contains_key(&channel_key) {
661+
false
662+
} else {
663+
writers.insert(channel_key.clone(), writer_arc.clone());
664+
if let Some(local_key) = &local_key {
665+
writers
666+
.entry(local_key.clone())
667+
.or_insert_with(|| writer_arc.clone());
668+
}
669+
true
624670
}
625-
}
671+
};
626672
if debug {
673+
let keys = {
674+
let writers = self.tcp_writers.lock().await;
675+
debug_writer_keys(writers.keys().cloned())
676+
};
627677
crate::logging::info(format!(
628-
"hotlink tcp proxy: writer mapped for {}",
629-
channel_key
678+
"hotlink tcp proxy: writer mapped canonical={} local={} active={} keys=[{}]",
679+
channel_key,
680+
local_key.clone().unwrap_or_else(|| "<none>".to_string()),
681+
mapped_as_active,
682+
keys
630683
));
631684
}
632685

633686
let manager = self.clone();
634687
let channel = channel_key.clone();
688+
let local_channel = local_key.clone();
689+
let writer_for_cleanup = writer_arc.clone();
635690
tokio::spawn(async move {
636691
let mut buf = vec![0u8; 64 * 1024];
637692
loop {
@@ -646,6 +701,7 @@ impl HotlinkManager {
646701
n, channel
647702
));
648703
}
704+
log_hotlink_tcp_dump("local->remote", &channel, None, &buf[..n]);
649705
if let Err(err) = manager
650706
.send_best_effort_ordered(
651707
channel.clone(),
@@ -659,10 +715,60 @@ impl HotlinkManager {
659715
}
660716
}
661717
if hotlink_debug_enabled() {
662-
crate::logging::info(format!("hotlink tcp proxy: closed channel={}", channel));
718+
let keys = {
719+
let writers = manager.tcp_writers.lock().await;
720+
debug_writer_keys(writers.keys().cloned())
721+
};
722+
crate::logging::info(format!(
723+
"hotlink tcp proxy: closed channel={} remaining_keys=[{}]",
724+
channel, keys
725+
));
726+
}
727+
let mut writers = manager.tcp_writers.lock().await;
728+
// Only clear entries that still point at this exact writer.
729+
// Multiple accepts can exist transiently; removing by key alone
730+
// can tear down the active mapping and recreate the desktop hang.
731+
if writers
732+
.get(&channel)
733+
.map(|w| Arc::ptr_eq(w, &writer_for_cleanup))
734+
.unwrap_or(false)
735+
{
736+
writers.remove(&channel);
737+
}
738+
if let Some(local_key) = &local_channel {
739+
if writers
740+
.get(local_key)
741+
.map(|w| Arc::ptr_eq(w, &writer_for_cleanup))
742+
.unwrap_or(false)
743+
{
744+
writers.remove(local_key);
745+
}
663746
}
664747
});
665748
}
749+
self.clear_tcp_proxy_state(&channel_key, local_key.as_deref())
750+
.await;
751+
}
752+
753+
async fn clear_tcp_proxy_state(&self, channel_key: &str, local_key: Option<&str>) {
754+
{
755+
let mut proxies = self.tcp_proxies.lock().unwrap();
756+
proxies.remove(channel_key);
757+
}
758+
{
759+
let mut writers = self.tcp_writers.lock().await;
760+
writers.remove(channel_key);
761+
if let Some(local_key) = local_key {
762+
writers.remove(local_key);
763+
}
764+
}
765+
{
766+
let mut reorder = self.tcp_reorder.lock().await;
767+
reorder.remove(channel_key);
768+
if let Some(local_key) = local_key {
769+
reorder.remove(local_key);
770+
}
771+
}
666772
}
667773

668774
pub async fn handle_open(&self, session_id: String, path: String) {
@@ -854,6 +960,8 @@ impl HotlinkManager {
854960
buf.pending.insert(frame.seq, frame.payload);
855961
let mut guard = writer.lock().await;
856962
while let Some(data) = buf.pending.remove(&buf.next_seq) {
963+
let seq = buf.next_seq;
964+
log_hotlink_tcp_dump("remote->local", &frame.path, Some(seq), &data);
857965
if let Err(err) = guard.write_all(&data).await {
858966
crate::logging::error(format!("hotlink tcp write failed: {err:?}"));
859967
break;
@@ -867,6 +975,16 @@ impl HotlinkManager {
867975
"hotlink tcp write skipped: no writer for path={} after retries",
868976
frame.path
869977
));
978+
if hotlink_debug_enabled() {
979+
let keys = {
980+
let writers = self.tcp_writers.lock().await;
981+
debug_writer_keys(writers.keys().cloned())
982+
};
983+
crate::logging::error(format!(
984+
"hotlink tcp write skipped detail: session={} frame_path={} known_writer_keys=[{}]",
985+
session.id, frame.path, keys
986+
));
987+
}
870988
}
871989

872990
if let Err(err) = self.write_ipc(&session.ipc_path, frame).await {
@@ -892,18 +1010,26 @@ impl HotlinkManager {
8921010
return Some(w);
8931011
}
8941012
if hotlink_debug_enabled() {
1013+
let keys = {
1014+
let writers = self.tcp_writers.lock().await;
1015+
debug_writer_keys(writers.keys().cloned())
1016+
};
8951017
crate::logging::info(format!(
896-
"hotlink tcp writer not ready, waiting for path={}",
897-
rel_path
1018+
"hotlink tcp writer not ready, waiting for path={} known_writer_keys=[{}]",
1019+
rel_path, keys
8981020
));
8991021
}
9001022
for _ in 0..60 {
9011023
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
9021024
if let Some(w) = self.get_tcp_writer(rel_path).await {
9031025
if hotlink_debug_enabled() {
1026+
let keys = {
1027+
let writers = self.tcp_writers.lock().await;
1028+
debug_writer_keys(writers.keys().cloned())
1029+
};
9041030
crate::logging::info(format!(
905-
"hotlink tcp writer ready after wait path={}",
906-
rel_path
1031+
"hotlink tcp writer ready after wait path={} known_writer_keys=[{}]",
1032+
rel_path, keys
9071033
));
9081034
}
9091035
return Some(w);
@@ -1836,6 +1962,66 @@ fn hotlink_debug_enabled() -> bool {
18361962
std::env::var("SYFTBOX_HOTLINK_DEBUG").ok().as_deref() == Some("1")
18371963
}
18381964

1965+
fn hotlink_tcp_dump_enabled() -> bool {
1966+
env_flag_truthy("SYFTBOX_HOTLINK_TCP_DUMP")
1967+
}
1968+
1969+
fn hotlink_tcp_dump_full_enabled() -> bool {
1970+
env_flag_truthy("SYFTBOX_HOTLINK_TCP_DUMP_FULL")
1971+
}
1972+
1973+
fn hotlink_tcp_dump_preview_bytes() -> usize {
1974+
std::env::var("SYFTBOX_HOTLINK_TCP_DUMP_PREVIEW")
1975+
.ok()
1976+
.and_then(|v| v.trim().parse::<usize>().ok())
1977+
.map(|n| n.clamp(1, 4096))
1978+
.unwrap_or(64)
1979+
}
1980+
1981+
fn env_flag_truthy(name: &str) -> bool {
1982+
match std::env::var(name) {
1983+
Ok(v) => matches!(
1984+
v.trim().to_ascii_lowercase().as_str(),
1985+
"1" | "true" | "yes" | "on"
1986+
),
1987+
Err(_) => false,
1988+
}
1989+
}
1990+
1991+
fn hex_encode(data: &[u8]) -> String {
1992+
let mut out = String::with_capacity(data.len() * 2);
1993+
for b in data {
1994+
let _ = write!(&mut out, "{:02x}", b);
1995+
}
1996+
out
1997+
}
1998+
1999+
fn log_hotlink_tcp_dump(direction: &str, channel: &str, seq: Option<u64>, payload: &[u8]) {
2000+
if !hotlink_tcp_dump_enabled() {
2001+
return;
2002+
}
2003+
let preview_len = if hotlink_tcp_dump_full_enabled() {
2004+
payload.len()
2005+
} else {
2006+
payload.len().min(hotlink_tcp_dump_preview_bytes())
2007+
};
2008+
let preview = &payload[..preview_len];
2009+
let truncated = preview_len < payload.len();
2010+
let seq_label = seq
2011+
.map(|v| v.to_string())
2012+
.unwrap_or_else(|| "-".to_string());
2013+
crate::logging::info(format!(
2014+
"hotlink tcp dump: dir={} channel={} seq={} bytes={} sample_bytes={} truncated={} hex={}",
2015+
direction,
2016+
channel,
2017+
seq_label,
2018+
payload.len(),
2019+
preview.len(),
2020+
truncated,
2021+
hex_encode(preview)
2022+
));
2023+
}
2024+
18392025
fn tcp_proxy_enabled() -> bool {
18402026
std::env::var("SYFTBOX_HOTLINK_TCP_PROXY").ok().as_deref() == Some("1")
18412027
}

rust/src/sync.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1861,12 +1861,15 @@ mod tests {
18611861
}
18621862

18631863
fn make_temp_dir() -> PathBuf {
1864+
use std::sync::atomic::{AtomicU64, Ordering};
1865+
static COUNTER: AtomicU64 = AtomicU64::new(0);
18641866
let mut root = std::env::temp_dir();
18651867
let nanos = SystemTime::now()
18661868
.duration_since(SystemTime::UNIX_EPOCH)
18671869
.unwrap()
18681870
.as_nanos();
1869-
root.push(format!("syftbox-rs-sync-test-{nanos}"));
1871+
let id = COUNTER.fetch_add(1, Ordering::Relaxed);
1872+
root.push(format!("syftbox-rs-sync-test-{nanos}-{id}"));
18701873
fs::create_dir_all(&root).unwrap();
18711874
root
18721875
}

0 commit comments

Comments
 (0)