@@ -568,8 +568,7 @@ async fn upload_transaction_graph_results<'a>(
568568) -> Result < ( ) , Box < dyn std:: error:: Error + Send + Sync > > {
569569 // Get computation results
570570 let graph_results = tx_graph. get_results ( ) ;
571- let mut handles_to_update = tx_graph. get_intermediate_handles ( ) ;
572- handles_to_update. append ( unneeded_handles) ;
571+ let mut handles_to_update = vec ! [ ] ;
573572
574573 // Traverse computations that have been scheduled and
575574 // upload their results/errors.
@@ -641,24 +640,25 @@ async fn upload_transaction_graph_results<'a>(
641640 }
642641 }
643642 }
644- let mut s = tracer. start_with_context ( "insert_ct_into_db" , loop_ctx) ;
645- s. set_attribute ( KeyValue :: new ( "tenant_id" , * tenant_id as i64 ) ) ;
646- s. set_attributes (
647- cts_to_insert
648- . iter ( )
649- . map ( |( _, ( h, ( _, ( _, _) ) ) ) | KeyValue :: new ( "handle" , format ! ( "0x{}" , hex:: encode( h) ) ) ) ,
650- ) ;
651- s. set_attributes (
652- cts_to_insert
653- . iter ( )
654- . map ( |( _, ( _, ( _, ( _, db_type) ) ) ) | KeyValue :: new ( "ciphertext_type" , * db_type as i64 ) ) ,
655- ) ;
656- #[ allow( clippy:: type_complexity) ]
657- let ( tenant_ids, ( handles, ( ciphertexts, ( ciphertext_versions, ciphertext_types) ) ) ) : (
658- Vec < _ > ,
659- ( Vec < _ > , ( Vec < _ > , ( Vec < _ > , Vec < _ > ) ) ) ,
660- ) = cts_to_insert. into_iter ( ) . unzip ( ) ;
661- let _ = query ! (
643+ if !cts_to_insert. is_empty ( ) {
644+ let mut s = tracer. start_with_context ( "insert_ct_into_db" , loop_ctx) ;
645+ s. set_attribute ( KeyValue :: new ( "tenant_id" , * tenant_id as i64 ) ) ;
646+ s. set_attributes (
647+ cts_to_insert. iter ( ) . map ( |( _, ( h, ( _, ( _, _) ) ) ) | {
648+ KeyValue :: new ( "handle" , format ! ( "0x{}" , hex:: encode( h) ) )
649+ } ) ,
650+ ) ;
651+ s. set_attributes (
652+ cts_to_insert. iter ( ) . map ( |( _, ( _, ( _, ( _, db_type) ) ) ) | {
653+ KeyValue :: new ( "ciphertext_type" , * db_type as i64 )
654+ } ) ,
655+ ) ;
656+ #[ allow( clippy:: type_complexity) ]
657+ let ( tenant_ids, ( handles, ( ciphertexts, ( ciphertext_versions, ciphertext_types) ) ) ) : (
658+ Vec < _ > ,
659+ ( Vec < _ > , ( Vec < _ > , ( Vec < _ > , Vec < _ > ) ) ) ,
660+ ) = cts_to_insert. into_iter ( ) . unzip ( ) ;
661+ let _ = query ! (
662662 "
663663 INSERT INTO ciphertexts(tenant_id, handle, ciphertext, ciphertext_version, ciphertext_type)
664664 SELECT * FROM UNNEST($1::INTEGER[], $2::BYTEA[], $3::BYTEA[], $4::SMALLINT[], $5::SMALLINT[])
@@ -670,24 +670,24 @@ async fn upload_transaction_graph_results<'a>(
670670 error ! ( target: "tfhe_worker" , { tenant_id = * tenant_id, error = %err } , "error while inserting new ciphertexts" ) ;
671671 err
672672 } ) ?;
673- // Notify all workers that new ciphertext is inserted
674- // For now, it's only the SnS workers that are listening for these events
675- let _ = sqlx:: query!( "SELECT pg_notify($1, '')" , EVENT_CIPHERTEXT_COMPUTED )
676- . execute ( trx. as_mut ( ) )
677- . await ?;
678- s. end ( ) ;
679-
680- let mut s = tracer. start_with_context ( "update_computation" , loop_ctx) ;
681- s. set_attribute ( KeyValue :: new ( "tenant_id" , * tenant_id as i64 ) ) ;
682- s. set_attributes (
683- handles_to_update
684- . iter ( )
685- . map ( |( h, _) | KeyValue :: new ( "handle" , format ! ( "0x{}" , hex:: encode( h) ) ) ) ,
686- ) ;
687-
688- let ( handles_vec, txn_ids_vec) : ( Vec < _ > , Vec < _ > ) = handles_to_update. into_iter ( ) . unzip ( ) ;
673+ // Notify all workers that new ciphertext is inserted
674+ // For now, it's only the SnS workers that are listening for these events
675+ let _ = sqlx:: query!( "SELECT pg_notify($1, '')" , EVENT_CIPHERTEXT_COMPUTED )
676+ . execute ( trx. as_mut ( ) )
677+ . await ?;
678+ s. end ( ) ;
679+ }
689680
690- let _ = query ! (
681+ if !handles_to_update. is_empty ( ) {
682+ let mut s = tracer. start_with_context ( "update_computation" , loop_ctx) ;
683+ s. set_attribute ( KeyValue :: new ( "tenant_id" , * tenant_id as i64 ) ) ;
684+ s. set_attributes (
685+ handles_to_update
686+ . iter ( )
687+ . map ( |( h, _) | KeyValue :: new ( "handle" , format ! ( "0x{}" , hex:: encode( h) ) ) ) ,
688+ ) ;
689+ let ( handles_vec, txn_ids_vec) : ( Vec < _ > , Vec < _ > ) = handles_to_update. into_iter ( ) . unzip ( ) ;
690+ let _ = query ! (
691691 "
692692 UPDATE computations
693693 SET is_completed = true, completed_at = CURRENT_TIMESTAMP
@@ -706,9 +706,8 @@ async fn upload_transaction_graph_results<'a>(
706706 error ! ( target: "tfhe_worker" , { tenant_id = * tenant_id, error = %err } , "error while updating computations as completed" ) ;
707707 err
708708 } ) ?;
709-
710- s. end ( ) ;
711-
709+ s. end ( ) ;
710+ }
712711 Ok ( ( ) )
713712}
714713
0 commit comments