Skip to content

Commit 4856e61

Browse files
MalteJclaude
andcommitted
Fix manifest sync: zombie prevention, template persistence, and sender race
- Populate all known_ids from local sub-services at startup (init from reality) to prevent pruning of pre-existing resources on first manifest - Include completed templates in node manifest so they aren't pruned - Register manifest sender before building initial manifest to close race window where events during build would be missed Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 25f06fd commit 4856e61

6 files changed

Lines changed: 163 additions & 17 deletions

File tree

mvirt-api/src/grpc/manifest.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,24 @@ async fn build_volume_specs(store: &Arc<dyn DataStore>, node_id: &str) -> Vec<Pr
152152
async fn build_template_specs(store: &Arc<dyn DataStore>, node_id: &str) -> Vec<ProtoTemplateSpec> {
153153
let mut specs = Vec::new();
154154

155-
// Pending/running import jobs — these should be sent to the target node
155+
// 1. Completed templates on this node (persistent)
156+
if let Ok(templates) = store.list_templates(Some(node_id)).await {
157+
for tpl in &templates {
158+
specs.push(ProtoTemplateSpec {
159+
meta: Some(ResourceMeta {
160+
id: tpl.id.clone(),
161+
name: tpl.name.clone(),
162+
project_id: tpl.project_id.clone(),
163+
node_id: Some(tpl.node_id.clone()),
164+
labels: Default::default(),
165+
}),
166+
url: String::new(),
167+
checksum: None,
168+
});
169+
}
170+
}
171+
172+
// 2. Pending/running import jobs — these should be sent to the target node
156173
if let Ok(jobs) = store.list_import_jobs(None).await {
157174
for job in &jobs {
158175
if job.state == ImportJobState::Completed || job.state == ImportJobState::Failed {

mvirt-api/src/grpc/server.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,13 @@ impl NodeService for NodeServiceImpl {
237237
};
238238
let _ = out_tx.send(Ok(register_result)).await;
239239

240-
// 4. Build and send initial manifest
240+
// 4. Register manifest sender (before building initial manifest to avoid race window)
241+
{
242+
let mut senders = self.manifest_senders.write().await;
243+
senders.insert(registered_id.clone(), manifest_tx);
244+
}
245+
246+
// 5. Build and send initial manifest
241247
let revision = self.next_revision().await;
242248
let initial_manifest = build_manifest(&self.store, &registered_id, revision).await;
243249
let _ = out_tx
@@ -246,12 +252,6 @@ impl NodeService for NodeServiceImpl {
246252
}))
247253
.await;
248254

249-
// 5. Register manifest sender
250-
{
251-
let mut senders = self.manifest_senders.write().await;
252-
senders.insert(registered_id.clone(), manifest_tx);
253-
}
254-
255255
tracing::info!("Node {} connected via Sync stream", registered_id);
256256

257257
// 6. Spawn reader task for incoming messages

mvirt-node/src/agent.rs

Lines changed: 74 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,8 @@ pub struct NodeAgent {
119119
security_group_reconciler: SecurityGroupReconciler,
120120
route_reconciler: RouteReconciler,
121121
// Clients for init-from-reality
122+
vmm_client: VmmClient,
123+
net_client: NetClient,
122124
zfs_client: ZfsClient,
123125
}
124126

@@ -143,13 +145,15 @@ impl NodeAgent {
143145
resources,
144146
audit,
145147
known_ids: HashMap::new(),
146-
vm_reconciler: VmReconciler::new(vmm_client, zfs_client.clone()),
148+
vm_reconciler: VmReconciler::new(vmm_client.clone(), zfs_client.clone()),
147149
network_reconciler: NetworkReconciler::new(net_client.clone()),
148150
nic_reconciler: NicReconciler::new(net_client.clone()),
149151
template_reconciler: TemplateReconciler::new(zfs_client.clone()),
150152
volume_reconciler: VolumeReconciler::new(zfs_client.clone()),
151-
security_group_reconciler: SecurityGroupReconciler::new(net_client),
153+
security_group_reconciler: SecurityGroupReconciler::new(net_client.clone()),
152154
route_reconciler: RouteReconciler::new(),
155+
vmm_client,
156+
net_client,
153157
zfs_client,
154158
}
155159
}
@@ -168,6 +172,72 @@ impl NodeAgent {
168172

169173
/// Initialize known_ids from local sub-services (init from reality).
170174
async fn init_from_reality(&mut self) {
175+
// VMs from VMM
176+
match self.vmm_client.list_vms().await {
177+
Ok(vms) => {
178+
let ids: HashSet<String> = vms.iter().map(|v| v.id.clone()).collect();
179+
info!("Init from reality: {} VMs", ids.len());
180+
self.known_ids.insert("vm".to_string(), ids);
181+
}
182+
Err(e) => {
183+
warn!("Failed to list VMs from VMM: {}", e);
184+
self.known_ids.insert("vm".to_string(), HashSet::new());
185+
}
186+
}
187+
188+
// Networks from net
189+
match self.net_client.list_networks().await {
190+
Ok(networks) => {
191+
let ids: HashSet<String> = networks.iter().map(|n| n.id.clone()).collect();
192+
info!("Init from reality: {} networks", ids.len());
193+
self.known_ids.insert("network".to_string(), ids);
194+
}
195+
Err(e) => {
196+
warn!("Failed to list networks from net: {}", e);
197+
self.known_ids.insert("network".to_string(), HashSet::new());
198+
}
199+
}
200+
201+
// NICs from net
202+
match self.net_client.list_nics().await {
203+
Ok(nics) => {
204+
let ids: HashSet<String> = nics.iter().map(|n| n.id.clone()).collect();
205+
info!("Init from reality: {} NICs", ids.len());
206+
self.known_ids.insert("nic".to_string(), ids);
207+
}
208+
Err(e) => {
209+
warn!("Failed to list NICs from net: {}", e);
210+
self.known_ids.insert("nic".to_string(), HashSet::new());
211+
}
212+
}
213+
214+
// Security groups from net
215+
match self.net_client.list_security_groups().await {
216+
Ok(sgs) => {
217+
let ids: HashSet<String> = sgs.iter().map(|s| s.id.clone()).collect();
218+
info!("Init from reality: {} security groups", ids.len());
219+
self.known_ids.insert("security_group".to_string(), ids);
220+
}
221+
Err(e) => {
222+
warn!("Failed to list security groups from net: {}", e);
223+
self.known_ids
224+
.insert("security_group".to_string(), HashSet::new());
225+
}
226+
}
227+
228+
// Volumes from ZFS
229+
match self.zfs_client.list_volumes().await {
230+
Ok(volumes) => {
231+
let ids: HashSet<String> = volumes.iter().map(|v| v.name.clone()).collect();
232+
info!("Init from reality: {} volumes", ids.len());
233+
self.known_ids.insert("volume".to_string(), ids);
234+
}
235+
Err(e) => {
236+
warn!("Failed to list volumes from ZFS: {}", e);
237+
self.known_ids.insert("volume".to_string(), HashSet::new());
238+
}
239+
}
240+
171241
// Templates from ZFS
172242
match self.zfs_client.list_templates().await {
173243
Ok(templates) => {
@@ -182,10 +252,8 @@ impl NodeAgent {
182252
}
183253
}
184254

185-
// Other resource types start empty — the first manifest will populate them
186-
for key in &["vm", "network", "nic", "volume", "security_group", "route"] {
187-
self.known_ids.entry(key.to_string()).or_default();
188-
}
255+
// Routes start empty (no local daemon to query)
256+
self.known_ids.entry("route".to_string()).or_default();
189257
}
190258

191259
/// Main agent loop.

mvirt-node/src/clients/net.rs

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ use crate::proto::net::{
88
net_service_client::NetServiceClient, AddSecurityGroupRuleRequest, AttachNicRequest,
99
AttachSecurityGroupRequest, CreateNetworkRequest, CreateNicRequest, CreateSecurityGroupRequest,
1010
DeleteNetworkRequest, DeleteNicRequest, DeleteSecurityGroupRequest, DetachSecurityGroupRequest,
11-
GetNicRequest, RemoveSecurityGroupRuleRequest,
11+
GetNicRequest, ListNetworksRequest, ListNicsRequest, ListSecurityGroupsRequest,
12+
RemoveSecurityGroupRuleRequest,
1213
};
1314

1415
pub use crate::proto::net::{
@@ -30,6 +31,43 @@ impl NetClient {
3031
Ok(Self { client })
3132
}
3233

34+
// === List operations ===
35+
36+
/// List all networks.
37+
pub async fn list_networks(&mut self) -> Result<Vec<Network>> {
38+
debug!("Listing networks from mvirt-net");
39+
let resp = self
40+
.client
41+
.list_networks(ListNetworksRequest {})
42+
.await
43+
.context("Failed to list networks")?;
44+
Ok(resp.into_inner().networks)
45+
}
46+
47+
/// List all NICs.
48+
pub async fn list_nics(&mut self) -> Result<Vec<Nic>> {
49+
debug!("Listing NICs from mvirt-net");
50+
let resp = self
51+
.client
52+
.list_nics(ListNicsRequest {
53+
network_id: String::new(),
54+
})
55+
.await
56+
.context("Failed to list NICs")?;
57+
Ok(resp.into_inner().nics)
58+
}
59+
60+
/// List all security groups.
61+
pub async fn list_security_groups(&mut self) -> Result<Vec<SecurityGroup>> {
62+
debug!("Listing security groups from mvirt-net");
63+
let resp = self
64+
.client
65+
.list_security_groups(ListSecurityGroupsRequest {})
66+
.await
67+
.context("Failed to list security groups")?;
68+
Ok(resp.into_inner().security_groups)
69+
}
70+
3371
// === Network operations ===
3472

3573
/// Create a network.

mvirt-node/src/clients/vmm.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,13 @@ use tracing::debug;
66

77
use crate::proto::vmm::{
88
vm_service_client::VmServiceClient, CreateVmRequest, DeleteVmRequest, GetVmRequest,
9-
KillVmRequest, StartVmRequest, StopVmRequest,
9+
KillVmRequest, ListVmsRequest, StartVmRequest, StopVmRequest,
1010
};
1111

1212
pub use crate::proto::vmm::{BootMode, DiskConfig, NicConfig, Vm, VmConfig, VmState};
1313

1414
/// Client for interacting with mvirt-vmm.
15+
#[derive(Clone)]
1516
pub struct VmmClient {
1617
client: VmServiceClient<Channel>,
1718
}
@@ -88,6 +89,17 @@ impl VmmClient {
8889
Ok(resp.into_inner())
8990
}
9091

92+
/// List all VMs.
93+
pub async fn list_vms(&mut self) -> Result<Vec<Vm>> {
94+
debug!("Listing VMs from mvirt-vmm");
95+
let resp = self
96+
.client
97+
.list_vms(ListVmsRequest {})
98+
.await
99+
.context("Failed to list VMs")?;
100+
Ok(resp.into_inner().vms)
101+
}
102+
91103
/// Delete a VM.
92104
pub async fn delete_vm(&mut self, id: &str) -> Result<()> {
93105
debug!("Deleting VM {} in mvirt-vmm", id);

mvirt-node/src/clients/zfs.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use tracing::debug;
77
use crate::proto::zfs::{
88
zfs_service_client::ZfsServiceClient, CloneFromTemplateRequest, CreateVolumeRequest,
99
DeleteTemplateRequest, DeleteVolumeRequest, GetImportJobRequest, GetVolumeRequest,
10-
ImportTemplateRequest, ListTemplatesRequest,
10+
ImportTemplateRequest, ListTemplatesRequest, ListVolumesRequest,
1111
};
1212

1313
pub use crate::proto::zfs::{ImportJob, Template, Volume};
@@ -26,6 +26,17 @@ impl ZfsClient {
2626
Ok(Self { client })
2727
}
2828

29+
/// List all volumes.
30+
pub async fn list_volumes(&mut self) -> Result<Vec<Volume>> {
31+
debug!("Listing volumes from mvirt-zfs");
32+
let resp = self
33+
.client
34+
.list_volumes(ListVolumesRequest {})
35+
.await
36+
.context("Failed to list volumes")?;
37+
Ok(resp.into_inner().volumes)
38+
}
39+
2940
/// Get volume by name.
3041
pub async fn get_volume(&mut self, name: &str) -> Result<Option<Volume>> {
3142
debug!("Getting volume {} from mvirt-zfs", name);

0 commit comments

Comments
 (0)