-
Notifications
You must be signed in to change notification settings - Fork 35
feat(indexer): Make concurrent modifications to objects
and display
table safe
#6377
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 4 Skipped Deployments
|
a2690fa
to
1f9b3aa
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm 🐙, a minor nit around cloning the query.
let mut coin_balance_vec = Vec::with_capacity(chunk_size); | ||
let mut df_kind_vec = Vec::with_capacity(chunk_size); | ||
|
||
for obj in mutated_object_mutation_chunk.iter().cloned() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for obj in mutated_object_mutation_chunk.iter().cloned() { | |
for obj in mutated_object_mutation_chunk { |
query | ||
.clone() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I'm wondering if cloning here is ok and does not introduce more memory consumption. Even tough if the transactional_blocking_with_retry
succeeds at the first try we still have two copies of the query
, I suppose the vectors are not small and potentially could increase the usage of the RAM. If cloning is not an concert then you can safely ignore my comment.
I was able to avoid cloning by moving the query inside the transactional_blocking_with_retry!
and using references for the vectors.
transactional_blocking_with_retry!(
&self.blocking_cp,
|conn| {
diesel::sql_query(
r#"
WITH new_data AS (
SELECT
unnest($1::bytea[]) AS object_id,
unnest($2::bigint[]) AS object_version
),
locked_objects AS (
SELECT o.*
FROM objects o
JOIN new_data nd ON o.object_id = nd.object_id
FOR UPDATE
),
locked_deletes AS (
SELECT del.*
FROM optimistic_deleted_objects_versions del
JOIN new_data nd ON del.object_id = nd.object_id
FOR UPDATE
),
deleted AS (
DELETE FROM objects o
USING new_data nd
WHERE o.object_id = nd.object_id
AND nd.object_version > o.object_version
)
INSERT INTO optimistic_deleted_objects_versions (object_id, object_version)
SELECT object_id, object_version
FROM new_data
ON CONFLICT (object_id)
DO UPDATE
SET object_version = EXCLUDED.object_version
WHERE EXCLUDED.object_version > optimistic_deleted_objects_versions.object_version;
"#,
)
.bind::<sql_types::Array<sql_types::Bytea>, _>(&object_id_vec)
.bind::<sql_types::Array<sql_types::BigInt>, _>(&object_version_vec)
.execute(conn)
.map_err(IndexerError::from)
.context("Failed to write object deletion to PostgresDB")?;
Ok::<(), IndexerError>(())
},
same approach was applied for the persist_object_mutation_chunk
method.
Please see the failing CI |
…ry cloning, use SERIALIZABLE isolation level for object updates
41f1e28
to
ee7dfe9
Compare
Description of change
Make concurrent modifications to
objects
anddisplay
table safe.It's a prerequisite for optimistic indexing, which will run concurrent to standard checkpoint indexing and may introduce race conditions.
Links to any relevant issues
fixes #6320
Type of change
How the change has been tested
cargo ci-clippy
cargo nextest run -p iota-indexer --features pg_integration --test-threads 1
Tested on local branch with optimistic indexing enabled, which results in concurrent updates of
objects
anddisplay
tables, the following scenarios were tested:No inconsistencies were spotted in the data returned via api when running read queries inbetween the modification requests.
Change checklist
Tick the boxes that are relevant to your changes, and delete any items that are not.