@@ -13,6 +13,7 @@ use async_trait::async_trait;
1313use diesel:: {
1414 ExpressionMethods , OptionalExtension , QueryDsl , RunQueryDsl ,
1515 dsl:: { max, min} ,
16+ sql_types,
1617 upsert:: excluded,
1718} ;
1819use downcast:: Any ;
@@ -48,11 +49,12 @@ use crate::{
4849 transactions:: { OptimisticTransaction , StoredTransaction , TxInsertionOrder } ,
4950 tx_indices:: OptimisticTxIndices ,
5051 } ,
51- on_conflict_do_update, on_conflict_do_update_wf, persist_chunk_into_table, read_only_blocking,
52+ on_conflict_do_update, on_conflict_do_update_with_condition, persist_chunk_into_table,
53+ read_only_blocking,
5254 schema:: {
5355 chain_identifier, checkpoints, display, epochs, event_emit_module, event_emit_package,
5456 event_senders, event_struct_instantiation, event_struct_module, event_struct_name,
55- event_struct_package, events, feature_flags, objects , objects_history, objects_snapshot,
57+ event_struct_package, events, feature_flags, objects_history, objects_snapshot,
5658 objects_version, optimistic_event_emit_module, optimistic_event_emit_package,
5759 optimistic_event_senders, optimistic_event_struct_instantiation,
5860 optimistic_event_struct_module, optimistic_event_struct_name,
@@ -339,7 +341,7 @@ impl PgIndexerStore {
339341 transactional_blocking_with_retry ! (
340342 & self . blocking_cp,
341343 |conn| {
342- on_conflict_do_update !(
344+ on_conflict_do_update_with_condition !(
343345 display:: table,
344346 display_updates. values( ) . collect:: <Vec <_>>( ) ,
345347 display:: object_type,
@@ -348,6 +350,7 @@ impl PgIndexerStore {
348350 display:: version. eq( excluded( display:: version) ) ,
349351 display:: bcs. eq( excluded( display:: bcs) ) ,
350352 ) ,
353+ excluded( display:: version) . gt( display:: version) ,
351354 conn
352355 ) ;
353356 Ok :: <( ) , IndexerError >( ( ) )
@@ -362,6 +365,100 @@ impl PgIndexerStore {
362365 & self ,
363366 mutated_object_mutation_chunk : Vec < StoredObject > ,
364367 ) -> Result < ( ) , IndexerError > {
368+ let mut object_id_vec = Vec :: new ( ) ;
369+ let mut object_version_vec = Vec :: new ( ) ;
370+ let mut object_digest_vec = Vec :: new ( ) ;
371+ let mut owner_type_vec = Vec :: new ( ) ;
372+ let mut owner_id_vec = Vec :: new ( ) ;
373+ let mut object_type_vec = Vec :: new ( ) ;
374+ let mut serialized_object_vec = Vec :: new ( ) ;
375+ let mut coin_type_vec = Vec :: new ( ) ;
376+ let mut coin_balance_vec = Vec :: new ( ) ;
377+ let mut df_kind_vec = Vec :: new ( ) ;
378+
379+ for obj in mutated_object_mutation_chunk. iter ( ) . cloned ( ) {
380+ object_id_vec. push ( obj. object_id ) ;
381+ object_version_vec. push ( obj. object_version ) ;
382+ object_digest_vec. push ( obj. object_digest ) ;
383+ owner_type_vec. push ( obj. owner_type ) ;
384+ owner_id_vec. push ( obj. owner_id ) ;
385+ object_type_vec. push ( obj. object_type ) ;
386+ serialized_object_vec. push ( obj. serialized_object ) ;
387+ coin_type_vec. push ( obj. coin_type ) ;
388+ coin_balance_vec. push ( obj. coin_balance ) ;
389+ df_kind_vec. push ( obj. df_kind ) ;
390+ }
391+
392+ let query = diesel:: sql_query (
393+ r#"
394+ WITH new_data AS (
395+ SELECT
396+ unnest($1::bytea[]) AS object_id,
397+ unnest($2::bigint[]) AS object_version,
398+ unnest($3::bytea[]) AS object_digest,
399+ unnest($4::smallint[]) AS owner_type,
400+ unnest($5::bytea[] ) AS owner_id,
401+ unnest($6::text[] ) AS object_type,
402+ unnest($7::bytea[]) AS serialized_object,
403+ unnest($8::text[] ) AS coin_type,
404+ unnest($9::bigint[]) AS coin_balance,
405+ unnest($10::smallint[]) AS df_kind
406+ ),
407+ locked_objects AS (
408+ SELECT o.*
409+ FROM objects o
410+ JOIN new_data nd ON o.object_id = nd.object_id
411+ FOR UPDATE
412+ ),
413+ locked_deletes AS (
414+ SELECT del.*
415+ FROM deleted_objects_versions del
416+ JOIN new_data nd ON del.object_id = nd.object_id
417+ FOR SHARE
418+ )
419+ INSERT INTO objects (
420+ object_id,
421+ object_version,
422+ object_digest,
423+ owner_type,
424+ owner_id,
425+ object_type,
426+ serialized_object,
427+ coin_type,
428+ coin_balance,
429+ df_kind
430+ )
431+ SELECT nd.*
432+ FROM new_data nd
433+ LEFT JOIN deleted_objects_versions del
434+ ON del.object_id = nd.object_id
435+ WHERE COALESCE(del.object_version, -1) < nd.object_version
436+ ON CONFLICT (object_id)
437+ DO UPDATE SET
438+ object_version = EXCLUDED.object_version,
439+ object_digest = EXCLUDED.object_digest,
440+ owner_type = EXCLUDED.owner_type,
441+ owner_id = EXCLUDED.owner_id,
442+ object_type = EXCLUDED.object_type,
443+ serialized_object = EXCLUDED.serialized_object,
444+ coin_type = EXCLUDED.coin_type,
445+ coin_balance = EXCLUDED.coin_balance,
446+ df_kind = EXCLUDED.df_kind
447+ WHERE
448+ EXCLUDED.object_version > objects.object_version;
449+ "# ,
450+ )
451+ . bind :: < sql_types:: Array < sql_types:: Binary > , _ > ( object_id_vec)
452+ . bind :: < sql_types:: Array < sql_types:: BigInt > , _ > ( object_version_vec)
453+ . bind :: < sql_types:: Array < sql_types:: Binary > , _ > ( object_digest_vec)
454+ . bind :: < sql_types:: Array < sql_types:: SmallInt > , _ > ( owner_type_vec)
455+ . bind :: < sql_types:: Array < sql_types:: Nullable < sql_types:: Binary > > , _ > ( owner_id_vec)
456+ . bind :: < sql_types:: Array < sql_types:: Nullable < sql_types:: Text > > , _ > ( object_type_vec)
457+ . bind :: < sql_types:: Array < sql_types:: Binary > , _ > ( serialized_object_vec)
458+ . bind :: < sql_types:: Array < sql_types:: Nullable < sql_types:: Text > > , _ > ( coin_type_vec)
459+ . bind :: < sql_types:: Array < sql_types:: Nullable < sql_types:: BigInt > > , _ > ( coin_balance_vec)
460+ . bind :: < sql_types:: Array < sql_types:: Nullable < sql_types:: SmallInt > > , _ > ( df_kind_vec) ;
461+
365462 let guard = self
366463 . metrics
367464 . checkpoint_db_commit_latency_objects_chunks
@@ -370,24 +467,7 @@ impl PgIndexerStore {
370467 transactional_blocking_with_retry ! (
371468 & self . blocking_cp,
372469 |conn| {
373- on_conflict_do_update_wf!(
374- objects:: table,
375- mutated_object_mutation_chunk. clone( ) ,
376- objects:: object_id,
377- (
378- objects:: object_id. eq( excluded( objects:: object_id) ) ,
379- objects:: object_version. eq( excluded( objects:: object_version) ) ,
380- objects:: object_digest. eq( excluded( objects:: object_digest) ) ,
381- objects:: owner_type. eq( excluded( objects:: owner_type) ) ,
382- objects:: owner_id. eq( excluded( objects:: owner_id) ) ,
383- objects:: object_type. eq( excluded( objects:: object_type) ) ,
384- objects:: serialized_object. eq( excluded( objects:: serialized_object) ) ,
385- objects:: coin_type. eq( excluded( objects:: coin_type) ) ,
386- objects:: coin_balance. eq( excluded( objects:: coin_balance) ) ,
387- objects:: df_kind. eq( excluded( objects:: df_kind) ) ,
388- ) ,
389- conn
390- ) ;
470+ query. clone( ) . execute( conn) ?;
391471 Ok :: <( ) , IndexerError >( ( ) )
392472 } ,
393473 PG_DB_COMMIT_SLEEP_DURATION
@@ -410,23 +490,53 @@ impl PgIndexerStore {
410490 . checkpoint_db_commit_latency_objects_chunks
411491 . start_timer ( ) ;
412492 let len = deleted_objects_chunk. len ( ) ;
493+
494+ let ( object_id_vec, object_version_vec) : ( Vec < _ > , Vec < _ > ) = deleted_objects_chunk
495+ . into_iter ( )
496+ . map ( |obj| ( obj. object_id , obj. object_version ) )
497+ . unzip ( ) ;
498+
499+ let query = diesel:: sql_query (
500+ r#"
501+ WITH new_data AS (
502+ SELECT
503+ unnest($1::bytea[]) AS object_id,
504+ unnest($2::bigint[]) AS object_version
505+ ),
506+ locked_objects AS (
507+ SELECT o.*
508+ FROM objects o
509+ JOIN new_data nd ON o.object_id = nd.object_id
510+ FOR UPDATE
511+ ),
512+ locked_deletes AS (
513+ SELECT del.*
514+ FROM deleted_objects_versions del
515+ JOIN new_data nd ON del.object_id = nd.object_id
516+ FOR UPDATE
517+ ),
518+ deleted AS (
519+ DELETE FROM objects o
520+ USING new_data nd
521+ WHERE o.object_id = nd.object_id
522+ AND nd.object_version > o.object_version
523+ )
524+ INSERT INTO deleted_objects_versions (object_id, object_version)
525+ SELECT object_id, object_version
526+ FROM new_data
527+ ON CONFLICT (object_id)
528+ DO UPDATE
529+ SET object_version = EXCLUDED.object_version
530+ WHERE EXCLUDED.object_version > deleted_objects_versions.object_version;
531+ "# ,
532+ )
533+ . bind :: < sql_types:: Array < sql_types:: Bytea > , _ > ( object_id_vec)
534+ . bind :: < sql_types:: Array < sql_types:: BigInt > , _ > ( object_version_vec) ;
535+
413536 transactional_blocking_with_retry ! (
414537 & self . blocking_cp,
415538 |conn| {
416- diesel:: delete(
417- objects:: table. filter(
418- objects:: object_id. eq_any(
419- deleted_objects_chunk
420- . iter( )
421- . map( |o| o. object_id. clone( ) )
422- . collect:: <Vec <_>>( ) ,
423- ) ,
424- ) ,
425- )
426- . execute( conn)
427- . map_err( IndexerError :: from)
428- . context( "Failed to write object deletion to PostgresDB" ) ?;
429-
539+ query. clone( ) . execute( conn) ?;
430540 Ok :: <( ) , IndexerError >( ( ) )
431541 } ,
432542 PG_DB_COMMIT_SLEEP_DURATION
@@ -677,17 +787,17 @@ impl PgIndexerStore {
677787 } ,
678788 PG_DB_COMMIT_SLEEP_DURATION
679789 )
680- . tap_ok ( |_| {
681- let elapsed = guard. stop_and_record ( ) ;
682- info ! (
790+ . tap_ok ( |_| {
791+ let elapsed = guard. stop_and_record ( ) ;
792+ info ! (
683793 elapsed,
684794 "Persisted {} checkpoints" ,
685795 stored_checkpoints. len( )
686796 ) ;
687- } )
688- . tap_err ( |e| {
689- tracing:: error!( "Failed to persist checkpoints with error: {}" , e) ;
690- } )
797+ } )
798+ . tap_err ( |e| {
799+ tracing:: error!( "Failed to persist checkpoints with error: {}" , e) ;
800+ } )
691801 }
692802
693803 fn persist_transactions_chunk (
0 commit comments