@@ -3,14 +3,16 @@ package io.aiven.guardian.kafka.backup.s3
33import akka .actor .ActorSystem
44import akka .kafka .ProducerSettings
55import akka .kafka .scaladsl .Producer
6+ import akka .stream .KillSwitches
7+ import akka .stream .SharedKillSwitch
68import akka .stream .alpakka .s3 .S3Settings
79import akka .stream .alpakka .s3 .scaladsl .S3
810import akka .stream .scaladsl .Sink
911import akka .stream .scaladsl .Source
12+ import akka .util .ByteString
1013import io .aiven .guardian .akka .AnyPropTestKit
1114import io .aiven .guardian .kafka .Generators .KafkaDataInChunksWithTimePeriod
1215import io .aiven .guardian .kafka .Generators .kafkaDataWithMinSizeGen
13- import io .aiven .guardian .kafka .KafkaClient
1416import io .aiven .guardian .kafka .KafkaClusterTest
1517import io .aiven .guardian .kafka .Utils ._
1618import io .aiven .guardian .kafka .backup .configs .Backup
@@ -26,6 +28,7 @@ import org.apache.kafka.clients.producer.ProducerRecord
2628import org .apache .kafka .common .serialization .ByteArraySerializer
2729import org .mdedetrich .akka .stream .support .CirceStreamSupport
2830
31+ import scala .concurrent .Future
2932import scala .concurrent .duration ._
3033import scala .jdk .CollectionConverters ._
3134import scala .jdk .FutureConverters ._
@@ -47,12 +50,26 @@ class RealS3BackupClientSpec
4750 override lazy val bucketPrefix : Option [String ] = Some (" guardian-" )
4851 override lazy val enableCleanup : Option [FiniteDuration ] = Some (5 seconds)
4952
53+ case object TerminationException extends Exception (" termination-exception" )
54+
5055 def reducedConsumerRecordsToJson (reducedConsumerRecords : List [ReducedConsumerRecord ]): Array [Byte ] = {
5156 import io .aiven .guardian .kafka .codecs .Circe ._
5257 import io .circe .syntax ._
5358 reducedConsumerRecords.asJson.noSpaces.getBytes
5459 }
5560
61+ def createKafkaClient (
62+ killSwitch : SharedKillSwitch
63+ )(implicit kafkaClusterConfig : KafkaCluster ): KafkaClientWithKillSwitch =
64+ new KafkaClientWithKillSwitch (
65+ configure = Some (
66+ _.withBootstrapServers(
67+ container.bootstrapServers
68+ ).withGroupId(" test-group" )
69+ ),
70+ killSwitch
71+ )
72+
5673 property(" suspend/resume works correctly" ) {
5774 forAll(kafkaDataWithMinSizeGen(S3 .MinChunkSize , 5 , reducedConsumerRecordsToJson),
5875 s3ConfigGen(useVirtualDotHost, bucketPrefix)
@@ -72,18 +89,15 @@ class RealS3BackupClientSpec
7289 ProducerSettings (system, new ByteArraySerializer , new ByteArraySerializer )
7390 .withBootstrapServers(container.bootstrapServers)
7491
75- val backupClient = new BackupClient (Some (s3Settings))(new KafkaClient (
76- configure = Some (
77- _.withBootstrapServers(
78- container.bootstrapServers
79- ).withGroupId(" test-group" )
80- )
81- ),
82- implicitly,
83- implicitly,
84- implicitly,
85- implicitly
86- )
92+ val firstKillSwitch = KillSwitches .shared(" first-kill-switch" )
93+
94+ val backupClient =
95+ new BackupClient (Some (s3Settings))(createKafkaClient(firstKillSwitch),
96+ implicitly,
97+ implicitly,
98+ implicitly,
99+ implicitly
100+ )
87101
88102 val asProducerRecord = data.map { reducedConsumerRecord =>
89103 val keyAsByteArray = Base64 .getDecoder.decode(reducedConsumerRecord.key)
@@ -110,45 +124,57 @@ class RealS3BackupClientSpec
110124 val calculatedFuture = for {
111125 _ <- createTopics.all().toCompletableFuture.asScala
112126 _ <- createBucket(s3Config.dataBucket)
113- firstControl = backupClient.backup.run()
114- _ = baseSource.runWith(Producer .plainSink(producerSettings))
115- _ <- akka.pattern.after(15 seconds)(firstControl.stop())
116- secondBackupClient = new BackupClient ( Some (s3Settings))( new KafkaClient (
117- configure = Some (
118- _.withBootstrapServers(
119- container.bootstrapServers
120- ).withGroupId( " test-group " )
121- )
122- ) ,
123- implicitly,
124- implicitly,
125- implicitly,
126- implicitly
127- )
128- secondControl = secondBackupClient.backup.run( )
129- _ <- akka.pattern.after( 15 seconds)(secondControl.stop() )
127+ _ = backupClient.backup.run()
128+ _ = baseSource.runWith(Producer .plainSink(producerSettings))
129+ _ <- akka.pattern.after(15 seconds)(Future {
130+ firstKillSwitch.abort( TerminationException )
131+ })
132+ secondKillSwitch = KillSwitches .shared( " second-kill-switch " )
133+ secondBackupClient =
134+ new BackupClient ( Some (s3Settings))(createKafkaClient(secondKillSwitch),
135+ implicitly,
136+ implicitly ,
137+ implicitly,
138+ implicitly
139+ )
140+ _ <- akka.pattern.after( 10 seconds)( Future (secondBackupClient.backup.run()))
141+ _ <- akka.pattern.after( 30 seconds)( Future {
142+ secondKillSwitch.abort( TerminationException )
143+ } )
130144 // Need to manually merge the bucket since we haven't gone over time window
131- incomplete <- S3 .listMultipartUpload(s3Config.dataBucket, None ).withAttributes(s3Attrs).runWith(Sink .seq)
145+ incomplete <- akka.pattern.after(10 seconds)(
146+ S3 .listMultipartUpload(s3Config.dataBucket, None ).withAttributes(s3Attrs).runWith(Sink .seq)
147+ )
132148
133149 grouped = incomplete.groupBy(_.key)
134150
135151 (key, uploadIds) = grouped.head
136- uploadId = uploadIds.head
152+ upload = uploadIds.head
137153
138- parts <- S3 .listParts(s3Config.dataBucket, key, uploadId .uploadId).runWith(Sink .seq)
154+ parts <- S3 .listParts(s3Config.dataBucket, key, upload .uploadId).runWith(Sink .seq)
139155
140- _ <- S3 .completeMultipartUpload(s3Config.dataBucket, key, uploadId.uploadId, parts.map(_.toPart))
156+ _ <- Source
157+ .single(ByteString (" null]" ))
158+ .runWith(
159+ S3 .resumeMultipartUpload(s3Config.dataBucket, key, upload.uploadId, parts.map(_.toPart))
160+ .withAttributes(s3Attrs)
161+ )
141162
142163 downloaded <-
143- akka.pattern.after(5 .seconds)(
144- S3 .download(s3Config.dataBucket, key).withAttributes(s3Attrs).runWith(Sink .head).flatMap {
145- case Some ((downloadSource, _)) =>
146- downloadSource.via(CirceStreamSupport .decode[List [ReducedConsumerRecord ]]).runWith(Sink .seq)
147- case None => throw new Exception (s " Expected object in bucket ${s3Config.dataBucket} with key $key" )
148- }
164+ akka.pattern.after(10 seconds)(
165+ S3 .download(s3Config.dataBucket, key)
166+ .withAttributes(s3Attrs)
167+ .runWith(Sink .head)
168+ .flatMap {
169+ case Some ((downloadSource, _)) =>
170+ downloadSource.via(CirceStreamSupport .decode[List [Option [ReducedConsumerRecord ]]]).runWith(Sink .seq)
171+ case None => throw new Exception (s " Expected object in bucket ${s3Config.dataBucket} with key $key" )
172+ }
149173 )
150174
151- } yield downloaded.flatten
175+ } yield downloaded.toList.flatten.collect { case Some (reducedConsumerRecord) =>
176+ reducedConsumerRecord
177+ }
152178
153179 val downloaded = calculatedFuture.futureValue
154180
@@ -161,7 +187,7 @@ class RealS3BackupClientSpec
161187 }
162188 .toMap
163189
164- val inputAsKey = kafkaDataInChunksWithTimePeriod. data.flatten
190+ val inputAsKey = data
165191 .groupBy(_.key)
166192 .view
167193 .mapValues { reducedConsumerRecords =>
0 commit comments