Skip to content

Commit 41f1e28

Browse files
committed
Use Vec::with_capacity, insert missing object fields, avoid unnecessary cloning, use SERIALIZABLE isolation level for object updates
1 parent 1f9b3aa commit 41f1e28

File tree

2 files changed

+154
-117
lines changed

2 files changed

+154
-117
lines changed

crates/iota-indexer/src/store/mod.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,46 @@ pub mod diesel_macro {
9595
}};
9696
}
9797

98+
#[macro_export]
99+
macro_rules! serializable_transactional_blocking_with_retry {
100+
($pool:expr, $query:expr, $max_elapsed:expr) => {{
101+
use $crate::{
102+
db::{PoolConnection, get_pool_connection},
103+
errors::IndexerError,
104+
};
105+
let mut backoff = backoff::ExponentialBackoff::default();
106+
backoff.max_elapsed_time = Some($max_elapsed);
107+
let result = match backoff::retry(backoff, || {
108+
let mut pool_conn =
109+
get_pool_connection($pool).map_err(|e| backoff::Error::Transient {
110+
err: IndexerError::PostgresWrite(e.to_string()),
111+
retry_after: None,
112+
})?;
113+
pool_conn
114+
.as_any_mut()
115+
.downcast_mut::<PoolConnection>()
116+
.unwrap()
117+
.build_transaction()
118+
.read_write()
119+
.serializable()
120+
.run($query)
121+
.map_err(|e| {
122+
tracing::error!("Error with persisting data into DB: {:?}, retrying...", e);
123+
backoff::Error::Transient {
124+
err: IndexerError::PostgresWrite(e.to_string()),
125+
retry_after: None,
126+
}
127+
})
128+
}) {
129+
Ok(v) => Ok(v),
130+
Err(backoff::Error::Transient { err, .. }) => Err(err),
131+
Err(backoff::Error::Permanent(err)) => Err(err),
132+
};
133+
134+
result
135+
}};
136+
}
137+
98138
#[macro_export]
99139
macro_rules! spawn_read_only_blocking {
100140
($pool:expr, $query:expr, $repeatable_read:expr) => {{

crates/iota-indexer/src/store/pg_indexer_store.rs

Lines changed: 114 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ use crate::{
6666
tx_changed_objects, tx_digests, tx_input_objects, tx_insertion_order, tx_kinds,
6767
tx_recipients, tx_senders,
6868
},
69-
transactional_blocking_with_retry,
69+
serializable_transactional_blocking_with_retry, transactional_blocking_with_retry,
7070
types::{
7171
EventIndex, IndexedCheckpoint, IndexedDeletedObject, IndexedEvent, IndexedObject,
7272
IndexedPackage, IndexedTransaction, TxIndex,
@@ -377,8 +377,11 @@ impl PgIndexerStore {
377377
let mut coin_type_vec = Vec::with_capacity(chunk_size);
378378
let mut coin_balance_vec = Vec::with_capacity(chunk_size);
379379
let mut df_kind_vec = Vec::with_capacity(chunk_size);
380+
let mut object_type_package_vec = Vec::with_capacity(chunk_size);
381+
let mut object_type_module_vec = Vec::with_capacity(chunk_size);
382+
let mut object_type_name_vec = Vec::with_capacity(chunk_size);
380383

381-
for obj in mutated_object_mutation_chunk.iter().cloned() {
384+
for obj in mutated_object_mutation_chunk {
382385
object_id_vec.push(obj.object_id);
383386
object_version_vec.push(obj.object_version);
384387
object_digest_vec.push(obj.object_digest);
@@ -389,86 +392,94 @@ impl PgIndexerStore {
389392
coin_type_vec.push(obj.coin_type);
390393
coin_balance_vec.push(obj.coin_balance);
391394
df_kind_vec.push(obj.df_kind);
395+
object_type_package_vec.push(obj.object_type_package);
396+
object_type_module_vec.push(obj.object_type_module);
397+
object_type_name_vec.push(obj.object_type_name);
392398
}
393399

394-
let query = diesel::sql_query(
395-
r#"
396-
WITH new_data AS (
397-
SELECT
398-
unnest($1::bytea[]) AS object_id,
399-
unnest($2::bigint[]) AS object_version,
400-
unnest($3::bytea[]) AS object_digest,
401-
unnest($4::smallint[]) AS owner_type,
402-
unnest($5::bytea[]) AS owner_id,
403-
unnest($6::text[]) AS object_type,
404-
unnest($7::bytea[]) AS serialized_object,
405-
unnest($8::text[]) AS coin_type,
406-
unnest($9::bigint[]) AS coin_balance,
407-
unnest($10::smallint[]) AS df_kind
408-
),
409-
locked_objects AS (
410-
SELECT o.*
411-
FROM objects o
412-
JOIN new_data nd ON o.object_id = nd.object_id
413-
FOR UPDATE
414-
),
415-
locked_deletes AS (
416-
SELECT del.*
417-
FROM optimistic_deleted_objects_versions del
418-
JOIN new_data nd ON del.object_id = nd.object_id
419-
FOR SHARE
420-
)
421-
INSERT INTO objects (
422-
object_id,
423-
object_version,
424-
object_digest,
425-
owner_type,
426-
owner_id,
427-
object_type,
428-
serialized_object,
429-
coin_type,
430-
coin_balance,
431-
df_kind
432-
)
433-
SELECT nd.*
434-
FROM new_data nd
435-
LEFT JOIN optimistic_deleted_objects_versions del
436-
ON del.object_id = nd.object_id
437-
WHERE COALESCE(del.object_version, -1) < nd.object_version
438-
ON CONFLICT (object_id)
439-
DO UPDATE SET
440-
object_version = EXCLUDED.object_version,
441-
object_digest = EXCLUDED.object_digest,
442-
owner_type = EXCLUDED.owner_type,
443-
owner_id = EXCLUDED.owner_id,
444-
object_type = EXCLUDED.object_type,
445-
serialized_object = EXCLUDED.serialized_object,
446-
coin_type = EXCLUDED.coin_type,
447-
coin_balance = EXCLUDED.coin_balance,
448-
df_kind = EXCLUDED.df_kind
449-
WHERE
450-
EXCLUDED.object_version > objects.object_version;
451-
"#,
452-
)
453-
.bind::<sql_types::Array<sql_types::Binary>, _>(object_id_vec)
454-
.bind::<sql_types::Array<sql_types::BigInt>, _>(object_version_vec)
455-
.bind::<sql_types::Array<sql_types::Binary>, _>(object_digest_vec)
456-
.bind::<sql_types::Array<sql_types::SmallInt>, _>(owner_type_vec)
457-
.bind::<sql_types::Array<sql_types::Nullable<sql_types::Binary>>, _>(owner_id_vec)
458-
.bind::<sql_types::Array<sql_types::Nullable<sql_types::Text>>, _>(object_type_vec)
459-
.bind::<sql_types::Array<sql_types::Binary>, _>(serialized_object_vec)
460-
.bind::<sql_types::Array<sql_types::Nullable<sql_types::Text>>, _>(coin_type_vec)
461-
.bind::<sql_types::Array<sql_types::Nullable<sql_types::BigInt>>, _>(coin_balance_vec)
462-
.bind::<sql_types::Array<sql_types::Nullable<sql_types::SmallInt>>, _>(df_kind_vec);
463-
464400
let guard = self
465401
.metrics
466402
.checkpoint_db_commit_latency_objects_chunks
467403
.start_timer();
468-
transactional_blocking_with_retry!(
404+
405+
serializable_transactional_blocking_with_retry!(
469406
&self.blocking_cp,
470407
|conn| {
471-
query.clone().execute(conn)?;
408+
diesel::sql_query(
409+
r#"
410+
WITH new_data AS (
411+
SELECT
412+
unnest($1::bytea[]) AS object_id,
413+
unnest($2::bigint[]) AS object_version,
414+
unnest($3::bytea[]) AS object_digest,
415+
unnest($4::smallint[]) AS owner_type,
416+
unnest($5::bytea[]) AS owner_id,
417+
unnest($6::text[]) AS object_type,
418+
unnest($7::bytea[]) AS serialized_object,
419+
unnest($8::text[]) AS coin_type,
420+
unnest($9::bigint[]) AS coin_balance,
421+
unnest($10::smallint[]) AS df_kind,
422+
unnest($11::bytea[]) AS object_type_package,
423+
unnest($12::text[]) AS object_type_module,
424+
unnest($13::text[]) AS object_type_name
425+
)
426+
INSERT INTO objects (
427+
object_id,
428+
object_version,
429+
object_digest,
430+
owner_type,
431+
owner_id,
432+
object_type,
433+
serialized_object,
434+
coin_type,
435+
coin_balance,
436+
df_kind,
437+
object_type_package,
438+
object_type_module,
439+
object_type_name
440+
)
441+
SELECT nd.*
442+
FROM new_data nd
443+
LEFT JOIN optimistic_deleted_objects_versions del
444+
ON del.object_id = nd.object_id
445+
WHERE COALESCE(del.object_version, -1) < nd.object_version
446+
ON CONFLICT (object_id)
447+
DO UPDATE SET
448+
object_version = EXCLUDED.object_version,
449+
object_digest = EXCLUDED.object_digest,
450+
owner_type = EXCLUDED.owner_type,
451+
owner_id = EXCLUDED.owner_id,
452+
object_type = EXCLUDED.object_type,
453+
serialized_object = EXCLUDED.serialized_object,
454+
coin_type = EXCLUDED.coin_type,
455+
coin_balance = EXCLUDED.coin_balance,
456+
df_kind = EXCLUDED.df_kind
457+
WHERE
458+
EXCLUDED.object_version > objects.object_version;
459+
"#,
460+
)
461+
.bind::<sql_types::Array<sql_types::Binary>, _>(&object_id_vec)
462+
.bind::<sql_types::Array<sql_types::BigInt>, _>(&object_version_vec)
463+
.bind::<sql_types::Array<sql_types::Binary>, _>(&object_digest_vec)
464+
.bind::<sql_types::Array<sql_types::SmallInt>, _>(&owner_type_vec)
465+
.bind::<sql_types::Array<sql_types::Nullable<sql_types::Binary>>, _>(&owner_id_vec)
466+
.bind::<sql_types::Array<sql_types::Nullable<sql_types::Text>>, _>(&object_type_vec)
467+
.bind::<sql_types::Array<sql_types::Binary>, _>(&serialized_object_vec)
468+
.bind::<sql_types::Array<sql_types::Nullable<sql_types::Text>>, _>(&coin_type_vec)
469+
.bind::<sql_types::Array<sql_types::Nullable<sql_types::BigInt>>, _>(
470+
&coin_balance_vec,
471+
)
472+
.bind::<sql_types::Array<sql_types::Nullable<sql_types::SmallInt>>, _>(&df_kind_vec)
473+
.bind::<sql_types::Array<sql_types::Nullable<sql_types::Binary>>, _>(
474+
&object_type_package_vec,
475+
)
476+
.bind::<sql_types::Array<sql_types::Nullable<sql_types::Text>>, _>(
477+
&object_type_module_vec,
478+
)
479+
.bind::<sql_types::Array<sql_types::Nullable<sql_types::Text>>, _>(
480+
&object_type_name_vec,
481+
)
482+
.execute(conn)?;
472483
Ok::<(), IndexerError>(())
473484
},
474485
PG_DB_COMMIT_SLEEP_DURATION
@@ -493,55 +504,41 @@ impl PgIndexerStore {
493504
let chunk_size = deleted_objects_chunk.len();
494505

495506
let (object_id_vec, object_version_vec): (Vec<_>, Vec<_>) = deleted_objects_chunk
507+
.clone()
496508
.into_iter()
497509
.map(|obj| (obj.object_id, obj.object_version))
498510
.unzip();
499511

500-
let query = diesel::sql_query(
501-
r#"
502-
WITH new_data AS (
503-
SELECT
504-
unnest($1::bytea[]) AS object_id,
505-
unnest($2::bigint[]) AS object_version
506-
),
507-
locked_objects AS (
508-
SELECT o.*
509-
FROM objects o
510-
JOIN new_data nd ON o.object_id = nd.object_id
511-
FOR UPDATE
512-
),
513-
locked_deletes AS (
514-
SELECT del.*
515-
FROM optimistic_deleted_objects_versions del
516-
JOIN new_data nd ON del.object_id = nd.object_id
517-
FOR UPDATE
518-
),
519-
deleted AS (
520-
DELETE FROM objects o
521-
USING new_data nd
522-
WHERE o.object_id = nd.object_id
523-
AND nd.object_version > o.object_version
524-
)
525-
INSERT INTO optimistic_deleted_objects_versions (object_id, object_version)
526-
SELECT object_id, object_version
527-
FROM new_data
528-
ON CONFLICT (object_id)
529-
DO UPDATE
530-
SET object_version = EXCLUDED.object_version
531-
WHERE EXCLUDED.object_version > optimistic_deleted_objects_versions.object_version;
532-
"#,
533-
)
534-
.bind::<sql_types::Array<sql_types::Bytea>, _>(object_id_vec)
535-
.bind::<sql_types::Array<sql_types::BigInt>, _>(object_version_vec);
536-
537-
transactional_blocking_with_retry!(
512+
serializable_transactional_blocking_with_retry!(
538513
&self.blocking_cp,
539514
|conn| {
540-
query
541-
.clone()
542-
.execute(conn)
543-
.map_err(IndexerError::from)
544-
.context("Failed to write object deletion to PostgresDB")?;
515+
diesel::sql_query(
516+
r#"
517+
WITH new_data AS (
518+
SELECT
519+
unnest($1::bytea[]) AS object_id,
520+
unnest($2::bigint[]) AS object_version
521+
),
522+
deleted AS (
523+
DELETE FROM objects o
524+
USING new_data nd
525+
WHERE o.object_id = nd.object_id
526+
AND nd.object_version > o.object_version
527+
)
528+
INSERT INTO optimistic_deleted_objects_versions (object_id, object_version)
529+
SELECT object_id, object_version
530+
FROM new_data
531+
ON CONFLICT (object_id)
532+
DO UPDATE
533+
SET object_version = EXCLUDED.object_version
534+
WHERE EXCLUDED.object_version > optimistic_deleted_objects_versions.object_version;
535+
"#,
536+
)
537+
.bind::<sql_types::Array<sql_types::Bytea>, _>(&object_id_vec)
538+
.bind::<sql_types::Array<sql_types::BigInt>, _>(&object_version_vec)
539+
.execute(conn)
540+
.map_err(IndexerError::from)
541+
.context("Failed to write object deletion to PostgresDB")?;
545542
Ok::<(), IndexerError>(())
546543
},
547544
PG_DB_COMMIT_SLEEP_DURATION

0 commit comments

Comments
 (0)