Skip to content

fix: deletion of invocation impacting whole invocation secondary index #1354

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions indexify/tests/cli/test_function_allowlist.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ def function_dev(_: str) -> int:

class TestFunctionAllowlist(unittest.TestCase):
def test_function_routing(self):
print(
"Waiting for 30 seconds for Server to notice that any previously existing Executors exited."
)
time.sleep(30)

graph_name = test_graph_name(self)
version = str(time.time())

Expand Down
13 changes: 13 additions & 0 deletions indexify/tests/cli/test_server_task_distribution.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ def success_func(sleep_secs: float) -> str:

class TestServerTaskDistribution(unittest.TestCase):
def test_server_distributes_invocations_fairly_between_two_executors(self):
print(
"Waiting for 30 seconds for Server to notice that any previously existing Executors exited."
)
time.sleep(30)

graph_name = test_graph_name(self)
version = str(time.time())

Expand Down Expand Up @@ -81,6 +86,10 @@ def test_server_distributes_invocations_fairly_between_two_executors(self):
self.assertLess(invocations_count, 125)

def test_server_redistributes_invocations_when_new_executor_joins(self):
print(
"Waiting for 30 seconds for Server to notice that any previously existing Executors exited."
)
time.sleep(30)
graph_name = test_graph_name(self)
version = str(time.time())

Expand Down Expand Up @@ -132,6 +141,10 @@ def test_server_redistributes_invocations_when_new_executor_joins(self):
self.assertLess(invocations_count, 150)

def test_all_tasks_succeed_when_executor_exits(self):
print(
"Waiting for 30 seconds for Server to notice that any previously existing Executors exited."
)
time.sleep(30)
graph_name = test_graph_name(self)
version = str(time.time())

Expand Down
12 changes: 0 additions & 12 deletions server/data_model/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -830,18 +830,6 @@ impl GraphInvocationCtx {
key
}

pub fn secondary_index_key_prefix_from_compute_graph(
namespace: &str,
compute_graph_name: &str,
) -> Vec<u8> {
let mut key = Vec::new();
key.extend_from_slice(namespace.as_bytes());
key.push(b'|');
key.extend_from_slice(compute_graph_name.as_bytes());
key.push(b'|');
key
}

pub fn get_invocation_id_from_secondary_index_key(key: &[u8]) -> Option<String> {
key.split(|&b| b == b'|')
.nth(3)
Expand Down
11 changes: 8 additions & 3 deletions server/processor/src/gc.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use std::sync::Arc;
use std::{sync::Arc, time::Duration};

use anyhow::Result;
use blob_store::BlobStorage;
use state_store::{
requests::{RequestPayload, StateMachineUpdateRequest},
IndexifyState,
};
use tokio::time::{self};
use tracing::{debug, error, info};

pub struct Gc {
Expand Down Expand Up @@ -41,10 +42,14 @@ impl Gc {
Ok(has_more) => {
if has_more {
rx.mark_changed();
// throttling to avoid tight loop
time::sleep(Duration::from_secs(5)).await;
}
}
Err(err) => {
error!("error processing gc work: {:?}", err);
// prevent spurious errors from causing a tight loop
time::sleep(Duration::from_secs(30)).await;
}
}
tokio::select! {
Expand All @@ -69,13 +74,13 @@ impl Gc {
if let Err(e) = storage.delete(url).await {
error!("Error deleting url {:?}: {:?}", url, e);
} else {
deleted_urls.push(url);
deleted_urls.push(url.clone());
}
}
if !deleted_urls.is_empty() {
self.state
.write(StateMachineUpdateRequest {
payload: RequestPayload::RemoveGcUrls(urls),
payload: RequestPayload::RemoveGcUrls(deleted_urls),
processed_state_changes: vec![],
})
.await?;
Expand Down
184 changes: 108 additions & 76 deletions server/src/gc_test.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
#[cfg(test)]
mod tests {
use std::collections::HashMap;

use anyhow::Result;
use bytes::Bytes;
use data_model::{
test_objects::tests::{mock_graph_a, TEST_NAMESPACE},
GraphInvocationCtx,
InvocationPayload,
NodeOutput,
};
use futures::stream;
use indexify_utils::get_epoch_time_in_ms;
use state_store::{
requests::{
CreateOrUpdateComputeGraphRequest,
Expand All @@ -32,81 +36,104 @@ mod tests {
} = test_srv.service;

// Create a compute graph
let compute_graph = mock_graph_a("image_hash".to_string());
indexify_state
.write(StateMachineUpdateRequest {
payload: RequestPayload::CreateOrUpdateComputeGraph(
CreateOrUpdateComputeGraphRequest {
namespace: TEST_NAMESPACE.to_string(),
compute_graph: compute_graph.clone(),
upgrade_tasks_to_current_version: false,
},
),
processed_state_changes: vec![],
})
.await?;
let compute_graph = {
let mut compute_graph = mock_graph_a("image_hash".to_string()).clone();
let data = "code";
let path = format!("{}", &compute_graph.code.path);

let data_stream = Box::pin(stream::once(async { Ok(Bytes::from(data)) }));
let res = blob_storage.put(&path, data_stream).await?;
compute_graph.code.path = res.url;

indexify_state
.write(StateMachineUpdateRequest {
payload: RequestPayload::CreateOrUpdateComputeGraph(
CreateOrUpdateComputeGraphRequest {
namespace: TEST_NAMESPACE.to_string(),
compute_graph: compute_graph.clone(),
upgrade_tasks_to_current_version: false,
},
),
processed_state_changes: vec![],
})
.await?;

let data = "aaaa";
let path = "qqqq";
let data_stream = Box::pin(stream::once(async { Ok(Bytes::from(data)) }));
let res = blob_storage.put(path, data_stream).await?;

// Create a graph invocation
let invocation = InvocationPayload {
id: "invocation_id".to_string(),
namespace: TEST_NAMESPACE.to_string(),
compute_graph_name: compute_graph.name.clone(),
payload: data_model::DataPayload {
path: res.url.clone(),
size: res.size_bytes,
sha256_hash: res.sha256_hash.clone(),
},
created_at: 5,
encoding: "application/octet-stream".to_string(),
compute_graph
};

indexify_state.db.put_cf(
&IndexifyObjectsColumns::GraphInvocations.cf_db(&indexify_state.db),
invocation.key().as_bytes(),
&JsonEncoder::encode(&invocation)?,
)?;

let output = NodeOutput {
id: "id".to_string(),
namespace: TEST_NAMESPACE.to_string(),
compute_fn_name: "fn_a".to_string(),
compute_graph_name: "graph_A".to_string(),
invocation_id: "invocation_id".to_string(),
payload: data_model::OutputPayload::Fn(data_model::DataPayload {
path: res.url.clone(),
size: res.size_bytes,
sha256_hash: res.sha256_hash.clone(),
}),
errors: None,
reduced_state: false,
created_at: 5,
encoding: "application/octet-stream".to_string(),
let res = {
let data = "invocation_payload";
let path = "invocation_payload";
let data_stream = Box::pin(stream::once(async { Ok(Bytes::from(data)) }));
let res = blob_storage.put(path, data_stream).await?;

// Create a graph invocation
let invocation = InvocationPayload {
id: "invocation_id".to_string(),
namespace: TEST_NAMESPACE.to_string(),
compute_graph_name: compute_graph.name.clone(),
payload: data_model::DataPayload {
path: res.url.clone(),
size: res.size_bytes,
sha256_hash: res.sha256_hash.clone(),
},
created_at: get_epoch_time_in_ms(),
encoding: "application/octet-stream".to_string(),
};

indexify_state.db.put_cf(
&IndexifyObjectsColumns::GraphInvocations.cf_db(&indexify_state.db),
invocation.key().as_bytes(),
&JsonEncoder::encode(&invocation)?,
)?;

indexify_state.db.put_cf(
&IndexifyObjectsColumns::GraphInvocationCtx.cf_db(&indexify_state.db),
invocation.key().as_bytes(),
&JsonEncoder::encode(&GraphInvocationCtx {
invocation_id: invocation.id.clone(),
compute_graph_name: compute_graph.name.clone(),
namespace: TEST_NAMESPACE.to_string(),
graph_version: compute_graph.version.clone(),
completed: false,
outcome: data_model::GraphInvocationOutcome::Failure,
outstanding_tasks: 0,
fn_task_analytics: HashMap::new(),
created_at: get_epoch_time_in_ms(),
})?,
)?;

let output = NodeOutput {
id: "id".to_string(),
namespace: TEST_NAMESPACE.to_string(),
compute_fn_name: "fn_a".to_string(),
compute_graph_name: compute_graph.name.clone(),
invocation_id: invocation.id.clone(),
payload: data_model::OutputPayload::Fn(data_model::DataPayload {
path: res.url.clone(),
size: res.size_bytes,
sha256_hash: res.sha256_hash.clone(),
}),
errors: None,
reduced_state: false,
created_at: 5,
encoding: "application/octet-stream".to_string(),
};
let key = output.key(&output.invocation_id);
let serialized_output = JsonEncoder::encode(&output)?;
indexify_state.db.put_cf(
&IndexifyObjectsColumns::FnOutputs.cf_db(&indexify_state.db),
key,
&serialized_output,
)?;

blob_storage.read_bytes(&res.url).await?;

res
};
let key = output.key(&output.invocation_id);
let serialized_output = JsonEncoder::encode(&output)?;
indexify_state.db.put_cf(
&IndexifyObjectsColumns::FnOutputs.cf_db(&indexify_state.db),
key,
&serialized_output,
)?;

blob_storage.read_bytes(&res.url).await?;

let request = RequestPayload::TombstoneComputeGraph(DeleteComputeGraphRequest {
namespace: TEST_NAMESPACE.to_string(),
name: compute_graph.name.clone(),
});
indexify_state
.write(StateMachineUpdateRequest {
payload: request,
processed_state_changes: vec![],
})
.await?;

let urls = indexify_state.reader().get_gc_urls(None)?;
assert!(urls.is_empty(), "all gc urls are empty: {:?}", urls);

indexify_state
.write(StateMachineUpdateRequest {
Expand All @@ -118,15 +145,20 @@ mod tests {
})
.await?;

let urls = indexify_state.reader().get_gc_urls(None)?;
assert!(
!urls.is_empty(),
"all gc urls should not be empty: {:?}",
urls
);

gc_executor.lock().await.run().await?;

let urls = indexify_state.reader().get_gc_urls(None)?;
assert!(urls.is_empty(), "all gc urls are empty");
assert!(urls.is_empty(), "all gc urls are empty: {:?}", urls);

assert!(
blob_storage.read_bytes(&res.url).await.is_err(),
"file is deleted"
);
let read_res = blob_storage.read_bytes(&res.url).await;
assert!(read_res.is_err(), "file is not deleted: {:#?}", read_res);

Ok(())
}
Expand Down
3 changes: 2 additions & 1 deletion server/src/http_objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,8 @@ pub struct CreateNamespaceResponse {
#[derive(Debug, Serialize, Deserialize, ToSchema)]
pub struct GraphInvocations {
pub invocations: Vec<Invocation>,
pub cursor: Option<String>,
pub prev_cursor: Option<String>,
pub next_cursor: Option<String>,
}

#[derive(Debug, Serialize, Deserialize, ToSchema)]
Expand Down
10 changes: 6 additions & 4 deletions server/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ async fn graph_invocations(
Some(CursorDirection::Backward) => Some(state_store::scanner::CursorDirection::Backward),
None => None,
};
let (invocation_ctxs, cursor) = state
let (invocation_ctxs, prev_cursor, next_cursor) = state
.indexify_state
.reader()
.list_invocations(
Expand All @@ -651,11 +651,13 @@ async fn graph_invocations(
for invocation_ctx in invocation_ctxs {
invocations.push(invocation_ctx.into());
}
let cursor = cursor.map(|c| BASE64_STANDARD.encode(c));
let prev_cursor = prev_cursor.map(|c| BASE64_STANDARD.encode(c));
let next_cursor = next_cursor.map(|c| BASE64_STANDARD.encode(c));

Ok(Json(GraphInvocations {
invocations,
cursor,
prev_cursor,
next_cursor,
}))
}

Expand Down Expand Up @@ -971,7 +973,7 @@ async fn find_invocation(
Ok(Json(invocation_ctx.into()))
}

/// Delete a specific invocation
/// Delete a specific invocation
#[utoipa::path(
delete,
path = "/namespaces/{namespace}/compute_graphs/{compute_graph}/invocations/{invocation_id}",
Expand Down
5 changes: 3 additions & 2 deletions server/state_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use tracing::{debug, error, info, span, warn};
pub mod in_memory_state;
pub mod invocation_events;
pub mod kv;
pub mod migration_runner;
pub mod migrations;
pub mod requests;
pub mod scanner;
Expand Down Expand Up @@ -108,7 +109,7 @@ impl IndexifyState {
// Migrate the db before opening with all column families.
// This is because the migration process may delete older column families.
// If we open the db with all column families, it would fail to open.
let sm_meta = migrations::migrate(&path)?;
let sm_meta = migration_runner::run(&path)?;

let sm_column_families = IndexifyObjectsColumns::iter()
.map(|cf| ColumnFamilyDescriptor::new(cf.to_string(), Options::default()));
Expand Down Expand Up @@ -303,7 +304,7 @@ impl IndexifyState {
&txn,
&request.processed_state_changes,
)?;
migrations::write_sm_meta(
migration_runner::write_sm_meta(
&self.db,
&txn,
&StateMachineMetadata {
Expand Down
Loading