Skip to content

Commit abed66e

Browse files
authored
fix(scorpio): stabilize Antares mounts, fix dicfuse parent.children, and add configurable HTTP bind address (#1809)
* fix(scorpio): stabilize Antares mounts, fix dicfuse parent.children, and add configurable HTTP bind address - Stabilize Antares mounts and cleanup on shutdown - Ensure parent.children updated in upsert_inode - Add --http-addr CLI argument for configurable HTTP bind address Signed-off-by: jerry609 <1772030600@qq.com> * Fix AWS SDK BehaviorVersion deprecation --------- Signed-off-by: jerry609 <1772030600@qq.com>
1 parent 1dd37ab commit abed66e

File tree

8 files changed

+163
-23
lines changed

8 files changed

+163
-23
lines changed

docker/dev-image/.env.example

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ SCORPIO_LFS_URL=http://host.docker.internal:8000
2626
# Scorpio HTTP port
2727
SCORPIO_PORT=2725
2828

29+
# Optional: override Scorpio bind address inside container (newer scorpio supports `--http-addr`)
30+
# If you set this, ensure your port mappings / SCORPIO_API_BASE_URL match.
31+
# SCORPIO_HTTP_ADDR=0.0.0.0:2725
32+
2933
# Store path for Scorpio data (inside container)
3034
SCORPIO_STORE_PATH=/data/scorpio/store
3135

docker/dev-image/entrypoint.sh

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,16 @@ start_scorpio() {
249249
echo ""
250250

251251
log_info "Starting scorpio..."
252-
exec scorpio -c "$SCORPIO_CONFIG" "$@"
252+
local extra_args=()
253+
# Optional: bind HTTP server to a custom address/port (supported by newer scorpio binaries).
254+
# If user already passed --http-addr in args, do not add a duplicate flag.
255+
if [ -n "${SCORPIO_HTTP_ADDR:-}" ]; then
256+
case " $* " in
257+
*" --http-addr "*|*" --http-addr="*) ;;
258+
*) extra_args+=(--http-addr "${SCORPIO_HTTP_ADDR}") ;;
259+
esac
260+
fi
261+
exec scorpio -c "$SCORPIO_CONFIG" "${extra_args[@]}" "$@"
253262
}
254263

255264
start_orion_worker() {
@@ -274,10 +283,28 @@ start_orion_worker() {
274283
check_fuse
275284
generate_scorpio_config
276285

277-
# Ensure the worker talks to the embedded Scorpio by default.
278-
export SCORPIO_API_BASE_URL="http://127.0.0.1:2725"
286+
local extra_args=()
287+
local http_addr="${SCORPIO_HTTP_ADDR:-0.0.0.0:2725}"
288+
# NOTE: Simple IPv4-style parsing. For IPv6 bind addresses, set SCORPIO_API_BASE_URL manually.
289+
local http_port="${http_addr##*:}"
290+
case "$http_port" in
291+
''|*[!0-9]*)
292+
http_port="2725"
293+
;;
294+
esac
295+
296+
# Ensure the worker talks to the embedded Scorpio by default (respecting http port override).
297+
export SCORPIO_API_BASE_URL="http://127.0.0.1:${http_port}"
298+
299+
# Optional: pass through --http-addr if provided (and not already present in args).
300+
if [ -n "${SCORPIO_HTTP_ADDR:-}" ]; then
301+
case " $* " in
302+
*" --http-addr "*|*" --http-addr="*) ;;
303+
*) extra_args+=(--http-addr "${SCORPIO_HTTP_ADDR}") ;;
304+
esac
305+
fi
279306

280-
scorpio -c "$SCORPIO_CONFIG" &
307+
scorpio -c "$SCORPIO_CONFIG" "${extra_args[@]}" &
281308
local scorpio_pid=$!
282309

283310
cleanup() {
@@ -289,7 +316,7 @@ start_orion_worker() {
289316

290317
trap cleanup EXIT INT TERM
291318

292-
wait_for_service "127.0.0.1" "2725" 60 || exit 1
319+
wait_for_service "127.0.0.1" "${http_port}" 60 || exit 1
293320
else
294321
log_warn "ORION_WORKER_START_SCORPIO disabled; ensure Scorpio mountpoints are accessible to this container."
295322
fi

orion-server/src/log/store/s3_log_store.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ impl S3LogStore {
2424
) -> Self {
2525
let region = Region::new(region_name.to_string());
2626

27-
let shared_config = aws_config::defaults(BehaviorVersion::v2025_08_07())
27+
let shared_config = aws_config::defaults(BehaviorVersion::latest())
2828
.region(region.clone())
2929
.credentials_provider(Credentials::new(
3030
access_key_id,

scorpio/src/antares/fuse.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,13 @@ impl AntaresFuse {
4343

4444
/// Compose the union filesystem instance.
4545
pub async fn build_overlay(&self) -> std::io::Result<OverlayFs> {
46-
// Build lower layers: optional CL, then a passthrough over the upper dir as a fallback lower.
46+
// Build lower layers:
47+
// - Prefer Dicfuse as the primary lower layer (read-only monorepo view).
48+
// - Optional CL dir is best-effort and may be empty; keep it as an additional
49+
// layer so it never masks the Dicfuse view.
4750
let mut lower_layers: Vec<Arc<dyn Layer>> = Vec::new();
51+
lower_layers.push(self.dic.clone() as Arc<dyn Layer>);
52+
4853
if let Some(cl_dir) = &self.cl_dir {
4954
let cl_layer = new_passthroughfs_layer(PassthroughArgs {
5055
root_dir: cl_dir,
@@ -54,8 +59,6 @@ impl AntaresFuse {
5459
lower_layers.push(Arc::new(cl_layer) as Arc<dyn Layer>);
5560
}
5661

57-
lower_layers.push(self.dic.clone() as Arc<dyn Layer>);
58-
5962
// Upper layer mirrors upper_dir to keep writes separated from lower layers.
6063
let upper_layer: Arc<dyn Layer> = Arc::new(
6164
new_passthroughfs_layer(PassthroughArgs {

scorpio/src/daemon/antares.rs

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -542,8 +542,35 @@ impl AntaresServiceImpl {
542542
/// - Add progress callback for long-running initialization
543543
/// - Consider lazy loading for very large subdirectory mounts
544544
async fn get_or_create_dicfuse(&self, path: &str) -> Result<Arc<Dicfuse>, ServiceError> {
545-
// For root path, use the shared global instance
545+
const INIT_TIMEOUT_SECS: u64 = 120;
546+
547+
// For root path, use the shared global instance (but ensure it's initialized first).
546548
if path.is_empty() || path == "/" {
549+
tracing::info!(
550+
"Waiting for shared Dicfuse instance to initialize for path: / (timeout: {}s)",
551+
INIT_TIMEOUT_SECS
552+
);
553+
match tokio::time::timeout(
554+
Duration::from_secs(INIT_TIMEOUT_SECS),
555+
self.dicfuse.store.wait_for_ready(),
556+
)
557+
.await
558+
{
559+
Ok(_) => {
560+
tracing::info!("Shared Dicfuse initialized successfully for path: /");
561+
}
562+
Err(_) => {
563+
tracing::error!(
564+
"Shared Dicfuse initialization timed out for path: / after {}s",
565+
INIT_TIMEOUT_SECS
566+
);
567+
return Err(ServiceError::FuseFailure(format!(
568+
"Dicfuse initialization timed out for path '/' after {}s. \
569+
Check network connectivity to the monorepo server.",
570+
INIT_TIMEOUT_SECS
571+
)));
572+
}
573+
}
547574
return Ok(self.dicfuse.clone());
548575
}
549576

@@ -568,7 +595,6 @@ impl AntaresServiceImpl {
568595
// CRITICAL: Wait for the Dicfuse directory tree to be fully loaded before
569596
// returning. Without this, FUSE mount may fail because the root inode
570597
// is not set up yet when import_arc hasn't completed.
571-
const INIT_TIMEOUT_SECS: u64 = 120;
572598
// TODO(dicfuse-antares-integration): If many concurrent requests initialize DIFFERENT
573599
// base paths, we may enqueue a large number of concurrent warmups (network + memory).
574600
// Consider adding a global semaphore/queue to cap concurrent initializations.

scorpio/src/daemon/mod.rs

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::net::SocketAddr;
12
use std::path::PathBuf;
23
use std::sync::Arc;
34

@@ -11,7 +12,7 @@ use axum::Router;
1112
use dashmap::DashMap;
1213
use git_internal::hash::ObjectHash;
1314
use serde::{Deserialize, Serialize};
14-
use tokio::sync::Mutex;
15+
use tokio::sync::{oneshot, Mutex};
1516
use uuid::Uuid;
1617
pub mod antares;
1718
//mod git;
@@ -105,7 +106,12 @@ struct ScoState {
105106
tasks: Arc<DashMap<String, MountStatus>>, // Thread-safe storage for async mount tasks
106107
}
107108
#[allow(unused)]
108-
pub async fn daemon_main(fuse: Arc<MegaFuse>, manager: ScorpioManager) {
109+
pub async fn daemon_main(
110+
fuse: Arc<MegaFuse>,
111+
manager: ScorpioManager,
112+
shutdown_rx: oneshot::Receiver<()>,
113+
bind_addr: SocketAddr,
114+
) {
109115
let inner = ScoState {
110116
fuse,
111117
manager: Arc::new(Mutex::new(manager)),
@@ -125,12 +131,29 @@ pub async fn daemon_main(fuse: Arc<MegaFuse>, manager: ScorpioManager) {
125131

126132
// Antares route - create service with new Dicfuse instance
127133
let antares_service = Arc::new(antares::AntaresServiceImpl::new(None).await);
134+
let antares_service_for_shutdown = antares_service.clone();
128135
let antares_daemon = antares::AntaresDaemon::new(antares_service);
129136
let antares_router = antares_daemon.router();
130137
let app = app.nest("/antares", antares_router);
131138

132-
let listener = tokio::net::TcpListener::bind("0.0.0.0:2725").await.unwrap();
133-
axum::serve(listener, app).await.unwrap()
139+
let listener = tokio::net::TcpListener::bind(bind_addr).await.unwrap();
140+
axum::serve(listener, app)
141+
.with_graceful_shutdown(async move {
142+
let _ = shutdown_rx.await;
143+
tracing::info!("HTTP server shutdown requested; running Antares shutdown cleanup");
144+
match tokio::time::timeout(
145+
std::time::Duration::from_secs(15),
146+
antares_service_for_shutdown.shutdown_cleanup_impl(),
147+
)
148+
.await
149+
{
150+
Ok(Ok(())) => tracing::info!("Antares shutdown cleanup completed"),
151+
Ok(Err(e)) => tracing::warn!("Antares shutdown cleanup failed: {:?}", e),
152+
Err(_) => tracing::warn!("Antares shutdown cleanup timed out"),
153+
}
154+
})
155+
.await
156+
.unwrap()
134157
}
135158

136159
/// Asynchronous mount handler for clients.

scorpio/src/dicfuse/store.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1068,6 +1068,14 @@ impl DictionaryStore {
10681068
};
10691069

10701070
if let Some(inode) = existing {
1071+
// Ensure parent.children contains this inode (fix inconsistent state from DB recovery).
1072+
// This handles cases where radix_trie has the path->inode mapping but parent.children
1073+
// list is missing the inode entry.
1074+
if let Ok(parent_item) = self.persistent_path_store.get_item(parent) {
1075+
if !parent_item.get_children().contains(&inode) {
1076+
let _ = self.persistent_path_store.append_child(parent, inode);
1077+
}
1078+
}
10711079
return Ok(inode);
10721080
}
10731081

scorpio/src/main.rs

Lines changed: 57 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,11 @@ use scorpio::fuse::MegaFuse;
55
use scorpio::manager::{fetch::CheckHash, ScorpioManager};
66
use scorpio::server::mount_filesystem;
77
use scorpio::util::config;
8+
use std::net::SocketAddr;
89
use std::{ffi::OsStr, sync::Arc};
10+
#[cfg(not(unix))]
911
use tokio::signal;
12+
use tokio::sync::oneshot;
1013

1114
/// Command line arguments for the application
1215
#[derive(Parser, Debug)]
@@ -15,6 +18,10 @@ struct Args {
1518
/// Path to the configuration file
1619
#[arg(short, long, default_value = "scorpio.toml")]
1720
config_path: String,
21+
22+
/// HTTP bind address for the daemon (Antares API lives under /antares/*)
23+
#[arg(long, default_value = "0.0.0.0:2725")]
24+
http_addr: SocketAddr,
1825
}
1926

2027
#[tokio::main]
@@ -44,19 +51,61 @@ async fn main() {
4451
let lgfs = LoggingFileSystem::new(fuse_interface.clone());
4552
let mut mount_handle = mount_filesystem(lgfs, mountpoint).await;
4653

47-
let handle = &mut mount_handle;
54+
print!("server running...");
4855

49-
// spawn the server running function.
50-
tokio::spawn(daemon_main(Arc::new(fuse_interface), manager));
56+
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
57+
let mut daemon_task = tokio::spawn(daemon_main(
58+
Arc::new(fuse_interface),
59+
manager,
60+
shutdown_rx,
61+
args.http_addr,
62+
));
5163

52-
print!("server running...");
64+
let mut mount_finished = false;
5365
tokio::select! {
54-
res = handle => res.unwrap(),
55-
_ = signal::ctrl_c() => {
66+
res = &mut mount_handle => {
67+
mount_finished = true;
68+
if let Err(e) = res {
69+
eprintln!("FUSE session ended with error: {e:?}");
70+
}
71+
}
72+
_ = shutdown_signal() => {
73+
// fallthrough to shutdown sequence below
74+
}
75+
}
5676

57-
println!("unmount....");
58-
mount_handle.unmount().await.unwrap();
77+
// Stop HTTP server first (this triggers Antares shutdown cleanup), then unmount the main workspace FS.
78+
let _ = shutdown_tx.send(());
79+
match tokio::time::timeout(std::time::Duration::from_secs(20), &mut daemon_task).await {
80+
Ok(Ok(_)) => {}
81+
Ok(Err(e)) => eprintln!("HTTP daemon task join failed: {e}"),
82+
Err(_) => {
83+
eprintln!("HTTP daemon shutdown timed out; aborting task");
84+
daemon_task.abort();
85+
}
86+
}
87+
88+
if !mount_finished {
89+
println!("unmount....");
90+
let _ = mount_handle.unmount().await;
91+
}
92+
}
5993

94+
async fn shutdown_signal() {
95+
#[cfg(unix)]
96+
{
97+
let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
98+
.expect("failed to install SIGTERM handler");
99+
let mut sigint = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())
100+
.expect("failed to install SIGINT handler");
101+
tokio::select! {
102+
_ = sigterm.recv() => {}
103+
_ = sigint.recv() => {}
60104
}
61105
}
106+
107+
#[cfg(not(unix))]
108+
{
109+
let _ = signal::ctrl_c().await;
110+
}
62111
}

0 commit comments

Comments
 (0)