From 301df49418bc972ce372d8df5aaa17a9ff8598f3 Mon Sep 17 00:00:00 2001 From: Benjamin Boudreau Date: Thu, 10 Apr 2025 17:02:53 -0400 Subject: [PATCH 1/5] fix: deletion of invocation impacting whole invocation secondary index --- server/data_model/src/lib.rs | 12 ----- server/state_store/src/migrations.rs | 54 ++++++++++++++++++++- server/state_store/src/state_machine.rs | 62 ++++++++++++++----------- 3 files changed, 88 insertions(+), 40 deletions(-) diff --git a/server/data_model/src/lib.rs b/server/data_model/src/lib.rs index eeb91e0c3..2e45bffbe 100644 --- a/server/data_model/src/lib.rs +++ b/server/data_model/src/lib.rs @@ -830,18 +830,6 @@ impl GraphInvocationCtx { key } - pub fn secondary_index_key_prefix_from_compute_graph( - namespace: &str, - compute_graph_name: &str, - ) -> Vec { - let mut key = Vec::new(); - key.extend_from_slice(namespace.as_bytes()); - key.push(b'|'); - key.extend_from_slice(compute_graph_name.as_bytes()); - key.push(b'|'); - key - } - pub fn get_invocation_id_from_secondary_index_key(key: &[u8]) -> Option { key.split(|&b| b == b'|') .nth(3) diff --git a/server/state_store/src/migrations.rs b/server/state_store/src/migrations.rs index c2e44a803..3a4b9cdc0 100644 --- a/server/state_store/src/migrations.rs +++ b/server/state_store/src/migrations.rs @@ -18,7 +18,7 @@ use crate::{ state_machine::IndexifyObjectsColumns, }; -const SERVER_DB_VERSION: u64 = 7; +const SERVER_DB_VERSION: u64 = 8; // Note: should never be used with data model types to guarantee it works with // different versions. @@ -116,6 +116,12 @@ pub fn migrate(path: &Path) -> Result { .context("migrating from v6 to v7")?; } + if sm_meta.db_version == 7 { + sm_meta.db_version += 1; + migrate_v7_to_v8_recompute_invocation_ctx_secondary_index(&db, &txn) + .context("migrating from v7 to v8")?; + } + // add new migrations before this line and increment SERVER_DB_VERSION } @@ -536,6 +542,52 @@ pub fn migrate_v6_to_v7_reallocate_allocated_tasks( Ok(()) } +#[tracing::instrument(skip(db, txn))] +pub fn migrate_v7_to_v8_recompute_invocation_ctx_secondary_index( + db: &TransactionDB, + txn: &Transaction, +) -> Result<()> { + let mut num_total_invocation_ctx: usize = 0; + let mut num_migrated_invocation_ctx: usize = 0; + + panic!("TODO"); + + { + let mut read_options = ReadOptions::default(); + read_options.set_readahead_size(4_194_304); + + let iter = db.iterator_cf_opt( + &IndexifyObjectsColumns::GraphInvocationCtx.cf_db(&db), + read_options, + IteratorMode::Start, + ); + + for kv in iter { + num_total_invocation_ctx += 1; + let (_key, value) = kv?; + + let graph_invocation_ctx = JsonEncoder::decode::(&value)?; + + let secondary_index_key = + GraphInvocationCtx::secondary_index_key(&graph_invocation_ctx); + + txn.put_cf( + &IndexifyObjectsColumns::GraphInvocationCtxSecondaryIndex.cf_db(&db), + &secondary_index_key, + &[], + )?; + + num_migrated_invocation_ctx += 1; + } + } + + info!( + "Migrated {}/{} invocation context secondary indexes from v3 to v4", + num_migrated_invocation_ctx, num_total_invocation_ctx + ); + Ok(()) +} + pub fn write_sm_meta( db: &TransactionDB, txn: &Transaction, diff --git a/server/state_store/src/state_machine.rs b/server/state_store/src/state_machine.rs index 6b418bedd..38909e228 100644 --- a/server/state_store/src/state_machine.rs +++ b/server/state_store/src/state_machine.rs @@ -161,14 +161,39 @@ pub(crate) fn delete_invocation( "Deleting invocation", ); - // Delete the invocation payload - let invocation_key = + // Check if the invocation was deleted before the task completes + let invocation_ctx_key = GraphInvocationCtx::key_from(&req.namespace, &req.compute_graph, &req.invocation_id); - delete_cf_prefix( - txn, - &IndexifyObjectsColumns::GraphInvocations.cf_db(&db), - invocation_key.as_bytes(), - )?; + let invocation_ctx = txn + .get_cf( + &IndexifyObjectsColumns::GraphInvocationCtx.cf_db(&db), + &invocation_ctx_key, + ) + .map_err(|e| anyhow!("failed to get invocation: {}", e))?; + let invocation_ctx = match invocation_ctx { + Some(v) => JsonEncoder::decode::(&v)?, + None => { + info!( + namespace = &req.namespace, + compute_graph = &req.compute_graph, + invocation_id = &req.invocation_id, + "Invocation to delete not found: {}", + &req.invocation_id + ); + return Ok(()); + } + }; + + // Delete the invocation payload + { + let invocation_key = + InvocationPayload::key_from(&req.namespace, &req.compute_graph, &req.invocation_id); + + txn.delete_cf( + &IndexifyObjectsColumns::GraphInvocations.cf_db(&db), + &invocation_key, + )?; + } let mut tasks_deleted = Vec::new(); let task_prefix = @@ -247,26 +272,16 @@ pub(crate) fn delete_invocation( } // Delete Graph Invocation Context - delete_cf_prefix( txn, IndexifyObjectsColumns::GraphInvocationCtx.cf_db(&db), - invocation_key.as_bytes(), + invocation_ctx_key.as_bytes(), )?; // Delete Graph Invocation Context Secondary Index - // Note We don't delete the secondary index here because it's too much work to - // get the invocation id from the secondary index key. We purge all the - // secondary index keys for graphs if they are ever deleted. - // - // TODO: Only delete the secondary index keys for this invocation - delete_cf_prefix( - txn, + txn.delete_cf( IndexifyObjectsColumns::GraphInvocationCtxSecondaryIndex.cf_db(&db), - &GraphInvocationCtx::secondary_index_key_prefix_from_compute_graph( - &req.namespace, - &req.compute_graph, - ), + &invocation_ctx.secondary_index_key(), )?; let node_output_prefix = @@ -580,13 +595,6 @@ pub fn delete_compute_graph( delete_invocation(db.clone(), txn, &req)?; } - // Delete Graph Invocation Context Secondary Index - delete_cf_prefix( - txn, - IndexifyObjectsColumns::GraphInvocationCtxSecondaryIndex.cf_db(&db), - &GraphInvocationCtx::secondary_index_key_prefix_from_compute_graph(namespace, name), - )?; - for iter in make_prefix_iterator( txn, &IndexifyObjectsColumns::ComputeGraphVersions.cf_db(&db), From 376694d2ef4e6294521fc844045fcb431de0ba36 Mon Sep 17 00:00:00 2001 From: Benjamin Boudreau Date: Fri, 11 Apr 2025 12:10:28 -0400 Subject: [PATCH 2/5] feat: revamp migrations to support recreating cfs --- server/state_store/src/lib.rs | 5 +- server/state_store/src/migration_runner.rs | 248 ++++ server/state_store/src/migrations.rs | 1210 ----------------- server/state_store/src/migrations/contexts.rs | 340 +++++ .../src/migrations/migration_trait.rs | 24 + server/state_store/src/migrations/mod.rs | 16 + server/state_store/src/migrations/registry.rs | 238 ++++ server/state_store/src/migrations/testing.rs | 141 ++ .../src/migrations/v1_task_status.rs | 176 +++ .../v2_invocation_ctx_timestamps.rs | 195 +++ .../v3_invocation_ctx_secondary_index.rs | 273 ++++ .../src/migrations/v4_drop_executors.rs | 121 ++ .../src/migrations/v5_allocation_keys.rs | 328 +++++ .../src/migrations/v6_clean_orphaned_tasks.rs | 183 +++ .../migrations/v7_reset_allocated_tasks.rs | 244 ++++ ..._rebuild_invocation_ctx_secondary_index.rs | 290 ++++ 16 files changed, 2820 insertions(+), 1212 deletions(-) create mode 100644 server/state_store/src/migration_runner.rs delete mode 100644 server/state_store/src/migrations.rs create mode 100644 server/state_store/src/migrations/contexts.rs create mode 100644 server/state_store/src/migrations/migration_trait.rs create mode 100644 server/state_store/src/migrations/mod.rs create mode 100644 server/state_store/src/migrations/registry.rs create mode 100644 server/state_store/src/migrations/testing.rs create mode 100644 server/state_store/src/migrations/v1_task_status.rs create mode 100644 server/state_store/src/migrations/v2_invocation_ctx_timestamps.rs create mode 100644 server/state_store/src/migrations/v3_invocation_ctx_secondary_index.rs create mode 100644 server/state_store/src/migrations/v4_drop_executors.rs create mode 100644 server/state_store/src/migrations/v5_allocation_keys.rs create mode 100644 server/state_store/src/migrations/v6_clean_orphaned_tasks.rs create mode 100644 server/state_store/src/migrations/v7_reset_allocated_tasks.rs create mode 100644 server/state_store/src/migrations/v8_rebuild_invocation_ctx_secondary_index.rs diff --git a/server/state_store/src/lib.rs b/server/state_store/src/lib.rs index 049f99eaa..27923825b 100644 --- a/server/state_store/src/lib.rs +++ b/server/state_store/src/lib.rs @@ -28,6 +28,7 @@ use tracing::{debug, error, info, span, warn}; pub mod in_memory_state; pub mod invocation_events; pub mod kv; +pub mod migration_runner; pub mod migrations; pub mod requests; pub mod scanner; @@ -108,7 +109,7 @@ impl IndexifyState { // Migrate the db before opening with all column families. // This is because the migration process may delete older column families. // If we open the db with all column families, it would fail to open. - let sm_meta = migrations::migrate(&path)?; + let sm_meta = migration_runner::run(&path)?; let sm_column_families = IndexifyObjectsColumns::iter() .map(|cf| ColumnFamilyDescriptor::new(cf.to_string(), Options::default())); @@ -303,7 +304,7 @@ impl IndexifyState { &txn, &request.processed_state_changes, )?; - migrations::write_sm_meta( + migration_runner::write_sm_meta( &self.db, &txn, &StateMachineMetadata { diff --git a/server/state_store/src/migration_runner.rs b/server/state_store/src/migration_runner.rs new file mode 100644 index 000000000..db630f8d8 --- /dev/null +++ b/server/state_store/src/migration_runner.rs @@ -0,0 +1,248 @@ +use std::path::Path; + +use anyhow::{Context, Result}; +use data_model::StateMachineMetadata; +use rocksdb::{Transaction, TransactionDB}; +use tracing::info; + +use crate::{ + migrations::{ + contexts::{MigrationContext, PrepareContext}, + registry::MigrationRegistry, + }, + serializer::{JsonEncode, JsonEncoder}, + state_machine::IndexifyObjectsColumns, +}; + +/// Main function to run all necessary migrations on a database at the given +/// path +pub fn run(path: &Path) -> Result { + // Initialize prepare context + let prepare_ctx = PrepareContext::new(path.to_path_buf()); + + // Initialize registry + let registry = MigrationRegistry::new()?; + let latest_version = registry.latest_version(); + + // Check if DB exists + let db = match prepare_ctx.open_db() { + Ok(db) => db, + Err(e) if e.to_string().contains("No such file or directory") => { + // New DB, return default metadata + info!( + "No database found. Initializing at version {}", + latest_version + ); + return Ok(StateMachineMetadata { + db_version: latest_version, + last_change_idx: 0, + }); + } + Err(e) => return Err(anyhow::anyhow!("Error opening database: {:?}", e)), + }; + + // Read current metadata + let mut sm_meta = read_sm_meta(&db)?; + drop(db); // Close DB before migrations + + // Find applicable migrations + let migrations = registry.find_migrations(sm_meta.db_version); + + // No migrations needed + if migrations.is_empty() { + info!( + "Database already at version {}. No migrations needed.", + sm_meta.db_version + ); + return Ok(sm_meta); + } + + info!( + "Starting migrations from v{} to v{}", + sm_meta.db_version, latest_version + ); + + // Execute each migration in sequence + for migration in migrations { + let from_version = sm_meta.db_version; + let to_version = migration.version(); + + info!( + "Running migration {}: v{} → v{}", + migration.name(), + from_version, + to_version + ); + + // Each migration prepares the DB as needed + let db = migration + .prepare(&prepare_ctx) + .with_context(|| format!("Preparing DB for migration to v{}", to_version))?; + + // Apply migration in a transaction + let txn = db.transaction(); + + // Create migration context + let migration_ctx = MigrationContext::new(&db, &txn); + + // Apply the migration + migration + .apply(&migration_ctx) + .with_context(|| format!("Applying migration to v{}", to_version))?; + + // Update metadata in the same transaction + sm_meta.db_version = to_version; + write_sm_meta(&db, &txn, &sm_meta)?; + + info!("Committing migration to v{}", to_version); + txn.commit() + .with_context(|| format!("Committing migration to v{}", to_version))?; + + // Close DB after each migration to ensure clean state + drop(db); + } + + info!( + "Completed all migrations. DB now at version {}", + sm_meta.db_version + ); + Ok(sm_meta) +} + +/// Read state machine metadata from the database +pub fn read_sm_meta(db: &TransactionDB) -> Result { + let meta = db.get_cf( + &IndexifyObjectsColumns::StateMachineMetadata.cf_db(db), + b"sm_meta", + )?; + match meta { + Some(meta) => Ok(JsonEncoder::decode(&meta)?), + None => Ok(StateMachineMetadata { + db_version: 0, + last_change_idx: 0, + }), + } +} + +/// Write state machine metadata to the database +pub fn write_sm_meta( + db: &TransactionDB, + txn: &Transaction, + sm_meta: &StateMachineMetadata, +) -> Result<()> { + let serialized_meta = JsonEncoder::encode(sm_meta)?; + txn.put_cf( + &IndexifyObjectsColumns::StateMachineMetadata.cf_db(db), + b"sm_meta", + &serialized_meta, + )?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use rocksdb::{ColumnFamilyDescriptor, Options, TransactionDBOptions}; + use strum::IntoEnumIterator; + use tempfile::TempDir; + + use super::*; + use crate::migrations::migration_trait::Migration; + + #[derive(Clone)] + struct MockMigration { + version: u64, + name: &'static str, + } + + impl Migration for MockMigration { + fn version(&self) -> u64 { + self.version + } + + fn name(&self) -> &'static str { + self.name + } + + fn prepare(&self, ctx: &PrepareContext) -> Result { + // Simple mock - just open DB + ctx.open_db() + } + + fn apply(&self, _ctx: &MigrationContext) -> Result<()> { + // No-op for test + Ok(()) + } + + fn box_clone(&self) -> Box { + Box::new(self.clone()) + } + } + + #[test] + fn test_migration_new_db() -> Result<()> { + let temp_dir = TempDir::new()?; + let path = temp_dir.path(); + + // Create a mock migration + let mock_migration = MockMigration { + version: 1, + name: "MockMigration", + }; + + // Use the mock migration (e.g., log its name) + info!("Testing with migration: {}", mock_migration.name()); + + // Run migrations on non-existent DB + let sm_meta = run(path)?; + + // Check migration resulted in latest version + assert_eq!( + sm_meta.db_version, + MigrationRegistry::new()?.latest_version() + ); + + Ok(()) + } + + #[test] + fn test_migration_existing_db() -> Result<()> { + let temp_dir = TempDir::new()?; + let path = temp_dir.path(); + + // Create DB with initial metadata + let sm_column_families = IndexifyObjectsColumns::iter() + .map(|cf| ColumnFamilyDescriptor::new(cf.to_string(), Options::default())); + + let mut db_opts = Options::default(); + db_opts.create_missing_column_families(true); + db_opts.create_if_missing(true); + + let db = TransactionDB::open_cf_descriptors( + &db_opts, + &TransactionDBOptions::default(), + path, + sm_column_families, + )?; + + // Set initial version to 1 + let txn = db.transaction(); + let initial_meta = StateMachineMetadata { + db_version: 0, + last_change_idx: 0, + }; + write_sm_meta(&db, &txn, &initial_meta)?; + txn.commit()?; + drop(db); + + // Run migrations + let sm_meta = run(path)?; + + // Check migration resulted in latest version + assert_eq!( + sm_meta.db_version, + MigrationRegistry::new()?.latest_version() + ); + + Ok(()) + } +} diff --git a/server/state_store/src/migrations.rs b/server/state_store/src/migrations.rs deleted file mode 100644 index 3a4b9cdc0..000000000 --- a/server/state_store/src/migrations.rs +++ /dev/null @@ -1,1210 +0,0 @@ -use std::path::Path; - -use anyhow::{Context, Result}; -use data_model::{GraphInvocationCtx, StateMachineMetadata}; -use rocksdb::{ - IteratorMode, - Options, - ReadOptions, - Transaction, - TransactionDB, - TransactionDBOptions, - DB, -}; -use tracing::info; - -use crate::{ - serializer::{JsonEncode, JsonEncoder}, - state_machine::IndexifyObjectsColumns, -}; - -const SERVER_DB_VERSION: u64 = 8; - -// Note: should never be used with data model types to guarantee it works with -// different versions. -pub fn migrate(path: &Path) -> Result { - let mut db_opts = Options::default(); - // don't create missing column families during migration - db_opts.create_missing_column_families(false); - db_opts.create_if_missing(true); - - // Fetch existing column families - let existing_cfs = match DB::list_cf(&db_opts, &path) { - Ok(cfs) => cfs, - Err(e) if e.kind() == rocksdb::ErrorKind::IOError => { - // No migration needed, just return the default metadata. - info!( - "no state store migration needed, new state at version {}", - SERVER_DB_VERSION - ); - return Ok(StateMachineMetadata { - db_version: SERVER_DB_VERSION, - last_change_idx: 0, - }); - } - Err(e) => return Err(anyhow::anyhow!("listing column families: {}", e)), - }; - - // Open the database with the existing column families - let mut db = TransactionDB::open_cf( - &db_opts, - &TransactionDBOptions::default(), - path, - &existing_cfs, - ) - .map_err(|e| anyhow::anyhow!("failed to open db for migration: {}", e))?; - - let mut sm_meta = read_sm_meta(&db).context("reading current state machine metadata")?; - let current_db_version = sm_meta.db_version; - - if current_db_version == SERVER_DB_VERSION { - info!( - "no state store migration needed, already at version {}", - SERVER_DB_VERSION - ); - return Ok(sm_meta); - } - - // Drop column families before starting migrations transaction - // Dropping column families cannot be done in a transaction and requires - // borrowing with mut. - drop_unused_cfs(&existing_cfs, &mut db).context("dropping column families before migration")?; - - info!( - "starting state store migration from version {} to {}", - current_db_version, SERVER_DB_VERSION - ); - - let txn = db.transaction(); - - // handle empty DB - if sm_meta.db_version == 0 { - sm_meta.db_version = SERVER_DB_VERSION; - } - - // migrations - { - if sm_meta.db_version == 1 { - sm_meta.db_version += 1; - migrate_v1_to_v2(&db, &txn).context("migrating from v1 to v2")?; - } - - if sm_meta.db_version == 2 { - sm_meta.db_version += 1; - migrate_v2_to_v3(&db, &txn).context("migrating from v2 to v3")?; - } - - if sm_meta.db_version == 3 { - sm_meta.db_version += 1; - migrate_v3_to_v4(&db, &txn).context("migrating from v3 to v4")?; - } - - if sm_meta.db_version == 4 { - sm_meta.db_version += 1; - // Bumping for new cf to drop: Executors - } - - if sm_meta.db_version == 5 { - sm_meta.db_version += 1; - migrate_v5_to_v6_migrate_allocations(&db, &txn).context("migrating from v5 to v6")?; - migrate_v5_to_v6_clean_orphaned_tasks(&db, &txn).context("migrating from v5 to v6")?; - } - - if sm_meta.db_version == 6 { - sm_meta.db_version += 1; - migrate_v6_to_v7_reallocate_allocated_tasks(&db, &txn) - .context("migrating from v6 to v7")?; - } - - if sm_meta.db_version == 7 { - sm_meta.db_version += 1; - migrate_v7_to_v8_recompute_invocation_ctx_secondary_index(&db, &txn) - .context("migrating from v7 to v8")?; - } - - // add new migrations before this line and increment SERVER_DB_VERSION - } - - // assert we migrated all the way to the expected server version - if sm_meta.db_version != SERVER_DB_VERSION { - return Err(anyhow::anyhow!( - "migration did not migrate to the expected server version: {} != {}", - sm_meta.db_version, - SERVER_DB_VERSION - )); - } - - // saving db version - write_sm_meta(&db, &txn, &sm_meta)?; - - info!("committing migration"); - txn.commit().context("committing migration")?; - info!("completed state store migration"); - - Ok(sm_meta) -} - -#[tracing::instrument(skip(db, txn))] -pub fn migrate_v1_to_v2(db: &TransactionDB, txn: &Transaction) -> Result<()> { - let mut num_total_tasks: usize = 0; - let mut num_migrated_tasks: usize = 0; - let mut read_options = ReadOptions::default(); - read_options.set_readahead_size(10_194_304); - - // Migrate tasks statuses - // If the status is not set, - // set it to "Pending" if the outcome is not terminal. - // set it to "Completed" if the outcome is terminal. - { - let iter = db.iterator_cf_opt( - IndexifyObjectsColumns::Tasks.cf_db(&db), - read_options, - IteratorMode::Start, - ); - - for kv in iter { - num_total_tasks += 1; - let (key, val_bytes) = kv?; - - let mut task_value: serde_json::Value = serde_json::from_slice(&val_bytes) - .map_err(|e| anyhow::anyhow!("error deserializing Tasks json bytes, {}", e))?; - - let task_obj = task_value.as_object_mut().ok_or(anyhow::anyhow!( - "unexpected task JSON value: {:?}", - String::from_utf8(val_bytes.to_vec()), - ))?; - - let outcome = - task_obj - .get("outcome") - .and_then(|v| v.as_str()) - .ok_or(anyhow::anyhow!( - "unexpected task outcome JSON value: {:?}", - task_obj.get("outcome") - ))?; - - let status_undefined = match task_obj.get("status") { - Some(serde_json::Value::String(status)) => status.is_empty(), - Some(serde_json::Value::Null) => true, - None => true, - val @ _ => { - return Err(anyhow::anyhow!( - "unexpected task status JSON value: {:?}", - val - )); - } - }; - - if status_undefined { - num_migrated_tasks += 1; - if outcome == "Success" || outcome == "Failure" { - task_obj.insert( - "status".to_string(), - serde_json::Value::String("Completed".to_string()), - ); - } else { - task_obj.insert( - "status".to_string(), - serde_json::Value::String("Pending".to_string()), - ); - } - - let task_bytes = serde_json::to_vec(&task_value).map_err(|e| { - anyhow::anyhow!( - "error serializing into json: {}, value: {:?}", - e, - task_value.clone() - ) - })?; - - txn.put_cf(IndexifyObjectsColumns::Tasks.cf_db(&db), &key, &task_bytes)?; - } - } - } - - info!( - "Migrated {}/{} tasks from v1 to v2", - num_migrated_tasks, num_total_tasks - ); - - Ok(()) -} - -#[tracing::instrument(skip(db, txn))] -pub fn migrate_v2_to_v3(db: &TransactionDB, txn: &Transaction) -> Result<()> { - let mut num_total_invocation_ctx: usize = 0; - let mut num_migrated_invocation_ctx: usize = 0; - let mut read_options = ReadOptions::default(); - read_options.set_readahead_size(10_194_304); - - // Migrate graph invocation ctx date from invocation payload data - // by using the payload created_at - { - let iter = db.iterator_cf_opt( - IndexifyObjectsColumns::GraphInvocationCtx.cf_db(&db), - read_options, - IteratorMode::Start, - ); - - for kv in iter { - num_total_invocation_ctx += 1; - let (key, val_bytes) = kv?; - let key_str = String::from_utf8_lossy(&key); - - let mut invocation_ctx: serde_json::Value = serde_json::from_slice(&val_bytes) - .map_err(|e| { - anyhow::anyhow!("error deserializing InvocationCtx json bytes, {}", e) - })?; - - let new_invocation_ctx = invocation_ctx.as_object_mut().ok_or_else(|| { - anyhow::anyhow!("unexpected invocation ctx JSON value {}", key_str) - })?; - - let invocation_bytes = db - .get_cf(&IndexifyObjectsColumns::GraphInvocations.cf_db(&db), &key)? - .ok_or_else(|| { - anyhow::anyhow!("invocation not found for invocation ctx: {}", key_str) - })?; - - let invocation: serde_json::Value = serde_json::from_slice(&invocation_bytes)?; - - let created_at = invocation - .get("created_at") - .and_then(|v| v.as_u64()) - .ok_or_else(|| { - anyhow::anyhow!("created_at not found in invocation: {}", key_str) - })?; - - new_invocation_ctx.insert( - "created_at".to_string(), - serde_json::Value::from(created_at), - ); - - let new_invocation_ctx_bytes = serde_json::to_vec(&new_invocation_ctx)?; - - txn.put_cf( - &IndexifyObjectsColumns::GraphInvocationCtx.cf_db(&db), - &key, - &new_invocation_ctx_bytes, - )?; - - num_migrated_invocation_ctx += 1; - } - } - - info!( - "Migrated {}/{} invocation context from v2 to v3", - num_migrated_invocation_ctx, num_total_invocation_ctx - ); - - Ok(()) -} - -#[tracing::instrument(skip(db, txn))] -pub fn migrate_v3_to_v4(db: &TransactionDB, txn: &Transaction) -> Result<()> { - let mut num_total_invocation_ctx: usize = 0; - let mut num_migrated_invocation_ctx: usize = 0; - - { - let mut read_options = ReadOptions::default(); - read_options.set_readahead_size(4_194_304); - - let iter = db.iterator_cf_opt( - &IndexifyObjectsColumns::GraphInvocationCtx.cf_db(&db), - read_options, - IteratorMode::Start, - ); - - for kv in iter { - num_total_invocation_ctx += 1; - let (_key, value) = kv?; - - let graph_invocation_ctx = JsonEncoder::decode::(&value)?; - - let secondary_index_key = - GraphInvocationCtx::secondary_index_key(&graph_invocation_ctx); - - txn.put_cf( - &IndexifyObjectsColumns::GraphInvocationCtxSecondaryIndex.cf_db(&db), - &secondary_index_key, - &[], - )?; - - num_migrated_invocation_ctx += 1; - } - } - - info!( - "Migrated {}/{} invocation context secondary indexes from v3 to v4", - num_migrated_invocation_ctx, num_total_invocation_ctx - ); - Ok(()) -} - -fn get_string_val(val: &serde_json::Value, key: &str) -> Result { - val.get(key) - .and_then(|v| v.as_str()) - .map(|s| s.to_string()) - .ok_or(anyhow::anyhow!("missing {} in json value", key)) -} - -#[tracing::instrument(skip(db, txn))] -pub fn migrate_v5_to_v6_migrate_allocations( - db: &TransactionDB, - txn: &Transaction, -) -> Result<()> { - let mut read_options = ReadOptions::default(); - read_options.set_readahead_size(10_194_304); // 10MB - - let iter = db.iterator_cf_opt( - IndexifyObjectsColumns::Allocations.cf_db(&db), - read_options, - IteratorMode::Start, - ); - - let mut num_migrated_allocations = 0; - let mut num_deleted_allocations = 0; - let mut num_total_allocations = 0; - - for kv in iter { - num_total_allocations += 1; - let (key, val_bytes) = kv?; - let allocation: serde_json::Value = serde_json::from_slice(&val_bytes) - .map_err(|e| anyhow::anyhow!("error deserializing Allocations json bytes, {:#?}", e))?; - - let namespace = get_string_val(&allocation, "namespace")?; - let compute_graph = get_string_val(&allocation, "compute_graph")?; - let invocation_id = get_string_val(&allocation, "invocation_id")?; - let new_allocation_key = format!( - "{}|{}|{}|{}|{}|{}", - namespace, - compute_graph, - invocation_id, - get_string_val(&allocation, "compute_fn")?, - get_string_val(&allocation, "task_id")?, - get_string_val(&allocation, "executor_id")? - ); - - // Delete the old allocation using id as key - txn.delete_cf(IndexifyObjectsColumns::Allocations.cf_db(&db), &key)?; - - // Check if the allocation is orphaned by ensuring it has a graph invocation and - // ctx - if db - .get_cf( - &IndexifyObjectsColumns::GraphInvocationCtx.cf_db(&db), - format!("{}|{}|{}", namespace, compute_graph, invocation_id), - )? - .is_some() - { - // Re-put the allocation with the new key - txn.put_cf( - IndexifyObjectsColumns::Allocations.cf_db(&db), - new_allocation_key, - &val_bytes, - )?; - num_migrated_allocations += 1; - } else { - num_deleted_allocations += 1; - } - } - - info!( - "Migrated {} allocations and deleted {} orphaned allocations from {} total allocations", - num_migrated_allocations, num_deleted_allocations, num_total_allocations - ); - - Ok(()) -} - -#[tracing::instrument(skip(db, txn))] -pub fn migrate_v5_to_v6_clean_orphaned_tasks( - db: &TransactionDB, - txn: &Transaction, -) -> Result<()> { - let mut read_options = ReadOptions::default(); - read_options.set_readahead_size(10_194_304); // 10MB - - let iter = db.iterator_cf_opt( - IndexifyObjectsColumns::Tasks.cf_db(&db), - read_options, - IteratorMode::Start, - ); - - let mut num_deleted_tasks = 0; - let mut num_total_tasks = 0; - - for kv in iter { - num_total_tasks += 1; - let (key, val_bytes) = kv?; - let task: serde_json::Value = serde_json::from_slice(&val_bytes) - .map_err(|e| anyhow::anyhow!("error deserializing Tasks json bytes, {:#?}", e))?; - - let namespace = get_string_val(&task, "namespace")?; - let compute_graph = get_string_val(&task, "compute_graph_name")?; - let invocation_id = get_string_val(&task, "invocation_id")?; - - // Check if the task is orphaned by ensuring it has a graph invocation - if db - .get_cf( - &IndexifyObjectsColumns::GraphInvocationCtx.cf_db(&db), - format!("{}|{}|{}", namespace, compute_graph, invocation_id), - )? - .is_none() - { - // Delete the orphaned task - txn.delete_cf(IndexifyObjectsColumns::Tasks.cf_db(&db), &key)?; - num_deleted_tasks += 1; - } - } - - info!( - "Deleted {} orphaned tasks out of {}", - num_deleted_tasks, num_total_tasks - ); - - Ok(()) -} - -#[tracing::instrument(skip(db, txn))] -pub fn migrate_v6_to_v7_reallocate_allocated_tasks( - db: &TransactionDB, - txn: &Transaction, -) -> Result<()> { - // Set up read options with reasonable readahead size - let mut read_options = ReadOptions::default(); - read_options.set_readahead_size(10_194_304); // 10MB - - // Iterate through all Allocations - let iter = db.iterator_cf_opt( - IndexifyObjectsColumns::Allocations.cf_db(&db), - read_options, - IteratorMode::Start, - ); - - let mut num_total_allocations = 0; - let mut num_deleted_allocations = 0; - let mut num_updated_tasks = 0; - - for kv in iter { - num_total_allocations += 1; - let (key, val_bytes) = kv?; - let allocation: serde_json::Value = serde_json::from_slice(&val_bytes) - .map_err(|e| anyhow::anyhow!("error deserializing Allocations json bytes, {:#?}", e))?; - - // Extract task information from the allocation - let namespace = get_string_val(&allocation, "namespace")?; - let compute_graph = get_string_val(&allocation, "compute_graph")?; - let invocation_id = get_string_val(&allocation, "invocation_id")?; - let compute_fn = get_string_val(&allocation, "compute_fn")?; - let task_id = get_string_val(&allocation, "task_id")?; - - // Construct the task key - let task_key = format!( - "{}|{}|{}|{}|{}", - namespace, compute_graph, invocation_id, compute_fn, task_id - ); - - // Get the task - if let Some(task_bytes) = db.get_cf(&IndexifyObjectsColumns::Tasks.cf_db(&db), &task_key)? { - let mut task: serde_json::Value = serde_json::from_slice(&task_bytes) - .map_err(|e| anyhow::anyhow!("error deserializing Task json bytes, {:#?}", e))?; - - // Update task status to Pending - if let Some(task_obj) = task.as_object_mut() { - task_obj.insert( - "status".to_string(), - serde_json::Value::String("Pending".to_string()), - ); - - // Update the task in the database - let updated_task_bytes = serde_json::to_vec(&task) - .map_err(|e| anyhow::anyhow!("error serializing task: {:#?}", e))?; - txn.put_cf( - &IndexifyObjectsColumns::Tasks.cf_db(&db), - &task_key, - &updated_task_bytes, - )?; - num_updated_tasks += 1; - } - } - - // Delete the allocation - txn.delete_cf(IndexifyObjectsColumns::Allocations.cf_db(&db), &key)?; - num_deleted_allocations += 1; - } - - info!( - "Dropped {} allocations and updated {} tasks out of {} total allocations", - num_deleted_allocations, num_updated_tasks, num_total_allocations - ); - - Ok(()) -} - -#[tracing::instrument(skip(db, txn))] -pub fn migrate_v7_to_v8_recompute_invocation_ctx_secondary_index( - db: &TransactionDB, - txn: &Transaction, -) -> Result<()> { - let mut num_total_invocation_ctx: usize = 0; - let mut num_migrated_invocation_ctx: usize = 0; - - panic!("TODO"); - - { - let mut read_options = ReadOptions::default(); - read_options.set_readahead_size(4_194_304); - - let iter = db.iterator_cf_opt( - &IndexifyObjectsColumns::GraphInvocationCtx.cf_db(&db), - read_options, - IteratorMode::Start, - ); - - for kv in iter { - num_total_invocation_ctx += 1; - let (_key, value) = kv?; - - let graph_invocation_ctx = JsonEncoder::decode::(&value)?; - - let secondary_index_key = - GraphInvocationCtx::secondary_index_key(&graph_invocation_ctx); - - txn.put_cf( - &IndexifyObjectsColumns::GraphInvocationCtxSecondaryIndex.cf_db(&db), - &secondary_index_key, - &[], - )?; - - num_migrated_invocation_ctx += 1; - } - } - - info!( - "Migrated {}/{} invocation context secondary indexes from v3 to v4", - num_migrated_invocation_ctx, num_total_invocation_ctx - ); - Ok(()) -} - -pub fn write_sm_meta( - db: &TransactionDB, - txn: &Transaction, - sm_meta: &StateMachineMetadata, -) -> Result<()> { - let serialized_meta = JsonEncoder::encode(&sm_meta)?; - txn.put_cf( - &IndexifyObjectsColumns::StateMachineMetadata.cf_db(&db), - b"sm_meta", - &serialized_meta, - )?; - Ok(()) -} - -pub fn read_sm_meta(db: &TransactionDB) -> Result { - let meta = db.get_cf( - &IndexifyObjectsColumns::StateMachineMetadata.cf_db(&db), - b"sm_meta", - )?; - match meta { - Some(meta) => Ok(JsonEncoder::decode(&meta)?), - None => Ok(StateMachineMetadata { - db_version: 0, - last_change_idx: 0, - }), - } -} - -#[tracing::instrument(skip(db, existing_cfs))] -pub fn drop_unused_cfs(existing_cfs: &Vec, db: &mut TransactionDB) -> Result<()> { - let cfs_to_drop = vec!["Executors", "UnallocatedTasks", "TaskAllocations"]; - - for cf_name in cfs_to_drop { - if existing_cfs.contains(&cf_name.to_string()) { - info!("Dropping unused {} column family", cf_name); - db.drop_cf(cf_name)?; - } - } - - Ok(()) -} - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use rocksdb::{Options, TransactionDBOptions}; - use serde_json::json; - use tempfile::TempDir; - - use super::*; - - #[tokio::test] - async fn test_migrate_v1_to_v2() -> Result<()> { - let temp_dir = TempDir::new()?; - let path = temp_dir.path().to_str().unwrap(); - - let sm_column_families = vec![ - rocksdb::ColumnFamilyDescriptor::new( - IndexifyObjectsColumns::Tasks.as_ref(), - Options::default(), - ), - rocksdb::ColumnFamilyDescriptor::new( - IndexifyObjectsColumns::StateMachineMetadata.as_ref(), - Options::default(), - ), - ]; - - let mut db_opts = Options::default(); - db_opts.create_missing_column_families(true); - db_opts.create_if_missing(true); - let db = TransactionDB::open_cf_descriptors( - &db_opts, - &TransactionDBOptions::default(), - path, - sm_column_families, - ) - .map_err(|e| anyhow::anyhow!("failed to open db: {}", e))?; - - // Create tasks with different outcomes and no status - let tasks = vec![ - json!({ - "id": "task1", - "namespace": "test_ns", - "compute_fn_name": "test_fn", - "compute_graph_name": "test_graph", - "invocation_id": "test_invocation", - "input_node_output_key": "test_input", - "graph_version": "1", - "outcome": "Success", - "creation_time_ns": 0, - }), - json!({ - "id": "task2", - "namespace": "test_ns", - "compute_fn_name": "test_fn", - "compute_graph_name": "test_graph", - "invocation_id": "test_invocation", - "input_node_output_key": "test_input", - "graph_version": "1", - "outcome": "Failure", - "creation_time_ns": 0, - }), - json!({ - "id": "task3", - "namespace": "test_ns", - "compute_fn_name": "test_fn", - "compute_graph_name": "test_graph", - "invocation_id": "test_invocation", - "input_node_output_key": "test_input", - "graph_version": "1", - "outcome": "Unknown", - "creation_time_ns": 0, - }), - ]; - - for task in tasks { - let task_key = format!( - "{}|{}|{}|{}|{}", - task["namespace"].as_str().unwrap(), - task["compute_graph_name"].as_str().unwrap(), - task["invocation_id"].as_str().unwrap(), - task["compute_fn_name"].as_str().unwrap(), - task["id"].as_str().unwrap() - ); - let task_bytes = serde_json::to_vec(&task)?; - db.put_cf( - &IndexifyObjectsColumns::Tasks.cf_db(&db), - &task_key, - &task_bytes, - )?; - } - - // Perform migration - let txn = db.transaction(); - migrate_v1_to_v2(&db, &txn)?; - txn.commit()?; - - // Verify migration - let task1_key = "test_ns|test_graph|test_invocation|test_fn|task1"; - let task1: serde_json::Value = serde_json::from_slice( - &db.get_cf(&IndexifyObjectsColumns::Tasks.cf_db(&db), &task1_key)? - .unwrap(), - )?; - assert_eq!(task1["status"], "Completed", "task1 {}", task1); - - let task2_key = "test_ns|test_graph|test_invocation|test_fn|task2"; - let task2: serde_json::Value = serde_json::from_slice( - &db.get_cf(&IndexifyObjectsColumns::Tasks.cf_db(&db), &task2_key)? - .unwrap(), - )?; - assert_eq!(task2["status"], "Completed"); - - let task3_key = "test_ns|test_graph|test_invocation|test_fn|task3"; - let task3: serde_json::Value = serde_json::from_slice( - &db.get_cf(&IndexifyObjectsColumns::Tasks.cf_db(&db), &task3_key)? - .unwrap(), - )?; - assert_eq!(task3["status"], "Pending"); - - Ok(()) - } - - #[tokio::test] - async fn test_migrate_v2_to_v3() -> Result<()> { - let temp_dir = TempDir::new()?; - let path = temp_dir.path().to_str().unwrap(); - - let sm_column_families = vec![ - rocksdb::ColumnFamilyDescriptor::new( - IndexifyObjectsColumns::GraphInvocationCtx.as_ref(), - Options::default(), - ), - rocksdb::ColumnFamilyDescriptor::new( - IndexifyObjectsColumns::GraphInvocations.as_ref(), - Options::default(), - ), - rocksdb::ColumnFamilyDescriptor::new( - IndexifyObjectsColumns::StateMachineMetadata.as_ref(), - Options::default(), - ), - ]; - - let mut db_opts = Options::default(); - db_opts.create_missing_column_families(true); - db_opts.create_if_missing(true); - let db = TransactionDB::open_cf_descriptors( - &db_opts, - &TransactionDBOptions::default(), - path, - sm_column_families, - ) - .map_err(|e| anyhow::anyhow!("failed to open db: {}", e))?; - - // Create invocation payloads and invocation contexts without created_at - let invocations = vec![ - json!({ - "id": "invocation1", - "namespace": "test_ns", - "compute_graph_name": "test_graph", - "payload": { - "path": "path1", - "size": 123, - "sha256_hash": "hash1" - }, - "created_at": 1000, - "encoding": "application/json" - }), - json!({ - "id": "invocation2", - "namespace": "test_ns", - "compute_graph_name": "test_graph", - "payload": { - "path": "path2", - "size": 456, - "sha256_hash": "hash2" - }, - "created_at": 2000, - "encoding": "application/json" - }), - ]; - - let invocation_ctxs = vec![ - json!({ - "namespace": "test_ns", - "compute_graph_name": "test_graph", - "graph_version": "1", - "invocation_id": "invocation1", - "completed": false, - "outcome": "Undefined", - "outstanding_tasks": 0, - "fn_task_analytics": {} - }), - json!({ - "namespace": "test_ns", - "compute_graph_name": "test_graph", - "graph_version": "1", - "invocation_id": "invocation2", - "completed": false, - "outcome": "Undefined", - "outstanding_tasks": 0, - "fn_task_analytics": {} - }), - ]; - - for invocation in invocations { - let key = format!( - "{}|{}|{}", - invocation["namespace"].as_str().unwrap(), - invocation["compute_graph_name"].as_str().unwrap(), - invocation["id"].as_str().unwrap() - ); - let bytes = serde_json::to_vec(&invocation)?; - db.put_cf( - &IndexifyObjectsColumns::GraphInvocations.cf_db(&db), - &key, - &bytes, - )?; - } - - for ctx in invocation_ctxs { - let key = format!( - "{}|{}|{}", - ctx["namespace"].as_str().unwrap(), - ctx["compute_graph_name"].as_str().unwrap(), - ctx["invocation_id"].as_str().unwrap() - ); - let bytes = serde_json::to_vec(&ctx)?; - db.put_cf( - &IndexifyObjectsColumns::GraphInvocationCtx.cf_db(&db), - &key, - &bytes, - )?; - } - - // Perform migration - let txn = db.transaction(); - migrate_v2_to_v3(&db, &txn)?; - txn.commit()?; - - // Verify migration - let ctx1_key = "test_ns|test_graph|invocation1"; - let ctx1: serde_json::Value = serde_json::from_slice( - &db.get_cf( - &IndexifyObjectsColumns::GraphInvocationCtx.cf_db(&db), - &ctx1_key, - )? - .unwrap(), - )?; - assert_eq!(ctx1["created_at"], 1000); - - let ctx2_key = "test_ns|test_graph|invocation2"; - let ctx2: serde_json::Value = serde_json::from_slice( - &db.get_cf( - &IndexifyObjectsColumns::GraphInvocationCtx.cf_db(&db), - &ctx2_key, - )? - .unwrap(), - )?; - assert_eq!(ctx2["created_at"], 2000); - - Ok(()) - } - - #[tokio::test] - async fn test_migrate_logic() -> Result<()> { - let temp_dir = TempDir::new()?; - let path = temp_dir.path(); - - let mut db_opts = Options::default(); - db_opts.create_missing_column_families(true); - db_opts.create_if_missing(true); - let db = Arc::new( - TransactionDB::open_cf_descriptors( - &db_opts, - &TransactionDBOptions::default(), - path, - vec![rocksdb::ColumnFamilyDescriptor::new( - IndexifyObjectsColumns::StateMachineMetadata.as_ref(), - Options::default(), - )], - ) - .map_err(|e| anyhow::anyhow!("failed to open db: {}", e))?, - ); - - // Test case where the database is already at the latest version - let sm_meta = StateMachineMetadata { - db_version: SERVER_DB_VERSION, - last_change_idx: 0, - }; - let txn = db.transaction(); - write_sm_meta(&db, &txn, &sm_meta)?; - txn.commit()?; - - drop(db); - - let sm_meta = migrate(path)?; - assert_eq!(sm_meta.db_version, SERVER_DB_VERSION); - - // Test case where the database is empty - let sm_meta = StateMachineMetadata { - db_version: 0, - last_change_idx: 0, - }; - - let db = Arc::new( - TransactionDB::open_cf_descriptors( - &db_opts, - &TransactionDBOptions::default(), - path, - vec![rocksdb::ColumnFamilyDescriptor::new( - IndexifyObjectsColumns::StateMachineMetadata.as_ref(), - Options::default(), - )], - ) - .map_err(|e| anyhow::anyhow!("failed to open db: {}", e))?, - ); - let txn = db.transaction(); - write_sm_meta(&db, &txn, &sm_meta)?; - txn.commit()?; - drop(db); - - let sm_meta = migrate(path)?; - assert_eq!(sm_meta.db_version, SERVER_DB_VERSION); - - Ok(()) - } - - #[tokio::test] - async fn test_migrate_v5_to_v6() -> Result<()> { - let temp_dir = TempDir::new()?; - let path = temp_dir.path().to_str().unwrap(); - - let sm_column_families = vec![ - rocksdb::ColumnFamilyDescriptor::new( - IndexifyObjectsColumns::Allocations.as_ref(), - Options::default(), - ), - rocksdb::ColumnFamilyDescriptor::new( - IndexifyObjectsColumns::GraphInvocationCtx.as_ref(), - Options::default(), - ), - rocksdb::ColumnFamilyDescriptor::new( - IndexifyObjectsColumns::StateMachineMetadata.as_ref(), - Options::default(), - ), - ]; - - let mut db_opts = Options::default(); - db_opts.create_missing_column_families(true); - db_opts.create_if_missing(true); - let db = TransactionDB::open_cf_descriptors( - &db_opts, - &TransactionDBOptions::default(), - path, - sm_column_families, - ) - .map_err(|e| anyhow::anyhow!("failed to open db: {}", e))?; - - // Create allocations with different invocation statuses - let allocations = vec![ - json!({ - "id": "allocation1", - "namespace": "test_ns", - "compute_graph": "test_graph", - "invocation_id": "invocation1", - "compute_fn": "test_fn", - "task_id": "task1", - "executor_id": "executor1", - }), - json!({ - "id": "allocation2", - "namespace": "test_ns", - "compute_graph": "test_graph", - "invocation_id": "invocation2", - "compute_fn": "test_fn", - "task_id": "task2", - "executor_id": "executor2", - }), - ]; - - for allocation in allocations { - let allocation_key = allocation["id"].as_str().unwrap(); - let allocation_bytes = serde_json::to_vec(&allocation)?; - db.put_cf( - &IndexifyObjectsColumns::Allocations.cf_db(&db), - allocation_key, - &allocation_bytes, - )?; - } - - // Create invocation contexts - let invocation_ctxs = vec![json!({ - "namespace": "test_ns", - "compute_graph_name": "test_graph", - "graph_version": "1", - "invocation_id": "invocation1", - "completed": false, - "outcome": "Undefined", - "outstanding_tasks": 0, - "fn_task_analytics": {} - })]; - - for ctx in invocation_ctxs { - let key = format!( - "{}|{}|{}", - ctx["namespace"].as_str().unwrap(), - ctx["compute_graph_name"].as_str().unwrap(), - ctx["invocation_id"].as_str().unwrap() - ); - let bytes = serde_json::to_vec(&ctx)?; - db.put_cf( - &IndexifyObjectsColumns::GraphInvocationCtx.cf_db(&db), - &key, - &bytes, - )?; - } - - // Perform migration - let txn = db.transaction(); - migrate_v5_to_v6_migrate_allocations(&db, &txn)?; - txn.commit()?; - - let all_allocations = &db - .full_iterator_cf( - &IndexifyObjectsColumns::Allocations.cf_db(&db), - IteratorMode::Start, - ) - .collect::>(); - assert_eq!( - all_allocations.len(), - 1, - "allocations: {:#?}", - all_allocations - ); - - // Verify migration - let allocation1_key = "test_ns|test_graph|invocation1|test_fn|task1|executor1"; - let allocation1: serde_json::Value = serde_json::from_slice( - &db.get_cf( - &IndexifyObjectsColumns::Allocations.cf_db(&db), - allocation1_key, - )? - .unwrap(), - )?; - assert_eq!(allocation1["invocation_id"], "invocation1"); - - Ok(()) - } - - #[tokio::test] - async fn test_migrate_v5_to_v6_clean_orphaned_tasks() -> Result<()> { - let temp_dir = TempDir::new()?; - let path = temp_dir.path().to_str().unwrap(); - - let sm_column_families = vec![ - rocksdb::ColumnFamilyDescriptor::new( - IndexifyObjectsColumns::Tasks.as_ref(), - Options::default(), - ), - rocksdb::ColumnFamilyDescriptor::new( - IndexifyObjectsColumns::GraphInvocationCtx.as_ref(), - Options::default(), - ), - rocksdb::ColumnFamilyDescriptor::new( - IndexifyObjectsColumns::StateMachineMetadata.as_ref(), - Options::default(), - ), - ]; - - let mut db_opts = Options::default(); - db_opts.create_missing_column_families(true); - db_opts.create_if_missing(true); - let db = TransactionDB::open_cf_descriptors( - &db_opts, - &TransactionDBOptions::default(), - path, - sm_column_families, - ) - .map_err(|e| anyhow::anyhow!("failed to open db: {}", e))?; - - // Create tasks with different invocation statuses - let tasks = vec![ - json!({ - "id": "task1", - "namespace": "test_ns", - "compute_fn_name": "test_fn", - "compute_graph_name": "test_graph", - "invocation_id": "invocation1", - "input_node_output_key": "test_input", - "graph_version": "1", - "outcome": "Success", - "creation_time_ns": 0, - }), - json!({ - "id": "task2", - "namespace": "test_ns", - "compute_fn_name": "test_fn", - "compute_graph_name": "test_graph", - "invocation_id": "invocation2", - "input_node_output_key": "test_input", - "graph_version": "1", - "outcome": "Failure", - "creation_time_ns": 0, - }), - ]; - - for task in tasks { - let task_key = format!( - "{}|{}|{}|{}|{}", - task["namespace"].as_str().unwrap(), - task["compute_graph_name"].as_str().unwrap(), - task["invocation_id"].as_str().unwrap(), - task["compute_fn_name"].as_str().unwrap(), - task["id"].as_str().unwrap() - ); - let task_bytes = serde_json::to_vec(&task)?; - db.put_cf( - &IndexifyObjectsColumns::Tasks.cf_db(&db), - &task_key, - &task_bytes, - )?; - } - - // Create invocation contexts - let invocation_ctxs = vec![json!({ - "namespace": "test_ns", - "compute_graph_name": "test_graph", - "graph_version": "1", - "invocation_id": "invocation1", - "completed": false, - "outcome": "Undefined", - "outstanding_tasks": 0, - "fn_task_analytics": {} - })]; - - for ctx in invocation_ctxs { - let key = format!( - "{}|{}|{}", - ctx["namespace"].as_str().unwrap(), - ctx["compute_graph_name"].as_str().unwrap(), - ctx["invocation_id"].as_str().unwrap() - ); - let bytes = serde_json::to_vec(&ctx)?; - db.put_cf( - &IndexifyObjectsColumns::GraphInvocationCtx.cf_db(&db), - &key, - &bytes, - )?; - } - - // Perform migration - let txn = db.transaction(); - migrate_v5_to_v6_clean_orphaned_tasks(&db, &txn)?; - txn.commit()?; - - // Verify migration - - let all_tasks = &db - .full_iterator_cf( - &IndexifyObjectsColumns::Tasks.cf_db(&db), - IteratorMode::Start, - ) - .collect::>(); - assert_eq!(all_tasks.len(), 1, "tasks: {:#?}", all_tasks); - - let task1_key = "test_ns|test_graph|invocation1|test_fn|task1"; - let task1: serde_json::Value = serde_json::from_slice( - &db.get_cf(&IndexifyObjectsColumns::Tasks.cf_db(&db), &task1_key)? - .unwrap(), - )?; - assert_eq!(task1["invocation_id"], "invocation1"); - - let task2_key = "test_ns|test_graph|invocation2|test_fn|task2"; - let task2 = db.get_cf(&IndexifyObjectsColumns::Tasks.cf_db(&db), &task2_key)?; - assert!(task2.is_none(), "task2 should be deleted"); - - Ok(()) - } -} diff --git a/server/state_store/src/migrations/contexts.rs b/server/state_store/src/migrations/contexts.rs new file mode 100644 index 000000000..ccca5f5fd --- /dev/null +++ b/server/state_store/src/migrations/contexts.rs @@ -0,0 +1,340 @@ +use std::path::PathBuf; + +use anyhow::{anyhow, Result}; +use rocksdb::{ + ColumnFamily, + IteratorMode, + Options, + ReadOptions, + Transaction, + TransactionDB, + TransactionDBOptions, + DB, +}; +use serde_json::Value; + +use crate::state_machine::IndexifyObjectsColumns; + +/// Context for database preparation phase of migrations +pub struct PrepareContext { + pub path: PathBuf, + pub db_opts: Options, +} + +impl PrepareContext { + pub fn new(path: PathBuf) -> Self { + let mut db_opts = Options::default(); + db_opts.create_if_missing(true); + Self { path, db_opts } + } + + /// Open database with all existing column families + pub fn open_db(&self) -> Result { + let cfs = match DB::list_cf(&self.db_opts, &self.path) { + Ok(cfs) => cfs, + Err(e) => return Err(anyhow!("Failed to list column families: {}", e)), + }; + + TransactionDB::open_cf( + &self.db_opts, + &TransactionDBOptions::default(), + &self.path, + &cfs, + ) + .map_err(|e| anyhow!("Failed to open DB: {}", e)) + } + + /// Open DB with specific column families + pub fn open_db_with_cfs(&self, cfs: &[String]) -> Result { + TransactionDB::open_cf( + &self.db_opts, + &TransactionDBOptions::default(), + &self.path, + cfs, + ) + .map_err(|e| anyhow!("Failed to open DB: {}", e)) + } + + /// Helper to perform column family operations and reopen DB + pub fn reopen_with_cf_operations(&self, operations: F) -> Result + where + F: FnOnce(&mut TransactionDB) -> Result<()>, + { + // Open DB + let mut db = self.open_db()?; + + // Apply operations + operations(&mut db)?; + + // Close DB to finalize CF changes + drop(db); + + // Reopen with updated CFs + self.open_db() + } + + /// Get list of all column families + pub fn list_cfs(&self) -> Result> { + DB::list_cf(&self.db_opts, &self.path) + .map_err(|e| anyhow!("Failed to list column families: {}", e)) + } +} + +/// Context for applying migration logic +pub struct MigrationContext<'a> { + pub db: &'a TransactionDB, + pub txn: &'a Transaction<'a, TransactionDB>, +} + +impl<'a> MigrationContext<'a> { + pub fn new(db: &'a TransactionDB, txn: &'a Transaction<'a, TransactionDB>) -> Self { + Self { db, txn } + } + + /// Get column family handle + pub fn cf(&self, column_family: &'a IndexifyObjectsColumns) -> &'a ColumnFamily { + column_family.cf_db(self.db) + } + + /// Iterate over all entries in a column family + pub fn iterate_cf( + &self, + column_family: &IndexifyObjectsColumns, + mut callback: F, + ) -> Result<()> + where + F: FnMut(&[u8], &[u8]) -> Result<()>, + { + let mut read_options = ReadOptions::default(); + read_options.set_readahead_size(10_194_304); // 10MB + + let iter = + self.db + .iterator_cf_opt(self.cf(column_family), read_options, IteratorMode::Start); + + for kv in iter { + let (key, value) = kv?; + callback(&key, &value)?; + } + + Ok(()) + } + + /// Count entries in a column family + pub fn count_entries(&self, column_family: &IndexifyObjectsColumns) -> Result { + let mut count = 0; + self.iterate_cf(column_family, |_, _| { + count += 1; + Ok(()) + })?; + Ok(count) + } + + /// Parse JSON from bytes + pub fn parse_json(&self, bytes: &[u8]) -> Result { + serde_json::from_slice(bytes).map_err(|e| anyhow!("Error deserializing JSON: {}", e)) + } + + /// Encode JSON to bytes + pub fn encode_json(&self, json: &Value) -> Result> { + serde_json::to_vec(json).map_err(|e| anyhow!("Error serializing JSON: {}", e)) + } + + /// Helper for common field renames in JSON objects + pub fn rename_json_field( + &self, + json: &mut Value, + old_field: &str, + new_field: &str, + ) -> Result { + if let Some(obj) = json.as_object_mut() { + if let Some(value) = obj.remove(old_field) { + obj.insert(new_field.to_string(), value); + return Ok(true); + } + } + Ok(false) + } + + /// Helper to ensure a field exists with a default value + pub fn ensure_json_field( + &self, + json: &mut Value, + field: &str, + default_value: Value, + ) -> Result { + if let Some(obj) = json.as_object_mut() { + if !obj.contains_key(field) { + obj.insert(field.to_string(), default_value); + return Ok(true); + } + } + Ok(false) + } + + /// Helper to update a JSON object and write it back + pub fn update_json( + &self, + column_family: &IndexifyObjectsColumns, + key: &[u8], + updater: F, + ) -> Result + where + F: FnOnce(&mut Value) -> Result, + { + if let Some(value_bytes) = self.db.get_cf(self.cf(column_family), key)? { + let mut json = self.parse_json(&value_bytes)?; + + if updater(&mut json)? { + let updated_bytes = self.encode_json(&json)?; + self.txn + .put_cf(self.cf(column_family), key, &updated_bytes)?; + return Ok(true); + } + } + + Ok(false) + } + + /// Get a string value from a JSON object + pub fn get_string_val(&self, val: &Value, key: &str) -> Result { + val.get(key) + .and_then(|v| v.as_str()) + .map(|s| s.to_string()) + .ok_or_else(|| anyhow!("Missing {} in JSON value", key)) + } + + /// Truncate all entries in a column family + pub fn truncate_cf(&self, column_family: &IndexifyObjectsColumns) -> Result { + let mut count = 0; + + self.iterate_cf(column_family, |key, _| { + self.txn.delete_cf(self.cf(column_family), key)?; + count += 1; + Ok(()) + })?; + + Ok(count) + } +} + +#[cfg(test)] +mod tests { + use rocksdb::ColumnFamilyDescriptor; + use serde_json::json; + use tempfile::TempDir; + + use super::*; + + #[test] + fn test_prepare_context() -> Result<()> { + let temp_dir = TempDir::new()?; + let path = temp_dir.path().to_path_buf(); + + let ctx = PrepareContext::new(path); + + // Test creating DB + let mut db_opts = Options::default(); + db_opts.create_missing_column_families(true); + db_opts.create_if_missing(true); + + let cf_names = vec!["test_cf".to_string(), "default".to_string()]; + let cf_descriptors: Vec<_> = cf_names + .iter() + .map(|name| ColumnFamilyDescriptor::new(name, Options::default())) + .collect(); + + TransactionDB::::open_cf_descriptors( + &db_opts, + &TransactionDBOptions::default(), + &ctx.path, + cf_descriptors, + )?; + + // Test listing CFs + let cfs = ctx.list_cfs()?; + assert!(cfs.contains(&"test_cf".to_string())); + assert!(cfs.contains(&"default".to_string())); + + // Test reopen with CF operations + let db = ctx.reopen_with_cf_operations(|db| { + db.drop_cf("test_cf")?; + db.create_cf("new_cf", &Options::default())?; + Ok(()) + })?; + + drop(db); + + let new_cfs = DB::list_cf(&db_opts, &ctx.path)?; + assert!(!new_cfs.contains(&"test_cf".to_string())); + assert!(new_cfs.contains(&"new_cf".to_string())); + + Ok(()) + } + + #[test] + fn test_migration_context() -> Result<()> { + let temp_dir = TempDir::new()?; + let path = temp_dir.path(); + + // Set up test DB with a column family + let mut db_opts = Options::default(); + db_opts.create_missing_column_families(true); + db_opts.create_if_missing(true); + + let cf_name = IndexifyObjectsColumns::Tasks.as_ref(); + let cf_descriptors = vec![ + ColumnFamilyDescriptor::new("default", Options::default()), + ColumnFamilyDescriptor::new(cf_name, Options::default()), + ]; + + let db = TransactionDB::open_cf_descriptors( + &db_opts, + &TransactionDBOptions::default(), + path, + cf_descriptors, + )?; + + // Add test data + let test_json = json!({ + "id": "test1", + "old_field": "value", + "preserved": true + }); + + let key = b"test_key"; + let value = serde_json::to_vec(&test_json)?; + + db.put_cf(IndexifyObjectsColumns::Tasks.cf_db(&db), key, &value)?; + + // Create migration context + let txn = db.transaction(); + let ctx = MigrationContext::new(&db, &txn); + + // Test JSON operations + ctx.update_json(&IndexifyObjectsColumns::Tasks, key, |json| { + // Rename field + ctx.rename_json_field(json, "old_field", "new_field")?; + + // Add field with default + ctx.ensure_json_field(json, "added_field", json!(42))?; + + Ok(true) + })?; + + txn.commit()?; + + // Verify changes + let updated_bytes = db + .get_cf(IndexifyObjectsColumns::Tasks.cf_db(&db), key)? + .unwrap(); + let updated_json: Value = serde_json::from_slice(&updated_bytes)?; + + assert_eq!(updated_json["new_field"], "value"); + assert_eq!(updated_json["added_field"], 42); + assert_eq!(updated_json["preserved"], true); + assert!(updated_json.get("old_field").is_none()); + + Ok(()) + } +} diff --git a/server/state_store/src/migrations/migration_trait.rs b/server/state_store/src/migrations/migration_trait.rs new file mode 100644 index 000000000..0d694dcfc --- /dev/null +++ b/server/state_store/src/migrations/migration_trait.rs @@ -0,0 +1,24 @@ +use anyhow::Result; +use rocksdb::TransactionDB; + +use super::contexts::{MigrationContext, PrepareContext}; + +/// Trait defining a database migration +pub trait Migration { + /// The version this migration upgrades TO + fn version(&self) -> u64; + + /// Name for logging purposes + fn name(&self) -> &'static str; + + /// DB preparation - column family operations before transaction + /// Default implementation simply opens the DB with existing column families + fn prepare(&self, ctx: &PrepareContext) -> Result { + ctx.open_db() + } + + /// Apply migration using provided context + fn apply(&self, ctx: &MigrationContext) -> Result<()>; + + fn box_clone(&self) -> Box; +} diff --git a/server/state_store/src/migrations/mod.rs b/server/state_store/src/migrations/mod.rs new file mode 100644 index 000000000..7fc3707ec --- /dev/null +++ b/server/state_store/src/migrations/mod.rs @@ -0,0 +1,16 @@ +pub mod contexts; +pub mod migration_trait; +pub mod registry; +#[cfg(test)] +mod testing; + +// migrations +mod v1_task_status; +mod v2_invocation_ctx_timestamps; +mod v3_invocation_ctx_secondary_index; +mod v4_drop_executors; +mod v5_allocation_keys; +mod v6_clean_orphaned_tasks; +mod v7_reset_allocated_tasks; +mod v8_rebuild_invocation_ctx_secondary_index; +// Add new migrations mod here diff --git a/server/state_store/src/migrations/registry.rs b/server/state_store/src/migrations/registry.rs new file mode 100644 index 000000000..1203bd3d4 --- /dev/null +++ b/server/state_store/src/migrations/registry.rs @@ -0,0 +1,238 @@ +use anyhow::{anyhow, Result}; + +use super::{ + migration_trait::Migration, + v4_drop_executors::V4DropExecutorsMigration, + v5_allocation_keys::V5AllocationKeysMigration, + v6_clean_orphaned_tasks::V6CleanOrphanedTasksMigration, + v7_reset_allocated_tasks::V7ResetAllocatedTasksMigration, + v8_rebuild_invocation_ctx_secondary_index::V8RebuildInvocationCtxSecondaryIndexMigration, +}; +// Import all migration implementations +use super::{ + v1_task_status::V1TaskStatusMigration, + v2_invocation_ctx_timestamps::V2InvocationTimestampsMigration, + v3_invocation_ctx_secondary_index::V3SecondaryIndexesMigration, + // Add new migrations here +}; + +/// Registry for all available migrations +pub struct MigrationRegistry { + migrations: Vec>, +} + +impl MigrationRegistry { + /// Create a new registry with all registered migrations + pub fn new() -> Result { + let mut registry = Self { + migrations: Vec::new(), + }; + + // Register all migrations + registry.register(Box::new(V1TaskStatusMigration {})); + registry.register(Box::new(V2InvocationTimestampsMigration {})); + registry.register(Box::new(V3SecondaryIndexesMigration {})); + registry.register(Box::new(V4DropExecutorsMigration {})); + registry.register(Box::new(V5AllocationKeysMigration {})); + registry.register(Box::new(V6CleanOrphanedTasksMigration {})); + registry.register(Box::new(V7ResetAllocatedTasksMigration {})); + registry.register(Box::new(V8RebuildInvocationCtxSecondaryIndexMigration {})); + // Add new migrations here + + // Sort and validate migrations + registry.sort_and_validate()?; + + Ok(registry) + } + + /// Register a new migration + fn register(&mut self, migration: Box) { + self.migrations.push(migration); + } + + /// Sort migrations by version and validate no duplicates + fn sort_and_validate(&mut self) -> Result<()> { + // Sort migrations by version + self.migrations.sort_by_key(|m| m.version()); + + // Validate no duplicate versions + for i in 1..self.migrations.len() { + if self.migrations[i].version() == self.migrations[i - 1].version() { + return Err(anyhow!( + "Duplicate migration version {} found: {} and {}", + self.migrations[i].version(), + self.migrations[i - 1].name(), + self.migrations[i].name() + )); + } + } + Ok(()) + } + + /// Find migrations that should be applied from the current version + pub fn find_migrations(&self, from_version: u64) -> Vec> { + self.migrations + .iter() + .map(|m| m.box_clone()) + .filter(|m| m.version() > from_version) + .collect() + } + + /// Get the latest migration version + pub fn latest_version(&self) -> u64 { + self.migrations + .iter() + .map(|m| m.version()) + .max() + .unwrap_or(0) + } +} + +#[cfg(test)] +mod tests { + use super::{super::contexts::MigrationContext, *}; + + #[derive(Clone)] + struct TestMigration { + version_num: u64, + name_str: &'static str, + } + + impl Migration for TestMigration { + fn version(&self) -> u64 { + self.version_num + } + + fn name(&self) -> &'static str { + self.name_str + } + + fn apply(&self, _ctx: &MigrationContext) -> Result<()> { + Ok(()) + } + + fn box_clone(&self) -> Box { + Box::new(self.clone()) + } + } + + #[test] + fn test_registry_sorts_migrations() -> Result<()> { + let mut registry = MigrationRegistry { + migrations: Vec::new(), + }; + + // Add migrations in random order + registry.register(Box::new(TestMigration { + version_num: 3, + name_str: "Migration 3", + })); + registry.register(Box::new(TestMigration { + version_num: 1, + name_str: "Migration 1", + })); + registry.register(Box::new(TestMigration { + version_num: 2, + name_str: "Migration 2", + })); + + registry.sort_and_validate()?; + + // Check migrations are sorted + assert_eq!(registry.migrations[0].version(), 1); + assert_eq!(registry.migrations[1].version(), 2); + assert_eq!(registry.migrations[2].version(), 3); + + Ok(()) + } + + #[test] + fn test_registry_detects_duplicates() { + let mut registry = MigrationRegistry { + migrations: Vec::new(), + }; + + // Add migrations with duplicate versions + registry.register(Box::new(TestMigration { + version_num: 1, + name_str: "Migration A", + })); + registry.register(Box::new(TestMigration { + version_num: 1, + name_str: "Migration B", + })); + + let result = registry.sort_and_validate(); + assert!(result.is_err()); + if let Err(e) = result { + assert_eq!( + e.to_string(), + "Duplicate migration version 1 found: Migration A and Migration B" + ); + } + } + + #[test] + fn test_find_migrations() -> Result<()> { + let mut registry = MigrationRegistry { + migrations: Vec::new(), + }; + + registry.register(Box::new(TestMigration { + version_num: 1, + name_str: "Migration 1", + })); + registry.register(Box::new(TestMigration { + version_num: 2, + name_str: "Migration 2", + })); + registry.register(Box::new(TestMigration { + version_num: 3, + name_str: "Migration 3", + })); + + registry.sort_and_validate()?; + + // Find migrations from version 0 + let migrations = registry.find_migrations(0); + assert_eq!(migrations.len(), 3); + + // Find migrations from version 1 + let migrations = registry.find_migrations(1); + assert_eq!(migrations.len(), 2); + assert_eq!(migrations[0].version(), 2); + assert_eq!(migrations[1].version(), 3); + + // Find migrations from version 3 + let migrations = registry.find_migrations(3); + assert_eq!(migrations.len(), 0); + + Ok(()) + } + + #[test] + fn test_latest_version() { + let mut registry = MigrationRegistry { + migrations: Vec::new(), + }; + + // Empty registry + assert_eq!(registry.latest_version(), 0); + + // Add migrations + registry.register(Box::new(TestMigration { + version_num: 1, + name_str: "Migration 1", + })); + registry.register(Box::new(TestMigration { + version_num: 5, + name_str: "Migration 5", + })); + registry.register(Box::new(TestMigration { + version_num: 3, + name_str: "Migration 3", + })); + + assert_eq!(registry.latest_version(), 5); + } +} diff --git a/server/state_store/src/migrations/testing.rs b/server/state_store/src/migrations/testing.rs new file mode 100644 index 000000000..786e3fa71 --- /dev/null +++ b/server/state_store/src/migrations/testing.rs @@ -0,0 +1,141 @@ +use anyhow::Result; +use rocksdb::{Options, TransactionDB, TransactionDBOptions}; +use tempfile::TempDir; + +use super::{ + contexts::{MigrationContext, PrepareContext}, + migration_trait::Migration, +}; + +/// A more complete test utility that handles custom column families +pub struct MigrationTestBuilder { + column_families: Vec, + db_opts: Options, +} + +impl MigrationTestBuilder { + pub fn new() -> Self { + let mut db_opts = Options::default(); + db_opts.create_missing_column_families(true); + db_opts.create_if_missing(true); + + Self { + column_families: vec!["default".to_string()], + db_opts, + } + } + + /// Add column families to create initially + pub fn with_column_family(mut self, cf_name: &str) -> Self { + self.column_families.push(cf_name.to_string()); + self + } + + /// Run the test with the given migration and setup/verify functions + pub fn run_test(self, migration: &M, setup: S, verify: V) -> Result<()> + where + M: Migration, + S: FnOnce(&TransactionDB) -> Result<()>, + V: FnOnce(&TransactionDB) -> Result<()>, + { + // Create temporary database directory + let temp_dir = TempDir::new()?; + let path = temp_dir.path(); + + // Create database with specified column families + let db = TransactionDB::open_cf( + &self.db_opts, + &TransactionDBOptions::default(), + path, + &self.column_families, + )?; + + // Run setup function to populate test data + setup(&db)?; + + // Close database + drop(db); + + // Prepare the database for migration + let prepare_ctx = PrepareContext::new(path.to_path_buf()); + let db = migration.prepare(&prepare_ctx)?; + + // Apply the migration + let txn = db.transaction(); + let migration_ctx = MigrationContext::new(&db, &txn); + + migration.apply(&migration_ctx)?; + txn.commit()?; + + // Run verification + verify(&db)?; + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::state_machine::IndexifyObjectsColumns; + + #[derive(Clone)] + struct MockMigration { + version_num: u64, + } + + impl Migration for MockMigration { + fn version(&self) -> u64 { + self.version_num + } + + fn name(&self) -> &'static str { + "Mock Migration" + } + + fn prepare(&self, ctx: &PrepareContext) -> Result { + ctx.open_db() + } + + fn apply(&self, ctx: &MigrationContext) -> Result<()> { + // Simple mock implementation that just puts a marker + ctx.txn.put_cf( + ctx.cf(&IndexifyObjectsColumns::StateMachineMetadata), + b"migration_test", + format!("v{}", self.version_num).as_bytes(), + )?; + Ok(()) + } + + fn box_clone(&self) -> Box { + Box::new(self.clone()) + } + } + + #[test] + fn test_migration_test_builder() -> Result<()> { + let migration = MockMigration { version_num: 43 }; + + MigrationTestBuilder::new() + .with_column_family(IndexifyObjectsColumns::StateMachineMetadata.as_ref()) + .run_test( + &migration, + |_db| { + // Setup - nothing needed + Ok(()) + }, + |db| { + // Verify migration was applied + let result = db.get_cf( + IndexifyObjectsColumns::StateMachineMetadata.cf_db(db), + b"migration_test", + )?; + + assert_eq!(result, Some(b"v43".to_vec())); + Ok(()) + }, + )?; + + Ok(()) + } +} diff --git a/server/state_store/src/migrations/v1_task_status.rs b/server/state_store/src/migrations/v1_task_status.rs new file mode 100644 index 000000000..287ae9fd2 --- /dev/null +++ b/server/state_store/src/migrations/v1_task_status.rs @@ -0,0 +1,176 @@ +use anyhow::Result; +use tracing::info; + +use super::{contexts::MigrationContext, migration_trait::Migration}; +use crate::state_machine::IndexifyObjectsColumns; + +#[derive(Clone)] +pub struct V1TaskStatusMigration {} + +impl Migration for V1TaskStatusMigration { + fn version(&self) -> u64 { + 1 + } + + fn name(&self) -> &'static str { + "Add status field to tasks" + } + + fn apply(&self, ctx: &MigrationContext) -> Result<()> { + let mut num_total_tasks: usize = 0; + let mut num_migrated_tasks: usize = 0; + + ctx.iterate_cf(&IndexifyObjectsColumns::Tasks, |key, _value| { + num_total_tasks += 1; + + ctx.update_json(&IndexifyObjectsColumns::Tasks, key, |task_json| { + let task_obj = task_json + .as_object_mut() + .ok_or_else(|| anyhow::anyhow!("unexpected task JSON value"))?; + + let outcome = task_obj + .get("outcome") + .and_then(|v| v.as_str()) + .ok_or_else(|| anyhow::anyhow!("unexpected task outcome JSON value"))?; + + let status_undefined = match task_obj.get("status") { + Some(serde_json::Value::String(status)) => status.is_empty(), + Some(serde_json::Value::Null) => true, + None => true, + _ => false, + }; + + if status_undefined { + num_migrated_tasks += 1; + if outcome == "Success" || outcome == "Failure" { + task_obj.insert( + "status".to_string(), + serde_json::Value::String("Completed".to_string()), + ); + } else { + task_obj.insert( + "status".to_string(), + serde_json::Value::String("Pending".to_string()), + ); + } + + Ok(true) // Task was modified + } else { + Ok(false) // No changes needed + } + })?; + + Ok(()) + })?; + + info!( + "Migrated {}/{} tasks: added status field", + num_migrated_tasks, num_total_tasks + ); + + Ok(()) + } + + fn box_clone(&self) -> Box { + Box::new(self.clone()) + } +} + +#[cfg(test)] +mod tests { + use serde_json::json; + + use super::*; + use crate::migrations::testing::MigrationTestBuilder; + + #[test] + fn test_v1_migration() -> Result<()> { + let migration = V1TaskStatusMigration {}; + + MigrationTestBuilder::new() + .with_column_family(IndexifyObjectsColumns::Tasks.as_ref()) + .run_test( + &migration, + |db| { + // Setup: Insert test tasks with different outcomes and no status field + let tasks = vec![ + ( + b"test_ns|test_graph|test_invoc|test_fn|task1".to_vec(), + json!({ + "id": "task1", + "namespace": "test_ns", + "compute_graph_name": "test_graph", + "invocation_id": "test_invoc", + "compute_fn_name": "test_fn", + "outcome": "Success" + }), + ), + ( + b"test_ns|test_graph|test_invoc|test_fn|task2".to_vec(), + json!({ + "id": "task2", + "namespace": "test_ns", + "compute_graph_name": "test_graph", + "invocation_id": "test_invoc", + "compute_fn_name": "test_fn", + "outcome": "Failure" + }), + ), + ( + b"test_ns|test_graph|test_invoc|test_fn|task3".to_vec(), + json!({ + "id": "task3", + "namespace": "test_ns", + "compute_graph_name": "test_graph", + "invocation_id": "test_invoc", + "compute_fn_name": "test_fn", + "outcome": "Unknown" + }), + ), + ( + b"test_ns|test_graph|test_invoc|test_fn|task4".to_vec(), + json!({ + "id": "task4", + "namespace": "test_ns", + "compute_graph_name": "test_graph", + "invocation_id": "test_invoc", + "compute_fn_name": "test_fn", + "outcome": "Success", + "status": "AlreadySet" + }), + ), + ]; + + for (key, value) in tasks { + db.put_cf( + IndexifyObjectsColumns::Tasks.cf_db(db), + &key, + serde_json::to_vec(&value)?.as_slice(), + )?; + } + + Ok(()) + }, + |db| { + // Verify: Check that status fields were added properly + let verify_status = |key: &[u8], expected_status: &str| -> Result<()> { + let bytes = db + .get_cf(IndexifyObjectsColumns::Tasks.cf_db(db), key)? + .unwrap(); + let task: serde_json::Value = serde_json::from_slice(&bytes)?; + assert_eq!(task["status"].as_str().unwrap(), expected_status); + Ok(()) + }; + + verify_status(b"test_ns|test_graph|test_invoc|test_fn|task1", "Completed")?; + verify_status(b"test_ns|test_graph|test_invoc|test_fn|task2", "Completed")?; + verify_status(b"test_ns|test_graph|test_invoc|test_fn|task3", "Pending")?; + verify_status(b"test_ns|test_graph|test_invoc|test_fn|task4", "AlreadySet")?; + + Ok(()) + }, + )?; + + Ok(()) + } +} diff --git a/server/state_store/src/migrations/v2_invocation_ctx_timestamps.rs b/server/state_store/src/migrations/v2_invocation_ctx_timestamps.rs new file mode 100644 index 000000000..3d1cce3d2 --- /dev/null +++ b/server/state_store/src/migrations/v2_invocation_ctx_timestamps.rs @@ -0,0 +1,195 @@ +use anyhow::Result; +use tracing::info; + +use super::{contexts::MigrationContext, migration_trait::Migration}; +use crate::state_machine::IndexifyObjectsColumns; + +#[derive(Clone)] +pub struct V2InvocationTimestampsMigration {} + +impl Migration for V2InvocationTimestampsMigration { + fn version(&self) -> u64 { + 2 + } + + fn name(&self) -> &'static str { + "Add timestamps to invocation contexts" + } + + fn apply(&self, ctx: &MigrationContext) -> Result<()> { + let mut num_total_invocation_ctx: usize = 0; + let mut num_migrated_invocation_ctx: usize = 0; + + ctx.iterate_cf( + &IndexifyObjectsColumns::GraphInvocationCtx, + |key, _value| { + num_total_invocation_ctx += 1; + let key_str = String::from_utf8_lossy(key); + + ctx.update_json( + &IndexifyObjectsColumns::GraphInvocationCtx, + key, + |invocation_ctx| { + let invocation_bytes = ctx + .db + .get_cf(&IndexifyObjectsColumns::GraphInvocations.cf_db(ctx.db), key)? + .ok_or_else(|| { + anyhow::anyhow!( + "invocation not found for invocation ctx: {}", + key_str + ) + })?; + + let invocation = ctx.parse_json(&invocation_bytes)?; + + let created_at = invocation + .get("created_at") + .and_then(|v| v.as_u64()) + .ok_or_else(|| { + anyhow::anyhow!("created_at not found in invocation: {}", key_str) + })?; + + let ctx_obj = invocation_ctx.as_object_mut().ok_or_else(|| { + anyhow::anyhow!("unexpected invocation ctx JSON value {}", key_str) + })?; + + ctx_obj.insert( + "created_at".to_string(), + serde_json::Value::Number(serde_json::Number::from(created_at)), + ); + + num_migrated_invocation_ctx += 1; + Ok(true) + }, + )?; + + Ok(()) + }, + )?; + + info!( + "Migrated {}/{} invocation contexts: added timestamps", + num_migrated_invocation_ctx, num_total_invocation_ctx + ); + + Ok(()) + } + + fn box_clone(&self) -> Box { + Box::new(self.clone()) + } +} + +#[cfg(test)] +mod tests { + use serde_json::json; + + use super::*; + use crate::migrations::testing::MigrationTestBuilder; + + #[test] + fn test_v2_migration() -> Result<()> { + let migration = V2InvocationTimestampsMigration {}; + + MigrationTestBuilder::new() + .with_column_family(IndexifyObjectsColumns::GraphInvocationCtx.as_ref()) + .with_column_family(IndexifyObjectsColumns::GraphInvocations.as_ref()) + .run_test( + &migration, + |db| { + // Setup: Insert test invocations and contexts + let invocations = vec![ + ( + b"test_ns|test_graph|inv1".to_vec(), + json!({ + "id": "inv1", + "namespace": "test_ns", + "compute_graph_name": "test_graph", + "created_at": 1000, + "payload": { + "path": "test_path", + "size": 123 + } + }), + ), + ( + b"test_ns|test_graph|inv2".to_vec(), + json!({ + "id": "inv2", + "namespace": "test_ns", + "compute_graph_name": "test_graph", + "created_at": 2000, + "payload": { + "path": "test_path2", + "size": 456 + } + }), + ), + ]; + + let contexts = vec![ + ( + b"test_ns|test_graph|inv1".to_vec(), + json!({ + "namespace": "test_ns", + "compute_graph_name": "test_graph", + "invocation_id": "inv1", + "graph_version": "1", + "completed": false, + "outcome": "Undefined", + "outstanding_tasks": 0 + }), + ), + ( + b"test_ns|test_graph|inv2".to_vec(), + json!({ + "namespace": "test_ns", + "compute_graph_name": "test_graph", + "invocation_id": "inv2", + "graph_version": "1", + "completed": false, + "outcome": "Undefined", + "outstanding_tasks": 0 + }), + ), + ]; + + for (key, value) in invocations { + db.put_cf( + IndexifyObjectsColumns::GraphInvocations.cf_db(db), + &key, + serde_json::to_vec(&value)?.as_slice(), + )?; + } + + for (key, value) in contexts { + db.put_cf( + IndexifyObjectsColumns::GraphInvocationCtx.cf_db(db), + &key, + serde_json::to_vec(&value)?.as_slice(), + )?; + } + + Ok(()) + }, + |db| { + // Verify: Check that timestamps were added to contexts + let verify_timestamp = |key: &[u8], expected_timestamp: u64| -> Result<()> { + let bytes = db + .get_cf(IndexifyObjectsColumns::GraphInvocationCtx.cf_db(db), key)? + .unwrap(); + let ctx_json: serde_json::Value = serde_json::from_slice(&bytes)?; + assert_eq!(ctx_json["created_at"].as_u64().unwrap(), expected_timestamp); + Ok(()) + }; + + verify_timestamp(b"test_ns|test_graph|inv1", 1000)?; + verify_timestamp(b"test_ns|test_graph|inv2", 2000)?; + + Ok(()) + }, + )?; + + Ok(()) + } +} diff --git a/server/state_store/src/migrations/v3_invocation_ctx_secondary_index.rs b/server/state_store/src/migrations/v3_invocation_ctx_secondary_index.rs new file mode 100644 index 000000000..91909855e --- /dev/null +++ b/server/state_store/src/migrations/v3_invocation_ctx_secondary_index.rs @@ -0,0 +1,273 @@ +use anyhow::{Context, Result}; +use tracing::info; + +use super::{ + contexts::{MigrationContext, PrepareContext}, + migration_trait::Migration, +}; +use crate::state_machine::IndexifyObjectsColumns; + +#[derive(Clone)] +pub struct V3SecondaryIndexesMigration {} + +impl Migration for V3SecondaryIndexesMigration { + fn version(&self) -> u64 { + 3 + } + + fn name(&self) -> &'static str { + "Create invocation context secondary indexes" + } + + fn prepare(&self, ctx: &PrepareContext) -> Result { + // Check if secondary index CF already exists, if not create it + let existing_cfs = ctx.list_cfs()?; + + if !existing_cfs.contains( + &IndexifyObjectsColumns::GraphInvocationCtxSecondaryIndex + .as_ref() + .to_string(), + ) { + ctx.reopen_with_cf_operations(|db| { + info!("Creating secondary index column family"); + db.create_cf( + IndexifyObjectsColumns::GraphInvocationCtxSecondaryIndex.as_ref(), + &rocksdb::Options::default(), + )?; + Ok(()) + }) + } else { + info!("Secondary index column family already exists"); + ctx.open_db() + } + } + + fn apply(&self, ctx: &MigrationContext) -> Result<()> { + // First clear any existing secondary indexes + let deleted_count = + ctx.truncate_cf(&IndexifyObjectsColumns::GraphInvocationCtxSecondaryIndex)?; + if deleted_count > 0 { + info!("Cleared {} existing secondary index entries", deleted_count); + } + + let mut num_total_invocation_ctx: usize = 0; + let mut num_indexed_invocation_ctx: usize = 0; + + ctx.iterate_cf( + &IndexifyObjectsColumns::GraphInvocationCtx, + |_key, value| { + num_total_invocation_ctx += 1; + + // Parse the invocation context as JSON + let invocation_ctx: serde_json::Value = serde_json::from_slice(value) + .context("Error parsing GraphInvocationCtx as JSON")?; + + // Create secondary index key + let secondary_index_key = create_secondary_index_key(&invocation_ctx)?; + + // Store the secondary index (key -> empty value) + ctx.txn.put_cf( + ctx.cf(&IndexifyObjectsColumns::GraphInvocationCtxSecondaryIndex), + &secondary_index_key, + &[], + )?; + + num_indexed_invocation_ctx += 1; + Ok(()) + }, + )?; + + info!( + "Created secondary indexes for {}/{} invocation contexts", + num_indexed_invocation_ctx, num_total_invocation_ctx + ); + + Ok(()) + } + + fn box_clone(&self) -> Box { + Box::new(self.clone()) + } +} + +/// Create a secondary index key from a JSON representation of +/// GraphInvocationCtx +/// Format: namespace|compute_graph_name|created_at_bytes|invocation_id +fn create_secondary_index_key(invocation_ctx: &serde_json::Value) -> Result> { + let namespace = invocation_ctx["namespace"] + .as_str() + .ok_or_else(|| anyhow::anyhow!("Missing namespace in invocation context"))?; + + let compute_graph_name = invocation_ctx["compute_graph_name"] + .as_str() + .ok_or_else(|| anyhow::anyhow!("Missing compute_graph_name in invocation context"))?; + + let created_at = invocation_ctx["created_at"] + .as_u64() + .ok_or_else(|| anyhow::anyhow!("Missing or invalid created_at in invocation context"))?; + + let invocation_id = invocation_ctx["invocation_id"] + .as_str() + .ok_or_else(|| anyhow::anyhow!("Missing invocation_id in invocation context"))?; + + let mut key = Vec::new(); + key.extend_from_slice(namespace.as_bytes()); + key.push(b'|'); + key.extend_from_slice(compute_graph_name.as_bytes()); + key.push(b'|'); + key.extend_from_slice(&created_at.to_be_bytes()); + key.push(b'|'); + key.extend_from_slice(invocation_id.as_bytes()); + + Ok(key) +} + +#[cfg(test)] +mod tests { + use serde_json::json; + + use super::*; + use crate::migrations::testing::MigrationTestBuilder; + + #[test] + fn test_v3_migration() -> Result<()> { + let migration = V3SecondaryIndexesMigration {}; + + MigrationTestBuilder::new() + .with_column_family(IndexifyObjectsColumns::GraphInvocationCtx.as_ref()) + .run_test( + &migration, + |db| { + // Setup: Create test invocation contexts + let contexts = vec![ + json!({ + "namespace": "test_ns", + "compute_graph_name": "graph1", + "graph_version": "1", + "invocation_id": "inv1", + "created_at": 1000, + "completed": false, + "outcome": "Success", + "outstanding_tasks": 0, + "fn_task_analytics": {} + }), + json!({ + "namespace": "test_ns", + "compute_graph_name": "graph1", + "graph_version": "1", + "invocation_id": "inv2", + "created_at": 2000, + "completed": true, + "outcome": "Success", + "outstanding_tasks": 0, + "fn_task_analytics": {} + }), + json!({ + "namespace": "other_ns", + "compute_graph_name": "graph2", + "graph_version": "1", + "invocation_id": "inv3", + "created_at": 3000, + "completed": false, + "outcome": "Success", + "outstanding_tasks": 0, + "fn_task_analytics": {} + }), + ]; + + for ctx_obj in &contexts { + let key = format!( + "{}|{}|{}", + ctx_obj["namespace"].as_str().unwrap(), + ctx_obj["compute_graph_name"].as_str().unwrap(), + ctx_obj["invocation_id"].as_str().unwrap() + ); + let encoded = serde_json::to_vec(ctx_obj).unwrap(); + db.put_cf( + IndexifyObjectsColumns::GraphInvocationCtx.cf_db(db), + key, + &encoded, + )?; + } + + Ok(()) + }, + |db| { + // Verify secondary indexes were created + let contexts = vec![ + json!({ + "namespace": "test_ns", + "compute_graph_name": "graph1", + "graph_version": "1", + "invocation_id": "inv1", + "created_at": 1000, + "completed": false, + "outcome": "Success", + "outstanding_tasks": 0, + "fn_task_analytics": {} + }), + json!({ + "namespace": "test_ns", + "compute_graph_name": "graph1", + "graph_version": "1", + "invocation_id": "inv2", + "created_at": 2000, + "completed": true, + "outcome": "Success", + "outstanding_tasks": 0, + "fn_task_analytics": {} + }), + json!({ + "namespace": "other_ns", + "compute_graph_name": "graph2", + "graph_version": "1", + "invocation_id": "inv3", + "created_at": 3000, + "completed": false, + "outcome": "Success", + "outstanding_tasks": 0, + "fn_task_analytics": {} + }), + ]; + + // Check secondary index CF was created + assert!(db + .cf_handle( + IndexifyObjectsColumns::GraphInvocationCtxSecondaryIndex.as_ref() + ) + .is_some()); + + // Check secondary indexes were created + for ctx_obj in &contexts { + let secondary_key = create_secondary_index_key(ctx_obj)?; + let exists = db + .get_cf( + IndexifyObjectsColumns::GraphInvocationCtxSecondaryIndex.cf_db(db), + &secondary_key, + )? + .is_some(); + + assert!(exists, "Secondary index not found for {:?}", ctx_obj); + } + + // Count secondary indexes + let secondary_indexes = db + .iterator_cf( + IndexifyObjectsColumns::GraphInvocationCtxSecondaryIndex.cf_db(db), + rocksdb::IteratorMode::Start, + ) + .collect::>(); + + assert_eq!( + secondary_indexes.len(), + 3, + "Should have 3 secondary indexes" + ); + + Ok(()) + }, + )?; + + Ok(()) + } +} diff --git a/server/state_store/src/migrations/v4_drop_executors.rs b/server/state_store/src/migrations/v4_drop_executors.rs new file mode 100644 index 000000000..5214c73b5 --- /dev/null +++ b/server/state_store/src/migrations/v4_drop_executors.rs @@ -0,0 +1,121 @@ +use anyhow::Result; +use tracing::info; + +use super::{ + contexts::{MigrationContext, PrepareContext}, + migration_trait::Migration, +}; + +#[derive(Clone)] +/// Migration to remove the deprecated Executors column family +pub struct V4DropExecutorsMigration {} + +impl Migration for V4DropExecutorsMigration { + fn version(&self) -> u64 { + 4 + } + + fn name(&self) -> &'static str { + "Drop Executors column family" + } + + fn prepare(&self, ctx: &PrepareContext) -> Result { + // Check if the Executors CF exists and drop it if needed + let existing_cfs = ctx.list_cfs()?; + + ctx.reopen_with_cf_operations(|db| { + // Drop Executors CF if it exists + if existing_cfs.contains(&"Executors".to_string()) { + info!("Dropping Executors column family"); + db.drop_cf("Executors")?; + } else { + info!("Executors column family doesn't exist, no action needed"); + } + + Ok(()) + }) + } + + fn apply(&self, _ctx: &MigrationContext) -> Result<()> { + // No data migration needed, just log completion + Ok(()) + } + + fn box_clone(&self) -> Box { + Box::new(self.clone()) + } +} + +#[cfg(test)] +mod tests { + use rocksdb::{ColumnFamilyDescriptor, Options, TransactionDB, TransactionDBOptions}; + + use super::*; + use crate::migrations::testing::MigrationTestBuilder; + + #[test] + fn test_v4_migration_with_executors_cf() -> Result<()> { + let migration = V4DropExecutorsMigration {}; + + // Create DB with custom setup that includes Executors CF + let temp_dir = tempfile::TempDir::new()?; + let path = temp_dir.path(); + + // First create a DB with Executors CF + { + let mut db_opts = Options::default(); + db_opts.create_missing_column_families(true); + db_opts.create_if_missing(true); + + let cf_descriptors = vec![ + ColumnFamilyDescriptor::new("default", Options::default()), + ColumnFamilyDescriptor::new("Executors", Options::default()), + ]; + + let _db: TransactionDB = TransactionDB::open_cf_descriptors( + &db_opts, + &TransactionDBOptions::default(), + path, + cf_descriptors, + )?; + // DB is dropped here when it goes out of scope + } + + // Now run migration + let prepare_ctx = PrepareContext::new(path.to_path_buf()); + let db = migration.prepare(&prepare_ctx)?; + + // Verify Executors CF was dropped + let cfs = prepare_ctx.list_cfs()?; + assert!(!cfs.contains(&"Executors".to_string())); + + // Run apply phase (no-op in this case) + let txn = db.transaction(); + let migration_ctx = MigrationContext::new(&db, &txn); + migration.apply(&migration_ctx)?; + txn.commit()?; + + Ok(()) + } + + #[test] + fn test_v4_migration_without_executors_cf() -> Result<()> { + let migration = V4DropExecutorsMigration {}; + + MigrationTestBuilder::new().run_test( + &migration, + |_db| { + // No setup needed - DB doesn't have Executors CF + Ok(()) + }, + |db| { + // Verify migration completes without error + let txn = db.transaction(); + txn.commit()?; + Ok(()) + }, + )?; + + Ok(()) + } +} diff --git a/server/state_store/src/migrations/v5_allocation_keys.rs b/server/state_store/src/migrations/v5_allocation_keys.rs new file mode 100644 index 000000000..e51ac8052 --- /dev/null +++ b/server/state_store/src/migrations/v5_allocation_keys.rs @@ -0,0 +1,328 @@ +use anyhow::Result; +use tracing::info; + +use super::{contexts::MigrationContext, migration_trait::Migration}; +use crate::state_machine::IndexifyObjectsColumns; + +/// Migration to reformat allocation keys and clean up orphaned allocations +#[derive(Clone)] +pub struct V5AllocationKeysMigration {} + +impl Migration for V5AllocationKeysMigration { + fn version(&self) -> u64 { + 5 + } + + fn name(&self) -> &'static str { + "Reformat allocation keys and clean orphaned allocations" + } + + fn apply(&self, ctx: &MigrationContext) -> Result<()> { + // First migrate allocations to new key format and clean orphaned ones + self.migrate_allocations(ctx)?; + + // Then clean orphaned tasks + self.clean_orphaned_tasks(ctx)?; + + Ok(()) + } + + fn box_clone(&self) -> Box { + Box::new(self.clone()) + } +} + +impl V5AllocationKeysMigration { + fn migrate_allocations(&self, ctx: &MigrationContext) -> Result<()> { + let mut num_total_allocations: usize = 0; + let mut num_migrated_allocations: usize = 0; + let mut num_deleted_allocations: usize = 0; + + // We need to collect all keys and values first since we'll be modifying the CF + let mut allocations_to_process = Vec::new(); + + ctx.iterate_cf(&IndexifyObjectsColumns::Allocations, |key, value| { + num_total_allocations += 1; + allocations_to_process.push((key.to_vec(), value.to_vec())); + Ok(()) + })?; + + // Process allocations + for (key, val_bytes) in allocations_to_process { + let allocation: serde_json::Value = ctx.parse_json(&val_bytes)?; + + // Extract fields for new key + let namespace = ctx.get_string_val(&allocation, "namespace")?; + let compute_graph = ctx.get_string_val(&allocation, "compute_graph")?; + let invocation_id = ctx.get_string_val(&allocation, "invocation_id")?; + let compute_fn = ctx.get_string_val(&allocation, "compute_fn")?; + let task_id = ctx.get_string_val(&allocation, "task_id")?; + let executor_id = ctx.get_string_val(&allocation, "executor_id")?; + + // Create new allocation key + let new_allocation_key = format!( + "{}|{}|{}|{}|{}|{}", + namespace, compute_graph, invocation_id, compute_fn, task_id, executor_id + ); + + // Delete the old allocation + ctx.txn + .delete_cf(ctx.cf(&IndexifyObjectsColumns::Allocations), &key)?; + + // Check if the allocation is orphaned by ensuring it has a graph invocation ctx + let invocation_ctx_key = format!("{}|{}|{}", namespace, compute_graph, invocation_id); + + if ctx + .db + .get_cf( + ctx.cf(&IndexifyObjectsColumns::GraphInvocationCtx), + invocation_ctx_key.as_bytes(), + )? + .is_some() + { + // Re-insert with new key + ctx.txn.put_cf( + ctx.cf(&IndexifyObjectsColumns::Allocations), + new_allocation_key.as_bytes(), + &val_bytes, + )?; + num_migrated_allocations += 1; + } else { + // Allocation is orphaned, don't re-insert + num_deleted_allocations += 1; + } + } + + info!( + "Migrated {} allocations and deleted {} orphaned allocations from {} total allocations", + num_migrated_allocations, num_deleted_allocations, num_total_allocations + ); + + Ok(()) + } + + fn clean_orphaned_tasks(&self, ctx: &MigrationContext) -> Result<()> { + let mut num_total_tasks: usize = 0; + let mut num_deleted_tasks: usize = 0; + + // Collect tasks to delete + let mut tasks_to_delete = Vec::new(); + + ctx.iterate_cf(&IndexifyObjectsColumns::Tasks, |key, value| { + num_total_tasks += 1; + + let task: serde_json::Value = ctx.parse_json(&value)?; + + let namespace = ctx.get_string_val(&task, "namespace")?; + let compute_graph = ctx.get_string_val(&task, "compute_graph_name")?; + let invocation_id = ctx.get_string_val(&task, "invocation_id")?; + + // Check if the task is orphaned + let invocation_ctx_key = format!("{}|{}|{}", namespace, compute_graph, invocation_id); + + if ctx + .db + .get_cf( + ctx.cf(&IndexifyObjectsColumns::GraphInvocationCtx), + invocation_ctx_key.as_bytes(), + )? + .is_none() + { + // Task is orphaned, mark for deletion + tasks_to_delete.push(key.to_vec()); + } + + Ok(()) + })?; + + // Delete orphaned tasks + for key in tasks_to_delete { + ctx.txn + .delete_cf(ctx.cf(&IndexifyObjectsColumns::Tasks), &key)?; + num_deleted_tasks += 1; + } + + info!( + "Deleted {} orphaned tasks out of {}", + num_deleted_tasks, num_total_tasks + ); + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use serde_json::json; + + use super::*; + use crate::migrations::testing::MigrationTestBuilder; + + #[test] + fn test_v5_migration() -> Result<()> { + let migration = V5AllocationKeysMigration {}; + + MigrationTestBuilder::new() + .with_column_family(IndexifyObjectsColumns::Allocations.as_ref()) + .with_column_family(IndexifyObjectsColumns::Tasks.as_ref()) + .with_column_family(IndexifyObjectsColumns::GraphInvocationCtx.as_ref()) + .run_test( + &migration, + |db| { + // Setup: Create test allocations and tasks + + // First create invocation contexts to reference + let contexts = vec![ + ( + b"test_ns|test_graph|inv1".to_vec(), + json!({ + "namespace": "test_ns", + "compute_graph_name": "test_graph", + "invocation_id": "inv1", + "graph_version": "1", + "completed": false + }), + ), + // No inv2 - to test orphaned references + ]; + + for (key, value) in &contexts { + db.put_cf( + IndexifyObjectsColumns::GraphInvocationCtx.cf_db(db), + key, + serde_json::to_vec(value)?.as_slice(), + )?; + } + + // Create allocations - some with old-style keys + let allocations = vec![ + ( + b"allocation1".to_vec(), + json!({ + "id": "allocation1", + "namespace": "test_ns", + "compute_graph": "test_graph", + "invocation_id": "inv1", + "compute_fn": "test_fn1", + "task_id": "task1", + "executor_id": "exec1" + }), + ), + ( + b"allocation2".to_vec(), + json!({ + "id": "allocation2", + "namespace": "test_ns", + "compute_graph": "test_graph", + "invocation_id": "inv2", // Orphaned + "compute_fn": "test_fn2", + "task_id": "task2", + "executor_id": "exec2" + }), + ), + ]; + + for (key, value) in &allocations { + db.put_cf( + IndexifyObjectsColumns::Allocations.cf_db(db), + key, + serde_json::to_vec(value)?.as_slice(), + )?; + } + + // Create tasks - some valid, some orphaned + let tasks = vec![ + ( + b"test_ns|test_graph|inv1|test_fn1|task1".to_vec(), + json!({ + "id": "task1", + "namespace": "test_ns", + "compute_graph_name": "test_graph", + "invocation_id": "inv1", + "compute_fn_name": "test_fn1" + }), + ), + ( + b"test_ns|test_graph|inv2|test_fn2|task2".to_vec(), // Orphaned + json!({ + "id": "task2", + "namespace": "test_ns", + "compute_graph_name": "test_graph", + "invocation_id": "inv2", + "compute_fn_name": "test_fn2" + }), + ), + ]; + + for (key, value) in &tasks { + db.put_cf( + IndexifyObjectsColumns::Tasks.cf_db(db), + key, + serde_json::to_vec(value)?.as_slice(), + )?; + } + + Ok(()) + }, + |db| { + // Verify: Check that allocations were migrated correctly and orphaned items + // removed + + // Old allocation keys should be gone + assert!(db + .get_cf( + IndexifyObjectsColumns::Allocations.cf_db(db), + b"allocation1" + )? + .is_none()); + + assert!(db + .get_cf( + IndexifyObjectsColumns::Allocations.cf_db(db), + b"allocation2" + )? + .is_none()); + + // Valid allocation should be migrated with new key format + let new_key = b"test_ns|test_graph|inv1|test_fn1|task1|exec1"; + let migrated_allocation = + db.get_cf(IndexifyObjectsColumns::Allocations.cf_db(db), new_key)?; + + assert!( + migrated_allocation.is_some(), + "Valid allocation should be migrated with new key" + ); + + // Orphaned allocation should not exist + let orphaned_key = b"test_ns|test_graph|inv2|test_fn2|task2|exec2"; + let orphaned_allocation = + db.get_cf(IndexifyObjectsColumns::Allocations.cf_db(db), orphaned_key)?; + + assert!( + orphaned_allocation.is_none(), + "Orphaned allocation should be deleted" + ); + + // Valid task should still exist + let valid_task = db.get_cf( + IndexifyObjectsColumns::Tasks.cf_db(db), + b"test_ns|test_graph|inv1|test_fn1|task1", + )?; + + assert!(valid_task.is_some(), "Valid task should still exist"); + + // Orphaned task should be deleted + let orphaned_task = db.get_cf( + IndexifyObjectsColumns::Tasks.cf_db(db), + b"test_ns|test_graph|inv2|test_fn2|task2", + )?; + + assert!(orphaned_task.is_none(), "Orphaned task should be deleted"); + + Ok(()) + }, + )?; + + Ok(()) + } +} diff --git a/server/state_store/src/migrations/v6_clean_orphaned_tasks.rs b/server/state_store/src/migrations/v6_clean_orphaned_tasks.rs new file mode 100644 index 000000000..03bd058f2 --- /dev/null +++ b/server/state_store/src/migrations/v6_clean_orphaned_tasks.rs @@ -0,0 +1,183 @@ +use anyhow::Result; +use tracing::info; + +use super::{contexts::MigrationContext, migration_trait::Migration}; +use crate::state_machine::IndexifyObjectsColumns; + +#[derive(Clone)] +pub struct V6CleanOrphanedTasksMigration {} + +impl Migration for V6CleanOrphanedTasksMigration { + fn version(&self) -> u64 { + 6 + } + + fn name(&self) -> &'static str { + "Clean orphaned tasks" + } + + fn apply(&self, ctx: &MigrationContext) -> Result<()> { + let mut num_deleted_tasks = 0; + let mut num_total_tasks = 0; + + // We need to collect keys first since we'll be modifying the collection + let mut orphaned_task_keys = Vec::new(); + + ctx.iterate_cf(&IndexifyObjectsColumns::Tasks, |key, value| { + num_total_tasks += 1; + + let task: serde_json::Value = serde_json::from_slice(&value) + .map_err(|e| anyhow::anyhow!("error deserializing Tasks json bytes, {:#?}", e))?; + + let namespace = ctx.get_string_val(&task, "namespace")?; + let compute_graph = ctx.get_string_val(&task, "compute_graph_name")?; + let invocation_id = ctx.get_string_val(&task, "invocation_id")?; + + // Check if the task is orphaned by ensuring it has a graph invocation + let invocation_ctx_key = format!("{}|{}|{}", namespace, compute_graph, invocation_id); + + if ctx + .db + .get_cf( + ctx.cf(&IndexifyObjectsColumns::GraphInvocationCtx), + invocation_ctx_key.as_bytes(), + )? + .is_none() + { + // Mark the task for deletion + orphaned_task_keys.push(key.to_vec()); + } + + Ok(()) + })?; + + // Delete the orphaned tasks + for key in orphaned_task_keys { + ctx.txn + .delete_cf(ctx.cf(&IndexifyObjectsColumns::Tasks), &key)?; + num_deleted_tasks += 1; + } + + info!( + "Deleted {} orphaned tasks out of {}", + num_deleted_tasks, num_total_tasks + ); + + Ok(()) + } + + fn box_clone(&self) -> Box { + Box::new(self.clone()) + } +} + +#[cfg(test)] +mod tests { + use serde_json::json; + + use super::*; + use crate::migrations::testing::MigrationTestBuilder; + + #[test] + fn test_v5_clean_orphaned_tasks() -> Result<()> { + let migration = V6CleanOrphanedTasksMigration {}; + + MigrationTestBuilder::new() + .with_column_family(IndexifyObjectsColumns::Tasks.as_ref()) + .with_column_family(IndexifyObjectsColumns::GraphInvocationCtx.as_ref()) + .run_test( + &migration, + |db| { + // Setup: Create tasks with different invocation statuses + let tasks = vec![ + ( + b"test_ns|test_graph|invocation1|test_fn|task1".to_vec(), + json!({ + "id": "task1", + "namespace": "test_ns", + "compute_graph_name": "test_graph", + "invocation_id": "invocation1", + "compute_fn_name": "test_fn", + "input_node_output_key": "test_input", + "graph_version": "1", + "outcome": "Success", + "creation_time_ns": 0, + }), + ), + ( + b"test_ns|test_graph|invocation2|test_fn|task2".to_vec(), + json!({ + "id": "task2", + "namespace": "test_ns", + "compute_graph_name": "test_graph", + "invocation_id": "invocation2", // Orphaned + "compute_fn_name": "test_fn", + "input_node_output_key": "test_input", + "graph_version": "1", + "outcome": "Failure", + "creation_time_ns": 0, + }), + ), + ]; + + for (key, value) in &tasks { + db.put_cf( + IndexifyObjectsColumns::Tasks.cf_db(db), + key, + serde_json::to_vec(value)?.as_slice(), + )?; + } + + // Create invocation context only for invocation1 + let invocation_ctx = json!({ + "namespace": "test_ns", + "compute_graph_name": "test_graph", + "graph_version": "1", + "invocation_id": "invocation1", + "completed": false, + "outcome": "Undefined", + "outstanding_tasks": 0, + "fn_task_analytics": {} + }); + + let key = format!( + "{}|{}|{}", + invocation_ctx["namespace"].as_str().unwrap(), + invocation_ctx["compute_graph_name"].as_str().unwrap(), + invocation_ctx["invocation_id"].as_str().unwrap() + ); + + db.put_cf( + IndexifyObjectsColumns::GraphInvocationCtx.cf_db(db), + key, + serde_json::to_vec(&invocation_ctx)?.as_slice(), + )?; + + Ok(()) + }, + |db| { + // Verify: Check that orphaned task was deleted + + // Task1 (not orphaned) should still exist + let task1_key = b"test_ns|test_graph|invocation1|test_fn|task1"; + let task1_exists = db + .get_cf(IndexifyObjectsColumns::Tasks.cf_db(db), task1_key)? + .is_some(); + + assert!(task1_exists, "Task1 should still exist"); + + // Task2 (orphaned) should be deleted + let task2_key = b"test_ns|test_graph|invocation2|test_fn|task2"; + let task2_exists = db + .get_cf(IndexifyObjectsColumns::Tasks.cf_db(db), task2_key)? + .is_some(); + + assert!(!task2_exists, "Task2 should be deleted"); + + Ok(()) + }, + )?; + + Ok(()) + } +} diff --git a/server/state_store/src/migrations/v7_reset_allocated_tasks.rs b/server/state_store/src/migrations/v7_reset_allocated_tasks.rs new file mode 100644 index 000000000..3b7676237 --- /dev/null +++ b/server/state_store/src/migrations/v7_reset_allocated_tasks.rs @@ -0,0 +1,244 @@ +use anyhow::Result; +use tracing::info; + +use super::{contexts::MigrationContext, migration_trait::Migration}; +use crate::state_machine::IndexifyObjectsColumns; + +/// Migration to reset pending tasks and remove allocations +#[derive(Clone)] +pub struct V7ResetAllocatedTasksMigration {} + +impl Migration for V7ResetAllocatedTasksMigration { + fn version(&self) -> u64 { + 7 + } + + fn name(&self) -> &'static str { + "Reset allocated tasks and drop allocations" + } + + fn apply(&self, ctx: &MigrationContext) -> Result<()> { + let mut num_total_allocations: usize = 0; + let mut num_deleted_allocations: usize = 0; + let mut num_updated_tasks: usize = 0; + + // Collect all allocations + let mut allocations_to_process = Vec::new(); + + ctx.iterate_cf(&IndexifyObjectsColumns::Allocations, |key, value| { + num_total_allocations += 1; + allocations_to_process.push((key.to_vec(), value.to_vec())); + Ok(()) + })?; + + // Process each allocation + for (key, val_bytes) in allocations_to_process { + let allocation: serde_json::Value = ctx.parse_json(&val_bytes)?; + + // Extract task information from the allocation + let namespace = ctx.get_string_val(&allocation, "namespace")?; + let compute_graph = ctx.get_string_val(&allocation, "compute_graph")?; + let invocation_id = ctx.get_string_val(&allocation, "invocation_id")?; + let compute_fn = ctx.get_string_val(&allocation, "compute_fn")?; + let task_id = ctx.get_string_val(&allocation, "task_id")?; + + // Construct the task key + let task_key = format!( + "{}|{}|{}|{}|{}", + namespace, compute_graph, invocation_id, compute_fn, task_id + ); + + // Update the task status to Pending + let updated = ctx.update_json( + &IndexifyObjectsColumns::Tasks, + task_key.as_bytes(), + |task| { + if let Some(task_obj) = task.as_object_mut() { + task_obj.insert( + "status".to_string(), + serde_json::Value::String("Pending".to_string()), + ); + num_updated_tasks += 1; + Ok(true) + } else { + Ok(false) + } + }, + )?; + + if !updated { + info!("Task not found for allocation: {}", task_key); + } + + // Delete the allocation + ctx.txn + .delete_cf(ctx.cf(&IndexifyObjectsColumns::Allocations), &key)?; + num_deleted_allocations += 1; + } + + info!( + "Dropped {} allocations and updated {} tasks out of {} total allocations", + num_deleted_allocations, num_updated_tasks, num_total_allocations + ); + + Ok(()) + } + + fn box_clone(&self) -> Box { + Box::new(self.clone()) + } +} + +#[cfg(test)] +mod tests { + use serde_json::json; + + use super::*; + use crate::migrations::testing::MigrationTestBuilder; + + #[test] + fn test_v6_migration() -> Result<()> { + let migration = V7ResetAllocatedTasksMigration {}; + + MigrationTestBuilder::new() + .with_column_family(IndexifyObjectsColumns::Allocations.as_ref()) + .with_column_family(IndexifyObjectsColumns::Tasks.as_ref()) + .run_test( + &migration, + |db| { + // Setup: Create test allocations and tasks + + // Create tasks with different statuses + let tasks = vec![ + ( + b"test_ns|test_graph|inv1|test_fn1|task1".to_vec(), + json!({ + "id": "task1", + "namespace": "test_ns", + "compute_graph_name": "test_graph", + "invocation_id": "inv1", + "compute_fn_name": "test_fn1", + "status": "Running" + }), + ), + ( + b"test_ns|test_graph|inv1|test_fn2|task2".to_vec(), + json!({ + "id": "task2", + "namespace": "test_ns", + "compute_graph_name": "test_graph", + "invocation_id": "inv1", + "compute_fn_name": "test_fn2", + "status": "Running" + }), + ), + // This task has no allocation, should remain unchanged + ( + b"test_ns|test_graph|inv1|test_fn3|task3".to_vec(), + json!({ + "id": "task3", + "namespace": "test_ns", + "compute_graph_name": "test_graph", + "invocation_id": "inv1", + "compute_fn_name": "test_fn3", + "status": "Running" + }), + ), + ]; + + for (key, value) in &tasks { + db.put_cf( + IndexifyObjectsColumns::Tasks.cf_db(db), + key, + serde_json::to_vec(value)?.as_slice(), + )?; + } + + // Create allocations + let allocations = vec![ + ( + b"test_ns|test_graph|inv1|test_fn1|task1|exec1".to_vec(), + json!({ + "namespace": "test_ns", + "compute_graph": "test_graph", + "invocation_id": "inv1", + "compute_fn": "test_fn1", + "task_id": "task1", + "executor_id": "exec1" + }), + ), + ( + b"test_ns|test_graph|inv1|test_fn2|task2|exec2".to_vec(), + json!({ + "namespace": "test_ns", + "compute_graph": "test_graph", + "invocation_id": "inv1", + "compute_fn": "test_fn2", + "task_id": "task2", + "executor_id": "exec2" + }), + ), + // This allocation has no task, it should be deleted without error + ( + b"test_ns|test_graph|inv1|test_fn4|task4|exec1".to_vec(), + json!({ + "namespace": "test_ns", + "compute_graph": "test_graph", + "invocation_id": "inv1", + "compute_fn": "test_fn4", + "task_id": "task4", + "executor_id": "exec1" + }), + ), + ]; + + for (key, value) in &allocations { + db.put_cf( + IndexifyObjectsColumns::Allocations.cf_db(db), + key, + serde_json::to_vec(value)?.as_slice(), + )?; + } + + Ok(()) + }, + |db| { + // Verify: All allocations should be deleted, tasks should be updated + + // Check no allocations remain + let all_allocations = db + .iterator_cf( + IndexifyObjectsColumns::Allocations.cf_db(db), + rocksdb::IteratorMode::Start, + ) + .collect::>(); + + assert!( + all_allocations.is_empty(), + "All allocations should be deleted" + ); + + // Check task statuses were updated properly + let check_task_status = |key: &[u8], expected_status: &str| -> Result<()> { + let bytes = db + .get_cf(IndexifyObjectsColumns::Tasks.cf_db(db), key)? + .unwrap(); + let task: serde_json::Value = serde_json::from_slice(&bytes)?; + assert_eq!(task["status"].as_str().unwrap(), expected_status); + Ok(()) + }; + + // Task1 and Task2 should be Pending + check_task_status(b"test_ns|test_graph|inv1|test_fn1|task1", "Pending")?; + check_task_status(b"test_ns|test_graph|inv1|test_fn2|task2", "Pending")?; + + // Task3 should still be Running (unchanged) + check_task_status(b"test_ns|test_graph|inv1|test_fn3|task3", "Running")?; + + Ok(()) + }, + )?; + + Ok(()) + } +} diff --git a/server/state_store/src/migrations/v8_rebuild_invocation_ctx_secondary_index.rs b/server/state_store/src/migrations/v8_rebuild_invocation_ctx_secondary_index.rs new file mode 100644 index 000000000..47bccb56c --- /dev/null +++ b/server/state_store/src/migrations/v8_rebuild_invocation_ctx_secondary_index.rs @@ -0,0 +1,290 @@ +use anyhow::{Context, Result}; +use tracing::info; + +use super::{ + contexts::{MigrationContext, PrepareContext}, + migration_trait::Migration, +}; +use crate::state_machine::IndexifyObjectsColumns; + +/// Migration to rebuild the invocation context secondary indexes by dropping +/// and recreating the column family +#[derive(Clone)] +pub struct V8RebuildInvocationCtxSecondaryIndexMigration {} + +impl Migration for V8RebuildInvocationCtxSecondaryIndexMigration { + fn version(&self) -> u64 { + 8 + } + + fn name(&self) -> &'static str { + "Rebuild invocation context secondary indexes" + } + + fn prepare(&self, ctx: &PrepareContext) -> Result { + // Drop and recreate the secondary index column family instead of truncating + info!("Rebuilding secondary index column family"); + + ctx.reopen_with_cf_operations(|db| { + // Drop if exists + let cf_name = IndexifyObjectsColumns::GraphInvocationCtxSecondaryIndex.as_ref(); + if db.cf_handle(cf_name).is_some() { + info!("Dropping secondary index column family"); + db.drop_cf(cf_name)?; + } + + // Create fresh + info!("Creating new secondary index column family"); + db.create_cf(cf_name, &rocksdb::Options::default())?; + + Ok(()) + }) + } + + fn apply(&self, ctx: &MigrationContext) -> Result<()> { + let mut num_total_invocation_ctx: usize = 0; + let mut num_migrated_invocation_ctx: usize = 0; + + ctx.iterate_cf( + &IndexifyObjectsColumns::GraphInvocationCtx, + |_key, value| { + num_total_invocation_ctx += 1; + + // Parse the invocation context as JSON + let invocation_ctx: serde_json::Value = serde_json::from_slice(value) + .context("Error parsing GraphInvocationCtx as JSON")?; + + // Create secondary index key + let secondary_index_key = create_secondary_index_key(&invocation_ctx)?; + + // Store the secondary index (key -> empty value) + ctx.txn.put_cf( + ctx.cf(&IndexifyObjectsColumns::GraphInvocationCtxSecondaryIndex), + &secondary_index_key, + &[], + )?; + + num_migrated_invocation_ctx += 1; + Ok(()) + }, + )?; + + info!( + "Rebuilt secondary indexes for {}/{} invocation contexts", + num_migrated_invocation_ctx, num_total_invocation_ctx + ); + + Ok(()) + } + + fn box_clone(&self) -> Box { + Box::new(self.clone()) + } +} + +/// Create a secondary index key from a JSON representation of +/// GraphInvocationCtx +/// Format: namespace|compute_graph_name|created_at_bytes|invocation_id +fn create_secondary_index_key(invocation_ctx: &serde_json::Value) -> Result> { + let namespace = invocation_ctx["namespace"] + .as_str() + .ok_or_else(|| anyhow::anyhow!("Missing namespace in invocation context"))?; + + let compute_graph_name = invocation_ctx["compute_graph_name"] + .as_str() + .ok_or_else(|| anyhow::anyhow!("Missing compute_graph_name in invocation context"))?; + + let created_at = invocation_ctx["created_at"] + .as_u64() + .ok_or_else(|| anyhow::anyhow!("Missing or invalid created_at in invocation context"))?; + + let invocation_id = invocation_ctx["invocation_id"] + .as_str() + .ok_or_else(|| anyhow::anyhow!("Missing invocation_id in invocation context"))?; + + let mut key = Vec::new(); + key.extend_from_slice(namespace.as_bytes()); + key.push(b'|'); + key.extend_from_slice(compute_graph_name.as_bytes()); + key.push(b'|'); + key.extend_from_slice(&created_at.to_be_bytes()); + key.push(b'|'); + key.extend_from_slice(invocation_id.as_bytes()); + + Ok(key) +} + +#[cfg(test)] +mod tests { + use serde_json::json; + + use super::*; + use crate::migrations::testing::MigrationTestBuilder; + + #[test] + fn test_v7_migration() -> Result<()> { + let migration = V8RebuildInvocationCtxSecondaryIndexMigration {}; + + MigrationTestBuilder::new() + .with_column_family(IndexifyObjectsColumns::GraphInvocationCtx.as_ref()) + .with_column_family(IndexifyObjectsColumns::GraphInvocationCtxSecondaryIndex.as_ref()) + .run_test( + &migration, + |db| { + // Setup: Create invocation contexts and invalid secondary indexes + + // Create contexts + let contexts = vec![ + json!({ + "namespace": "test_ns", + "compute_graph_name": "graph1", + "graph_version": "1", + "invocation_id": "inv1", + "created_at": 1000, + "completed": false, + "outcome": "Success", + "outstanding_tasks": 0, + "fn_task_analytics": {} + }), + json!({ + "namespace": "test_ns", + "compute_graph_name": "graph1", + "graph_version": "1", + "invocation_id": "inv2", + "created_at": 2000, + "completed": true, + "outcome": "Success", + "outstanding_tasks": 0, + "fn_task_analytics": {} + }), + json!({ + "namespace": "other_ns", + "compute_graph_name": "graph2", + "graph_version": "1", + "invocation_id": "inv3", + "created_at": 3000, + "completed": false, + "outcome": "Success", + "outstanding_tasks": 0, + "fn_task_analytics": {} + }), + ]; + + for ctx_obj in &contexts { + let key = format!( + "{}|{}|{}", + ctx_obj["namespace"].as_str().unwrap(), + ctx_obj["compute_graph_name"].as_str().unwrap(), + ctx_obj["invocation_id"].as_str().unwrap() + ); + let encoded = serde_json::to_vec(ctx_obj).unwrap(); + db.put_cf( + IndexifyObjectsColumns::GraphInvocationCtx.cf_db(db), + key, + &encoded, + )?; + } + + // Create some invalid secondary indexes (to be replaced) + db.put_cf( + IndexifyObjectsColumns::GraphInvocationCtxSecondaryIndex.cf_db(db), + b"invalid_index_1", + &[], + )?; + + db.put_cf( + IndexifyObjectsColumns::GraphInvocationCtxSecondaryIndex.cf_db(db), + b"invalid_index_2", + &[], + )?; + + Ok(()) + }, + |db| { + // Verify: Invalid secondary indexes should be gone, correct ones created + + // Check invalid indexes are gone + assert!(db + .get_cf( + IndexifyObjectsColumns::GraphInvocationCtxSecondaryIndex.cf_db(db), + b"invalid_index_1" + )? + .is_none()); + + assert!(db + .get_cf( + IndexifyObjectsColumns::GraphInvocationCtxSecondaryIndex.cf_db(db), + b"invalid_index_2" + )? + .is_none()); + + // Check correct indexes are present + let contexts = vec![ + json!({ + "namespace": "test_ns", + "compute_graph_name": "graph1", + "graph_version": "1", + "invocation_id": "inv1", + "created_at": 1000, + "completed": false, + "outcome": "Success", + "outstanding_tasks": 0, + "fn_task_analytics": {} + }), + json!({ + "namespace": "test_ns", + "compute_graph_name": "graph1", + "graph_version": "1", + "invocation_id": "inv2", + "created_at": 2000, + "completed": true, + "outcome": "Success", + "outstanding_tasks": 0, + "fn_task_analytics": {} + }), + json!({ + "namespace": "other_ns", + "compute_graph_name": "graph2", + "graph_version": "1", + "invocation_id": "inv3", + "created_at": 3000, + "completed": false, + "outcome": "Success", + "outstanding_tasks": 0, + "fn_task_analytics": {} + }), + ]; + + for ctx_obj in &contexts { + let secondary_key = create_secondary_index_key(ctx_obj)?; + let exists = db + .get_cf( + IndexifyObjectsColumns::GraphInvocationCtxSecondaryIndex.cf_db(db), + &secondary_key, + )? + .is_some(); + + assert!(exists, "Secondary index not found for {:?}", ctx_obj); + } + + // Check total count of secondary indexes + let secondary_indexes = db + .iterator_cf( + IndexifyObjectsColumns::GraphInvocationCtxSecondaryIndex.cf_db(db), + rocksdb::IteratorMode::Start, + ) + .collect::>(); + + assert_eq!( + secondary_indexes.len(), + 3, + "Should have 3 secondary indexes" + ); + + Ok(()) + }, + )?; + + Ok(()) + } +} From e329f7514369176dbbdd3e442e9607df683ce700 Mon Sep 17 00:00:00 2001 From: Benjamin Boudreau Date: Fri, 11 Apr 2025 16:54:07 -0400 Subject: [PATCH 3/5] fix: list invocations pagination --- server/src/http_objects.rs | 3 +- server/src/routes.rs | 10 +- server/state_store/src/scanner.rs | 78 +++++++++--- .../Namespace/IndividualComputeGraphPage.tsx | 118 +++++++++--------- server/ui/src/routes/Namespace/types.ts | 3 +- server/ui/src/utils/loaders.ts | 38 ++++-- 6 files changed, 162 insertions(+), 88 deletions(-) diff --git a/server/src/http_objects.rs b/server/src/http_objects.rs index d129c3580..a8971ca60 100644 --- a/server/src/http_objects.rs +++ b/server/src/http_objects.rs @@ -421,7 +421,8 @@ pub struct CreateNamespaceResponse { #[derive(Debug, Serialize, Deserialize, ToSchema)] pub struct GraphInvocations { pub invocations: Vec, - pub cursor: Option, + pub prev_cursor: Option, + pub next_cursor: Option, } #[derive(Debug, Serialize, Deserialize, ToSchema)] diff --git a/server/src/routes.rs b/server/src/routes.rs index 3876feae7..57c28cd92 100644 --- a/server/src/routes.rs +++ b/server/src/routes.rs @@ -636,7 +636,7 @@ async fn graph_invocations( Some(CursorDirection::Backward) => Some(state_store::scanner::CursorDirection::Backward), None => None, }; - let (invocation_ctxs, cursor) = state + let (invocation_ctxs, prev_cursor, next_cursor) = state .indexify_state .reader() .list_invocations( @@ -651,11 +651,13 @@ async fn graph_invocations( for invocation_ctx in invocation_ctxs { invocations.push(invocation_ctx.into()); } - let cursor = cursor.map(|c| BASE64_STANDARD.encode(c)); + let prev_cursor = prev_cursor.map(|c| BASE64_STANDARD.encode(c)); + let next_cursor = next_cursor.map(|c| BASE64_STANDARD.encode(c)); Ok(Json(GraphInvocations { invocations, - cursor, + prev_cursor, + next_cursor, })) } @@ -971,7 +973,7 @@ async fn find_invocation( Ok(Json(invocation_ctx.into())) } -/// Delete a specific invocation +/// Delete a specific invocation #[utoipa::path( delete, path = "/namespaces/{namespace}/compute_graphs/{compute_graph}/invocations/{invocation_id}", diff --git a/server/state_store/src/scanner.rs b/server/state_store/src/scanner.rs index ffa0a0cd7..aab01f714 100644 --- a/server/state_store/src/scanner.rs +++ b/server/state_store/src/scanner.rs @@ -17,6 +17,7 @@ use data_model::{ TaskOutputsIngestedEvent, UnprocessedStateChanges, }; +use indexify_utils::get_epoch_time_in_ms; use metrics::Timer; use opentelemetry::KeyValue; use rocksdb::{Direction, IteratorMode, ReadOptions, TransactionDB}; @@ -493,21 +494,21 @@ impl StateReader { cursor: Option<&[u8]>, limit: usize, direction: Option, - ) -> Result<(Vec, Option>)> { + ) -> Result<(Vec, Option>, Option>)> { let kvs = &[KeyValue::new("op", "list_invocations")]; let _timer = Timer::start_with_labels(&self.metrics.state_read, kvs); - let key_prefix = [namespace.as_bytes(), b"|", compute_graph.as_bytes()].concat(); + let key_prefix = [namespace.as_bytes(), b"|", compute_graph.as_bytes(), b"|"].concat(); let direction = direction.unwrap_or(CursorDirection::Forward); let mut read_options = ReadOptions::default(); read_options.set_readahead_size(10_194_304); let mut upper_bound = key_prefix.clone(); - upper_bound.push(0xff); + upper_bound.extend(&get_epoch_time_in_ms().to_be_bytes()); read_options.set_iterate_upper_bound(upper_bound); let mut lower_bound = key_prefix.clone(); - lower_bound.push(0x00); + lower_bound.extend(&(0 as u64).to_be_bytes()); read_options.set_iterate_lower_bound(lower_bound); let mut iter = self.db.raw_iterator_cf_opt( @@ -516,29 +517,67 @@ impl StateReader { ); match cursor { - Some(cursor) => iter.seek(cursor), - None => iter.seek_to_last(), + Some(cursor) => { + match direction { + CursorDirection::Backward => iter.seek(cursor), + CursorDirection::Forward => iter.seek_for_prev(cursor), + } + // Skip the first item (cursor position) + if iter.valid() { + match direction { + CursorDirection::Forward => iter.prev(), + CursorDirection::Backward => iter.next(), + } + } + } + None => match direction { + CursorDirection::Backward => iter.seek_to_first(), // Start at beginning of range + CursorDirection::Forward => iter.seek_to_last(), // Start at end of range + }, } let mut rows = Vec::new(); let mut next_cursor = None; + let mut prev_cursor = None; // Collect results - while iter.valid() { + while iter.valid() && rows.len() < limit { if let Some((key, _v)) = iter.item() { - if rows.len() < limit { - rows.push(key.to_vec()); - } else { - next_cursor = Some(key.to_vec()); - break; - } + rows.push(key.to_vec()); } else { - break; + break; // No valid item found } + // Move the iterator after capturing the current item match direction { - CursorDirection::Backward => iter.next(), CursorDirection::Forward => iter.prev(), + CursorDirection::Backward => iter.next(), + } + } + + // Check if there are more items after our limit + if iter.valid() { + let key = rows.last().cloned(); + match direction { + CursorDirection::Forward => { + next_cursor = key; + } + CursorDirection::Backward => { + prev_cursor = key; + } + } + } + + // Set the previous cursor if we have a valid item + if cursor.is_some() { + let key = rows.first().cloned(); + match direction { + CursorDirection::Forward => { + prev_cursor = key; + } + CursorDirection::Backward => { + next_cursor = key; + } } } @@ -556,6 +595,14 @@ impl StateReader { } } + match direction { + CursorDirection::Forward => {} + CursorDirection::Backward => { + // We keep the ordering the same even if we traverse in the opposite direction + invocation_prefixes.reverse(); + } + } + let invocations = self.get_rows_from_cf_multi_key::( invocation_prefixes.iter().map(|v| v.as_slice()).collect(), IndexifyObjectsColumns::GraphInvocationCtx, @@ -563,6 +610,7 @@ impl StateReader { Ok(( invocations.into_iter().filter_map(|v| v).collect(), + prev_cursor, next_cursor, )) } diff --git a/server/ui/src/routes/Namespace/IndividualComputeGraphPage.tsx b/server/ui/src/routes/Namespace/IndividualComputeGraphPage.tsx index 04982cd77..605e85369 100644 --- a/server/ui/src/routes/Namespace/IndividualComputeGraphPage.tsx +++ b/server/ui/src/routes/Namespace/IndividualComputeGraphPage.tsx @@ -1,4 +1,12 @@ -import { Box, Breadcrumbs, Typography, Stack, Chip, Button, CircularProgress } from '@mui/material' +import { + Box, + Breadcrumbs, + Typography, + Stack, + Chip, + Button, + CircularProgress, +} from '@mui/material' import { TableDocument } from 'iconsax-react' import NavigateNextIcon from '@mui/icons-material/NavigateNext' import NavigateBeforeIcon from '@mui/icons-material/NavigateBefore' @@ -14,13 +22,18 @@ import axios from 'axios' import { getIndexifyServiceURL } from '../../utils/helpers' const IndividualComputeGraphPage = () => { - const { invocationsList, computeGraph, namespace, cursor } = useLoaderData() as IndividualComputeGraphLoaderData + const { + invocationsList, + computeGraph, + namespace, + prevCursor: prevCursorLoader, + nextCursor: nextCursorLoader, + } = useLoaderData() as IndividualComputeGraphLoaderData const [invocations, setInvocations] = useState(invocationsList) const [isLoading, setIsLoading] = useState(false) - const [currentCursor, setCurrentCursor] = useState(null) - const [nextCursor, setNextCursor] = useState(cursor) - const [cursorHistory, setCursorHistory] = useState([]) + const [prevCursor, setPrevCursor] = useState(prevCursorLoader) + const [nextCursor, setNextCursor] = useState(nextCursorLoader) const handleDelete = useCallback((updatedList: Invocation[]) => { const sortedList = [...updatedList].sort( @@ -29,56 +42,49 @@ const IndividualComputeGraphPage = () => { setInvocations(sortedList) }, []) - const fetchInvocations = useCallback(async (cursor: string | null, direction: 'forward' | 'backward') => { - setIsLoading(true) - try { - const serviceURL = getIndexifyServiceURL() - const limit = 20 - const url = `${serviceURL}/namespaces/${namespace}/compute_graphs/${computeGraph.name}/invocations?limit=${limit}${ - cursor ? `&cursor=${cursor}` : '' - }&direction=${direction}` - - const response = await axios.get(url) - const data = response.data - - setInvocations([...data.invocations].sort( - (a, b) => (b.created_at ?? 0) - (a.created_at ?? 0) - )) - - if (direction === 'forward') { - if (cursor) { - setCursorHistory(prev => [...prev, cursor]) - } - setCurrentCursor(cursor) - setNextCursor(data.cursor || null) - } else { - if (cursorHistory.length > 0) { - setCursorHistory(prev => prev.slice(0, -1)) - } - setCurrentCursor(cursorHistory.length > 1 ? cursorHistory[cursorHistory.length - 2] : null) - setNextCursor(data.cursor || null) + const fetchInvocations = useCallback( + async (cursor: string | null, direction: 'forward' | 'backward') => { + setIsLoading(true) + try { + const serviceURL = getIndexifyServiceURL() + const limit = 20 + const url = `${serviceURL}/namespaces/${namespace}/compute_graphs/${ + computeGraph.name + }/invocations?limit=${limit}${ + cursor ? `&cursor=${cursor}` : '' + }&direction=${direction}` + + const response = await axios.get(url) + const data = response.data + + setInvocations([...data.invocations]) + + setPrevCursor(data.prev_cursor) + setNextCursor(data.next_cursor) + console.log(direction, { + prevCursor: data.prev_cursor, + nextCursor: data.next_cursor, + }) + } catch (error) { + console.error('Error fetching invocations:', error) + } finally { + setIsLoading(false) } - } catch (error) { - console.error("Error fetching invocations:", error) - } finally { - setIsLoading(false) - } - }, [namespace, computeGraph.name, cursorHistory]) + }, + [namespace, computeGraph.name] + ) const handleNextPage = useCallback(() => { - const cursor = nextCursor || currentCursor - if (cursor) { - fetchInvocations(cursor, 'forward') + if (nextCursor) { + fetchInvocations(nextCursor, 'forward') } - }, [nextCursor, currentCursor, fetchInvocations]) + }, [nextCursor, fetchInvocations]) const handlePreviousPage = useCallback(() => { - if (cursorHistory.length > 0) { - const prevCursor = cursorHistory[cursorHistory.length - 1] + if (prevCursor) { fetchInvocations(prevCursor, 'backward') } - }, [cursorHistory, fetchInvocations]) - + }, [prevCursor, fetchInvocations]) return ( { onDelete={handleDelete} /> - - + {isLoading && } - +