Skip to content

Commit 94d83fa

Browse files
committed
Add suspend and resume functionality
1 parent 4b9593f commit 94d83fa

File tree

14 files changed

+661
-78
lines changed

14 files changed

+661
-78
lines changed

backup-gcs/src/main/scala/io/aiven/guardian/kafka/backup/gcs/BackupClient.scala

+16-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.aiven.guardian.kafka.backup.gcs
22

3+
import akka.actor.ActorSystem
34
import akka.http.scaladsl.model.ContentTypes
45
import akka.stream.alpakka.google.GoogleAttributes
56
import akka.stream.alpakka.google.GoogleSettings
@@ -15,22 +16,35 @@ import io.aiven.guardian.kafka.gcs.configs.{GCS => GCSConfig}
1516
import scala.concurrent.ExecutionContext
1617
import scala.concurrent.Future
1718

19+
// TODO: GCS implementation currently does not work correctly because of inability of current GCS implementation in
20+
// Alpakka to allows us to commit Kafka cursor whenever chunks are uploaded
1821
class BackupClient[T <: KafkaClientInterface](maybeGoogleSettings: Option[GoogleSettings])(implicit
1922
override val kafkaClientInterface: T,
2023
override val backupConfig: Backup,
24+
override val system: ActorSystem,
2125
gcsConfig: GCSConfig
2226
) extends BackupClientInterface[T] {
2327

2428
override def empty: () => Future[Option[StorageObject]] = () => Future.successful(None)
2529

2630
override type BackupResult = Option[StorageObject]
2731

28-
override def backupToStorageSink(key: String): Sink[ByteString, Future[BackupResult]] = {
32+
override type CurrentState = Nothing
33+
34+
override def getCurrentUploadState(key: String): Future[Option[Nothing]] = Future.successful(None)
35+
36+
override def backupToStorageSink(key: String,
37+
currentState: Option[Nothing]
38+
): Sink[(ByteString, kafkaClientInterface.CursorContext), Future[BackupResult]] = {
2939
val base = GCStorage
3040
.resumableUpload(gcsConfig.dataBucket, key, ContentTypes.`application/json`)
3141
.mapMaterializedValue(future => future.map(result => Some(result))(ExecutionContext.parasitic))
3242

33-
maybeGoogleSettings.fold(base)(googleSettings => base.withAttributes(GoogleAttributes.settings(googleSettings)))
43+
maybeGoogleSettings
44+
.fold(base)(googleSettings => base.withAttributes(GoogleAttributes.settings(googleSettings)))
45+
.contramap[(ByteString, kafkaClientInterface.CursorContext)] { case (byteString, _) =>
46+
byteString
47+
}
3448
}
3549

3650
}
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,140 @@
11
package io.aiven.guardian.kafka.backup.s3
22

3+
import akka.Done
4+
import akka.NotUsed
5+
import akka.actor.ActorSystem
6+
import akka.stream.alpakka.s3.FailedUploadPart
37
import akka.stream.alpakka.s3.MultipartUploadResult
8+
import akka.stream.alpakka.s3.Part
49
import akka.stream.alpakka.s3.S3Attributes
510
import akka.stream.alpakka.s3.S3Headers
611
import akka.stream.alpakka.s3.S3Settings
12+
import akka.stream.alpakka.s3.SuccessfulUploadPart
13+
import akka.stream.alpakka.s3.UploadPartResponse
714
import akka.stream.alpakka.s3.scaladsl.S3
815
import akka.stream.scaladsl._
916
import akka.util.ByteString
17+
import com.typesafe.scalalogging.StrictLogging
1018
import io.aiven.guardian.kafka.KafkaClientInterface
1119
import io.aiven.guardian.kafka.backup.BackupClientInterface
1220
import io.aiven.guardian.kafka.backup.configs.Backup
1321
import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config}
1422

23+
import scala.collection.immutable
1524
import scala.concurrent.ExecutionContext
1625
import scala.concurrent.Future
1726

27+
import java.time.Instant
28+
29+
final case class CurrentS3State(uploadId: String, parts: Seq[Part])
30+
1831
class BackupClient[T <: KafkaClientInterface](maybeS3Settings: Option[S3Settings])(implicit
1932
override val kafkaClientInterface: T,
2033
override val backupConfig: Backup,
34+
override val system: ActorSystem,
2135
s3Config: S3Config,
2236
s3Headers: S3Headers
23-
) extends BackupClientInterface[T] {
37+
) extends BackupClientInterface[T]
38+
with StrictLogging {
2439

2540
override def empty: () => Future[Option[MultipartUploadResult]] = () => Future.successful(None)
2641

2742
override type BackupResult = Option[MultipartUploadResult]
2843

29-
override def backupToStorageSink(key: String): Sink[ByteString, Future[BackupResult]] = {
30-
val base = S3
31-
.multipartUploadWithHeaders(
32-
s3Config.dataBucket,
33-
key,
34-
s3Headers = s3Headers,
35-
chunkingParallelism = 1 // Parallelism is pointless for this usecase since we are directly streaming from Kafka
44+
override type CurrentState = CurrentS3State
45+
46+
override def getCurrentUploadState(key: String): Future[Option[CurrentS3State]] = {
47+
implicit val ec: ExecutionContext = system.classicSystem.getDispatcher
48+
49+
val baseListMultipart = S3.listMultipartUpload(s3Config.dataBucket, None)
50+
51+
for {
52+
incompleteUploads <-
53+
maybeS3Settings
54+
.fold(baseListMultipart)(s3Settings => baseListMultipart.withAttributes(S3Attributes.settings(s3Settings)))
55+
.runWith(Sink.seq)
56+
keys = incompleteUploads.filter(_.key == key)
57+
result <- if (keys.isEmpty)
58+
Future.successful(None)
59+
else {
60+
val listMultipartUploads = keys match {
61+
case Seq(single) => single
62+
case rest =>
63+
logger.warn(
64+
s"Found multiple previously cancelled uploads for key: $key and bucket: ${s3Config.dataBucket}, picking latest one"
65+
)
66+
rest.minBy(_.initiated)(Ordering[Instant].reverse)
67+
}
68+
val uploadId = listMultipartUploads.uploadId
69+
val baseList = S3.listParts(s3Config.dataBucket, key, listMultipartUploads.uploadId)
70+
71+
for {
72+
parts <- maybeS3Settings
73+
.fold(baseList)(s3Settings => baseList.withAttributes(S3Attributes.settings(s3Settings)))
74+
.runWith(Sink.seq)
75+
} yield Some(CurrentS3State(uploadId, parts.map(_.toPart)))
76+
}
77+
} yield result
78+
79+
}
80+
81+
val failureSink: Sink[(FailedUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext]), Future[Done]] = Sink
82+
.foreach[(FailedUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext])] { case (failedPart, _) =>
83+
logger.warn(
84+
s"Failed to upload a chunk into S3 with bucket: ${failedPart.multipartUpload.bucket}, key: ${failedPart.multipartUpload.key}, uploadId: ${failedPart.multipartUpload.uploadId} and partNumber: ${failedPart.partNumber}",
85+
failedPart.exception
3686
)
37-
.mapMaterializedValue(future => future.map(result => Some(result))(ExecutionContext.parasitic))
87+
}
88+
89+
val successSink: Sink[(SuccessfulUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext]), Future[Done]] =
90+
kafkaClientInterface.commitCursor
91+
.contramap[(SuccessfulUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext])] { case (_, cursors) =>
92+
kafkaClientInterface.batchCursorContext(cursors)
93+
}
94+
95+
val kafkaBatchSink: Sink[(UploadPartResponse, immutable.Iterable[kafkaClientInterface.CursorContext]), NotUsed] = {
96+
97+
val success = Flow[(UploadPartResponse, immutable.Iterable[kafkaClientInterface.CursorContext])]
98+
.collectType[(SuccessfulUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext])]
99+
.toMat(successSink)(Keep.none)
100+
101+
val failure = Flow[(UploadPartResponse, immutable.Iterable[kafkaClientInterface.CursorContext])]
102+
.collectType[(FailedUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext])]
103+
.toMat(failureSink)(Keep.none)
104+
105+
Sink.combine(success, failure)(Broadcast(_))
106+
}
107+
108+
override def backupToStorageSink(key: String,
109+
currentState: Option[CurrentS3State]
110+
): Sink[(ByteString, kafkaClientInterface.CursorContext), Future[BackupResult]] = {
111+
112+
// Note that chunkingParallelism is pointless for this usecase since we are directly streaming from Kafka.
113+
// Furthermore the real `KafkaClient` implementation uses `CommittableOffsetBatch` which is a global singleton so
114+
// we can't have concurrent updates to this data structure.
115+
116+
val sink = currentState match {
117+
case Some(state) =>
118+
S3.resumeMultipartUploadWithHeadersAndContext[kafkaClientInterface.CursorContext](
119+
s3Config.dataBucket,
120+
key,
121+
state.uploadId,
122+
state.parts,
123+
kafkaBatchSink,
124+
s3Headers = s3Headers,
125+
chunkingParallelism = 1
126+
)
127+
case None =>
128+
S3.multipartUploadWithHeadersAndContext[kafkaClientInterface.CursorContext](
129+
s3Config.dataBucket,
130+
key,
131+
kafkaBatchSink,
132+
s3Headers = s3Headers,
133+
chunkingParallelism = 1
134+
)
135+
}
136+
137+
val base = sink.mapMaterializedValue(future => future.map(result => Some(result))(ExecutionContext.parasitic))
38138
maybeS3Settings.fold(base)(s3Settings => base.withAttributes(S3Attributes.settings(s3Settings)))
39139
}
40140
}

backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/BackupClientSpec.scala

+8-3
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,9 @@ trait BackupClientSpec
4949
with StrictLogging {
5050

5151
implicit val ec: ExecutionContext = system.dispatcher
52-
implicit val defaultPatience: PatienceConfig = PatienceConfig(90 seconds, 100 millis)
52+
implicit val defaultPatience: PatienceConfig = PatienceConfig(5 minutes, 100 millis)
53+
implicit override val generatorDrivenConfig: PropertyCheckConfiguration =
54+
PropertyCheckConfiguration(minSuccessful = 1, minSize = 1)
5355

5456
val ThrottleElements: Int = 100
5557
val ThrottleAmount: FiniteDuration = 1 millis
@@ -135,6 +137,7 @@ trait BackupClientSpec
135137
val backupClient = new MockedS3BackupClientInterface(
136138
Source(kafkaDataWithTimePeriod.data).throttle(ThrottleElements, ThrottleAmount),
137139
kafkaDataWithTimePeriod.periodSlice,
140+
None,
138141
s3Config,
139142
Some(s3Settings)
140143
)
@@ -171,7 +174,7 @@ trait BackupClientSpec
171174
})
172175
keysWithRecords <- Future.sequence(keysWithSource.map { case (key, source) =>
173176
source
174-
.via(CirceStreamSupport.decode[List[ReducedConsumerRecord]])
177+
.via(CirceStreamSupport.decode[List[Option[ReducedConsumerRecord]]])
175178
.toMat(Sink.collection)(Keep.right)
176179
.run()
177180
.map(list => (key, list.flatten))
@@ -181,7 +184,9 @@ trait BackupClientSpec
181184
OffsetDateTime.parse(date).toEpochSecond
182185
}(Ordering[Long].reverse)
183186
flattened = sorted.flatMap { case (_, records) => records }
184-
} yield flattened
187+
} yield flattened.collect { case Some(reducedConsumerRecord) =>
188+
reducedConsumerRecord
189+
}
185190
val observed = calculatedFuture.futureValue
186191

187192
kafkaDataWithTimePeriod.data.containsSlice(observed) mustEqual true
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package io.aiven.guardian.kafka.backup.s3
2+
3+
import akka.actor.ActorSystem
4+
import akka.kafka.ConsumerMessage
5+
import akka.kafka.ConsumerSettings
6+
import akka.kafka.scaladsl.Consumer
7+
import akka.stream.SharedKillSwitch
8+
import akka.stream.scaladsl.SourceWithContext
9+
import io.aiven.guardian.kafka.KafkaClient
10+
import io.aiven.guardian.kafka.configs.KafkaCluster
11+
import io.aiven.guardian.kafka.models.ReducedConsumerRecord
12+
13+
class KafkaClientWithKillSwitch(
14+
configure: Option[ConsumerSettings[Array[Byte], Array[Byte]] => ConsumerSettings[Array[Byte], Array[Byte]]] = None,
15+
killSwitch: SharedKillSwitch
16+
)(implicit system: ActorSystem, kafkaClusterConfig: KafkaCluster)
17+
extends KafkaClient(configure) {
18+
override def getSource
19+
: SourceWithContext[ReducedConsumerRecord, ConsumerMessage.CommittableOffset, Consumer.Control] =
20+
super.getSource.via(killSwitch.flow)
21+
}

backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/MockedS3BackupClientInterface.scala

+7-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.aiven.guardian.kafka.backup.s3
22

33
import akka.NotUsed
4+
import akka.actor.ActorSystem
45
import akka.stream.alpakka.s3.S3Headers
56
import akka.stream.alpakka.s3.S3Settings
67
import akka.stream.scaladsl.Source
@@ -11,14 +12,18 @@ import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config}
1112

1213
import scala.concurrent.duration.FiniteDuration
1314

15+
import java.time.temporal.ChronoUnit
16+
1417
class MockedS3BackupClientInterface(
1518
kafkaData: Source[ReducedConsumerRecord, NotUsed],
1619
periodSlice: FiniteDuration,
20+
timeUnit: Option[ChronoUnit],
1721
s3Config: S3Config,
1822
maybeS3Settings: Option[S3Settings]
19-
)(implicit val s3Headers: S3Headers)
23+
)(implicit val s3Headers: S3Headers, system: ActorSystem)
2024
extends BackupClient(maybeS3Settings)(new MockedKafkaClientInterface(kafkaData),
21-
Backup(periodSlice),
25+
Backup(periodSlice, timeUnit),
26+
implicitly,
2227
s3Config,
2328
implicitly
2429
)

0 commit comments

Comments
 (0)