@@ -8,13 +8,14 @@ use itertools::Itertools;
88use lazy_static:: lazy_static;
99use opentelemetry:: trace:: { Span , TraceContextExt , Tracer } ;
1010use opentelemetry:: KeyValue ;
11- use prometheus:: { register_int_counter, IntCounter } ;
11+ use prometheus:: { register_histogram , register_int_counter, Histogram , IntCounter } ;
1212use scheduler:: dfg:: types:: { DFGTxInput , SchedulerError } ;
1313use scheduler:: dfg:: { build_component_nodes, ComponentNode , DFComponentGraph , DFGOp } ;
1414use scheduler:: dfg:: { scheduler:: Scheduler , types:: DFGTaskInput } ;
1515use sqlx:: types:: Uuid ;
1616use sqlx:: Postgres ;
1717use sqlx:: { postgres:: PgListener , query, Acquire } ;
18+ use std:: time:: SystemTime ;
1819use std:: {
1920 collections:: { BTreeSet , HashMap } ,
2021 num:: NonZeroUsize ,
@@ -55,12 +56,20 @@ lazy_static! {
5556 "work items successfully processed and stored in the database"
5657 )
5758 . unwrap( ) ;
59+ static ref WORK_ITEMS_QUERY_HISTOGRAM : Histogram = register_histogram!(
60+ "coprocessor_tfhe_worker_query_work_items_seconds" ,
61+ "Histogram of time spent querying work items in tfhe-worker" ,
62+ vec![ 0.001 , 0.005 , 0.01 , 0.05 , 0.1 , 0.25 , 0.5 , 0.75 , 1.0 , 2.0 , 5.0 , 10.0 ]
63+ )
64+ . unwrap( ) ;
5865}
5966
6067pub async fn run_tfhe_worker (
6168 args : crate :: daemon_cli:: Args ,
6269 health_check : crate :: health_check:: HealthCheck ,
6370) -> Result < ( ) , Box < dyn std:: error:: Error + Send + Sync > > {
71+ // Determine worker ID to use for the lifetime of this process
72+ // In case of a failure in tfhe_worker_cycle, the same id must be reused to quickly unlock any held locks
6473 let worker_id = args. worker_id . unwrap_or ( Uuid :: new_v4 ( ) ) ;
6574 info ! ( target: "tfhe_worker" , worker_id = %worker_id, "Starting tfhe-worker service" ) ;
6675 loop {
@@ -92,10 +101,11 @@ async fn tfhe_worker_cycle(
92101 let mut listener = PgListener :: connect_with ( & pool) . await ?;
93102 listener. listen ( "work_available" ) . await ?;
94103
95- let mut deps_chain_mngr = dependence_chain:: LockMngr :: new ( worker_id, pool. clone ( ) ) ;
104+ let mut dcid_mngr =
105+ dependence_chain:: LockMngr :: new_with_ttl ( worker_id, pool. clone ( ) , args. dcid_ttl_sec ) ;
96106
97107 // Release all owned locks on startup to avoid stale locks
98- deps_chain_mngr . release_all_owned_locks ( ) . await ?;
108+ dcid_mngr . release_all_owned_locks ( ) . await ?;
99109
100110 #[ cfg( feature = "bench" ) ]
101111 populate_cache_with_tenant_keys ( vec ! [ 1i32 ] , & pool, & tenant_key_cache) . await ?;
@@ -131,18 +141,26 @@ async fn tfhe_worker_cycle(
131141 args,
132142 & health_check,
133143 & mut trx,
134- & mut deps_chain_mngr ,
144+ & mut dcid_mngr ,
135145 & tracer,
136146 & loop_ctx,
137147 )
138148 . await ?;
139149 if transactions. is_empty ( ) {
140- deps_chain_mngr . release_current_lock ( ) . await ?;
150+ dcid_mngr . release_current_lock ( ) . await ?;
141151
142152 // Lock another dependence chain if available and
143153 // continue processing without waiting for notification
144- let ( lock, _) = deps_chain_mngr. acquire_next_lock ( ) . await ?;
145- immedially_poll_more_work = lock. is_some ( ) ;
154+ let mut s = tracer. start_with_context ( "query_dependence_chain" , & loop_ctx) ;
155+
156+ let ( dependence_chain_id, _) = dcid_mngr. acquire_next_lock ( ) . await ?;
157+ immedially_poll_more_work = dependence_chain_id. is_some ( ) ;
158+
159+ s. set_attribute ( KeyValue :: new (
160+ "dependence_chain_id" ,
161+ format ! ( "{:?}" , dependence_chain_id. as_ref( ) . map( hex:: encode) ) ,
162+ ) ) ;
163+ s. end ( ) ;
146164
147165 continue ;
148166 } else {
@@ -161,7 +179,12 @@ async fn tfhe_worker_cycle(
161179
162180 // Execute transactions segregated by tenant
163181 for ( tenant_id, ref mut tenant_txs) in transactions. iter_mut ( ) {
164- deps_chain_mngr. extend_current_lock ( ) . await ?;
182+ if dcid_mngr. extend_current_lock ( ) . await ?. is_none ( ) {
183+ // best-effort attempt to extend the lock and prevent other replicas from trying to lock the same DCID.
184+ // Worst-case scenario, it returns None if the lock has expired.
185+ // However, the worker has already secured exclusive access to the txn computations in the Computations table.
186+ warn ! ( target: "tfhe_worker" , tenant_id = %tenant_id, "Lost dcid lock while processing transactions" ) ;
187+ }
165188
166189 let mut tx_graph = build_transaction_graph_and_execute (
167190 tenant_id,
@@ -178,7 +201,7 @@ async fn tfhe_worker_cycle(
178201 & mut tx_graph,
179202 & mut unneeded_handles,
180203 & mut trx,
181- & mut deps_chain_mngr ,
204+ & mut dcid_mngr ,
182205 & tracer,
183206 & loop_ctx,
184207 )
@@ -324,8 +347,7 @@ async fn query_for_work<'a>(
324347 ( Vec < ( i32 , Vec < ComponentNode > ) > , Vec < ( Handle , Handle ) > ) ,
325348 Box < dyn std:: error:: Error + Send + Sync > ,
326349> {
327- // This query locks our work items so other worker doesn't select them.
328- let mut s = tracer. start_with_context ( "query_work_items" , loop_ctx) ;
350+ let mut s = tracer. start_with_context ( "query_dependence_chain" , loop_ctx) ;
329351
330352 // Lock dependence chain
331353 let ( dependence_chain_id, locking_reason) = match deps_chain_mngr. extend_current_lock ( ) . await ? {
@@ -343,7 +365,11 @@ async fn query_for_work<'a>(
343365 "dependence_chain_id" ,
344366 format ! ( "{:?}" , dependence_chain_id. as_ref( ) . map( hex:: encode) ) ,
345367 ) ) ;
368+ s. end ( ) ;
346369
370+ // This query locks our work items so other worker doesn't select them.
371+ let mut s = tracer. start_with_context ( "query_work_items" , loop_ctx) ;
372+ let started_at = SystemTime :: now ( ) ;
347373 let the_work = query ! (
348374 "
349375WITH selected_computations AS (
@@ -398,6 +424,8 @@ FOR UPDATE SKIP LOCKED ",
398424 error ! ( target: "tfhe_worker" , { error = %err } , "error while querying work items" ) ;
399425 err
400426 } ) ?;
427+
428+ WORK_ITEMS_QUERY_HISTOGRAM . observe ( started_at. elapsed ( ) . unwrap_or_default ( ) . as_secs_f64 ( ) ) ;
401429 s. set_attribute ( KeyValue :: new ( "count" , the_work. len ( ) as i64 ) ) ;
402430 s. end ( ) ;
403431 health_check. update_db_access ( ) ;
0 commit comments