Skip to content

Commit 0aebcec

Browse files
committed
feat: improve gc and gc tests
1 parent 827edea commit 0aebcec

File tree

4 files changed

+119
-81
lines changed

4 files changed

+119
-81
lines changed

Diff for: server/processor/src/gc.rs

+8-3
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1-
use std::sync::Arc;
1+
use std::{sync::Arc, time::Duration};
22

33
use anyhow::Result;
44
use blob_store::BlobStorage;
55
use state_store::{
66
requests::{RequestPayload, StateMachineUpdateRequest},
77
IndexifyState,
88
};
9+
use tokio::time::{self};
910
use tracing::{debug, error, info};
1011

1112
pub struct Gc {
@@ -41,10 +42,14 @@ impl Gc {
4142
Ok(has_more) => {
4243
if has_more {
4344
rx.mark_changed();
45+
// throttling to avoid tight loop
46+
time::sleep(Duration::from_secs(5)).await;
4447
}
4548
}
4649
Err(err) => {
4750
error!("error processing gc work: {:?}", err);
51+
// prevent spurious errors from causing a tight loop
52+
time::sleep(Duration::from_secs(30)).await;
4853
}
4954
}
5055
tokio::select! {
@@ -69,13 +74,13 @@ impl Gc {
6974
if let Err(e) = storage.delete(url).await {
7075
error!("Error deleting url {:?}: {:?}", url, e);
7176
} else {
72-
deleted_urls.push(url);
77+
deleted_urls.push(url.clone());
7378
}
7479
}
7580
if !deleted_urls.is_empty() {
7681
self.state
7782
.write(StateMachineUpdateRequest {
78-
payload: RequestPayload::RemoveGcUrls(urls),
83+
payload: RequestPayload::RemoveGcUrls(deleted_urls),
7984
processed_state_changes: vec![],
8085
})
8186
.await?;

Diff for: server/src/gc_test.rs

+108-76
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
#[cfg(test)]
22
mod tests {
3+
use std::collections::HashMap;
4+
35
use anyhow::Result;
46
use bytes::Bytes;
57
use data_model::{
68
test_objects::tests::{mock_graph_a, TEST_NAMESPACE},
9+
GraphInvocationCtx,
710
InvocationPayload,
811
NodeOutput,
912
};
1013
use futures::stream;
14+
use indexify_utils::get_epoch_time_in_ms;
1115
use state_store::{
1216
requests::{
1317
CreateOrUpdateComputeGraphRequest,
@@ -32,81 +36,104 @@ mod tests {
3236
} = test_srv.service;
3337

3438
// Create a compute graph
35-
let compute_graph = mock_graph_a("image_hash".to_string());
36-
indexify_state
37-
.write(StateMachineUpdateRequest {
38-
payload: RequestPayload::CreateOrUpdateComputeGraph(
39-
CreateOrUpdateComputeGraphRequest {
40-
namespace: TEST_NAMESPACE.to_string(),
41-
compute_graph: compute_graph.clone(),
42-
upgrade_tasks_to_current_version: false,
43-
},
44-
),
45-
processed_state_changes: vec![],
46-
})
47-
.await?;
39+
let compute_graph = {
40+
let mut compute_graph = mock_graph_a("image_hash".to_string()).clone();
41+
let data = "code";
42+
let path = format!("{}", &compute_graph.code.path);
43+
44+
let data_stream = Box::pin(stream::once(async { Ok(Bytes::from(data)) }));
45+
let res = blob_storage.put(&path, data_stream).await?;
46+
compute_graph.code.path = res.url;
47+
48+
indexify_state
49+
.write(StateMachineUpdateRequest {
50+
payload: RequestPayload::CreateOrUpdateComputeGraph(
51+
CreateOrUpdateComputeGraphRequest {
52+
namespace: TEST_NAMESPACE.to_string(),
53+
compute_graph: compute_graph.clone(),
54+
upgrade_tasks_to_current_version: false,
55+
},
56+
),
57+
processed_state_changes: vec![],
58+
})
59+
.await?;
4860

49-
let data = "aaaa";
50-
let path = "qqqq";
51-
let data_stream = Box::pin(stream::once(async { Ok(Bytes::from(data)) }));
52-
let res = blob_storage.put(path, data_stream).await?;
53-
54-
// Create a graph invocation
55-
let invocation = InvocationPayload {
56-
id: "invocation_id".to_string(),
57-
namespace: TEST_NAMESPACE.to_string(),
58-
compute_graph_name: compute_graph.name.clone(),
59-
payload: data_model::DataPayload {
60-
path: res.url.clone(),
61-
size: res.size_bytes,
62-
sha256_hash: res.sha256_hash.clone(),
63-
},
64-
created_at: 5,
65-
encoding: "application/octet-stream".to_string(),
61+
compute_graph
6662
};
6763

68-
indexify_state.db.put_cf(
69-
&IndexifyObjectsColumns::GraphInvocations.cf_db(&indexify_state.db),
70-
invocation.key().as_bytes(),
71-
&JsonEncoder::encode(&invocation)?,
72-
)?;
73-
74-
let output = NodeOutput {
75-
id: "id".to_string(),
76-
namespace: TEST_NAMESPACE.to_string(),
77-
compute_fn_name: "fn_a".to_string(),
78-
compute_graph_name: "graph_A".to_string(),
79-
invocation_id: "invocation_id".to_string(),
80-
payload: data_model::OutputPayload::Fn(data_model::DataPayload {
81-
path: res.url.clone(),
82-
size: res.size_bytes,
83-
sha256_hash: res.sha256_hash.clone(),
84-
}),
85-
errors: None,
86-
reduced_state: false,
87-
created_at: 5,
88-
encoding: "application/octet-stream".to_string(),
64+
let res = {
65+
let data = "invocation_payload";
66+
let path = "invocation_payload";
67+
let data_stream = Box::pin(stream::once(async { Ok(Bytes::from(data)) }));
68+
let res = blob_storage.put(path, data_stream).await?;
69+
70+
// Create a graph invocation
71+
let invocation = InvocationPayload {
72+
id: "invocation_id".to_string(),
73+
namespace: TEST_NAMESPACE.to_string(),
74+
compute_graph_name: compute_graph.name.clone(),
75+
payload: data_model::DataPayload {
76+
path: res.url.clone(),
77+
size: res.size_bytes,
78+
sha256_hash: res.sha256_hash.clone(),
79+
},
80+
created_at: get_epoch_time_in_ms(),
81+
encoding: "application/octet-stream".to_string(),
82+
};
83+
84+
indexify_state.db.put_cf(
85+
&IndexifyObjectsColumns::GraphInvocations.cf_db(&indexify_state.db),
86+
invocation.key().as_bytes(),
87+
&JsonEncoder::encode(&invocation)?,
88+
)?;
89+
90+
indexify_state.db.put_cf(
91+
&IndexifyObjectsColumns::GraphInvocationCtx.cf_db(&indexify_state.db),
92+
invocation.key().as_bytes(),
93+
&JsonEncoder::encode(&GraphInvocationCtx {
94+
invocation_id: invocation.id.clone(),
95+
compute_graph_name: compute_graph.name.clone(),
96+
namespace: TEST_NAMESPACE.to_string(),
97+
graph_version: compute_graph.version.clone(),
98+
completed: false,
99+
outcome: data_model::GraphInvocationOutcome::Failure,
100+
outstanding_tasks: 0,
101+
fn_task_analytics: HashMap::new(),
102+
created_at: get_epoch_time_in_ms(),
103+
})?,
104+
)?;
105+
106+
let output = NodeOutput {
107+
id: "id".to_string(),
108+
namespace: TEST_NAMESPACE.to_string(),
109+
compute_fn_name: "fn_a".to_string(),
110+
compute_graph_name: compute_graph.name.clone(),
111+
invocation_id: invocation.id.clone(),
112+
payload: data_model::OutputPayload::Fn(data_model::DataPayload {
113+
path: res.url.clone(),
114+
size: res.size_bytes,
115+
sha256_hash: res.sha256_hash.clone(),
116+
}),
117+
errors: None,
118+
reduced_state: false,
119+
created_at: 5,
120+
encoding: "application/octet-stream".to_string(),
121+
};
122+
let key = output.key(&output.invocation_id);
123+
let serialized_output = JsonEncoder::encode(&output)?;
124+
indexify_state.db.put_cf(
125+
&IndexifyObjectsColumns::FnOutputs.cf_db(&indexify_state.db),
126+
key,
127+
&serialized_output,
128+
)?;
129+
130+
blob_storage.read_bytes(&res.url).await?;
131+
132+
res
89133
};
90-
let key = output.key(&output.invocation_id);
91-
let serialized_output = JsonEncoder::encode(&output)?;
92-
indexify_state.db.put_cf(
93-
&IndexifyObjectsColumns::FnOutputs.cf_db(&indexify_state.db),
94-
key,
95-
&serialized_output,
96-
)?;
97-
98-
blob_storage.read_bytes(&res.url).await?;
99-
100-
let request = RequestPayload::TombstoneComputeGraph(DeleteComputeGraphRequest {
101-
namespace: TEST_NAMESPACE.to_string(),
102-
name: compute_graph.name.clone(),
103-
});
104-
indexify_state
105-
.write(StateMachineUpdateRequest {
106-
payload: request,
107-
processed_state_changes: vec![],
108-
})
109-
.await?;
134+
135+
let urls = indexify_state.reader().get_gc_urls(None)?;
136+
assert!(urls.is_empty(), "all gc urls are empty: {:?}", urls);
110137

111138
indexify_state
112139
.write(StateMachineUpdateRequest {
@@ -118,15 +145,20 @@ mod tests {
118145
})
119146
.await?;
120147

148+
let urls = indexify_state.reader().get_gc_urls(None)?;
149+
assert!(
150+
!urls.is_empty(),
151+
"all gc urls should not be empty: {:?}",
152+
urls
153+
);
154+
121155
gc_executor.lock().await.run().await?;
122156

123157
let urls = indexify_state.reader().get_gc_urls(None)?;
124-
assert!(urls.is_empty(), "all gc urls are empty");
158+
assert!(urls.is_empty(), "all gc urls are empty: {:?}", urls);
125159

126-
assert!(
127-
blob_storage.read_bytes(&res.url).await.is_err(),
128-
"file is deleted"
129-
);
160+
let read_res = blob_storage.read_bytes(&res.url).await;
161+
assert!(read_res.is_err(), "file is not deleted: {:#?}", read_res);
130162

131163
Ok(())
132164
}

Diff for: server/src/routes.rs

+1
Original file line numberDiff line numberDiff line change
@@ -455,6 +455,7 @@ async fn create_or_update_compute_graph(
455455
if name == "code" {
456456
let stream = field.map(|res| res.map_err(|err| anyhow!(err)));
457457
let file_name = format!("{}_{}", namespace, nanoid!());
458+
info!("Uploading file: {}", file_name);
458459
let result = state
459460
.blob_storage
460461
.put(&file_name, stream)

Diff for: server/state_store/src/state_machine.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -169,14 +169,15 @@ pub(crate) fn delete_invocation(
169169
&IndexifyObjectsColumns::GraphInvocationCtx.cf_db(&db),
170170
&invocation_ctx_key,
171171
)
172-
.map_err(|e| anyhow!("failed to get invocation: {}", e))?;
172+
.map_err(|e| anyhow!("failed to get invocation: {:?}", e))?;
173173
let invocation_ctx = match invocation_ctx {
174174
Some(v) => JsonEncoder::decode::<GraphInvocationCtx>(&v)?,
175175
None => {
176176
info!(
177177
namespace = &req.namespace,
178178
compute_graph = &req.compute_graph,
179179
invocation_id = &req.invocation_id,
180+
invocation_ctx_key = &invocation_ctx_key,
180181
"Invocation to delete not found: {}",
181182
&req.invocation_id
182183
);
@@ -586,7 +587,6 @@ pub fn delete_compute_graph(
586587
) {
587588
let (_key, value) = iter?;
588589
let value = JsonEncoder::decode::<InvocationPayload>(&value)?;
589-
info!("deleting graph invocation: {}", value.id);
590590
let req = DeleteInvocationRequest {
591591
namespace: value.namespace,
592592
compute_graph: value.compute_graph_name,

0 commit comments

Comments
 (0)