@@ -145,7 +145,7 @@ async fn tfhe_worker_cycle(
145145 s. end ( ) ;
146146
147147 // Query for transactions to execute, and if relevant the associated keys
148- let ( mut transactions, mut unneeded_handles , has_more_work) = query_for_work (
148+ let ( mut transactions, _ , has_more_work) = query_for_work (
149149 args,
150150 & health_check,
151151 & mut trx,
@@ -215,7 +215,6 @@ async fn tfhe_worker_cycle(
215215 upload_transaction_graph_results (
216216 tenant_id,
217217 & mut tx_graph,
218- & mut unneeded_handles,
219218 & mut trx,
220219 & mut dcid_mngr,
221220 & tracer,
@@ -560,16 +559,14 @@ async fn build_transaction_graph_and_execute<'a>(
560559async fn upload_transaction_graph_results < ' a > (
561560 tenant_id : & i32 ,
562561 tx_graph : & mut DFComponentGraph ,
563- unneeded_handles : & mut Vec < ( Handle , Handle ) > ,
564562 trx : & mut sqlx:: Transaction < ' a , Postgres > ,
565563 deps_mngr : & mut dependence_chain:: LockMngr ,
566564 tracer : & opentelemetry:: global:: BoxedTracer ,
567565 loop_ctx : & opentelemetry:: Context ,
568566) -> Result < ( ) , Box < dyn std:: error:: Error + Send + Sync > > {
569567 // Get computation results
570568 let graph_results = tx_graph. get_results ( ) ;
571- let mut handles_to_update = tx_graph. get_intermediate_handles ( ) ;
572- handles_to_update. append ( unneeded_handles) ;
569+ let mut handles_to_update = vec ! [ ] ;
573570
574571 // Traverse computations that have been scheduled and
575572 // upload their results/errors.
@@ -641,24 +638,25 @@ async fn upload_transaction_graph_results<'a>(
641638 }
642639 }
643640 }
644- let mut s = tracer. start_with_context ( "insert_ct_into_db" , loop_ctx) ;
645- s. set_attribute ( KeyValue :: new ( "tenant_id" , * tenant_id as i64 ) ) ;
646- s. set_attributes (
647- cts_to_insert
648- . iter ( )
649- . map ( |( _, ( h, ( _, ( _, _) ) ) ) | KeyValue :: new ( "handle" , format ! ( "0x{}" , hex:: encode( h) ) ) ) ,
650- ) ;
651- s. set_attributes (
652- cts_to_insert
653- . iter ( )
654- . map ( |( _, ( _, ( _, ( _, db_type) ) ) ) | KeyValue :: new ( "ciphertext_type" , * db_type as i64 ) ) ,
655- ) ;
656- #[ allow( clippy:: type_complexity) ]
657- let ( tenant_ids, ( handles, ( ciphertexts, ( ciphertext_versions, ciphertext_types) ) ) ) : (
658- Vec < _ > ,
659- ( Vec < _ > , ( Vec < _ > , ( Vec < _ > , Vec < _ > ) ) ) ,
660- ) = cts_to_insert. into_iter ( ) . unzip ( ) ;
661- let _ = query ! (
641+ if !cts_to_insert. is_empty ( ) {
642+ let mut s = tracer. start_with_context ( "insert_ct_into_db" , loop_ctx) ;
643+ s. set_attribute ( KeyValue :: new ( "tenant_id" , * tenant_id as i64 ) ) ;
644+ s. set_attributes (
645+ cts_to_insert. iter ( ) . map ( |( _, ( h, ( _, ( _, _) ) ) ) | {
646+ KeyValue :: new ( "handle" , format ! ( "0x{}" , hex:: encode( h) ) )
647+ } ) ,
648+ ) ;
649+ s. set_attributes (
650+ cts_to_insert. iter ( ) . map ( |( _, ( _, ( _, ( _, db_type) ) ) ) | {
651+ KeyValue :: new ( "ciphertext_type" , * db_type as i64 )
652+ } ) ,
653+ ) ;
654+ #[ allow( clippy:: type_complexity) ]
655+ let ( tenant_ids, ( handles, ( ciphertexts, ( ciphertext_versions, ciphertext_types) ) ) ) : (
656+ Vec < _ > ,
657+ ( Vec < _ > , ( Vec < _ > , ( Vec < _ > , Vec < _ > ) ) ) ,
658+ ) = cts_to_insert. into_iter ( ) . unzip ( ) ;
659+ let _ = query ! (
662660 "
663661 INSERT INTO ciphertexts(tenant_id, handle, ciphertext, ciphertext_version, ciphertext_type)
664662 SELECT * FROM UNNEST($1::INTEGER[], $2::BYTEA[], $3::BYTEA[], $4::SMALLINT[], $5::SMALLINT[])
@@ -670,24 +668,24 @@ async fn upload_transaction_graph_results<'a>(
670668 error ! ( target: "tfhe_worker" , { tenant_id = * tenant_id, error = %err } , "error while inserting new ciphertexts" ) ;
671669 err
672670 } ) ?;
673- // Notify all workers that new ciphertext is inserted
674- // For now, it's only the SnS workers that are listening for these events
675- let _ = sqlx:: query!( "SELECT pg_notify($1, '')" , EVENT_CIPHERTEXT_COMPUTED )
676- . execute ( trx. as_mut ( ) )
677- . await ?;
678- s. end ( ) ;
679-
680- let mut s = tracer. start_with_context ( "update_computation" , loop_ctx) ;
681- s. set_attribute ( KeyValue :: new ( "tenant_id" , * tenant_id as i64 ) ) ;
682- s. set_attributes (
683- handles_to_update
684- . iter ( )
685- . map ( |( h, _) | KeyValue :: new ( "handle" , format ! ( "0x{}" , hex:: encode( h) ) ) ) ,
686- ) ;
687-
688- let ( handles_vec, txn_ids_vec) : ( Vec < _ > , Vec < _ > ) = handles_to_update. into_iter ( ) . unzip ( ) ;
671+ // Notify all workers that new ciphertext is inserted
672+ // For now, it's only the SnS workers that are listening for these events
673+ let _ = sqlx:: query!( "SELECT pg_notify($1, '')" , EVENT_CIPHERTEXT_COMPUTED )
674+ . execute ( trx. as_mut ( ) )
675+ . await ?;
676+ s. end ( ) ;
677+ }
689678
690- let _ = query ! (
679+ if !handles_to_update. is_empty ( ) {
680+ let mut s = tracer. start_with_context ( "update_computation" , loop_ctx) ;
681+ s. set_attribute ( KeyValue :: new ( "tenant_id" , * tenant_id as i64 ) ) ;
682+ s. set_attributes (
683+ handles_to_update
684+ . iter ( )
685+ . map ( |( h, _) | KeyValue :: new ( "handle" , format ! ( "0x{}" , hex:: encode( h) ) ) ) ,
686+ ) ;
687+ let ( handles_vec, txn_ids_vec) : ( Vec < _ > , Vec < _ > ) = handles_to_update. into_iter ( ) . unzip ( ) ;
688+ let _ = query ! (
691689 "
692690 UPDATE computations
693691 SET is_completed = true, completed_at = CURRENT_TIMESTAMP
@@ -706,9 +704,8 @@ async fn upload_transaction_graph_results<'a>(
706704 error ! ( target: "tfhe_worker" , { tenant_id = * tenant_id, error = %err } , "error while updating computations as completed" ) ;
707705 err
708706 } ) ?;
709-
710- s. end ( ) ;
711-
707+ s. end ( ) ;
708+ }
712709 Ok ( ( ) )
713710}
714711
0 commit comments