Skip to content

Commit a2690fa

Browse files
committed
feat(indexer): Make concurrent modifications to objects and disaply table safe
1 parent b1eca50 commit a2690fa

File tree

5 files changed

+184
-42
lines changed

5 files changed

+184
-42
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
DROP TABLE deleted_objects_versions;
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
-- Version of objects that already got deleted.
2+
-- This is to handle race conditions between optimistic indexing and checkpoint indexing.
3+
-- If object version is present in this table,
4+
-- any inserts of lower versions for this object should be skipped.
5+
CREATE TABLE deleted_objects_versions (
6+
object_id bytea PRIMARY KEY,
7+
object_version bigint NOT NULL
8+
);

crates/iota-indexer/src/schema.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,13 @@ diesel::table! {
6464
}
6565
}
6666

67+
diesel::table! {
68+
deleted_objects_versions (object_id) {
69+
object_id -> Bytea,
70+
object_version -> Int8,
71+
}
72+
}
73+
6774
diesel::table! {
6875
display (object_type) {
6976
object_type -> Text,
@@ -601,6 +608,7 @@ macro_rules! for_all_tables {
601608
addresses,
602609
chain_identifier,
603610
checkpoints,
611+
deleted_objects_versions,
604612
display,
605613
epoch_peak_tps,
606614
epochs,

crates/iota-indexer/src/store/mod.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,21 @@ pub mod diesel_macro {
166166
}};
167167
}
168168

169+
#[macro_export]
170+
macro_rules! on_conflict_do_update_with_condition {
171+
($table:expr, $values:expr, $target:expr, $pg_columns:expr, $condition:expr, $conn:expr) => {{
172+
use diesel::{ExpressionMethods, RunQueryDsl, query_dsl::methods::FilterDsl};
173+
174+
diesel::insert_into($table)
175+
.values($values)
176+
.on_conflict($target)
177+
.do_update()
178+
.set($pg_columns)
179+
.filter($condition)
180+
.execute($conn)?;
181+
}};
182+
}
183+
169184
#[macro_export]
170185
macro_rules! run_query {
171186
($pool:expr, $query:expr) => {{

crates/iota-indexer/src/store/pg_indexer_store.rs

Lines changed: 152 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use async_trait::async_trait;
1313
use diesel::{
1414
ExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl,
1515
dsl::{max, min},
16+
sql_types,
1617
upsert::excluded,
1718
};
1819
use downcast::Any;
@@ -48,11 +49,12 @@ use crate::{
4849
transactions::{OptimisticTransaction, StoredTransaction, TxInsertionOrder},
4950
tx_indices::OptimisticTxIndices,
5051
},
51-
on_conflict_do_update, persist_chunk_into_table, read_only_blocking,
52+
on_conflict_do_update, on_conflict_do_update_with_condition, persist_chunk_into_table,
53+
read_only_blocking,
5254
schema::{
5355
chain_identifier, checkpoints, display, epochs, event_emit_module, event_emit_package,
5456
event_senders, event_struct_instantiation, event_struct_module, event_struct_name,
55-
event_struct_package, events, feature_flags, objects, objects_history, objects_snapshot,
57+
event_struct_package, events, feature_flags, objects_history, objects_snapshot,
5658
objects_version, optimistic_event_emit_module, optimistic_event_emit_package,
5759
optimistic_event_senders, optimistic_event_struct_instantiation,
5860
optimistic_event_struct_module, optimistic_event_struct_name,
@@ -339,7 +341,7 @@ impl PgIndexerStore {
339341
transactional_blocking_with_retry!(
340342
&self.blocking_cp,
341343
|conn| {
342-
on_conflict_do_update!(
344+
on_conflict_do_update_with_condition!(
343345
display::table,
344346
display_updates.values().collect::<Vec<_>>(),
345347
display::object_type,
@@ -348,6 +350,7 @@ impl PgIndexerStore {
348350
display::version.eq(excluded(display::version)),
349351
display::bcs.eq(excluded(display::bcs)),
350352
),
353+
excluded(display::version).gt(display::version),
351354
conn
352355
);
353356
Ok::<(), IndexerError>(())
@@ -362,6 +365,100 @@ impl PgIndexerStore {
362365
&self,
363366
mutated_object_mutation_chunk: Vec<StoredObject>,
364367
) -> Result<(), IndexerError> {
368+
let mut object_id_vec = Vec::new();
369+
let mut object_version_vec = Vec::new();
370+
let mut object_digest_vec = Vec::new();
371+
let mut owner_type_vec = Vec::new();
372+
let mut owner_id_vec = Vec::new();
373+
let mut object_type_vec = Vec::new();
374+
let mut serialized_object_vec = Vec::new();
375+
let mut coin_type_vec = Vec::new();
376+
let mut coin_balance_vec = Vec::new();
377+
let mut df_kind_vec = Vec::new();
378+
379+
for obj in mutated_object_mutation_chunk.iter().cloned() {
380+
object_id_vec.push(obj.object_id);
381+
object_version_vec.push(obj.object_version);
382+
object_digest_vec.push(obj.object_digest);
383+
owner_type_vec.push(obj.owner_type);
384+
owner_id_vec.push(obj.owner_id);
385+
object_type_vec.push(obj.object_type);
386+
serialized_object_vec.push(obj.serialized_object);
387+
coin_type_vec.push(obj.coin_type);
388+
coin_balance_vec.push(obj.coin_balance);
389+
df_kind_vec.push(obj.df_kind);
390+
}
391+
392+
let query = diesel::sql_query(
393+
r#"
394+
WITH new_data AS (
395+
SELECT
396+
unnest($1::bytea[]) AS object_id,
397+
unnest($2::bigint[]) AS object_version,
398+
unnest($3::bytea[]) AS object_digest,
399+
unnest($4::smallint[]) AS owner_type,
400+
unnest($5::bytea[] ) AS owner_id,
401+
unnest($6::text[] ) AS object_type,
402+
unnest($7::bytea[]) AS serialized_object,
403+
unnest($8::text[] ) AS coin_type,
404+
unnest($9::bigint[]) AS coin_balance,
405+
unnest($10::smallint[]) AS df_kind
406+
),
407+
locked_objects AS (
408+
SELECT o.*
409+
FROM objects o
410+
JOIN new_data nd ON o.object_id = nd.object_id
411+
FOR UPDATE
412+
),
413+
locked_deletes AS (
414+
SELECT del.*
415+
FROM deleted_objects_versions del
416+
JOIN new_data nd ON del.object_id = nd.object_id
417+
FOR SHARE
418+
)
419+
INSERT INTO objects (
420+
object_id,
421+
object_version,
422+
object_digest,
423+
owner_type,
424+
owner_id,
425+
object_type,
426+
serialized_object,
427+
coin_type,
428+
coin_balance,
429+
df_kind
430+
)
431+
SELECT nd.*
432+
FROM new_data nd
433+
LEFT JOIN deleted_objects_versions del
434+
ON del.object_id = nd.object_id
435+
WHERE COALESCE(del.object_version, -1) < nd.object_version
436+
ON CONFLICT (object_id)
437+
DO UPDATE SET
438+
object_version = EXCLUDED.object_version,
439+
object_digest = EXCLUDED.object_digest,
440+
owner_type = EXCLUDED.owner_type,
441+
owner_id = EXCLUDED.owner_id,
442+
object_type = EXCLUDED.object_type,
443+
serialized_object = EXCLUDED.serialized_object,
444+
coin_type = EXCLUDED.coin_type,
445+
coin_balance = EXCLUDED.coin_balance,
446+
df_kind = EXCLUDED.df_kind
447+
WHERE
448+
EXCLUDED.object_version > objects.object_version;
449+
"#,
450+
)
451+
.bind::<sql_types::Array<sql_types::Binary>, _>(object_id_vec)
452+
.bind::<sql_types::Array<sql_types::BigInt>, _>(object_version_vec)
453+
.bind::<sql_types::Array<sql_types::Binary>, _>(object_digest_vec)
454+
.bind::<sql_types::Array<sql_types::SmallInt>, _>(owner_type_vec)
455+
.bind::<sql_types::Array<sql_types::Nullable<sql_types::Binary>>, _>(owner_id_vec)
456+
.bind::<sql_types::Array<sql_types::Nullable<sql_types::Text>>, _>(object_type_vec)
457+
.bind::<sql_types::Array<sql_types::Binary>, _>(serialized_object_vec)
458+
.bind::<sql_types::Array<sql_types::Nullable<sql_types::Text>>, _>(coin_type_vec)
459+
.bind::<sql_types::Array<sql_types::Nullable<sql_types::BigInt>>, _>(coin_balance_vec)
460+
.bind::<sql_types::Array<sql_types::Nullable<sql_types::SmallInt>>, _>(df_kind_vec);
461+
365462
let guard = self
366463
.metrics
367464
.checkpoint_db_commit_latency_objects_chunks
@@ -370,24 +467,7 @@ impl PgIndexerStore {
370467
transactional_blocking_with_retry!(
371468
&self.blocking_cp,
372469
|conn| {
373-
on_conflict_do_update!(
374-
objects::table,
375-
mutated_object_mutation_chunk.clone(),
376-
objects::object_id,
377-
(
378-
objects::object_id.eq(excluded(objects::object_id)),
379-
objects::object_version.eq(excluded(objects::object_version)),
380-
objects::object_digest.eq(excluded(objects::object_digest)),
381-
objects::owner_type.eq(excluded(objects::owner_type)),
382-
objects::owner_id.eq(excluded(objects::owner_id)),
383-
objects::object_type.eq(excluded(objects::object_type)),
384-
objects::serialized_object.eq(excluded(objects::serialized_object)),
385-
objects::coin_type.eq(excluded(objects::coin_type)),
386-
objects::coin_balance.eq(excluded(objects::coin_balance)),
387-
objects::df_kind.eq(excluded(objects::df_kind)),
388-
),
389-
conn
390-
);
470+
query.clone().execute(conn)?;
391471
Ok::<(), IndexerError>(())
392472
},
393473
PG_DB_COMMIT_SLEEP_DURATION
@@ -410,23 +490,53 @@ impl PgIndexerStore {
410490
.checkpoint_db_commit_latency_objects_chunks
411491
.start_timer();
412492
let len = deleted_objects_chunk.len();
493+
494+
let (object_id_vec, object_version_vec): (Vec<_>, Vec<_>) = deleted_objects_chunk
495+
.into_iter()
496+
.map(|obj| (obj.object_id, obj.object_version))
497+
.unzip();
498+
499+
let query = diesel::sql_query(
500+
r#"
501+
WITH new_data AS (
502+
SELECT
503+
unnest($1::bytea[]) AS object_id,
504+
unnest($2::bigint[]) AS object_version
505+
),
506+
locked_objects AS (
507+
SELECT o.*
508+
FROM objects o
509+
JOIN new_data nd ON o.object_id = nd.object_id
510+
FOR UPDATE
511+
),
512+
locked_deletes AS (
513+
SELECT del.*
514+
FROM deleted_objects_versions del
515+
JOIN new_data nd ON del.object_id = nd.object_id
516+
FOR UPDATE
517+
),
518+
deleted AS (
519+
DELETE FROM objects o
520+
USING new_data nd
521+
WHERE o.object_id = nd.object_id
522+
AND nd.object_version > o.object_version
523+
)
524+
INSERT INTO deleted_objects_versions (object_id, object_version)
525+
SELECT object_id, object_version
526+
FROM new_data
527+
ON CONFLICT (object_id)
528+
DO UPDATE
529+
SET object_version = EXCLUDED.object_version
530+
WHERE EXCLUDED.object_version > deleted_objects_versions.object_version;
531+
"#,
532+
)
533+
.bind::<sql_types::Array<sql_types::Bytea>, _>(object_id_vec)
534+
.bind::<sql_types::Array<sql_types::BigInt>, _>(object_version_vec);
535+
413536
transactional_blocking_with_retry!(
414537
&self.blocking_cp,
415538
|conn| {
416-
diesel::delete(
417-
objects::table.filter(
418-
objects::object_id.eq_any(
419-
deleted_objects_chunk
420-
.iter()
421-
.map(|o| o.object_id.clone())
422-
.collect::<Vec<_>>(),
423-
),
424-
),
425-
)
426-
.execute(conn)
427-
.map_err(IndexerError::from)
428-
.context("Failed to write object deletion to PostgresDB")?;
429-
539+
query.clone().execute(conn)?;
430540
Ok::<(), IndexerError>(())
431541
},
432542
PG_DB_COMMIT_SLEEP_DURATION
@@ -677,17 +787,17 @@ impl PgIndexerStore {
677787
},
678788
PG_DB_COMMIT_SLEEP_DURATION
679789
)
680-
.tap_ok(|_| {
681-
let elapsed = guard.stop_and_record();
682-
info!(
790+
.tap_ok(|_| {
791+
let elapsed = guard.stop_and_record();
792+
info!(
683793
elapsed,
684794
"Persisted {} checkpoints",
685795
stored_checkpoints.len()
686796
);
687-
})
688-
.tap_err(|e| {
689-
tracing::error!("Failed to persist checkpoints with error: {}", e);
690-
})
797+
})
798+
.tap_err(|e| {
799+
tracing::error!("Failed to persist checkpoints with error: {}", e);
800+
})
691801
}
692802

693803
fn persist_transactions_chunk(

0 commit comments

Comments
 (0)