Skip to content

Commit d14913d

Browse files
committed
Add suspend and resume functionality
1 parent 4efd665 commit d14913d

File tree

10 files changed

+349
-36
lines changed

10 files changed

+349
-36
lines changed

backup-gcs/src/main/scala/io/aiven/guardian/kafka/backup/gcs/BackupClient.scala

+9-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.aiven.guardian.kafka.backup.gcs
22

3+
import akka.actor.ActorSystem
34
import akka.http.scaladsl.model.ContentTypes
45
import akka.stream.alpakka.google.GoogleAttributes
56
import akka.stream.alpakka.google.GoogleSettings
@@ -18,14 +19,21 @@ import scala.concurrent.Future
1819
class BackupClient[T <: KafkaClientInterface](maybeGoogleSettings: Option[GoogleSettings])(implicit
1920
override val kafkaClientInterface: T,
2021
override val backupConfig: Backup,
22+
override val system: ActorSystem,
2123
gcsConfig: GCSConfig
2224
) extends BackupClientInterface[T] {
2325

2426
override def empty: () => Future[Option[StorageObject]] = () => Future.successful(None)
2527

2628
override type BackupResult = Option[StorageObject]
2729

28-
override def backupToStorageSink(key: String): Sink[ByteString, Future[BackupResult]] = {
30+
override type CurrentState = Nothing
31+
32+
override def getCurrentUploadState(key: String): Future[Option[Nothing]] = Future.successful(None)
33+
34+
override def backupToStorageSink(key: String,
35+
currentState: Option[Nothing]
36+
): Sink[ByteString, Future[BackupResult]] = {
2937
val base = GCStorage
3038
.resumableUpload(gcsConfig.dataBucket, key, ContentTypes.`application/json`)
3139
.mapMaterializedValue(future => future.map(result => Some(result))(ExecutionContext.parasitic))
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
package io.aiven.guardian.kafka.backup.s3
22

3+
import akka.actor.ActorSystem
34
import akka.stream.alpakka.s3.MultipartUploadResult
5+
import akka.stream.alpakka.s3.Part
46
import akka.stream.alpakka.s3.S3Attributes
57
import akka.stream.alpakka.s3.S3Headers
68
import akka.stream.alpakka.s3.S3Settings
79
import akka.stream.alpakka.s3.scaladsl.S3
810
import akka.stream.scaladsl._
911
import akka.util.ByteString
12+
import com.typesafe.scalalogging.StrictLogging
1013
import io.aiven.guardian.kafka.KafkaClientInterface
1114
import io.aiven.guardian.kafka.backup.BackupClientInterface
1215
import io.aiven.guardian.kafka.backup.configs.Backup
@@ -15,26 +18,85 @@ import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config}
1518
import scala.concurrent.ExecutionContext
1619
import scala.concurrent.Future
1720

21+
import java.time.Instant
22+
23+
final case class CurrentS3State(uploadId: String, parts: Seq[Part])
24+
1825
class BackupClient[T <: KafkaClientInterface](maybeS3Settings: Option[S3Settings])(implicit
1926
override val kafkaClientInterface: T,
2027
override val backupConfig: Backup,
28+
override val system: ActorSystem,
2129
s3Config: S3Config,
2230
s3Headers: S3Headers
23-
) extends BackupClientInterface[T] {
31+
) extends BackupClientInterface[T]
32+
with StrictLogging {
2433

2534
override def empty: () => Future[Option[MultipartUploadResult]] = () => Future.successful(None)
2635

2736
override type BackupResult = Option[MultipartUploadResult]
2837

29-
override def backupToStorageSink(key: String): Sink[ByteString, Future[BackupResult]] = {
30-
val base = S3
31-
.multipartUploadWithHeaders(
32-
s3Config.dataBucket,
33-
key,
34-
s3Headers = s3Headers,
35-
chunkingParallelism = 1 // Parallelism is pointless for this usecase since we are directly streaming from Kafka
36-
)
37-
.mapMaterializedValue(future => future.map(result => Some(result))(ExecutionContext.parasitic))
38+
override type CurrentState = CurrentS3State
39+
40+
override def getCurrentUploadState(key: String): Future[Option[CurrentS3State]] = {
41+
implicit val ec: ExecutionContext = system.classicSystem.getDispatcher
42+
43+
val baseListMultipart = S3.listMultipartUpload(s3Config.dataBucket, None)
44+
45+
for {
46+
incompleteUploads <-
47+
maybeS3Settings
48+
.fold(baseListMultipart)(s3Settings => baseListMultipart.withAttributes(S3Attributes.settings(s3Settings)))
49+
.runWith(Sink.seq)
50+
keys = incompleteUploads.filter(_.key == key)
51+
result <- if (keys.isEmpty)
52+
Future.successful(None)
53+
else {
54+
val listMultipartUploads = keys match {
55+
case Seq(single) => single
56+
case rest =>
57+
logger.warn(
58+
s"Found multiple previously cancelled uploads for key: $key and bucket: ${s3Config.dataBucket}, picking latest one"
59+
)
60+
rest.minBy(_.initiated)(Ordering[Instant].reverse)
61+
}
62+
val uploadId = listMultipartUploads.uploadId
63+
val baseList = S3.listParts(s3Config.dataBucket, key, listMultipartUploads.uploadId)
64+
65+
for {
66+
parts <- maybeS3Settings
67+
.fold(baseList)(s3Settings => baseList.withAttributes(S3Attributes.settings(s3Settings)))
68+
.runWith(Sink.seq)
69+
} yield Some(CurrentS3State(uploadId, parts.map(_.toPart)))
70+
}
71+
} yield result
72+
73+
}
74+
75+
override def backupToStorageSink(key: String,
76+
currentState: Option[CurrentS3State]
77+
): Sink[ByteString, Future[BackupResult]] = {
78+
val sink = currentState match {
79+
case Some(state) =>
80+
S3.resumeMultipartUploadWithHeaders(
81+
s3Config.dataBucket,
82+
key,
83+
state.uploadId,
84+
state.parts,
85+
s3Headers = s3Headers,
86+
chunkingParallelism =
87+
1 // Parallelism is pointless for this usecase since we are directly streaming from Kafka
88+
)
89+
case None =>
90+
S3.multipartUploadWithHeaders(
91+
s3Config.dataBucket,
92+
key,
93+
s3Headers = s3Headers,
94+
chunkingParallelism =
95+
1 // Parallelism is pointless for this usecase since we are directly streaming from Kafka
96+
)
97+
}
98+
99+
val base = sink.mapMaterializedValue(future => future.map(result => Some(result))(ExecutionContext.parasitic))
38100
maybeS3Settings.fold(base)(s3Settings => base.withAttributes(S3Attributes.settings(s3Settings)))
39101
}
40102
}

backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/BackupClientSpec.scala

+1
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ trait BackupClientSpec
135135
val backupClient = new MockedS3BackupClientInterface(
136136
Source(kafkaDataWithTimePeriod.data).throttle(ThrottleElements, ThrottleAmount),
137137
kafkaDataWithTimePeriod.periodSlice,
138+
None,
138139
s3Config,
139140
Some(s3Settings)
140141
)

backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/MockedS3BackupClientInterface.scala

+7-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.aiven.guardian.kafka.backup.s3
22

33
import akka.NotUsed
4+
import akka.actor.ActorSystem
45
import akka.stream.alpakka.s3.S3Headers
56
import akka.stream.alpakka.s3.S3Settings
67
import akka.stream.scaladsl.Source
@@ -11,14 +12,18 @@ import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config}
1112

1213
import scala.concurrent.duration.FiniteDuration
1314

15+
import java.time.temporal.ChronoUnit
16+
1417
class MockedS3BackupClientInterface(
1518
kafkaData: Source[ReducedConsumerRecord, NotUsed],
1619
periodSlice: FiniteDuration,
20+
timeUnit: Option[ChronoUnit],
1721
s3Config: S3Config,
1822
maybeS3Settings: Option[S3Settings]
19-
)(implicit val s3Headers: S3Headers)
23+
)(implicit val s3Headers: S3Headers, system: ActorSystem)
2024
extends BackupClient(maybeS3Settings)(new MockedKafkaClientInterface(kafkaData),
21-
Backup(periodSlice),
25+
Backup(periodSlice, timeUnit),
26+
implicitly,
2227
s3Config,
2328
implicitly
2429
)
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,43 @@
11
package io.aiven.guardian.kafka.backup.s3
22

33
import akka.actor.ActorSystem
4+
import akka.kafka.ProducerSettings
5+
import akka.kafka.scaladsl.Producer
46
import akka.stream.alpakka.s3.S3Settings
7+
import akka.stream.alpakka.s3.scaladsl.S3
8+
import akka.stream.scaladsl.Sink
9+
import akka.stream.scaladsl.Source
510
import io.aiven.guardian.akka.AnyPropTestKit
11+
import io.aiven.guardian.kafka.Generators.KafkaDataInChunksWithTimePeriod
12+
import io.aiven.guardian.kafka.Generators.kafkaDataWithMinSizeGen
13+
import io.aiven.guardian.kafka.KafkaClient
14+
import io.aiven.guardian.kafka.KafkaClusterTest
15+
import io.aiven.guardian.kafka.Utils._
16+
import io.aiven.guardian.kafka.backup.configs.Backup
17+
import io.aiven.guardian.kafka.codecs.Circe._
18+
import io.aiven.guardian.kafka.configs.KafkaCluster
19+
import io.aiven.guardian.kafka.models.ReducedConsumerRecord
20+
import io.aiven.guardian.kafka.s3.Generators.s3ConfigGen
21+
import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config}
22+
import org.apache.kafka.clients.CommonClientConfigs
23+
import org.apache.kafka.clients.admin.AdminClient
24+
import org.apache.kafka.clients.admin.NewTopic
25+
import org.apache.kafka.clients.producer.ProducerRecord
26+
import org.apache.kafka.common.serialization.ByteArraySerializer
27+
import org.mdedetrich.akka.stream.support.CirceStreamSupport
628

729
import scala.concurrent.duration._
30+
import scala.jdk.CollectionConverters._
31+
import scala.jdk.FutureConverters._
832
import scala.language.postfixOps
933

10-
class RealS3BackupClientSpec extends AnyPropTestKit(ActorSystem("RealS3BackupClientSpec")) with BackupClientSpec {
34+
import java.time.temporal.ChronoUnit
35+
import java.util.Base64
36+
37+
class RealS3BackupClientSpec
38+
extends AnyPropTestKit(ActorSystem("RealS3BackupClientSpec"))
39+
with BackupClientSpec
40+
with KafkaClusterTest {
1141
override lazy val s3Settings: S3Settings = S3Settings()
1242

1343
/** Virtual Dot Host in bucket names are disabled because you need an actual DNS certificate otherwise AWS will fail
@@ -16,4 +46,130 @@ class RealS3BackupClientSpec extends AnyPropTestKit(ActorSystem("RealS3BackupCli
1646
override lazy val useVirtualDotHost: Boolean = false
1747
override lazy val bucketPrefix: Option[String] = Some("guardian-")
1848
override lazy val enableCleanup: Option[FiniteDuration] = Some(5 seconds)
49+
50+
def reducedConsumerRecordsToJson(reducedConsumerRecords: List[ReducedConsumerRecord]): Array[Byte] = {
51+
import io.aiven.guardian.kafka.codecs.Circe._
52+
import io.circe.syntax._
53+
reducedConsumerRecords.asJson.noSpaces.getBytes
54+
}
55+
56+
property("suspend/resume works correctly") {
57+
forAll(kafkaDataWithMinSizeGen(S3.MinChunkSize, 5, reducedConsumerRecordsToJson),
58+
s3ConfigGen(useVirtualDotHost, bucketPrefix)
59+
) { (kafkaDataInChunksWithTimePeriod: KafkaDataInChunksWithTimePeriod, s3Config: S3Config) =>
60+
logger.info(s"Data bucket is ${s3Config.dataBucket}")
61+
62+
val data = kafkaDataInChunksWithTimePeriod.data.flatten
63+
64+
val topics = data.map(_.topic).toSet
65+
66+
implicit val kafkaClusterConfig: KafkaCluster = KafkaCluster(topics)
67+
68+
implicit val config: S3Config = s3Config
69+
implicit val backupConfig: Backup = Backup(kafkaDataInChunksWithTimePeriod.periodSlice, Some(ChronoUnit.DAYS))
70+
71+
val producerSettings =
72+
ProducerSettings(system, new ByteArraySerializer, new ByteArraySerializer)
73+
.withBootstrapServers(container.bootstrapServers)
74+
75+
val backupClient = new BackupClient(Some(s3Settings))(new KafkaClient(
76+
configure = Some(
77+
_.withBootstrapServers(
78+
container.bootstrapServers
79+
).withGroupId("test-group")
80+
)
81+
),
82+
implicitly,
83+
implicitly,
84+
implicitly,
85+
implicitly
86+
)
87+
88+
val asProducerRecord = data.map { reducedConsumerRecord =>
89+
val keyAsByteArray = Base64.getDecoder.decode(reducedConsumerRecord.key)
90+
val valueAsByteArray = Base64.getDecoder.decode(reducedConsumerRecord.value)
91+
new ProducerRecord[Array[Byte], Array[Byte]](reducedConsumerRecord.topic, keyAsByteArray, valueAsByteArray)
92+
}
93+
94+
val baseSource = {
95+
val secondsToMillis = (30 seconds).toMillis
96+
val topicsPerMillis = asProducerRecord.size / secondsToMillis
97+
Source(asProducerRecord).throttle(topicsPerMillis.toInt, 1 millis)
98+
}
99+
100+
val adminClient = AdminClient.create(
101+
Map[String, AnyRef](
102+
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG -> container.bootstrapServers
103+
).asJava
104+
)
105+
106+
val createTopics = adminClient.createTopics(topics.map { topic =>
107+
new NewTopic(topic, 1, 1.toShort)
108+
}.asJava)
109+
110+
val calculatedFuture = for {
111+
_ <- createTopics.all().toCompletableFuture.asScala
112+
_ <- createBucket(s3Config.dataBucket)
113+
firstControl = backupClient.backup.run()
114+
_ = baseSource.runWith(Producer.plainSink(producerSettings))
115+
_ <- akka.pattern.after(15 seconds)(firstControl.stop())
116+
secondBackupClient = new BackupClient(Some(s3Settings))(new KafkaClient(
117+
configure = Some(
118+
_.withBootstrapServers(
119+
container.bootstrapServers
120+
).withGroupId("test-group")
121+
)
122+
),
123+
implicitly,
124+
implicitly,
125+
implicitly,
126+
implicitly
127+
)
128+
secondControl = secondBackupClient.backup.run()
129+
_ <- akka.pattern.after(15 seconds)(secondControl.stop())
130+
// Need to manually merge the bucket since we haven't gone over time window
131+
incomplete <- S3.listMultipartUpload(s3Config.dataBucket, None).withAttributes(s3Attrs).runWith(Sink.seq)
132+
133+
grouped = incomplete.groupBy(_.key)
134+
135+
(key, uploadIds) = grouped.head
136+
uploadId = uploadIds.head
137+
138+
parts <- S3.listParts(s3Config.dataBucket, key, uploadId.uploadId).runWith(Sink.seq)
139+
140+
_ <- S3.completeMultipartUpload(s3Config.dataBucket, key, uploadId.uploadId, parts.map(_.toPart))
141+
142+
downloaded <-
143+
akka.pattern.after(5.seconds)(
144+
S3.download(s3Config.dataBucket, key).withAttributes(s3Attrs).runWith(Sink.head).flatMap {
145+
case Some((downloadSource, _)) =>
146+
downloadSource.via(CirceStreamSupport.decode[List[ReducedConsumerRecord]]).runWith(Sink.seq)
147+
case None => throw new Exception(s"Expected object in bucket ${s3Config.dataBucket} with key $key")
148+
}
149+
)
150+
151+
} yield downloaded.flatten
152+
153+
val downloaded = calculatedFuture.futureValue
154+
155+
// Only care about ordering when it comes to key
156+
val downloadedGroupedAsKey = downloaded
157+
.groupBy(_.key)
158+
.view
159+
.mapValues { reducedConsumerRecords =>
160+
reducedConsumerRecords.map(_.value)
161+
}
162+
.toMap
163+
164+
val inputAsKey = kafkaDataInChunksWithTimePeriod.data.flatten
165+
.groupBy(_.key)
166+
.view
167+
.mapValues { reducedConsumerRecords =>
168+
reducedConsumerRecords.map(_.value)
169+
}
170+
.toMap
171+
172+
downloadedGroupedAsKey mustEqual inputAsKey
173+
}
174+
}
19175
}

build.sbt

+4
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ val akkaStreamsJson = "0.8.0"
2323
val diffxVersion = "0.5.6"
2424
val testContainersVersion = "0.39.8"
2525
val testContainersJavaVersion = "1.16.2"
26+
val scalaCheckVersion = "1.15.5-1-SNAPSHOT"
2627

2728
val flagsFor12 = Seq(
2829
"-Xlint:_",
@@ -69,6 +70,8 @@ val cliSettings = Seq(
6970

7071
val baseName = "guardian-for-apache-kafka"
7172

73+
ThisBuild / resolvers += Resolver.mavenLocal
74+
7275
lazy val core = project
7376
.in(file("core"))
7477
.settings(
@@ -86,6 +89,7 @@ lazy val core = project
8689
"com.typesafe.akka" %% "akka-stream" % akkaVersion % Test,
8790
"org.scalatest" %% "scalatest" % scalaTestVersion % Test,
8891
"org.scalatestplus" %% "scalacheck-1-15" % scalaTestScalaCheckVersion % Test,
92+
"org.mdedetrich" %% "scalacheck" % scalaCheckVersion % Test,
8993
"com.softwaremill.diffx" %% "diffx-scalatest" % diffxVersion % Test,
9094
"com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion % Test,
9195
"com.typesafe.akka" %% "akka-http-testkit" % akkaHttpVersion % Test,

0 commit comments

Comments
 (0)