1- use crate :: { Config , ExecutionError , HandleItem , S3Config } ;
1+ use crate :: { Config , ExecutionError , HandleItem , S3Config , UploadJob } ;
22use aws_config:: retry:: RetryConfig ;
33use aws_config:: timeout:: TimeoutConfig ;
44use aws_config:: BehaviorVersion ;
@@ -32,8 +32,8 @@ pub const EVENT_CIPHERTEXTS_UPLOADED: &str = "event_ciphertexts_uploaded";
3232/// Process the S3 uploads
3333pub ( crate ) async fn process_s3_uploads (
3434 conf : & Config ,
35- mut tasks : mpsc:: Receiver < HandleItem > ,
36- tasks_tx : mpsc:: Sender < HandleItem > ,
35+ mut jobs : mpsc:: Receiver < UploadJob > ,
36+ jobs_tx : mpsc:: Sender < UploadJob > ,
3737 token : CancellationToken ,
3838) -> Result < ( ) , ExecutionError > {
3939 // Client construction is expensive due to connection thread pool initialization, and should
@@ -56,7 +56,7 @@ pub(crate) async fn process_s3_uploads(
5656 let token = token. clone ( ) ;
5757 let pool = pool. clone ( ) ;
5858 async move {
59- do_resubmits_loop ( client, pool, & conf, tasks_tx , token, is_ready)
59+ do_resubmits_loop ( client, pool, & conf, jobs_tx , token, is_ready)
6060 . await
6161 . unwrap_or_else ( |err| {
6262 error ! ( "Failed to spawn do_resubmits_loop: {}" , err) ;
@@ -71,41 +71,50 @@ pub(crate) async fn process_s3_uploads(
7171
7272 loop {
7373 select ! {
74- task = tasks . recv( ) => {
75- let task = match task {
76- Some ( task ) => task ,
74+ job = jobs . recv( ) => {
75+ let job = match job {
76+ Some ( job ) => job ,
7777 None => return Ok ( ( ) ) ,
7878 } ;
79- debug!( "Received task, handle: {}" , hex:: encode( & task. handle) ) ;
80-
81- let mut trx_lock = pool. begin( ) . await ?;
82- task. enqueue_upload_task( & mut trx_lock) . await ?;
83-
84- if let Err ( err) = sqlx:: query!(
85- "SELECT * FROM ciphertext_digest
86- WHERE handle = $2 AND tenant_id = $1 AND
87- (ciphertext128 IS NULL OR ciphertext IS NULL)
88- FOR UPDATE SKIP LOCKED" ,
89- task. tenant_id,
90- task. handle,
91- )
92- . fetch_one( trx_lock. as_mut( ) )
93- . await {
94- error!( "Failed to lock pending uploads {}, handle: {}" , err, compact_hex( & task. handle) ) ;
95- trx_lock. rollback( ) . await ?;
96- continue ;
97- }
9879
9980 if !is_ready. load( Ordering :: Acquire ) {
10081 // If the S3 setup is not ready, we need to wait for its ready status
10182 // before we can continue spawning uploading job
10283 info!( "Upload task skipped, S3 connection still not ready" ) ;
103-
104- // Ensure that the uploading task is enqueued in the DB
105- trx_lock. commit( ) . await ?;
10684 continue ;
10785 }
10886
87+ let mut trx = pool. begin( ) . await ?;
88+
89+ let item = match job {
90+ UploadJob :: Normal ( item) => item,
91+ UploadJob :: DatabaseLock ( item) => {
92+ if let Err ( err) = sqlx:: query!(
93+ "SELECT * FROM ciphertext_digest
94+ WHERE handle = $2 AND tenant_id = $1 AND
95+ (ciphertext128 IS NULL OR ciphertext IS NULL)
96+ FOR UPDATE SKIP LOCKED" ,
97+ item. tenant_id,
98+ item. handle,
99+ )
100+ . fetch_one( trx. as_mut( ) )
101+ . await
102+ {
103+ warn!(
104+ "Failed to lock pending uploads {}, handle: {}" ,
105+ err,
106+ compact_hex( & item. handle)
107+ ) ;
108+ trx. rollback( ) . await ?;
109+ continue ;
110+ }
111+ item
112+ } ,
113+ } ;
114+
115+
116+ debug!( "Received task, handle: {}" , hex:: encode( & item. handle) ) ;
117+
109118 // Cleanup completed tasks
110119 upload_jobs. retain( |h| !h. is_finished( ) ) ;
111120
@@ -130,8 +139,8 @@ pub(crate) async fn process_s3_uploads(
130139
131140 // Spawn a new task to upload the ciphertexts
132141 let h = tokio:: spawn( async move {
133- let s = task . otel. child_span( "upload_s3" ) ;
134- if let Err ( err) = upload_ciphertexts( trx_lock , task , & client, & conf) . await {
142+ let s = item . otel. child_span( "upload_s3" ) ;
143+ if let Err ( err) = upload_ciphertexts( trx , item , & client, & conf) . await {
135144 if let ExecutionError :: S3TransientError ( _) = err {
136145 ready_flag. store( false , Ordering :: Release ) ;
137146 info!( "S3 setup is not ready, due to transient error: {}" , err) ;
@@ -339,7 +348,7 @@ fn compute_digest(ct: &[u8]) -> Vec<u8> {
339348async fn fetch_pending_uploads (
340349 db_pool : & Pool < Postgres > ,
341350 limit : i64 ,
342- ) -> Result < Vec < HandleItem > , ExecutionError > {
351+ ) -> Result < Vec < UploadJob > , ExecutionError > {
343352 let rows = sqlx:: query!(
344353 "SELECT tenant_id, handle, ciphertext, ciphertext128
345354 FROM ciphertext_digest
@@ -351,7 +360,7 @@ async fn fetch_pending_uploads(
351360 . fetch_all ( db_pool)
352361 . await ?;
353362
354- let mut tasks = Vec :: new ( ) ;
363+ let mut jobs = Vec :: new ( ) ;
355364
356365 for row in rows {
357366 let mut ct64_compressed = Arc :: new ( Vec :: new ( ) ) ;
@@ -404,17 +413,20 @@ async fn fetch_pending_uploads(
404413 }
405414
406415 if !ct64_compressed. is_empty ( ) || !ct128_uncompressed. is_empty ( ) {
407- tasks . push ( HandleItem {
416+ let item = HandleItem {
408417 tenant_id : row. tenant_id ,
409418 handle : handle. clone ( ) ,
410419 ct64_compressed,
411420 ct128_uncompressed,
412421 otel : telemetry:: tracer_with_handle ( "recovery_task" , handle) ,
413- } ) ;
422+ } ;
423+
424+ // Instruct the uploader to acquire DB lock when processing the item
425+ jobs. push ( UploadJob :: DatabaseLock ( item) ) ;
414426 }
415427 }
416428
417- Ok ( tasks )
429+ Ok ( jobs )
418430}
419431
420432/// Resubmit for uploading ciphertexts.
@@ -424,7 +436,7 @@ async fn do_resubmits_loop(
424436 client : Arc < aws_sdk_s3:: Client > ,
425437 pool : Arc < Pool < Postgres > > ,
426438 conf : & Config ,
427- tasks : mpsc:: Sender < HandleItem > ,
439+ tasks : mpsc:: Sender < UploadJob > ,
428440 token : CancellationToken ,
429441 is_ready : Arc < AtomicBool > ,
430442) -> Result < ( ) , ExecutionError > {
@@ -472,7 +484,7 @@ async fn do_resubmits_loop(
472484async fn try_resubmit (
473485 pool : & PgPool ,
474486 is_ready : Arc < AtomicBool > ,
475- tasks : mpsc:: Sender < HandleItem > ,
487+ tasks : mpsc:: Sender < UploadJob > ,
476488 token : CancellationToken ,
477489) -> Result < ( ) , ExecutionError > {
478490 if !is_ready. load ( Ordering :: SeqCst ) {
@@ -482,18 +494,18 @@ async fn try_resubmit(
482494
483495 match fetch_pending_uploads ( pool, 10 ) . await {
484496 // TODO: const, token
485- Ok ( recovery_tasks ) => {
497+ Ok ( jobs ) => {
486498 info ! (
487499 target: "worker" ,
488500 action = "retry_s3_uploads" ,
489501 "Fetched {} pending uploads from the database" ,
490- recovery_tasks . len( )
502+ jobs . len( )
491503 ) ;
492504 // Resubmit for uploading ciphertexts
493- for task in recovery_tasks {
505+ for task in jobs {
494506 select ! {
495507 _ = tasks. send( task. clone( ) ) => {
496- debug!( "Task sent to upload worker, handle: {}" , hex:: encode( & task. handle) ) ;
508+ // TODO: debug!("Task sent to upload worker, handle: {}", hex::encode(&task.handle));
497509 } ,
498510 _ = token. cancelled( ) => {
499511 return Ok ( ( ) )
0 commit comments