Skip to content

feat(indexer): Make concurrent modifications to objects and display table safe #6377

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE optimistic_deleted_objects_versions;
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- Version of objects that already got deleted.
-- This is to handle race conditions between optimistic indexing and checkpoint indexing.
-- If object version is present in this table,
-- any inserts of lower versions for this object should be skipped.
-- Object version deleted by both optimistic and checkpoint indexing should be stored in this table.
CREATE TABLE optimistic_deleted_objects_versions (
object_id bytea PRIMARY KEY,
object_version bigint NOT NULL
);
8 changes: 8 additions & 0 deletions crates/iota-indexer/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,13 @@ diesel::table! {
}
}

diesel::table! {
optimistic_deleted_objects_versions (object_id) {
object_id -> Bytea,
object_version -> Int8,
}
}

diesel::table! {
optimistic_event_emit_module (package, module, tx_insertion_order, event_sequence_number) {
package -> Bytea,
Expand Down Expand Up @@ -619,6 +626,7 @@ macro_rules! for_all_tables {
objects_history,
objects_snapshot,
objects_version,
optimistic_deleted_objects_versions,
optimistic_event_emit_module,
optimistic_event_emit_package,
optimistic_event_senders,
Expand Down
55 changes: 55 additions & 0 deletions crates/iota-indexer/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,46 @@ pub mod diesel_macro {
}};
}

#[macro_export]
macro_rules! serializable_transactional_blocking_with_retry {
($pool:expr, $query:expr, $max_elapsed:expr) => {{
use $crate::{
db::{PoolConnection, get_pool_connection},
errors::IndexerError,
};
let mut backoff = backoff::ExponentialBackoff::default();
backoff.max_elapsed_time = Some($max_elapsed);
let result = match backoff::retry(backoff, || {
let mut pool_conn =
get_pool_connection($pool).map_err(|e| backoff::Error::Transient {
err: IndexerError::PostgresWrite(e.to_string()),
retry_after: None,
})?;
pool_conn
.as_any_mut()
.downcast_mut::<PoolConnection>()
.unwrap()
.build_transaction()
.read_write()
.serializable()
.run($query)
.map_err(|e| {
tracing::error!("Error with persisting data into DB: {:?}, retrying...", e);
backoff::Error::Transient {
err: IndexerError::PostgresWrite(e.to_string()),
retry_after: None,
}
})
}) {
Ok(v) => Ok(v),
Err(backoff::Error::Transient { err, .. }) => Err(err),
Err(backoff::Error::Permanent(err)) => Err(err),
};

result
}};
}

#[macro_export]
macro_rules! spawn_read_only_blocking {
($pool:expr, $query:expr, $repeatable_read:expr) => {{
Expand Down Expand Up @@ -166,6 +206,21 @@ pub mod diesel_macro {
}};
}

#[macro_export]
macro_rules! on_conflict_do_update_with_condition {
($table:expr, $values:expr, $target:expr, $pg_columns:expr, $condition:expr, $conn:expr) => {{
use diesel::{ExpressionMethods, RunQueryDsl, query_dsl::methods::FilterDsl};

diesel::insert_into($table)
.values($values)
.on_conflict($target)
.do_update()
.set($pg_columns)
.filter($condition)
.execute($conn)?;
}};
}

#[macro_export]
macro_rules! run_query {
($pool:expr, $query:expr) => {{
Expand Down
204 changes: 158 additions & 46 deletions crates/iota-indexer/src/store/pg_indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use async_trait::async_trait;
use diesel::{
ExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl,
dsl::{max, min},
sql_types,
upsert::excluded,
};
use downcast::Any;
Expand Down Expand Up @@ -48,11 +49,12 @@ use crate::{
transactions::{OptimisticTransaction, StoredTransaction, TxInsertionOrder},
tx_indices::OptimisticTxIndices,
},
on_conflict_do_update, persist_chunk_into_table, read_only_blocking,
on_conflict_do_update, on_conflict_do_update_with_condition, persist_chunk_into_table,
read_only_blocking,
schema::{
chain_identifier, checkpoints, display, epochs, event_emit_module, event_emit_package,
event_senders, event_struct_instantiation, event_struct_module, event_struct_name,
event_struct_package, events, feature_flags, objects, objects_history, objects_snapshot,
event_struct_package, events, feature_flags, objects_history, objects_snapshot,
objects_version, optimistic_event_emit_module, optimistic_event_emit_package,
optimistic_event_senders, optimistic_event_struct_instantiation,
optimistic_event_struct_module, optimistic_event_struct_name,
Expand All @@ -64,7 +66,7 @@ use crate::{
tx_changed_objects, tx_digests, tx_input_objects, tx_insertion_order, tx_kinds,
tx_recipients, tx_senders,
},
transactional_blocking_with_retry,
serializable_transactional_blocking_with_retry, transactional_blocking_with_retry,
types::{
EventIndex, IndexedCheckpoint, IndexedDeletedObject, IndexedEvent, IndexedObject,
IndexedPackage, IndexedTransaction, TxIndex,
Expand Down Expand Up @@ -339,7 +341,7 @@ impl PgIndexerStore {
transactional_blocking_with_retry!(
&self.blocking_cp,
|conn| {
on_conflict_do_update!(
on_conflict_do_update_with_condition!(
display::table,
display_updates.values().collect::<Vec<_>>(),
display::object_type,
Expand All @@ -348,6 +350,7 @@ impl PgIndexerStore {
display::version.eq(excluded(display::version)),
display::bcs.eq(excluded(display::bcs)),
),
excluded(display::version).gt(display::version),
conn
);
Ok::<(), IndexerError>(())
Expand All @@ -362,42 +365,131 @@ impl PgIndexerStore {
&self,
mutated_object_mutation_chunk: Vec<StoredObject>,
) -> Result<(), IndexerError> {
let chunk_size = mutated_object_mutation_chunk.len();

let mut object_id_vec = Vec::with_capacity(chunk_size);
let mut object_version_vec = Vec::with_capacity(chunk_size);
let mut object_digest_vec = Vec::with_capacity(chunk_size);
let mut owner_type_vec = Vec::with_capacity(chunk_size);
let mut owner_id_vec = Vec::with_capacity(chunk_size);
let mut object_type_vec = Vec::with_capacity(chunk_size);
let mut serialized_object_vec = Vec::with_capacity(chunk_size);
let mut coin_type_vec = Vec::with_capacity(chunk_size);
let mut coin_balance_vec = Vec::with_capacity(chunk_size);
let mut df_kind_vec = Vec::with_capacity(chunk_size);
let mut object_type_package_vec = Vec::with_capacity(chunk_size);
let mut object_type_module_vec = Vec::with_capacity(chunk_size);
let mut object_type_name_vec = Vec::with_capacity(chunk_size);

for obj in mutated_object_mutation_chunk {
object_id_vec.push(obj.object_id);
object_version_vec.push(obj.object_version);
object_digest_vec.push(obj.object_digest);
owner_type_vec.push(obj.owner_type);
owner_id_vec.push(obj.owner_id);
object_type_vec.push(obj.object_type);
serialized_object_vec.push(obj.serialized_object);
coin_type_vec.push(obj.coin_type);
coin_balance_vec.push(obj.coin_balance);
df_kind_vec.push(obj.df_kind);
object_type_package_vec.push(obj.object_type_package);
object_type_module_vec.push(obj.object_type_module);
object_type_name_vec.push(obj.object_type_name);
}

let guard = self
.metrics
.checkpoint_db_commit_latency_objects_chunks
.start_timer();
let len = mutated_object_mutation_chunk.len();
transactional_blocking_with_retry!(

serializable_transactional_blocking_with_retry!(
&self.blocking_cp,
|conn| {
on_conflict_do_update!(
objects::table,
mutated_object_mutation_chunk.clone(),
objects::object_id,
(
objects::object_id.eq(excluded(objects::object_id)),
objects::object_version.eq(excluded(objects::object_version)),
objects::object_digest.eq(excluded(objects::object_digest)),
objects::owner_type.eq(excluded(objects::owner_type)),
objects::owner_id.eq(excluded(objects::owner_id)),
objects::object_type.eq(excluded(objects::object_type)),
objects::serialized_object.eq(excluded(objects::serialized_object)),
objects::coin_type.eq(excluded(objects::coin_type)),
objects::coin_balance.eq(excluded(objects::coin_balance)),
objects::df_kind.eq(excluded(objects::df_kind)),
),
conn
);
diesel::sql_query(
r#"
WITH new_data AS (
SELECT
unnest($1::bytea[]) AS object_id,
unnest($2::bigint[]) AS object_version,
unnest($3::bytea[]) AS object_digest,
unnest($4::smallint[]) AS owner_type,
unnest($5::bytea[]) AS owner_id,
unnest($6::text[]) AS object_type,
unnest($7::bytea[]) AS serialized_object,
unnest($8::text[]) AS coin_type,
unnest($9::bigint[]) AS coin_balance,
unnest($10::smallint[]) AS df_kind,
unnest($11::bytea[]) AS object_type_package,
unnest($12::text[]) AS object_type_module,
unnest($13::text[]) AS object_type_name
)
INSERT INTO objects (
object_id,
object_version,
object_digest,
owner_type,
owner_id,
object_type,
serialized_object,
coin_type,
coin_balance,
df_kind,
object_type_package,
object_type_module,
object_type_name
)
SELECT nd.*
FROM new_data nd
LEFT JOIN optimistic_deleted_objects_versions del
ON del.object_id = nd.object_id
WHERE COALESCE(del.object_version, -1) < nd.object_version
ON CONFLICT (object_id)
DO UPDATE SET
object_version = EXCLUDED.object_version,
object_digest = EXCLUDED.object_digest,
owner_type = EXCLUDED.owner_type,
owner_id = EXCLUDED.owner_id,
object_type = EXCLUDED.object_type,
serialized_object = EXCLUDED.serialized_object,
coin_type = EXCLUDED.coin_type,
coin_balance = EXCLUDED.coin_balance,
df_kind = EXCLUDED.df_kind
WHERE
EXCLUDED.object_version > objects.object_version;
"#,
)
.bind::<sql_types::Array<sql_types::Binary>, _>(&object_id_vec)
.bind::<sql_types::Array<sql_types::BigInt>, _>(&object_version_vec)
.bind::<sql_types::Array<sql_types::Binary>, _>(&object_digest_vec)
.bind::<sql_types::Array<sql_types::SmallInt>, _>(&owner_type_vec)
.bind::<sql_types::Array<sql_types::Nullable<sql_types::Binary>>, _>(&owner_id_vec)
.bind::<sql_types::Array<sql_types::Nullable<sql_types::Text>>, _>(&object_type_vec)
.bind::<sql_types::Array<sql_types::Binary>, _>(&serialized_object_vec)
.bind::<sql_types::Array<sql_types::Nullable<sql_types::Text>>, _>(&coin_type_vec)
.bind::<sql_types::Array<sql_types::Nullable<sql_types::BigInt>>, _>(
&coin_balance_vec,
)
.bind::<sql_types::Array<sql_types::Nullable<sql_types::SmallInt>>, _>(&df_kind_vec)
.bind::<sql_types::Array<sql_types::Nullable<sql_types::Binary>>, _>(
&object_type_package_vec,
)
.bind::<sql_types::Array<sql_types::Nullable<sql_types::Text>>, _>(
&object_type_module_vec,
)
.bind::<sql_types::Array<sql_types::Nullable<sql_types::Text>>, _>(
&object_type_name_vec,
)
.execute(conn)?;
Ok::<(), IndexerError>(())
},
PG_DB_COMMIT_SLEEP_DURATION
)
.tap_ok(|_| {
let elapsed = guard.stop_and_record();
info!(elapsed, "Persisted {} chunked objects", len);
info!(elapsed, "Persisted {chunk_size} chunked objects");
})
.tap_err(|e| {
tracing::error!("Failed to persist object mutations with error: {}", e);
tracing::error!("Failed to persist object mutations with error: {e}");
})
}

Expand All @@ -409,34 +501,54 @@ impl PgIndexerStore {
.metrics
.checkpoint_db_commit_latency_objects_chunks
.start_timer();
let len = deleted_objects_chunk.len();
transactional_blocking_with_retry!(
let chunk_size = deleted_objects_chunk.len();

let (object_id_vec, object_version_vec): (Vec<_>, Vec<_>) = deleted_objects_chunk
.clone()
.into_iter()
.map(|obj| (obj.object_id, obj.object_version))
.unzip();

serializable_transactional_blocking_with_retry!(
&self.blocking_cp,
|conn| {
diesel::delete(
objects::table.filter(
objects::object_id.eq_any(
deleted_objects_chunk
.iter()
.map(|o| o.object_id.clone())
.collect::<Vec<_>>(),
),
diesel::sql_query(
r#"
WITH new_data AS (
SELECT
unnest($1::bytea[]) AS object_id,
unnest($2::bigint[]) AS object_version
),
deleted AS (
DELETE FROM objects o
USING new_data nd
WHERE o.object_id = nd.object_id
AND nd.object_version > o.object_version
)
INSERT INTO optimistic_deleted_objects_versions (object_id, object_version)
SELECT object_id, object_version
FROM new_data
ON CONFLICT (object_id)
DO UPDATE
SET object_version = EXCLUDED.object_version
WHERE EXCLUDED.object_version > optimistic_deleted_objects_versions.object_version;
"#,
)
.bind::<sql_types::Array<sql_types::Bytea>, _>(&object_id_vec)
.bind::<sql_types::Array<sql_types::BigInt>, _>(&object_version_vec)
.execute(conn)
.map_err(IndexerError::from)
.context("Failed to write object deletion to PostgresDB")?;

Ok::<(), IndexerError>(())
},
PG_DB_COMMIT_SLEEP_DURATION
)
.tap_ok(|_| {
let elapsed = guard.stop_and_record();
info!(elapsed, "Deleted {} chunked objects", len);
info!(elapsed, "Deleted {chunk_size} chunked objects");
})
.tap_err(|e| {
tracing::error!("Failed to persist object deletions with error: {}", e);
tracing::error!("Failed to persist object deletions with error: {e}");
})
}

Expand Down Expand Up @@ -677,17 +789,17 @@ impl PgIndexerStore {
},
PG_DB_COMMIT_SLEEP_DURATION
)
.tap_ok(|_| {
let elapsed = guard.stop_and_record();
info!(
.tap_ok(|_| {
let elapsed = guard.stop_and_record();
info!(
elapsed,
"Persisted {} checkpoints",
stored_checkpoints.len()
);
})
.tap_err(|e| {
tracing::error!("Failed to persist checkpoints with error: {}", e);
})
})
.tap_err(|e| {
tracing::error!("Failed to persist checkpoints with error: {}", e);
})
}

fn persist_transactions_chunk(
Expand Down