Skip to content

Commit 301df49

Browse files
committed
fix: deletion of invocation impacting whole invocation secondary index
1 parent afd0ff9 commit 301df49

File tree

3 files changed

+88
-40
lines changed

3 files changed

+88
-40
lines changed

server/data_model/src/lib.rs

-12
Original file line numberDiff line numberDiff line change
@@ -830,18 +830,6 @@ impl GraphInvocationCtx {
830830
key
831831
}
832832

833-
pub fn secondary_index_key_prefix_from_compute_graph(
834-
namespace: &str,
835-
compute_graph_name: &str,
836-
) -> Vec<u8> {
837-
let mut key = Vec::new();
838-
key.extend_from_slice(namespace.as_bytes());
839-
key.push(b'|');
840-
key.extend_from_slice(compute_graph_name.as_bytes());
841-
key.push(b'|');
842-
key
843-
}
844-
845833
pub fn get_invocation_id_from_secondary_index_key(key: &[u8]) -> Option<String> {
846834
key.split(|&b| b == b'|')
847835
.nth(3)

server/state_store/src/migrations.rs

+53-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use crate::{
1818
state_machine::IndexifyObjectsColumns,
1919
};
2020

21-
const SERVER_DB_VERSION: u64 = 7;
21+
const SERVER_DB_VERSION: u64 = 8;
2222

2323
// Note: should never be used with data model types to guarantee it works with
2424
// different versions.
@@ -116,6 +116,12 @@ pub fn migrate(path: &Path) -> Result<StateMachineMetadata> {
116116
.context("migrating from v6 to v7")?;
117117
}
118118

119+
if sm_meta.db_version == 7 {
120+
sm_meta.db_version += 1;
121+
migrate_v7_to_v8_recompute_invocation_ctx_secondary_index(&db, &txn)
122+
.context("migrating from v7 to v8")?;
123+
}
124+
119125
// add new migrations before this line and increment SERVER_DB_VERSION
120126
}
121127

@@ -536,6 +542,52 @@ pub fn migrate_v6_to_v7_reallocate_allocated_tasks(
536542
Ok(())
537543
}
538544

545+
#[tracing::instrument(skip(db, txn))]
546+
pub fn migrate_v7_to_v8_recompute_invocation_ctx_secondary_index(
547+
db: &TransactionDB,
548+
txn: &Transaction<TransactionDB>,
549+
) -> Result<()> {
550+
let mut num_total_invocation_ctx: usize = 0;
551+
let mut num_migrated_invocation_ctx: usize = 0;
552+
553+
panic!("TODO");
554+
555+
{
556+
let mut read_options = ReadOptions::default();
557+
read_options.set_readahead_size(4_194_304);
558+
559+
let iter = db.iterator_cf_opt(
560+
&IndexifyObjectsColumns::GraphInvocationCtx.cf_db(&db),
561+
read_options,
562+
IteratorMode::Start,
563+
);
564+
565+
for kv in iter {
566+
num_total_invocation_ctx += 1;
567+
let (_key, value) = kv?;
568+
569+
let graph_invocation_ctx = JsonEncoder::decode::<GraphInvocationCtx>(&value)?;
570+
571+
let secondary_index_key =
572+
GraphInvocationCtx::secondary_index_key(&graph_invocation_ctx);
573+
574+
txn.put_cf(
575+
&IndexifyObjectsColumns::GraphInvocationCtxSecondaryIndex.cf_db(&db),
576+
&secondary_index_key,
577+
&[],
578+
)?;
579+
580+
num_migrated_invocation_ctx += 1;
581+
}
582+
}
583+
584+
info!(
585+
"Migrated {}/{} invocation context secondary indexes from v3 to v4",
586+
num_migrated_invocation_ctx, num_total_invocation_ctx
587+
);
588+
Ok(())
589+
}
590+
539591
pub fn write_sm_meta(
540592
db: &TransactionDB,
541593
txn: &Transaction<TransactionDB>,

server/state_store/src/state_machine.rs

+35-27
Original file line numberDiff line numberDiff line change
@@ -161,14 +161,39 @@ pub(crate) fn delete_invocation(
161161
"Deleting invocation",
162162
);
163163

164-
// Delete the invocation payload
165-
let invocation_key =
164+
// Check if the invocation was deleted before the task completes
165+
let invocation_ctx_key =
166166
GraphInvocationCtx::key_from(&req.namespace, &req.compute_graph, &req.invocation_id);
167-
delete_cf_prefix(
168-
txn,
169-
&IndexifyObjectsColumns::GraphInvocations.cf_db(&db),
170-
invocation_key.as_bytes(),
171-
)?;
167+
let invocation_ctx = txn
168+
.get_cf(
169+
&IndexifyObjectsColumns::GraphInvocationCtx.cf_db(&db),
170+
&invocation_ctx_key,
171+
)
172+
.map_err(|e| anyhow!("failed to get invocation: {}", e))?;
173+
let invocation_ctx = match invocation_ctx {
174+
Some(v) => JsonEncoder::decode::<GraphInvocationCtx>(&v)?,
175+
None => {
176+
info!(
177+
namespace = &req.namespace,
178+
compute_graph = &req.compute_graph,
179+
invocation_id = &req.invocation_id,
180+
"Invocation to delete not found: {}",
181+
&req.invocation_id
182+
);
183+
return Ok(());
184+
}
185+
};
186+
187+
// Delete the invocation payload
188+
{
189+
let invocation_key =
190+
InvocationPayload::key_from(&req.namespace, &req.compute_graph, &req.invocation_id);
191+
192+
txn.delete_cf(
193+
&IndexifyObjectsColumns::GraphInvocations.cf_db(&db),
194+
&invocation_key,
195+
)?;
196+
}
172197

173198
let mut tasks_deleted = Vec::new();
174199
let task_prefix =
@@ -247,26 +272,16 @@ pub(crate) fn delete_invocation(
247272
}
248273

249274
// Delete Graph Invocation Context
250-
251275
delete_cf_prefix(
252276
txn,
253277
IndexifyObjectsColumns::GraphInvocationCtx.cf_db(&db),
254-
invocation_key.as_bytes(),
278+
invocation_ctx_key.as_bytes(),
255279
)?;
256280

257281
// Delete Graph Invocation Context Secondary Index
258-
// Note We don't delete the secondary index here because it's too much work to
259-
// get the invocation id from the secondary index key. We purge all the
260-
// secondary index keys for graphs if they are ever deleted.
261-
//
262-
// TODO: Only delete the secondary index keys for this invocation
263-
delete_cf_prefix(
264-
txn,
282+
txn.delete_cf(
265283
IndexifyObjectsColumns::GraphInvocationCtxSecondaryIndex.cf_db(&db),
266-
&GraphInvocationCtx::secondary_index_key_prefix_from_compute_graph(
267-
&req.namespace,
268-
&req.compute_graph,
269-
),
284+
&invocation_ctx.secondary_index_key(),
270285
)?;
271286

272287
let node_output_prefix =
@@ -580,13 +595,6 @@ pub fn delete_compute_graph(
580595
delete_invocation(db.clone(), txn, &req)?;
581596
}
582597

583-
// Delete Graph Invocation Context Secondary Index
584-
delete_cf_prefix(
585-
txn,
586-
IndexifyObjectsColumns::GraphInvocationCtxSecondaryIndex.cf_db(&db),
587-
&GraphInvocationCtx::secondary_index_key_prefix_from_compute_graph(namespace, name),
588-
)?;
589-
590598
for iter in make_prefix_iterator(
591599
txn,
592600
&IndexifyObjectsColumns::ComputeGraphVersions.cf_db(&db),

0 commit comments

Comments
 (0)