Skip to content

Commit c39b723

Browse files
committed
Add suspend and resume functionality
1 parent 4b9593f commit c39b723

File tree

14 files changed

+701
-78
lines changed

14 files changed

+701
-78
lines changed

backup-gcs/src/main/scala/io/aiven/guardian/kafka/backup/gcs/BackupClient.scala

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.aiven.guardian.kafka.backup.gcs
22

3+
import akka.actor.ActorSystem
34
import akka.http.scaladsl.model.ContentTypes
45
import akka.stream.alpakka.google.GoogleAttributes
56
import akka.stream.alpakka.google.GoogleSettings
@@ -15,22 +16,35 @@ import io.aiven.guardian.kafka.gcs.configs.{GCS => GCSConfig}
1516
import scala.concurrent.ExecutionContext
1617
import scala.concurrent.Future
1718

19+
// TODO: GCS implementation currently does not work correctly because of inability of current GCS implementation in
20+
// Alpakka to allows us to commit Kafka cursor whenever chunks are uploaded
1821
class BackupClient[T <: KafkaClientInterface](maybeGoogleSettings: Option[GoogleSettings])(implicit
1922
override val kafkaClientInterface: T,
2023
override val backupConfig: Backup,
24+
override val system: ActorSystem,
2125
gcsConfig: GCSConfig
2226
) extends BackupClientInterface[T] {
2327

2428
override def empty: () => Future[Option[StorageObject]] = () => Future.successful(None)
2529

2630
override type BackupResult = Option[StorageObject]
2731

28-
override def backupToStorageSink(key: String): Sink[ByteString, Future[BackupResult]] = {
32+
override type CurrentState = Nothing
33+
34+
override def getCurrentUploadState(key: String): Future[Option[Nothing]] = Future.successful(None)
35+
36+
override def backupToStorageSink(key: String,
37+
currentState: Option[Nothing]
38+
): Sink[(ByteString, kafkaClientInterface.CursorContext), Future[BackupResult]] = {
2939
val base = GCStorage
3040
.resumableUpload(gcsConfig.dataBucket, key, ContentTypes.`application/json`)
3141
.mapMaterializedValue(future => future.map(result => Some(result))(ExecutionContext.parasitic))
3242

33-
maybeGoogleSettings.fold(base)(googleSettings => base.withAttributes(GoogleAttributes.settings(googleSettings)))
43+
maybeGoogleSettings
44+
.fold(base)(googleSettings => base.withAttributes(GoogleAttributes.settings(googleSettings)))
45+
.contramap[(ByteString, kafkaClientInterface.CursorContext)] { case (byteString, _) =>
46+
byteString
47+
}
3448
}
3549

3650
}
Lines changed: 120 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,151 @@
11
package io.aiven.guardian.kafka.backup.s3
22

3+
import akka.Done
4+
import akka.NotUsed
5+
import akka.actor.ActorSystem
6+
import akka.stream.alpakka.s3.FailedUploadPart
37
import akka.stream.alpakka.s3.MultipartUploadResult
8+
import akka.stream.alpakka.s3.Part
49
import akka.stream.alpakka.s3.S3Attributes
510
import akka.stream.alpakka.s3.S3Headers
611
import akka.stream.alpakka.s3.S3Settings
12+
import akka.stream.alpakka.s3.SuccessfulUploadPart
13+
import akka.stream.alpakka.s3.UploadPartResponse
714
import akka.stream.alpakka.s3.scaladsl.S3
815
import akka.stream.scaladsl._
916
import akka.util.ByteString
17+
import com.typesafe.scalalogging.StrictLogging
1018
import io.aiven.guardian.kafka.KafkaClientInterface
1119
import io.aiven.guardian.kafka.backup.BackupClientInterface
1220
import io.aiven.guardian.kafka.backup.configs.Backup
1321
import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config}
1422

23+
import scala.collection.immutable
1524
import scala.concurrent.ExecutionContext
1625
import scala.concurrent.Future
1726

27+
import java.time.Instant
28+
29+
final case class CurrentS3State(uploadId: String, parts: Seq[Part])
30+
1831
class BackupClient[T <: KafkaClientInterface](maybeS3Settings: Option[S3Settings])(implicit
1932
override val kafkaClientInterface: T,
2033
override val backupConfig: Backup,
34+
override val system: ActorSystem,
2135
s3Config: S3Config,
2236
s3Headers: S3Headers
23-
) extends BackupClientInterface[T] {
37+
) extends BackupClientInterface[T]
38+
with StrictLogging {
2439

2540
override def empty: () => Future[Option[MultipartUploadResult]] = () => Future.successful(None)
2641

2742
override type BackupResult = Option[MultipartUploadResult]
2843

29-
override def backupToStorageSink(key: String): Sink[ByteString, Future[BackupResult]] = {
30-
val base = S3
31-
.multipartUploadWithHeaders(
32-
s3Config.dataBucket,
33-
key,
34-
s3Headers = s3Headers,
35-
chunkingParallelism = 1 // Parallelism is pointless for this usecase since we are directly streaming from Kafka
44+
override type CurrentState = CurrentS3State
45+
46+
override def getCurrentUploadState(key: String): Future[Option[CurrentS3State]] = {
47+
implicit val ec: ExecutionContext = system.classicSystem.getDispatcher
48+
49+
val baseListMultipart = S3.listMultipartUpload(s3Config.dataBucket, None)
50+
51+
for {
52+
incompleteUploads <-
53+
maybeS3Settings
54+
.fold(baseListMultipart)(s3Settings => baseListMultipart.withAttributes(S3Attributes.settings(s3Settings)))
55+
.runWith(Sink.seq)
56+
keys = incompleteUploads.filter(_.key == key)
57+
result <- if (keys.isEmpty)
58+
Future.successful(None)
59+
else {
60+
val listMultipartUploads = keys match {
61+
case Seq(single) =>
62+
logger.info(
63+
s"Found previous uploadId: ${single.uploadId} and bucket: ${s3Config.dataBucket} with key: ${single.key}"
64+
)
65+
single
66+
case rest =>
67+
val last = rest.minBy(_.initiated)(Ordering[Instant].reverse)
68+
logger.warn(
69+
s"Found multiple previously cancelled uploads for key: $key and bucket: ${s3Config.dataBucket}, picking uploadId: ${last.uploadId}"
70+
)
71+
last
72+
}
73+
val uploadId = listMultipartUploads.uploadId
74+
val baseList = S3.listParts(s3Config.dataBucket, key, listMultipartUploads.uploadId)
75+
76+
for {
77+
parts <- maybeS3Settings
78+
.fold(baseList)(s3Settings => baseList.withAttributes(S3Attributes.settings(s3Settings)))
79+
.runWith(Sink.seq)
80+
} yield Some(CurrentS3State(uploadId, parts.map(_.toPart)))
81+
}
82+
} yield result
83+
84+
}
85+
86+
val failureSink: Sink[(FailedUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext]), Future[Done]] = Sink
87+
.foreach[(FailedUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext])] { case (failedPart, _) =>
88+
logger.warn(
89+
s"Failed to upload a chunk into S3 with bucket: ${failedPart.multipartUpload.bucket}, key: ${failedPart.multipartUpload.key}, uploadId: ${failedPart.multipartUpload.uploadId} and partNumber: ${failedPart.partNumber}",
90+
failedPart.exception
3691
)
37-
.mapMaterializedValue(future => future.map(result => Some(result))(ExecutionContext.parasitic))
92+
}
93+
94+
val successSink: Sink[(SuccessfulUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext]), Future[Done]] =
95+
kafkaClientInterface.commitCursor
96+
.contramap[(SuccessfulUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext])] { case (_, cursors) =>
97+
kafkaClientInterface.batchCursorContext(cursors)
98+
}
99+
100+
val kafkaBatchSink: Sink[(UploadPartResponse, immutable.Iterable[kafkaClientInterface.CursorContext]), NotUsed] = {
101+
102+
val success = Flow[(UploadPartResponse, immutable.Iterable[kafkaClientInterface.CursorContext])]
103+
.collectType[(SuccessfulUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext])]
104+
.toMat(successSink)(Keep.none)
105+
106+
val failure = Flow[(UploadPartResponse, immutable.Iterable[kafkaClientInterface.CursorContext])]
107+
.collectType[(FailedUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext])]
108+
.toMat(failureSink)(Keep.none)
109+
110+
Sink.combine(success, failure)(Broadcast(_))
111+
}
112+
113+
override def backupToStorageSink(key: String,
114+
currentState: Option[CurrentS3State]
115+
): Sink[(ByteString, kafkaClientInterface.CursorContext), Future[BackupResult]] = {
116+
117+
// Note that chunkingParallelism is pointless for this usecase since we are directly streaming from Kafka.
118+
// Furthermore the real `KafkaClient` implementation uses `CommittableOffsetBatch` which is a global singleton so
119+
// we can't have concurrent updates to this data structure.
120+
121+
val sink = currentState match {
122+
case Some(state) =>
123+
logger.info(
124+
s"Resuming previous upload with uploadId: ${state.uploadId} and bucket: ${s3Config.dataBucket} with key: $key"
125+
)
126+
S3.resumeMultipartUploadWithHeadersAndContext[kafkaClientInterface.CursorContext](
127+
s3Config.dataBucket,
128+
key,
129+
state.uploadId,
130+
state.parts,
131+
kafkaBatchSink,
132+
s3Headers = s3Headers,
133+
chunkingParallelism = 1
134+
)
135+
case None =>
136+
logger.info(
137+
s"Creating new upload with bucket: ${s3Config.dataBucket} and key: $key"
138+
)
139+
S3.multipartUploadWithHeadersAndContext[kafkaClientInterface.CursorContext](
140+
s3Config.dataBucket,
141+
key,
142+
kafkaBatchSink,
143+
s3Headers = s3Headers,
144+
chunkingParallelism = 1
145+
)
146+
}
147+
148+
val base = sink.mapMaterializedValue(future => future.map(result => Some(result))(ExecutionContext.parasitic))
38149
maybeS3Settings.fold(base)(s3Settings => base.withAttributes(S3Attributes.settings(s3Settings)))
39150
}
40151
}

backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/BackupClientSpec.scala

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package io.aiven.guardian.kafka.backup.s3
22

3+
import akka.actor.Scheduler
34
import akka.stream.Attributes
45
import akka.stream.alpakka.s3.BucketAccess
6+
import akka.stream.alpakka.s3.ListBucketResultContents
57
import akka.stream.alpakka.s3.S3Attributes
68
import akka.stream.alpakka.s3.S3Settings
79
import akka.stream.alpakka.s3.scaladsl.S3
@@ -49,7 +51,9 @@ trait BackupClientSpec
4951
with StrictLogging {
5052

5153
implicit val ec: ExecutionContext = system.dispatcher
52-
implicit val defaultPatience: PatienceConfig = PatienceConfig(90 seconds, 100 millis)
54+
implicit val defaultPatience: PatienceConfig = PatienceConfig(5 minutes, 100 millis)
55+
implicit override val generatorDrivenConfig: PropertyCheckConfiguration =
56+
PropertyCheckConfiguration(minSuccessful = 1, minSize = 1)
5357

5458
val ThrottleElements: Int = 100
5559
val ThrottleAmount: FiniteDuration = 1 millis
@@ -128,13 +132,42 @@ trait BackupClientSpec
128132
case None => ()
129133
}
130134

135+
/** @param dataBucket
136+
* Which S3 bucket the objects are being persisted into
137+
* @param transformResult
138+
* A function that transforms the download result from S3 into the data `T` that you need. Note that you can also
139+
* throw an exception in this transform function to trigger a retry (i.e. using it as a an additional predicate)
140+
* @param attempts
141+
* Total number of attempts
142+
* @param delay
143+
* The delay between each attempt after the first
144+
* @tparam T
145+
* Type of the final result transformed by `transformResult`
146+
* @return
147+
*/
148+
def waitForS3Download[T](dataBucket: String,
149+
transformResult: Seq[ListBucketResultContents] => T,
150+
attempts: Int = 10,
151+
delay: FiniteDuration = 1 second
152+
): Future[T] = {
153+
implicit val scheduler: Scheduler = system.scheduler
154+
155+
val attempt = () =>
156+
S3.listBucket(dataBucket, None).withAttributes(s3Attrs).runWith(Sink.seq).map {
157+
transformResult
158+
}
159+
160+
akka.pattern.retry(attempt, attempts, delay)
161+
}
162+
131163
property("backup method completes flow correctly for all valid Kafka events") {
132164
forAll(kafkaDataWithTimePeriodsGen(), s3ConfigGen(useVirtualDotHost, bucketPrefix)) {
133165
(kafkaDataWithTimePeriod: KafkaDataWithTimePeriod, s3Config: S3Config) =>
134166
logger.info(s"Data bucket is ${s3Config.dataBucket}")
135167
val backupClient = new MockedS3BackupClientInterface(
136168
Source(kafkaDataWithTimePeriod.data).throttle(ThrottleElements, ThrottleAmount),
137169
kafkaDataWithTimePeriod.periodSlice,
170+
None,
138171
s3Config,
139172
Some(s3Settings)
140173
)
@@ -171,7 +204,7 @@ trait BackupClientSpec
171204
})
172205
keysWithRecords <- Future.sequence(keysWithSource.map { case (key, source) =>
173206
source
174-
.via(CirceStreamSupport.decode[List[ReducedConsumerRecord]])
207+
.via(CirceStreamSupport.decode[List[Option[ReducedConsumerRecord]]])
175208
.toMat(Sink.collection)(Keep.right)
176209
.run()
177210
.map(list => (key, list.flatten))
@@ -181,7 +214,9 @@ trait BackupClientSpec
181214
OffsetDateTime.parse(date).toEpochSecond
182215
}(Ordering[Long].reverse)
183216
flattened = sorted.flatMap { case (_, records) => records }
184-
} yield flattened
217+
} yield flattened.collect { case Some(reducedConsumerRecord) =>
218+
reducedConsumerRecord
219+
}
185220
val observed = calculatedFuture.futureValue
186221

187222
kafkaDataWithTimePeriod.data.containsSlice(observed) mustEqual true
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package io.aiven.guardian.kafka.backup.s3
2+
3+
import akka.actor.ActorSystem
4+
import akka.kafka.ConsumerMessage
5+
import akka.kafka.ConsumerSettings
6+
import akka.kafka.scaladsl.Consumer
7+
import akka.stream.SharedKillSwitch
8+
import akka.stream.scaladsl.SourceWithContext
9+
import io.aiven.guardian.kafka.KafkaClient
10+
import io.aiven.guardian.kafka.configs.KafkaCluster
11+
import io.aiven.guardian.kafka.models.ReducedConsumerRecord
12+
13+
class KafkaClientWithKillSwitch(
14+
configure: Option[ConsumerSettings[Array[Byte], Array[Byte]] => ConsumerSettings[Array[Byte], Array[Byte]]] = None,
15+
killSwitch: SharedKillSwitch
16+
)(implicit system: ActorSystem, kafkaClusterConfig: KafkaCluster)
17+
extends KafkaClient(configure) {
18+
override def getSource
19+
: SourceWithContext[ReducedConsumerRecord, ConsumerMessage.CommittableOffset, Consumer.Control] =
20+
super.getSource.via(killSwitch.flow)
21+
}

backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/MockedS3BackupClientInterface.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.aiven.guardian.kafka.backup.s3
22

33
import akka.NotUsed
4+
import akka.actor.ActorSystem
45
import akka.stream.alpakka.s3.S3Headers
56
import akka.stream.alpakka.s3.S3Settings
67
import akka.stream.scaladsl.Source
@@ -11,14 +12,18 @@ import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config}
1112

1213
import scala.concurrent.duration.FiniteDuration
1314

15+
import java.time.temporal.ChronoUnit
16+
1417
class MockedS3BackupClientInterface(
1518
kafkaData: Source[ReducedConsumerRecord, NotUsed],
1619
periodSlice: FiniteDuration,
20+
timeUnit: Option[ChronoUnit],
1721
s3Config: S3Config,
1822
maybeS3Settings: Option[S3Settings]
19-
)(implicit val s3Headers: S3Headers)
23+
)(implicit val s3Headers: S3Headers, system: ActorSystem)
2024
extends BackupClient(maybeS3Settings)(new MockedKafkaClientInterface(kafkaData),
21-
Backup(periodSlice),
25+
Backup(periodSlice, timeUnit),
26+
implicitly,
2227
s3Config,
2328
implicitly
2429
)

0 commit comments

Comments
 (0)