Skip to content

Commit 83600d1

Browse files
committed
feat(multinode): add container reconciliation, scheduling config UI, and integration tests
Add container labels for reconciliation, list_containers agent endpoint, anti-affinity environment settings, fix auth and WebSocket log tests, and add multinode integration test suite.
1 parent 15ee160 commit 83600d1

File tree

19 files changed

+1261
-103
lines changed

19 files changed

+1261
-103
lines changed

crates/temps-agent/src/handlers.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ fn error_response(status: StatusCode, message: String) -> impl IntoResponse {
6161
remove_container,
6262
get_container_logs,
6363
get_container_info,
64+
list_containers,
6465
image_exists,
6566
import_image,
6667
health_check,
@@ -312,6 +313,33 @@ pub async fn get_container_info(
312313
}
313314
}
314315

316+
/// List all containers on this worker node
317+
#[utoipa::path(
318+
tag = "Containers",
319+
get,
320+
path = "/agent/containers",
321+
responses(
322+
(status = 200, description = "List of containers", body = AgentResponse<Vec<temps_deployer::ContainerInfo>>),
323+
(status = 401, description = "Unauthorized"),
324+
(status = 500, description = "Failed to list containers")
325+
),
326+
security(("bearer_auth" = []))
327+
)]
328+
pub async fn list_containers(State(state): State<Arc<AgentState>>) -> impl IntoResponse {
329+
tracing::debug!("Listing containers");
330+
match state.container_deployer.list_containers().await {
331+
Ok(containers) => AgentResponse::ok(containers).into_response(),
332+
Err(e) => {
333+
tracing::error!("Failed to list containers: {}", e);
334+
error_response(
335+
StatusCode::INTERNAL_SERVER_ERROR,
336+
format!("Failed to list containers: {}", e),
337+
)
338+
.into_response()
339+
}
340+
}
341+
}
342+
315343
/// Check if a Docker image exists on this node
316344
#[utoipa::path(
317345
tag = "Images",

crates/temps-agent/src/server.rs

Lines changed: 52 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ pub fn build_router(
4444
"/agent/containers/{id}/info",
4545
get(handlers::get_container_info),
4646
)
47+
.route("/agent/containers", get(handlers::list_containers))
4748
.route("/agent/images/import", post(handlers::import_image))
4849
.route("/agent/images/{name}/exists", get(handlers::image_exists))
4950
.route("/agent/health", get(handlers::health_check))
@@ -72,7 +73,13 @@ const HEARTBEAT_RETRY_MAX_DELAY: Duration = Duration::from_secs(15);
7273
/// On transient failures, retries up to `HEARTBEAT_MAX_RETRIES` times with exponential
7374
/// backoff before giving up for this interval. This prevents a brief network blip from
7475
/// causing the control plane to mark the node as offline (90s stale threshold).
75-
fn spawn_heartbeat_loop(config: &AgentConfig) {
76+
///
77+
/// The first successful heartbeat includes a full container inventory so the control
78+
/// plane can reconcile stale DB records against actual Docker state (e.g., after a crash).
79+
fn spawn_heartbeat_loop(
80+
config: &AgentConfig,
81+
container_deployer: Arc<dyn temps_deployer::ContainerDeployer>,
82+
) {
7683
let control_plane_url = config.control_plane_url.clone();
7784
let node_id = config.node_id;
7885
let token = config.token.clone();
@@ -98,12 +105,51 @@ fn spawn_heartbeat_loop(config: &AgentConfig) {
98105

99106
let mut interval = tokio::time::interval(Duration::from_secs(30));
100107
let mut consecutive_failures: u32 = 0;
108+
let mut inventory_sent = false;
101109

102110
loop {
103111
interval.tick().await;
104112

105113
let capacity = collect_capacity_metrics();
106-
let body = serde_json::json!({ "capacity": capacity, "labels": labels });
114+
let mut body = serde_json::json!({ "capacity": capacity, "labels": labels });
115+
116+
// On the first heartbeat (agent startup/reconnect), include a full
117+
// container inventory so the control plane can reconcile stale state.
118+
if !inventory_sent {
119+
match container_deployer.list_containers().await {
120+
Ok(containers) => {
121+
// Only include temps-managed containers
122+
let managed: Vec<_> = containers
123+
.into_iter()
124+
.filter(|c| {
125+
c.labels
126+
.get("sh.temps.managed")
127+
.map(|v| v == "true")
128+
.unwrap_or(false)
129+
})
130+
.map(|c| {
131+
serde_json::json!({
132+
"container_id": c.container_id,
133+
"container_name": c.container_name,
134+
})
135+
})
136+
.collect();
137+
body["containers"] = serde_json::json!(managed);
138+
tracing::info!(
139+
node_id = node_id,
140+
count = managed.len(),
141+
"Including container inventory in heartbeat for reconciliation"
142+
);
143+
}
144+
Err(e) => {
145+
tracing::warn!(
146+
node_id = node_id,
147+
"Failed to list containers for inventory: {}",
148+
e
149+
);
150+
}
151+
}
152+
}
107153

108154
let mut attempt = 0;
109155
let mut succeeded = false;
@@ -127,6 +173,7 @@ fn spawn_heartbeat_loop(config: &AgentConfig) {
127173
}
128174
consecutive_failures = 0;
129175
succeeded = true;
176+
inventory_sent = true;
130177
tracing::debug!(node_id = node_id, "Heartbeat sent to control plane");
131178
break;
132179
}
@@ -236,10 +283,10 @@ pub async fn start_agent_server(
236283
image_builder: Arc<dyn ImageBuilder>,
237284
config: AgentConfig,
238285
) -> Result<(), crate::AgentError> {
239-
let router = build_router(container_deployer, image_builder, &config);
286+
let router = build_router(container_deployer.clone(), image_builder, &config);
240287

241-
// Start heartbeat background loop
242-
spawn_heartbeat_loop(&config);
288+
// Start heartbeat background loop (with deployer for container inventory on first beat)
289+
spawn_heartbeat_loop(&config, container_deployer);
243290

244291
let listener = tokio::net::TcpListener::bind(&config.listen_address)
245292
.await

crates/temps-auth/src/auth_service.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1042,7 +1042,7 @@ mod tests {
10421042

10431043
let request1 = RegisterRequest {
10441044
email: "Test@Example.Com".to_string(),
1045-
password: "password123".to_string(),
1045+
password: "Password123!".to_string(),
10461046
name: "Test User".to_string(),
10471047
};
10481048

@@ -1051,7 +1051,7 @@ mod tests {
10511051

10521052
let request2 = RegisterRequest {
10531053
email: "TEST@EXAMPLE.COM".to_string(),
1054-
password: "password456".to_string(),
1054+
password: "Password456!".to_string(),
10551055
name: "Another User".to_string(),
10561056
};
10571057

crates/temps-deployer/src/docker.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1135,6 +1135,7 @@ impl ContainerDeployer for DockerRuntime {
11351135

11361136
let state = container.state.unwrap_or_default();
11371137
let config = container.config.unwrap_or_default();
1138+
let container_labels = config.labels.clone().unwrap_or_default();
11381139

11391140
// Parse environment variables
11401141
let env_vars = config
@@ -1202,6 +1203,7 @@ impl ContainerDeployer for DockerRuntime {
12021203
ports: port_mappings,
12031204
environment_vars: env_vars,
12041205
restart_count: container.restart_count,
1206+
labels: container_labels,
12051207
})
12061208
}
12071209

crates/temps-deployer/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,9 @@ pub struct ContainerInfo {
248248
pub ports: Vec<PortMapping>,
249249
pub environment_vars: HashMap<String, String>,
250250
pub restart_count: Option<i64>,
251+
/// Docker labels set on the container (e.g., `sh.temps.managed`, `sh.temps.project_id`).
252+
#[serde(default)]
253+
pub labels: HashMap<String, String>,
251254
}
252255

253256
/// Container performance statistics (CPU, memory, network)
@@ -686,6 +689,7 @@ mod tests {
686689
}],
687690
environment_vars: env_vars,
688691
restart_count: Some(0),
692+
labels: HashMap::new(),
689693
};
690694

691695
assert_eq!(info.container_id, "abc123");

crates/temps-deployer/src/remote.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -253,9 +253,7 @@ impl ContainerDeployer for RemoteNodeDeployer {
253253
}
254254

255255
async fn list_containers(&self) -> Result<Vec<ContainerInfo>, DeployerError> {
256-
Err(DeployerError::Other(
257-
"List containers not yet supported on remote nodes".into(),
258-
))
256+
self.agent_get("/agent/containers").await
259257
}
260258

261259
async fn get_container_logs(&self, container_id: &str) -> Result<String, DeployerError> {

0 commit comments

Comments
 (0)