@@ -74,15 +74,31 @@ pub(crate) async fn process_s3_uploads(
7474 Some ( task) => task,
7575 None => return Ok ( ( ) ) ,
7676 } ;
77-
78- let trx = insert_and_lock( task. tenant_id, task. handle. clone( ) , & pool) . await ?;
77+ debug!( "Received task, handle: {}" , hex:: encode( & task. handle) ) ;
78+
79+ let mut trx_lock = pool. begin( ) . await ?;
80+ task. enqueue_upload_task( & mut trx_lock) . await ?;
81+
82+ if let Err ( err) = sqlx:: query!(
83+ "SELECT * FROM ciphertext_digest
84+ WHERE handle = $2 AND tenant_id = $1 AND
85+ (ciphertext128 IS NULL OR ciphertext IS NULL)
86+ FOR UPDATE SKIP LOCKED" ,
87+ task. tenant_id,
88+ task. handle,
89+ )
90+ . fetch_one( trx_lock. as_mut( ) )
91+ . await {
92+ error!( "Failed to lock pending uploads {}, handle: {}" , err, compact_hex( & task. handle) ) ;
93+ trx_lock. rollback( ) . await ?;
94+ continue ;
95+ }
7996
8097 if !is_ready. load( Ordering :: SeqCst ) {
8198 // If the S3 setup is not ready, we need to wait for its ready status
8299 // before we can continue spawning uploading job
83100 info!( "Upload task skipped, S3 connection still not ready" ) ;
84- // Queue the uploading job in the database
85- trx. commit( ) . await ?;
101+ trx_lock. commit( ) . await ?;
86102 continue ;
87103 }
88104
@@ -111,7 +127,7 @@ pub(crate) async fn process_s3_uploads(
111127 // Spawn a new task to upload the ciphertexts
112128 let h = tokio:: spawn( async move {
113129 let s = task. otel. child_span( "upload_s3" ) ;
114- if let Err ( err) = upload_ciphertexts( trx , task, & client, & conf) . await {
130+ if let Err ( err) = upload_ciphertexts( trx_lock , task, & client, & conf) . await {
115131 if let ExecutionError :: S3TransientError ( _) = err {
116132 ready_flag. store( false , Ordering :: SeqCst ) ;
117133 info!( "S3 setup is not ready, due to transient error: {}" , err) ;
@@ -146,40 +162,6 @@ pub(crate) async fn process_s3_uploads(
146162 }
147163}
148164
149- async fn insert_and_lock (
150- tenant_id : i32 ,
151- handle : Vec < u8 > ,
152- pool : & PgPool ,
153- ) -> Result < Transaction < ' static , Postgres > , ExecutionError > {
154- let mut trx = pool. begin ( ) . await ?;
155-
156- sqlx:: query!(
157- "INSERT INTO ciphertext_digest (tenant_id, handle)
158- VALUES ($1, $2) ON CONFLICT DO NOTHING" ,
159- tenant_id,
160- & handle,
161- )
162- . execute ( trx. as_mut ( ) )
163- . await ?;
164-
165- trx. commit ( ) . await ?;
166-
167- let mut trx = pool. begin ( ) . await ?;
168-
169- sqlx:: query!(
170- "SELECT * FROM ciphertext_digest
171- WHERE handle = $2 AND tenant_id = $1 AND
172- (ciphertext128 IS NULL OR ciphertext IS NULL)
173- FOR UPDATE SKIP LOCKED" ,
174- tenant_id,
175- handle,
176- )
177- . fetch_all ( trx. as_mut ( ) )
178- . await ?;
179-
180- Ok ( trx)
181- }
182-
183165enum UploadResult {
184166 CtType128 ( ( Vec < u8 > , BoxedSpan ) ) ,
185167 CtType64 ( ( Vec < u8 > , BoxedSpan ) ) ,
@@ -230,6 +212,14 @@ async fn upload_ciphertexts(
230212 . send ( ) ,
231213 UploadResult :: CtType128 ( ( ct128_digest. clone ( ) , span) ) ,
232214 ) ) ;
215+ } else {
216+ info ! (
217+ "ct128 already exists in S3, handle: {}, digest: {}" ,
218+ handle_as_hex,
219+ hex:: encode( & ct128_digest)
220+ ) ;
221+
222+ task. update_ct128_uploaded ( & mut trx, ct128_digest) . await ?;
233223 }
234224 }
235225
@@ -261,8 +251,14 @@ async fn upload_ciphertexts(
261251 . send ( ) ,
262252 UploadResult :: CtType64 ( ( ct64_digest. clone ( ) , span) ) ,
263253 ) ) ;
254+ } else {
255+ info ! (
256+ "ct64 already exists in S3, handle: {}, digest: {}" ,
257+ handle_as_hex,
258+ hex:: encode( & ct64_digest)
259+ ) ;
264260
265- // TODO: Update DB
261+ task . update_ct64_uploaded ( & mut trx , ct64_digest ) . await ? ;
266262 }
267263 }
268264
@@ -287,35 +283,7 @@ async fn upload_ciphertexts(
287283 telemetry:: end_span_with_err ( span, err. to_string ( ) ) ;
288284 transient_error = Some ( ExecutionError :: S3TransientError ( err. to_string ( ) ) ) ;
289285 } else {
290- sqlx:: query!(
291- "UPDATE ciphertext_digest
292- SET ciphertext128 = $1
293- WHERE handle = $2" ,
294- digest,
295- task. handle
296- )
297- . execute ( trx. as_mut ( ) )
298- . await ?;
299-
300- // Reset ciphertext128 as the ct128 has been successfully uploaded to S3
301- // NB: For reclaiming the disk-space in DB, we rely on auto vacuuming in
302- // Postgres
303-
304- sqlx:: query!(
305- "UPDATE ciphertexts
306- SET ciphertext128 = NULL
307- WHERE handle = $1" ,
308- task. handle
309- )
310- . execute ( trx. as_mut ( ) )
311- . await ?;
312-
313- info ! (
314- "Uploaded ct128, handle: {}, digest: {}" ,
315- handle_as_hex,
316- compact_hex( & digest)
317- ) ;
318-
286+ task. update_ct128_uploaded ( & mut trx, digest) . await ?;
319287 telemetry:: end_span_with_timestamp ( span, finish_time) ;
320288 }
321289 }
@@ -329,28 +297,13 @@ async fn upload_ciphertexts(
329297 telemetry:: end_span_with_err ( span, err. to_string ( ) ) ;
330298 transient_error = Some ( ExecutionError :: S3TransientError ( err. to_string ( ) ) ) ;
331299 } else {
332- sqlx:: query!(
333- "UPDATE ciphertext_digest
334- SET ciphertext = $1
335- WHERE handle = $2" ,
336- digest,
337- task. handle
338- )
339- . execute ( trx. as_mut ( ) )
340- . await ?;
341- info ! (
342- "Uploaded ct64, handle: {}, digest: {}" ,
343- handle_as_hex,
344- compact_hex( & digest)
345- ) ;
346-
300+ task. update_ct64_uploaded ( & mut trx, digest) . await ?;
347301 telemetry:: end_span_with_timestamp ( span, finish_time) ;
348302 }
349303 }
350304 }
351305 }
352306
353- // TODO: Move this notify in DB query
354307 sqlx:: query ( "SELECT pg_notify($1, '')" )
355308 . bind ( EVENT_CIPHERTEXTS_UPLOADED )
356309 . execute ( trx. as_mut ( ) )
0 commit comments