@@ -213,33 +213,33 @@ class BackupClient[T <: KafkaConsumerInterface](maybeS3Settings: Option[S3Settin
213213        for  {
214214          exists <-  checkObjectExists(previousState.previousKey)
215215        } yield 
216-         //  The backupToStorageTerminateSink gets called in response to finding in progress multipart uploads. If an S3 object exists
217-         //  the same key that means that in fact the upload has already been completed so in this case lets not do anything
218-         if  (exists) {
219-           logger.debug(
220-             s " Previous upload with uploadId:  ${previousState.stateDetails.state.uploadId} and key:  ${previousState.previousKey} doesn't actually exist, skipping terminating " 
221-           )
222-           Sink .ignore
223-         } else  {
224-           logger.info(
225-             s " Terminating and completing previous backup with key:  ${previousState.previousKey} and uploadId:  ${previousState.stateDetails.state.uploadId}" 
226-           )
227-           val  sink  =  S3 
228-             .resumeMultipartUploadWithHeaders(
229-               s3Config.dataBucket,
230-               previousState.previousKey,
231-               previousState.stateDetails.state.uploadId,
232-               previousState.stateDetails.state.parts,
233-               s3Headers =  s3Headers,
234-               chunkingParallelism =  1 
216+           //  The backupToStorageTerminateSink gets called in response to finding in progress multipart uploads. If an S3 object exists
217+           //  the same key that means that in fact the upload has already been completed so in this case lets not do anything
218+           if  (exists) {
219+             logger.debug(
220+               s " Previous upload with uploadId:  ${previousState.stateDetails.state.uploadId} and key:  ${previousState.previousKey} doesn't actually exist, skipping terminating " 
235221            )
236- 
237-           val  base  = 
238-             sink.mapMaterializedValue(future =>  future.map(result =>  Some (result))(ExecutionContext .parasitic))
239- 
240-           maybeS3Settings
241-             .fold(base)(s3Settings =>  base.withAttributes(S3Attributes .settings(s3Settings)))
242-         }
222+             Sink .ignore
223+           } else  {
224+             logger.info(
225+               s " Terminating and completing previous backup with key:  ${previousState.previousKey} and uploadId:  ${previousState.stateDetails.state.uploadId}" 
226+             )
227+             val  sink  =  S3 
228+               .resumeMultipartUploadWithHeaders(
229+                 s3Config.dataBucket,
230+                 previousState.previousKey,
231+                 previousState.stateDetails.state.uploadId,
232+                 previousState.stateDetails.state.parts,
233+                 s3Headers =  s3Headers,
234+                 chunkingParallelism =  1 
235+               )
236+ 
237+             val  base  = 
238+               sink.mapMaterializedValue(future =>  future.map(result =>  Some (result))(ExecutionContext .parasitic))
239+ 
240+             maybeS3Settings
241+               .fold(base)(s3Settings =>  base.withAttributes(S3Attributes .settings(s3Settings)))
242+           }
243243
244244      }
245245    }
0 commit comments