Skip to content

Commit 1f9b3aa

Browse files
committed
feat(indexer): Make concurrent modifications to objects and disaply table safe
1 parent cca6542 commit 1f9b3aa

File tree

5 files changed

+196
-48
lines changed

5 files changed

+196
-48
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
DROP TABLE optimistic_deleted_objects_versions;
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
-- Version of objects that already got deleted.
2+
-- This is to handle race conditions between optimistic indexing and checkpoint indexing.
3+
-- If object version is present in this table,
4+
-- any inserts of lower versions for this object should be skipped.
5+
-- Object version deleted by both optimistic and checkpoint indexing should be stored in this table.
6+
CREATE TABLE optimistic_deleted_objects_versions (
7+
object_id bytea PRIMARY KEY,
8+
object_version bigint NOT NULL
9+
);

crates/iota-indexer/src/schema.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,13 @@ diesel::table! {
283283
}
284284
}
285285

286+
diesel::table! {
287+
optimistic_deleted_objects_versions (object_id) {
288+
object_id -> Bytea,
289+
object_version -> Int8,
290+
}
291+
}
292+
286293
diesel::table! {
287294
optimistic_event_emit_module (package, module, tx_insertion_order, event_sequence_number) {
288295
package -> Bytea,
@@ -619,6 +626,7 @@ macro_rules! for_all_tables {
619626
objects_history,
620627
objects_snapshot,
621628
objects_version,
629+
optimistic_deleted_objects_versions,
622630
optimistic_event_emit_module,
623631
optimistic_event_emit_package,
624632
optimistic_event_senders,

crates/iota-indexer/src/store/mod.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,21 @@ pub mod diesel_macro {
166166
}};
167167
}
168168

169+
#[macro_export]
170+
macro_rules! on_conflict_do_update_with_condition {
171+
($table:expr, $values:expr, $target:expr, $pg_columns:expr, $condition:expr, $conn:expr) => {{
172+
use diesel::{ExpressionMethods, RunQueryDsl, query_dsl::methods::FilterDsl};
173+
174+
diesel::insert_into($table)
175+
.values($values)
176+
.on_conflict($target)
177+
.do_update()
178+
.set($pg_columns)
179+
.filter($condition)
180+
.execute($conn)?;
181+
}};
182+
}
183+
169184
#[macro_export]
170185
macro_rules! run_query {
171186
($pool:expr, $query:expr) => {{

crates/iota-indexer/src/store/pg_indexer_store.rs

Lines changed: 163 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use async_trait::async_trait;
1313
use diesel::{
1414
ExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl,
1515
dsl::{max, min},
16+
sql_types,
1617
upsert::excluded,
1718
};
1819
use downcast::Any;
@@ -48,11 +49,12 @@ use crate::{
4849
transactions::{OptimisticTransaction, StoredTransaction, TxInsertionOrder},
4950
tx_indices::OptimisticTxIndices,
5051
},
51-
on_conflict_do_update, persist_chunk_into_table, read_only_blocking,
52+
on_conflict_do_update, on_conflict_do_update_with_condition, persist_chunk_into_table,
53+
read_only_blocking,
5254
schema::{
5355
chain_identifier, checkpoints, display, epochs, event_emit_module, event_emit_package,
5456
event_senders, event_struct_instantiation, event_struct_module, event_struct_name,
55-
event_struct_package, events, feature_flags, objects, objects_history, objects_snapshot,
57+
event_struct_package, events, feature_flags, objects_history, objects_snapshot,
5658
objects_version, optimistic_event_emit_module, optimistic_event_emit_package,
5759
optimistic_event_senders, optimistic_event_struct_instantiation,
5860
optimistic_event_struct_module, optimistic_event_struct_name,
@@ -339,7 +341,7 @@ impl PgIndexerStore {
339341
transactional_blocking_with_retry!(
340342
&self.blocking_cp,
341343
|conn| {
342-
on_conflict_do_update!(
344+
on_conflict_do_update_with_condition!(
343345
display::table,
344346
display_updates.values().collect::<Vec<_>>(),
345347
display::object_type,
@@ -348,6 +350,7 @@ impl PgIndexerStore {
348350
display::version.eq(excluded(display::version)),
349351
display::bcs.eq(excluded(display::bcs)),
350352
),
353+
excluded(display::version).gt(display::version),
351354
conn
352355
);
353356
Ok::<(), IndexerError>(())
@@ -362,42 +365,120 @@ impl PgIndexerStore {
362365
&self,
363366
mutated_object_mutation_chunk: Vec<StoredObject>,
364367
) -> Result<(), IndexerError> {
368+
let chunk_size = mutated_object_mutation_chunk.len();
369+
370+
let mut object_id_vec = Vec::with_capacity(chunk_size);
371+
let mut object_version_vec = Vec::with_capacity(chunk_size);
372+
let mut object_digest_vec = Vec::with_capacity(chunk_size);
373+
let mut owner_type_vec = Vec::with_capacity(chunk_size);
374+
let mut owner_id_vec = Vec::with_capacity(chunk_size);
375+
let mut object_type_vec = Vec::with_capacity(chunk_size);
376+
let mut serialized_object_vec = Vec::with_capacity(chunk_size);
377+
let mut coin_type_vec = Vec::with_capacity(chunk_size);
378+
let mut coin_balance_vec = Vec::with_capacity(chunk_size);
379+
let mut df_kind_vec = Vec::with_capacity(chunk_size);
380+
381+
for obj in mutated_object_mutation_chunk.iter().cloned() {
382+
object_id_vec.push(obj.object_id);
383+
object_version_vec.push(obj.object_version);
384+
object_digest_vec.push(obj.object_digest);
385+
owner_type_vec.push(obj.owner_type);
386+
owner_id_vec.push(obj.owner_id);
387+
object_type_vec.push(obj.object_type);
388+
serialized_object_vec.push(obj.serialized_object);
389+
coin_type_vec.push(obj.coin_type);
390+
coin_balance_vec.push(obj.coin_balance);
391+
df_kind_vec.push(obj.df_kind);
392+
}
393+
394+
let query = diesel::sql_query(
395+
r#"
396+
WITH new_data AS (
397+
SELECT
398+
unnest($1::bytea[]) AS object_id,
399+
unnest($2::bigint[]) AS object_version,
400+
unnest($3::bytea[]) AS object_digest,
401+
unnest($4::smallint[]) AS owner_type,
402+
unnest($5::bytea[]) AS owner_id,
403+
unnest($6::text[]) AS object_type,
404+
unnest($7::bytea[]) AS serialized_object,
405+
unnest($8::text[]) AS coin_type,
406+
unnest($9::bigint[]) AS coin_balance,
407+
unnest($10::smallint[]) AS df_kind
408+
),
409+
locked_objects AS (
410+
SELECT o.*
411+
FROM objects o
412+
JOIN new_data nd ON o.object_id = nd.object_id
413+
FOR UPDATE
414+
),
415+
locked_deletes AS (
416+
SELECT del.*
417+
FROM optimistic_deleted_objects_versions del
418+
JOIN new_data nd ON del.object_id = nd.object_id
419+
FOR SHARE
420+
)
421+
INSERT INTO objects (
422+
object_id,
423+
object_version,
424+
object_digest,
425+
owner_type,
426+
owner_id,
427+
object_type,
428+
serialized_object,
429+
coin_type,
430+
coin_balance,
431+
df_kind
432+
)
433+
SELECT nd.*
434+
FROM new_data nd
435+
LEFT JOIN optimistic_deleted_objects_versions del
436+
ON del.object_id = nd.object_id
437+
WHERE COALESCE(del.object_version, -1) < nd.object_version
438+
ON CONFLICT (object_id)
439+
DO UPDATE SET
440+
object_version = EXCLUDED.object_version,
441+
object_digest = EXCLUDED.object_digest,
442+
owner_type = EXCLUDED.owner_type,
443+
owner_id = EXCLUDED.owner_id,
444+
object_type = EXCLUDED.object_type,
445+
serialized_object = EXCLUDED.serialized_object,
446+
coin_type = EXCLUDED.coin_type,
447+
coin_balance = EXCLUDED.coin_balance,
448+
df_kind = EXCLUDED.df_kind
449+
WHERE
450+
EXCLUDED.object_version > objects.object_version;
451+
"#,
452+
)
453+
.bind::<sql_types::Array<sql_types::Binary>, _>(object_id_vec)
454+
.bind::<sql_types::Array<sql_types::BigInt>, _>(object_version_vec)
455+
.bind::<sql_types::Array<sql_types::Binary>, _>(object_digest_vec)
456+
.bind::<sql_types::Array<sql_types::SmallInt>, _>(owner_type_vec)
457+
.bind::<sql_types::Array<sql_types::Nullable<sql_types::Binary>>, _>(owner_id_vec)
458+
.bind::<sql_types::Array<sql_types::Nullable<sql_types::Text>>, _>(object_type_vec)
459+
.bind::<sql_types::Array<sql_types::Binary>, _>(serialized_object_vec)
460+
.bind::<sql_types::Array<sql_types::Nullable<sql_types::Text>>, _>(coin_type_vec)
461+
.bind::<sql_types::Array<sql_types::Nullable<sql_types::BigInt>>, _>(coin_balance_vec)
462+
.bind::<sql_types::Array<sql_types::Nullable<sql_types::SmallInt>>, _>(df_kind_vec);
463+
365464
let guard = self
366465
.metrics
367466
.checkpoint_db_commit_latency_objects_chunks
368467
.start_timer();
369-
let len = mutated_object_mutation_chunk.len();
370468
transactional_blocking_with_retry!(
371469
&self.blocking_cp,
372470
|conn| {
373-
on_conflict_do_update!(
374-
objects::table,
375-
mutated_object_mutation_chunk.clone(),
376-
objects::object_id,
377-
(
378-
objects::object_id.eq(excluded(objects::object_id)),
379-
objects::object_version.eq(excluded(objects::object_version)),
380-
objects::object_digest.eq(excluded(objects::object_digest)),
381-
objects::owner_type.eq(excluded(objects::owner_type)),
382-
objects::owner_id.eq(excluded(objects::owner_id)),
383-
objects::object_type.eq(excluded(objects::object_type)),
384-
objects::serialized_object.eq(excluded(objects::serialized_object)),
385-
objects::coin_type.eq(excluded(objects::coin_type)),
386-
objects::coin_balance.eq(excluded(objects::coin_balance)),
387-
objects::df_kind.eq(excluded(objects::df_kind)),
388-
),
389-
conn
390-
);
471+
query.clone().execute(conn)?;
391472
Ok::<(), IndexerError>(())
392473
},
393474
PG_DB_COMMIT_SLEEP_DURATION
394475
)
395476
.tap_ok(|_| {
396477
let elapsed = guard.stop_and_record();
397-
info!(elapsed, "Persisted {} chunked objects", len);
478+
info!(elapsed, "Persisted {chunk_size} chunked objects");
398479
})
399480
.tap_err(|e| {
400-
tracing::error!("Failed to persist object mutations with error: {}", e);
481+
tracing::error!("Failed to persist object mutations with error: {e}");
401482
})
402483
}
403484

@@ -409,34 +490,68 @@ impl PgIndexerStore {
409490
.metrics
410491
.checkpoint_db_commit_latency_objects_chunks
411492
.start_timer();
412-
let len = deleted_objects_chunk.len();
493+
let chunk_size = deleted_objects_chunk.len();
494+
495+
let (object_id_vec, object_version_vec): (Vec<_>, Vec<_>) = deleted_objects_chunk
496+
.into_iter()
497+
.map(|obj| (obj.object_id, obj.object_version))
498+
.unzip();
499+
500+
let query = diesel::sql_query(
501+
r#"
502+
WITH new_data AS (
503+
SELECT
504+
unnest($1::bytea[]) AS object_id,
505+
unnest($2::bigint[]) AS object_version
506+
),
507+
locked_objects AS (
508+
SELECT o.*
509+
FROM objects o
510+
JOIN new_data nd ON o.object_id = nd.object_id
511+
FOR UPDATE
512+
),
513+
locked_deletes AS (
514+
SELECT del.*
515+
FROM optimistic_deleted_objects_versions del
516+
JOIN new_data nd ON del.object_id = nd.object_id
517+
FOR UPDATE
518+
),
519+
deleted AS (
520+
DELETE FROM objects o
521+
USING new_data nd
522+
WHERE o.object_id = nd.object_id
523+
AND nd.object_version > o.object_version
524+
)
525+
INSERT INTO optimistic_deleted_objects_versions (object_id, object_version)
526+
SELECT object_id, object_version
527+
FROM new_data
528+
ON CONFLICT (object_id)
529+
DO UPDATE
530+
SET object_version = EXCLUDED.object_version
531+
WHERE EXCLUDED.object_version > optimistic_deleted_objects_versions.object_version;
532+
"#,
533+
)
534+
.bind::<sql_types::Array<sql_types::Bytea>, _>(object_id_vec)
535+
.bind::<sql_types::Array<sql_types::BigInt>, _>(object_version_vec);
536+
413537
transactional_blocking_with_retry!(
414538
&self.blocking_cp,
415539
|conn| {
416-
diesel::delete(
417-
objects::table.filter(
418-
objects::object_id.eq_any(
419-
deleted_objects_chunk
420-
.iter()
421-
.map(|o| o.object_id.clone())
422-
.collect::<Vec<_>>(),
423-
),
424-
),
425-
)
426-
.execute(conn)
427-
.map_err(IndexerError::from)
428-
.context("Failed to write object deletion to PostgresDB")?;
429-
540+
query
541+
.clone()
542+
.execute(conn)
543+
.map_err(IndexerError::from)
544+
.context("Failed to write object deletion to PostgresDB")?;
430545
Ok::<(), IndexerError>(())
431546
},
432547
PG_DB_COMMIT_SLEEP_DURATION
433548
)
434549
.tap_ok(|_| {
435550
let elapsed = guard.stop_and_record();
436-
info!(elapsed, "Deleted {} chunked objects", len);
551+
info!(elapsed, "Deleted {chunk_size} chunked objects");
437552
})
438553
.tap_err(|e| {
439-
tracing::error!("Failed to persist object deletions with error: {}", e);
554+
tracing::error!("Failed to persist object deletions with error: {e}");
440555
})
441556
}
442557

@@ -677,17 +792,17 @@ impl PgIndexerStore {
677792
},
678793
PG_DB_COMMIT_SLEEP_DURATION
679794
)
680-
.tap_ok(|_| {
681-
let elapsed = guard.stop_and_record();
682-
info!(
795+
.tap_ok(|_| {
796+
let elapsed = guard.stop_and_record();
797+
info!(
683798
elapsed,
684799
"Persisted {} checkpoints",
685800
stored_checkpoints.len()
686801
);
687-
})
688-
.tap_err(|e| {
689-
tracing::error!("Failed to persist checkpoints with error: {}", e);
690-
})
802+
})
803+
.tap_err(|e| {
804+
tracing::error!("Failed to persist checkpoints with error: {}", e);
805+
})
691806
}
692807

693808
fn persist_transactions_chunk(

0 commit comments

Comments
 (0)