11package io .aiven .guardian .kafka .backup .s3
22
33import akka .actor .ActorSystem
4+ import akka .kafka .ProducerSettings
5+ import akka .kafka .scaladsl .Producer
46import akka .stream .alpakka .s3 .S3Settings
7+ import akka .stream .alpakka .s3 .scaladsl .S3
8+ import akka .stream .scaladsl .Sink
9+ import akka .stream .scaladsl .Source
510import io .aiven .guardian .akka .AnyPropTestKit
11+ import io .aiven .guardian .kafka .Generators .KafkaDataInChunksWithTimePeriod
12+ import io .aiven .guardian .kafka .Generators .kafkaDataWithMinSizeGen
13+ import io .aiven .guardian .kafka .KafkaClient
14+ import io .aiven .guardian .kafka .KafkaClusterTest
15+ import io .aiven .guardian .kafka .Utils ._
16+ import io .aiven .guardian .kafka .backup .configs .Backup
17+ import io .aiven .guardian .kafka .codecs .Circe ._
18+ import io .aiven .guardian .kafka .configs .KafkaCluster
19+ import io .aiven .guardian .kafka .models .ReducedConsumerRecord
20+ import io .aiven .guardian .kafka .s3 .Generators .s3ConfigGen
21+ import io .aiven .guardian .kafka .s3 .configs .{S3 => S3Config }
22+ import org .apache .kafka .clients .CommonClientConfigs
23+ import org .apache .kafka .clients .admin .AdminClient
24+ import org .apache .kafka .clients .admin .NewTopic
25+ import org .apache .kafka .clients .producer .ProducerRecord
26+ import org .apache .kafka .common .serialization .ByteArraySerializer
27+ import org .mdedetrich .akka .stream .support .CirceStreamSupport
628
729import scala .concurrent .duration ._
30+ import scala .jdk .CollectionConverters ._
31+ import scala .jdk .FutureConverters ._
832import scala .language .postfixOps
933
10- class RealS3BackupClientSpec extends AnyPropTestKit (ActorSystem (" RealS3BackupClientSpec" )) with BackupClientSpec {
34+ import java .time .temporal .ChronoUnit
35+ import java .util .Base64
36+
37+ class RealS3BackupClientSpec
38+ extends AnyPropTestKit (ActorSystem (" RealS3BackupClientSpec" ))
39+ with BackupClientSpec
40+ with KafkaClusterTest {
1141 override lazy val s3Settings : S3Settings = S3Settings ()
1242
1343 /** Virtual Dot Host in bucket names are disabled because you need an actual DNS certificate otherwise AWS will fail
@@ -16,4 +46,130 @@ class RealS3BackupClientSpec extends AnyPropTestKit(ActorSystem("RealS3BackupCli
1646 override lazy val useVirtualDotHost : Boolean = false
1747 override lazy val bucketPrefix : Option [String ] = Some (" guardian-" )
1848 override lazy val enableCleanup : Option [FiniteDuration ] = Some (5 seconds)
49+
50+ def reducedConsumerRecordsToJson (reducedConsumerRecords : List [ReducedConsumerRecord ]): Array [Byte ] = {
51+ import io .aiven .guardian .kafka .codecs .Circe ._
52+ import io .circe .syntax ._
53+ reducedConsumerRecords.asJson.noSpaces.getBytes
54+ }
55+
56+ property(" suspend/resume works correctly" ) {
57+ forAll(kafkaDataWithMinSizeGen(S3 .MinChunkSize , 5 , reducedConsumerRecordsToJson),
58+ s3ConfigGen(useVirtualDotHost, bucketPrefix)
59+ ) { (kafkaDataInChunksWithTimePeriod : KafkaDataInChunksWithTimePeriod , s3Config : S3Config ) =>
60+ logger.info(s " Data bucket is ${s3Config.dataBucket}" )
61+
62+ val data = kafkaDataInChunksWithTimePeriod.data.flatten
63+
64+ val topics = data.map(_.topic).toSet
65+
66+ implicit val kafkaClusterConfig : KafkaCluster = KafkaCluster (topics)
67+
68+ implicit val config : S3Config = s3Config
69+ implicit val backupConfig : Backup = Backup (kafkaDataInChunksWithTimePeriod.periodSlice, Some (ChronoUnit .DAYS ))
70+
71+ val producerSettings =
72+ ProducerSettings (system, new ByteArraySerializer , new ByteArraySerializer )
73+ .withBootstrapServers(container.bootstrapServers)
74+
75+ val backupClient = new BackupClient (Some (s3Settings))(new KafkaClient (
76+ configure = Some (
77+ _.withBootstrapServers(
78+ container.bootstrapServers
79+ ).withGroupId(" test-group" )
80+ )
81+ ),
82+ implicitly,
83+ implicitly,
84+ implicitly,
85+ implicitly
86+ )
87+
88+ val asProducerRecord = data.map { reducedConsumerRecord =>
89+ val keyAsByteArray = Base64 .getDecoder.decode(reducedConsumerRecord.key)
90+ val valueAsByteArray = Base64 .getDecoder.decode(reducedConsumerRecord.value)
91+ new ProducerRecord [Array [Byte ], Array [Byte ]](reducedConsumerRecord.topic, keyAsByteArray, valueAsByteArray)
92+ }
93+
94+ val baseSource = {
95+ val secondsToMillis = (30 seconds).toMillis
96+ val topicsPerMillis = asProducerRecord.size / secondsToMillis
97+ Source (asProducerRecord).throttle(topicsPerMillis.toInt, 1 millis)
98+ }
99+
100+ val adminClient = AdminClient .create(
101+ Map [String , AnyRef ](
102+ CommonClientConfigs .BOOTSTRAP_SERVERS_CONFIG -> container.bootstrapServers
103+ ).asJava
104+ )
105+
106+ val createTopics = adminClient.createTopics(topics.map { topic =>
107+ new NewTopic (topic, 1 , 1 .toShort)
108+ }.asJava)
109+
110+ val calculatedFuture = for {
111+ _ <- createTopics.all().toCompletableFuture.asScala
112+ _ <- createBucket(s3Config.dataBucket)
113+ firstControl = backupClient.backup.run()
114+ _ = baseSource.runWith(Producer .plainSink(producerSettings))
115+ _ <- akka.pattern.after(15 seconds)(firstControl.stop())
116+ secondBackupClient = new BackupClient (Some (s3Settings))(new KafkaClient (
117+ configure = Some (
118+ _.withBootstrapServers(
119+ container.bootstrapServers
120+ ).withGroupId(" test-group" )
121+ )
122+ ),
123+ implicitly,
124+ implicitly,
125+ implicitly,
126+ implicitly
127+ )
128+ secondControl = secondBackupClient.backup.run()
129+ _ <- akka.pattern.after(15 seconds)(secondControl.stop())
130+ // Need to manually merge the bucket since we haven't gone over time window
131+ incomplete <- S3 .listMultipartUpload(s3Config.dataBucket, None ).withAttributes(s3Attrs).runWith(Sink .seq)
132+
133+ grouped = incomplete.groupBy(_.key)
134+
135+ (key, uploadIds) = grouped.head
136+ uploadId = uploadIds.head
137+
138+ parts <- S3 .listParts(s3Config.dataBucket, key, uploadId.uploadId).runWith(Sink .seq)
139+
140+ _ <- S3 .completeMultipartUpload(s3Config.dataBucket, key, uploadId.uploadId, parts.map(_.toPart))
141+
142+ downloaded <-
143+ akka.pattern.after(5 .seconds)(
144+ S3 .download(s3Config.dataBucket, key).withAttributes(s3Attrs).runWith(Sink .head).flatMap {
145+ case Some ((downloadSource, _)) =>
146+ downloadSource.via(CirceStreamSupport .decode[List [ReducedConsumerRecord ]]).runWith(Sink .seq)
147+ case None => throw new Exception (s " Expected object in bucket ${s3Config.dataBucket} with key $key" )
148+ }
149+ )
150+
151+ } yield downloaded.flatten
152+
153+ val downloaded = calculatedFuture.futureValue
154+
155+ // Only care about ordering when it comes to key
156+ val downloadedGroupedAsKey = downloaded
157+ .groupBy(_.key)
158+ .view
159+ .mapValues { reducedConsumerRecords =>
160+ reducedConsumerRecords.map(_.value)
161+ }
162+ .toMap
163+
164+ val inputAsKey = kafkaDataInChunksWithTimePeriod.data.flatten
165+ .groupBy(_.key)
166+ .view
167+ .mapValues { reducedConsumerRecords =>
168+ reducedConsumerRecords.map(_.value)
169+ }
170+ .toMap
171+
172+ downloadedGroupedAsKey mustEqual inputAsKey
173+ }
174+ }
19175}
0 commit comments