diff --git a/crates/iota-indexer/migrations/pg/2025-04-10-103714_deleted_object_versions_table/down.sql b/crates/iota-indexer/migrations/pg/2025-04-10-103714_deleted_object_versions_table/down.sql new file mode 100644 index 00000000000..fed4c231297 --- /dev/null +++ b/crates/iota-indexer/migrations/pg/2025-04-10-103714_deleted_object_versions_table/down.sql @@ -0,0 +1 @@ +DROP TABLE optimistic_deleted_objects_versions; \ No newline at end of file diff --git a/crates/iota-indexer/migrations/pg/2025-04-10-103714_deleted_object_versions_table/up.sql b/crates/iota-indexer/migrations/pg/2025-04-10-103714_deleted_object_versions_table/up.sql new file mode 100644 index 00000000000..fdf355f16ca --- /dev/null +++ b/crates/iota-indexer/migrations/pg/2025-04-10-103714_deleted_object_versions_table/up.sql @@ -0,0 +1,9 @@ +-- Version of objects that already got deleted. +-- This is to handle race conditions between optimistic indexing and checkpoint indexing. +-- If object version is present in this table, +-- any inserts of lower versions for this object should be skipped. +-- Object version deleted by both optimistic and checkpoint indexing should be stored in this table. +CREATE TABLE optimistic_deleted_objects_versions ( + object_id bytea PRIMARY KEY, + object_version bigint NOT NULL +); diff --git a/crates/iota-indexer/src/schema.rs b/crates/iota-indexer/src/schema.rs index c703a201535..549c931c8c8 100644 --- a/crates/iota-indexer/src/schema.rs +++ b/crates/iota-indexer/src/schema.rs @@ -283,6 +283,13 @@ diesel::table! { } } +diesel::table! { + optimistic_deleted_objects_versions (object_id) { + object_id -> Bytea, + object_version -> Int8, + } +} + diesel::table! { optimistic_event_emit_module (package, module, tx_insertion_order, event_sequence_number) { package -> Bytea, @@ -619,6 +626,7 @@ macro_rules! for_all_tables { objects_history, objects_snapshot, objects_version, + optimistic_deleted_objects_versions, optimistic_event_emit_module, optimistic_event_emit_package, optimistic_event_senders, diff --git a/crates/iota-indexer/src/store/mod.rs b/crates/iota-indexer/src/store/mod.rs index 6c3199e046e..0c6ac6d795a 100644 --- a/crates/iota-indexer/src/store/mod.rs +++ b/crates/iota-indexer/src/store/mod.rs @@ -95,6 +95,46 @@ pub mod diesel_macro { }}; } + #[macro_export] + macro_rules! serializable_transactional_blocking_with_retry { + ($pool:expr, $query:expr, $max_elapsed:expr) => {{ + use $crate::{ + db::{PoolConnection, get_pool_connection}, + errors::IndexerError, + }; + let mut backoff = backoff::ExponentialBackoff::default(); + backoff.max_elapsed_time = Some($max_elapsed); + let result = match backoff::retry(backoff, || { + let mut pool_conn = + get_pool_connection($pool).map_err(|e| backoff::Error::Transient { + err: IndexerError::PostgresWrite(e.to_string()), + retry_after: None, + })?; + pool_conn + .as_any_mut() + .downcast_mut::() + .unwrap() + .build_transaction() + .read_write() + .serializable() + .run($query) + .map_err(|e| { + tracing::error!("Error with persisting data into DB: {:?}, retrying...", e); + backoff::Error::Transient { + err: IndexerError::PostgresWrite(e.to_string()), + retry_after: None, + } + }) + }) { + Ok(v) => Ok(v), + Err(backoff::Error::Transient { err, .. }) => Err(err), + Err(backoff::Error::Permanent(err)) => Err(err), + }; + + result + }}; + } + #[macro_export] macro_rules! spawn_read_only_blocking { ($pool:expr, $query:expr, $repeatable_read:expr) => {{ @@ -166,6 +206,21 @@ pub mod diesel_macro { }}; } + #[macro_export] + macro_rules! on_conflict_do_update_with_condition { + ($table:expr, $values:expr, $target:expr, $pg_columns:expr, $condition:expr, $conn:expr) => {{ + use diesel::{ExpressionMethods, RunQueryDsl, query_dsl::methods::FilterDsl}; + + diesel::insert_into($table) + .values($values) + .on_conflict($target) + .do_update() + .set($pg_columns) + .filter($condition) + .execute($conn)?; + }}; + } + #[macro_export] macro_rules! run_query { ($pool:expr, $query:expr) => {{ diff --git a/crates/iota-indexer/src/store/pg_indexer_store.rs b/crates/iota-indexer/src/store/pg_indexer_store.rs index cbb29c47c39..b11d698428b 100644 --- a/crates/iota-indexer/src/store/pg_indexer_store.rs +++ b/crates/iota-indexer/src/store/pg_indexer_store.rs @@ -13,6 +13,7 @@ use async_trait::async_trait; use diesel::{ ExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl, dsl::{max, min}, + sql_types, upsert::excluded, }; use downcast::Any; @@ -48,11 +49,12 @@ use crate::{ transactions::{OptimisticTransaction, StoredTransaction, TxInsertionOrder}, tx_indices::OptimisticTxIndices, }, - on_conflict_do_update, persist_chunk_into_table, read_only_blocking, + on_conflict_do_update, on_conflict_do_update_with_condition, persist_chunk_into_table, + read_only_blocking, schema::{ chain_identifier, checkpoints, display, epochs, event_emit_module, event_emit_package, event_senders, event_struct_instantiation, event_struct_module, event_struct_name, - event_struct_package, events, feature_flags, objects, objects_history, objects_snapshot, + event_struct_package, events, feature_flags, objects_history, objects_snapshot, objects_version, optimistic_event_emit_module, optimistic_event_emit_package, optimistic_event_senders, optimistic_event_struct_instantiation, optimistic_event_struct_module, optimistic_event_struct_name, @@ -64,7 +66,7 @@ use crate::{ tx_changed_objects, tx_digests, tx_input_objects, tx_insertion_order, tx_kinds, tx_recipients, tx_senders, }, - transactional_blocking_with_retry, + serializable_transactional_blocking_with_retry, transactional_blocking_with_retry, types::{ EventIndex, IndexedCheckpoint, IndexedDeletedObject, IndexedEvent, IndexedObject, IndexedPackage, IndexedTransaction, TxIndex, @@ -339,7 +341,7 @@ impl PgIndexerStore { transactional_blocking_with_retry!( &self.blocking_cp, |conn| { - on_conflict_do_update!( + on_conflict_do_update_with_condition!( display::table, display_updates.values().collect::>(), display::object_type, @@ -348,6 +350,7 @@ impl PgIndexerStore { display::version.eq(excluded(display::version)), display::bcs.eq(excluded(display::bcs)), ), + excluded(display::version).gt(display::version), conn ); Ok::<(), IndexerError>(()) @@ -362,42 +365,131 @@ impl PgIndexerStore { &self, mutated_object_mutation_chunk: Vec, ) -> Result<(), IndexerError> { + let chunk_size = mutated_object_mutation_chunk.len(); + + let mut object_id_vec = Vec::with_capacity(chunk_size); + let mut object_version_vec = Vec::with_capacity(chunk_size); + let mut object_digest_vec = Vec::with_capacity(chunk_size); + let mut owner_type_vec = Vec::with_capacity(chunk_size); + let mut owner_id_vec = Vec::with_capacity(chunk_size); + let mut object_type_vec = Vec::with_capacity(chunk_size); + let mut serialized_object_vec = Vec::with_capacity(chunk_size); + let mut coin_type_vec = Vec::with_capacity(chunk_size); + let mut coin_balance_vec = Vec::with_capacity(chunk_size); + let mut df_kind_vec = Vec::with_capacity(chunk_size); + let mut object_type_package_vec = Vec::with_capacity(chunk_size); + let mut object_type_module_vec = Vec::with_capacity(chunk_size); + let mut object_type_name_vec = Vec::with_capacity(chunk_size); + + for obj in mutated_object_mutation_chunk { + object_id_vec.push(obj.object_id); + object_version_vec.push(obj.object_version); + object_digest_vec.push(obj.object_digest); + owner_type_vec.push(obj.owner_type); + owner_id_vec.push(obj.owner_id); + object_type_vec.push(obj.object_type); + serialized_object_vec.push(obj.serialized_object); + coin_type_vec.push(obj.coin_type); + coin_balance_vec.push(obj.coin_balance); + df_kind_vec.push(obj.df_kind); + object_type_package_vec.push(obj.object_type_package); + object_type_module_vec.push(obj.object_type_module); + object_type_name_vec.push(obj.object_type_name); + } + let guard = self .metrics .checkpoint_db_commit_latency_objects_chunks .start_timer(); - let len = mutated_object_mutation_chunk.len(); - transactional_blocking_with_retry!( + + serializable_transactional_blocking_with_retry!( &self.blocking_cp, |conn| { - on_conflict_do_update!( - objects::table, - mutated_object_mutation_chunk.clone(), - objects::object_id, - ( - objects::object_id.eq(excluded(objects::object_id)), - objects::object_version.eq(excluded(objects::object_version)), - objects::object_digest.eq(excluded(objects::object_digest)), - objects::owner_type.eq(excluded(objects::owner_type)), - objects::owner_id.eq(excluded(objects::owner_id)), - objects::object_type.eq(excluded(objects::object_type)), - objects::serialized_object.eq(excluded(objects::serialized_object)), - objects::coin_type.eq(excluded(objects::coin_type)), - objects::coin_balance.eq(excluded(objects::coin_balance)), - objects::df_kind.eq(excluded(objects::df_kind)), - ), - conn - ); + diesel::sql_query( + r#" + WITH new_data AS ( + SELECT + unnest($1::bytea[]) AS object_id, + unnest($2::bigint[]) AS object_version, + unnest($3::bytea[]) AS object_digest, + unnest($4::smallint[]) AS owner_type, + unnest($5::bytea[]) AS owner_id, + unnest($6::text[]) AS object_type, + unnest($7::bytea[]) AS serialized_object, + unnest($8::text[]) AS coin_type, + unnest($9::bigint[]) AS coin_balance, + unnest($10::smallint[]) AS df_kind, + unnest($11::bytea[]) AS object_type_package, + unnest($12::text[]) AS object_type_module, + unnest($13::text[]) AS object_type_name + ) + INSERT INTO objects ( + object_id, + object_version, + object_digest, + owner_type, + owner_id, + object_type, + serialized_object, + coin_type, + coin_balance, + df_kind, + object_type_package, + object_type_module, + object_type_name + ) + SELECT nd.* + FROM new_data nd + LEFT JOIN optimistic_deleted_objects_versions del + ON del.object_id = nd.object_id + WHERE COALESCE(del.object_version, -1) < nd.object_version + ON CONFLICT (object_id) + DO UPDATE SET + object_version = EXCLUDED.object_version, + object_digest = EXCLUDED.object_digest, + owner_type = EXCLUDED.owner_type, + owner_id = EXCLUDED.owner_id, + object_type = EXCLUDED.object_type, + serialized_object = EXCLUDED.serialized_object, + coin_type = EXCLUDED.coin_type, + coin_balance = EXCLUDED.coin_balance, + df_kind = EXCLUDED.df_kind + WHERE + EXCLUDED.object_version > objects.object_version; + "#, + ) + .bind::, _>(&object_id_vec) + .bind::, _>(&object_version_vec) + .bind::, _>(&object_digest_vec) + .bind::, _>(&owner_type_vec) + .bind::>, _>(&owner_id_vec) + .bind::>, _>(&object_type_vec) + .bind::, _>(&serialized_object_vec) + .bind::>, _>(&coin_type_vec) + .bind::>, _>( + &coin_balance_vec, + ) + .bind::>, _>(&df_kind_vec) + .bind::>, _>( + &object_type_package_vec, + ) + .bind::>, _>( + &object_type_module_vec, + ) + .bind::>, _>( + &object_type_name_vec, + ) + .execute(conn)?; Ok::<(), IndexerError>(()) }, PG_DB_COMMIT_SLEEP_DURATION ) .tap_ok(|_| { let elapsed = guard.stop_and_record(); - info!(elapsed, "Persisted {} chunked objects", len); + info!(elapsed, "Persisted {chunk_size} chunked objects"); }) .tap_err(|e| { - tracing::error!("Failed to persist object mutations with error: {}", e); + tracing::error!("Failed to persist object mutations with error: {e}"); }) } @@ -409,34 +501,54 @@ impl PgIndexerStore { .metrics .checkpoint_db_commit_latency_objects_chunks .start_timer(); - let len = deleted_objects_chunk.len(); - transactional_blocking_with_retry!( + let chunk_size = deleted_objects_chunk.len(); + + let (object_id_vec, object_version_vec): (Vec<_>, Vec<_>) = deleted_objects_chunk + .clone() + .into_iter() + .map(|obj| (obj.object_id, obj.object_version)) + .unzip(); + + serializable_transactional_blocking_with_retry!( &self.blocking_cp, |conn| { - diesel::delete( - objects::table.filter( - objects::object_id.eq_any( - deleted_objects_chunk - .iter() - .map(|o| o.object_id.clone()) - .collect::>(), - ), + diesel::sql_query( + r#" + WITH new_data AS ( + SELECT + unnest($1::bytea[]) AS object_id, + unnest($2::bigint[]) AS object_version ), + deleted AS ( + DELETE FROM objects o + USING new_data nd + WHERE o.object_id = nd.object_id + AND nd.object_version > o.object_version + ) + INSERT INTO optimistic_deleted_objects_versions (object_id, object_version) + SELECT object_id, object_version + FROM new_data + ON CONFLICT (object_id) + DO UPDATE + SET object_version = EXCLUDED.object_version + WHERE EXCLUDED.object_version > optimistic_deleted_objects_versions.object_version; + "#, ) + .bind::, _>(&object_id_vec) + .bind::, _>(&object_version_vec) .execute(conn) .map_err(IndexerError::from) .context("Failed to write object deletion to PostgresDB")?; - Ok::<(), IndexerError>(()) }, PG_DB_COMMIT_SLEEP_DURATION ) .tap_ok(|_| { let elapsed = guard.stop_and_record(); - info!(elapsed, "Deleted {} chunked objects", len); + info!(elapsed, "Deleted {chunk_size} chunked objects"); }) .tap_err(|e| { - tracing::error!("Failed to persist object deletions with error: {}", e); + tracing::error!("Failed to persist object deletions with error: {e}"); }) } @@ -677,17 +789,17 @@ impl PgIndexerStore { }, PG_DB_COMMIT_SLEEP_DURATION ) - .tap_ok(|_| { - let elapsed = guard.stop_and_record(); - info!( + .tap_ok(|_| { + let elapsed = guard.stop_and_record(); + info!( elapsed, "Persisted {} checkpoints", stored_checkpoints.len() ); - }) - .tap_err(|e| { - tracing::error!("Failed to persist checkpoints with error: {}", e); - }) + }) + .tap_err(|e| { + tracing::error!("Failed to persist checkpoints with error: {}", e); + }) } fn persist_transactions_chunk(