Skip to content

Commit c275bfb

Browse files
committed
Add suspend and resume functionality
1 parent 4b9593f commit c275bfb

File tree

19 files changed

+890
-107
lines changed

19 files changed

+890
-107
lines changed

backup-gcs/src/main/scala/io/aiven/guardian/kafka/backup/gcs/BackupClient.scala

+16-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.aiven.guardian.kafka.backup.gcs
22

3+
import akka.actor.ActorSystem
34
import akka.http.scaladsl.model.ContentTypes
45
import akka.stream.alpakka.google.GoogleAttributes
56
import akka.stream.alpakka.google.GoogleSettings
@@ -15,22 +16,35 @@ import io.aiven.guardian.kafka.gcs.configs.{GCS => GCSConfig}
1516
import scala.concurrent.ExecutionContext
1617
import scala.concurrent.Future
1718

19+
// TODO: GCS implementation currently does not work correctly because of inability of current GCS implementation in
20+
// Alpakka to allows us to commit Kafka cursor whenever chunks are uploaded
1821
class BackupClient[T <: KafkaClientInterface](maybeGoogleSettings: Option[GoogleSettings])(implicit
1922
override val kafkaClientInterface: T,
2023
override val backupConfig: Backup,
24+
override val system: ActorSystem,
2125
gcsConfig: GCSConfig
2226
) extends BackupClientInterface[T] {
2327

2428
override def empty: () => Future[Option[StorageObject]] = () => Future.successful(None)
2529

2630
override type BackupResult = Option[StorageObject]
2731

28-
override def backupToStorageSink(key: String): Sink[ByteString, Future[BackupResult]] = {
32+
override type CurrentState = Nothing
33+
34+
override def getCurrentUploadState(key: String): Future[Option[Nothing]] = Future.successful(None)
35+
36+
override def backupToStorageSink(key: String,
37+
currentState: Option[Nothing]
38+
): Sink[(ByteString, kafkaClientInterface.CursorContext), Future[BackupResult]] = {
2939
val base = GCStorage
3040
.resumableUpload(gcsConfig.dataBucket, key, ContentTypes.`application/json`)
3141
.mapMaterializedValue(future => future.map(result => Some(result))(ExecutionContext.parasitic))
3242

33-
maybeGoogleSettings.fold(base)(googleSettings => base.withAttributes(GoogleAttributes.settings(googleSettings)))
43+
maybeGoogleSettings
44+
.fold(base)(googleSettings => base.withAttributes(GoogleAttributes.settings(googleSettings)))
45+
.contramap[(ByteString, kafkaClientInterface.CursorContext)] { case (byteString, _) =>
46+
byteString
47+
}
3448
}
3549

3650
}
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,168 @@
11
package io.aiven.guardian.kafka.backup.s3
22

3+
import akka.Done
4+
import akka.NotUsed
5+
import akka.actor.ActorSystem
6+
import akka.stream.alpakka.s3.FailedUploadPart
37
import akka.stream.alpakka.s3.MultipartUploadResult
8+
import akka.stream.alpakka.s3.Part
49
import akka.stream.alpakka.s3.S3Attributes
510
import akka.stream.alpakka.s3.S3Headers
611
import akka.stream.alpakka.s3.S3Settings
12+
import akka.stream.alpakka.s3.SuccessfulUploadPart
13+
import akka.stream.alpakka.s3.UploadPartResponse
714
import akka.stream.alpakka.s3.scaladsl.S3
815
import akka.stream.scaladsl._
916
import akka.util.ByteString
17+
import com.typesafe.scalalogging.StrictLogging
1018
import io.aiven.guardian.kafka.KafkaClientInterface
1119
import io.aiven.guardian.kafka.backup.BackupClientInterface
1220
import io.aiven.guardian.kafka.backup.configs.Backup
1321
import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config}
1422

23+
import scala.collection.immutable
1524
import scala.concurrent.ExecutionContext
1625
import scala.concurrent.Future
1726

27+
import java.time.Instant
28+
29+
final case class CurrentS3State(uploadId: String, parts: Seq[Part])
30+
1831
class BackupClient[T <: KafkaClientInterface](maybeS3Settings: Option[S3Settings])(implicit
1932
override val kafkaClientInterface: T,
2033
override val backupConfig: Backup,
34+
override val system: ActorSystem,
2135
s3Config: S3Config,
2236
s3Headers: S3Headers
23-
) extends BackupClientInterface[T] {
37+
) extends BackupClientInterface[T]
38+
with StrictLogging {
2439

2540
override def empty: () => Future[Option[MultipartUploadResult]] = () => Future.successful(None)
2641

2742
override type BackupResult = Option[MultipartUploadResult]
2843

29-
override def backupToStorageSink(key: String): Sink[ByteString, Future[BackupResult]] = {
30-
val base = S3
31-
.multipartUploadWithHeaders(
32-
s3Config.dataBucket,
33-
key,
34-
s3Headers = s3Headers,
35-
chunkingParallelism = 1 // Parallelism is pointless for this usecase since we are directly streaming from Kafka
44+
override type CurrentState = CurrentS3State
45+
46+
override def getCurrentUploadState(key: String): Future[Option[CurrentS3State]] = {
47+
implicit val ec: ExecutionContext = system.classicSystem.getDispatcher
48+
49+
val baseListMultipart = S3.listMultipartUpload(s3Config.dataBucket, None)
50+
51+
for {
52+
incompleteUploads <-
53+
maybeS3Settings
54+
.fold(baseListMultipart)(s3Settings => baseListMultipart.withAttributes(S3Attributes.settings(s3Settings)))
55+
.runWith(Sink.seq)
56+
keys = incompleteUploads.filter(_.key == key)
57+
result <- if (keys.isEmpty)
58+
Future.successful(None)
59+
else {
60+
val listMultipartUploads = keys match {
61+
case Seq(single) =>
62+
logger.info(
63+
s"Found previous uploadId: ${single.uploadId} and bucket: ${s3Config.dataBucket} with key: ${single.key}"
64+
)
65+
single
66+
case rest =>
67+
val last = rest.minBy(_.initiated)(Ordering[Instant].reverse)
68+
logger.warn(
69+
s"Found multiple previously cancelled uploads for key: $key and bucket: ${s3Config.dataBucket}, picking uploadId: ${last.uploadId}"
70+
)
71+
last
72+
}
73+
val uploadId = listMultipartUploads.uploadId
74+
val baseList = S3.listParts(s3Config.dataBucket, key, listMultipartUploads.uploadId)
75+
76+
for {
77+
parts <- maybeS3Settings
78+
.fold(baseList)(s3Settings => baseList.withAttributes(S3Attributes.settings(s3Settings)))
79+
.runWith(Sink.seq)
80+
81+
finalParts = parts.lastOption match {
82+
case Some(part) if part.size >= akka.stream.alpakka.s3.scaladsl.S3.MinChunkSize =>
83+
parts
84+
case _ =>
85+
// We drop the last part here since its broken
86+
parts.dropRight(1)
87+
}
88+
} yield Some(CurrentS3State(uploadId, finalParts.map(_.toPart)))
89+
}
90+
} yield result
91+
92+
}
93+
94+
private[s3] def failureSink
95+
: Sink[(FailedUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext]), Future[Done]] = Sink
96+
.foreach[(FailedUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext])] { case (failedPart, _) =>
97+
logger.warn(
98+
s"Failed to upload a chunk into S3 with bucket: ${failedPart.multipartUpload.bucket}, key: ${failedPart.multipartUpload.key}, uploadId: ${failedPart.multipartUpload.uploadId} and partNumber: ${failedPart.partNumber}",
99+
failedPart.exception
36100
)
37-
.mapMaterializedValue(future => future.map(result => Some(result))(ExecutionContext.parasitic))
101+
}
102+
103+
private[s3] def successSink
104+
: Sink[(SuccessfulUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext]), Future[Done]] =
105+
kafkaClientInterface.commitCursor
106+
.contramap[(SuccessfulUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext])] { case (_, cursors) =>
107+
kafkaClientInterface.batchCursorContext(cursors)
108+
}
109+
110+
private[s3] def kafkaBatchSink
111+
: Sink[(UploadPartResponse, immutable.Iterable[kafkaClientInterface.CursorContext]), NotUsed] = {
112+
113+
val success = Flow[(UploadPartResponse, immutable.Iterable[kafkaClientInterface.CursorContext])]
114+
.collectType[(SuccessfulUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext])]
115+
.wireTap { data =>
116+
val (part, _) = data
117+
logger.info(
118+
s"Committing kafka cursor for uploadId:${part.multipartUpload.uploadId} key: ${part.multipartUpload.key} and S3 part: ${part.partNumber}"
119+
)
120+
}
121+
.toMat(successSink)(Keep.none)
122+
123+
val failure = Flow[(UploadPartResponse, immutable.Iterable[kafkaClientInterface.CursorContext])]
124+
.collectType[(FailedUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext])]
125+
.toMat(failureSink)(Keep.none)
126+
127+
Sink.combine(success, failure)(Broadcast(_))
128+
}
129+
130+
override def backupToStorageSink(key: String,
131+
currentState: Option[CurrentS3State]
132+
): Sink[(ByteString, kafkaClientInterface.CursorContext), Future[BackupResult]] = {
133+
134+
// Note that chunkingParallelism is pointless for this usecase since we are directly streaming from Kafka.
135+
// Furthermore the real `KafkaClient` implementation uses `CommittableOffsetBatch` which is a global singleton so
136+
// we can't have concurrent updates to this data structure.
137+
138+
val sink = currentState match {
139+
case Some(state) =>
140+
logger.info(
141+
s"Resuming previous upload with uploadId: ${state.uploadId} and bucket: ${s3Config.dataBucket} with key: $key"
142+
)
143+
S3.resumeMultipartUploadWithHeadersAndContext[kafkaClientInterface.CursorContext](
144+
s3Config.dataBucket,
145+
key,
146+
state.uploadId,
147+
state.parts,
148+
kafkaBatchSink,
149+
s3Headers = s3Headers,
150+
chunkingParallelism = 1
151+
)
152+
case None =>
153+
logger.info(
154+
s"Creating new upload with bucket: ${s3Config.dataBucket} and key: $key"
155+
)
156+
S3.multipartUploadWithHeadersAndContext[kafkaClientInterface.CursorContext](
157+
s3Config.dataBucket,
158+
key,
159+
kafkaBatchSink,
160+
s3Headers = s3Headers,
161+
chunkingParallelism = 1
162+
)
163+
}
164+
165+
val base = sink.mapMaterializedValue(future => future.map(result => Some(result))(ExecutionContext.parasitic))
38166
maybeS3Settings.fold(base)(s3Settings => base.withAttributes(S3Attributes.settings(s3Settings)))
39167
}
40168
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package io.aiven.guardian.kafka.backup.s3
2+
3+
import akka.Done
4+
import akka.actor.ActorSystem
5+
import akka.stream.alpakka.s3.S3Headers
6+
import akka.stream.alpakka.s3.S3Settings
7+
import akka.stream.alpakka.s3.SuccessfulUploadPart
8+
import akka.stream.scaladsl.Sink
9+
import io.aiven.guardian.kafka.KafkaClientInterface
10+
import io.aiven.guardian.kafka.backup.configs.Backup
11+
import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config}
12+
13+
import scala.collection.immutable
14+
import scala.concurrent.Future
15+
16+
import java.util.concurrent.ConcurrentLinkedQueue
17+
18+
class BackupClientChunkState[T <: KafkaClientInterface](maybeS3Settings: Option[S3Settings])(implicit
19+
override val kafkaClientInterface: T,
20+
override val backupConfig: Backup,
21+
override val system: ActorSystem,
22+
s3Config: S3Config,
23+
s3Headers: S3Headers
24+
) extends BackupClient[T](maybeS3Settings) {
25+
val processedChunks: ConcurrentLinkedQueue[SuccessfulUploadPart] = new ConcurrentLinkedQueue[SuccessfulUploadPart]()
26+
27+
override val successSink
28+
: Sink[(SuccessfulUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext]), Future[Done]] =
29+
super.successSink.contramap { case (part, value) =>
30+
processedChunks.add(part)
31+
(part, value)
32+
}
33+
}

backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/BackupClientSpec.scala

+39-4
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package io.aiven.guardian.kafka.backup.s3
22

3+
import akka.actor.Scheduler
34
import akka.stream.Attributes
45
import akka.stream.alpakka.s3.BucketAccess
6+
import akka.stream.alpakka.s3.ListBucketResultContents
57
import akka.stream.alpakka.s3.S3Attributes
68
import akka.stream.alpakka.s3.S3Settings
79
import akka.stream.alpakka.s3.scaladsl.S3
@@ -14,6 +16,7 @@ import com.softwaremill.diffx.scalatest.DiffMatcher.matchTo
1416
import com.typesafe.scalalogging.StrictLogging
1517
import io.aiven.guardian.akka.AkkaHttpTestKit
1618
import io.aiven.guardian.kafka.Generators._
19+
import io.aiven.guardian.kafka.backup.configs.PeriodFromFirst
1720
import io.aiven.guardian.kafka.codecs.Circe._
1821
import io.aiven.guardian.kafka.models.ReducedConsumerRecord
1922
import io.aiven.guardian.kafka.s3.Config
@@ -49,7 +52,9 @@ trait BackupClientSpec
4952
with StrictLogging {
5053

5154
implicit val ec: ExecutionContext = system.dispatcher
52-
implicit val defaultPatience: PatienceConfig = PatienceConfig(90 seconds, 100 millis)
55+
implicit val defaultPatience: PatienceConfig = PatienceConfig(5 minutes, 100 millis)
56+
implicit override val generatorDrivenConfig: PropertyCheckConfiguration =
57+
PropertyCheckConfiguration(minSuccessful = 1, minSize = 1)
5358

5459
val ThrottleElements: Int = 100
5560
val ThrottleAmount: FiniteDuration = 1 millis
@@ -128,13 +133,41 @@ trait BackupClientSpec
128133
case None => ()
129134
}
130135

136+
/** @param dataBucket
137+
* Which S3 bucket the objects are being persisted into
138+
* @param transformResult
139+
* A function that transforms the download result from S3 into the data `T` that you need. Note that you can also
140+
* throw an exception in this transform function to trigger a retry (i.e. using it as a an additional predicate)
141+
* @param attempts
142+
* Total number of attempts
143+
* @param delay
144+
* The delay between each attempt after the first
145+
* @tparam T
146+
* Type of the final result transformed by `transformResult`
147+
* @return
148+
*/
149+
def waitForS3Download[T](dataBucket: String,
150+
transformResult: Seq[ListBucketResultContents] => T,
151+
attempts: Int = 10,
152+
delay: FiniteDuration = 1 second
153+
): Future[T] = {
154+
implicit val scheduler: Scheduler = system.scheduler
155+
156+
val attempt = () =>
157+
S3.listBucket(dataBucket, None).withAttributes(s3Attrs).runWith(Sink.seq).map {
158+
transformResult
159+
}
160+
161+
akka.pattern.retry(attempt, attempts, delay)
162+
}
163+
131164
property("backup method completes flow correctly for all valid Kafka events") {
132165
forAll(kafkaDataWithTimePeriodsGen(), s3ConfigGen(useVirtualDotHost, bucketPrefix)) {
133166
(kafkaDataWithTimePeriod: KafkaDataWithTimePeriod, s3Config: S3Config) =>
134167
logger.info(s"Data bucket is ${s3Config.dataBucket}")
135168
val backupClient = new MockedS3BackupClientInterface(
136169
Source(kafkaDataWithTimePeriod.data).throttle(ThrottleElements, ThrottleAmount),
137-
kafkaDataWithTimePeriod.periodSlice,
170+
PeriodFromFirst(kafkaDataWithTimePeriod.periodSlice),
138171
s3Config,
139172
Some(s3Settings)
140173
)
@@ -171,7 +204,7 @@ trait BackupClientSpec
171204
})
172205
keysWithRecords <- Future.sequence(keysWithSource.map { case (key, source) =>
173206
source
174-
.via(CirceStreamSupport.decode[List[ReducedConsumerRecord]])
207+
.via(CirceStreamSupport.decode[List[Option[ReducedConsumerRecord]]])
175208
.toMat(Sink.collection)(Keep.right)
176209
.run()
177210
.map(list => (key, list.flatten))
@@ -181,7 +214,9 @@ trait BackupClientSpec
181214
OffsetDateTime.parse(date).toEpochSecond
182215
}(Ordering[Long].reverse)
183216
flattened = sorted.flatMap { case (_, records) => records }
184-
} yield flattened
217+
} yield flattened.collect { case Some(reducedConsumerRecord) =>
218+
reducedConsumerRecord
219+
}
185220
val observed = calculatedFuture.futureValue
186221

187222
kafkaDataWithTimePeriod.data.containsSlice(observed) mustEqual true
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package io.aiven.guardian.kafka.backup.s3
2+
3+
import akka.actor.ActorSystem
4+
import akka.kafka.CommitterSettings
5+
import akka.kafka.ConsumerMessage
6+
import akka.kafka.ConsumerSettings
7+
import akka.kafka.scaladsl.Consumer
8+
import akka.stream.SharedKillSwitch
9+
import akka.stream.scaladsl.SourceWithContext
10+
import io.aiven.guardian.kafka.KafkaClient
11+
import io.aiven.guardian.kafka.configs.KafkaCluster
12+
import io.aiven.guardian.kafka.models.ReducedConsumerRecord
13+
14+
class KafkaClientWithKillSwitch(
15+
configureConsumer: Option[
16+
ConsumerSettings[Array[Byte], Array[Byte]] => ConsumerSettings[Array[Byte], Array[Byte]]
17+
] = None,
18+
configureCommitter: Option[
19+
CommitterSettings => CommitterSettings
20+
] = None,
21+
killSwitch: SharedKillSwitch
22+
)(implicit system: ActorSystem, kafkaClusterConfig: KafkaCluster)
23+
extends KafkaClient(configureConsumer, configureCommitter) {
24+
override def getSource
25+
: SourceWithContext[ReducedConsumerRecord, ConsumerMessage.CommittableOffset, Consumer.Control] =
26+
super.getSource.via(killSwitch.flow)
27+
}

0 commit comments

Comments
 (0)