Skip to content

Commit 668b38e

Browse files
committed
Newest Alpakka S3 update that handles chunks
1 parent 0e40b59 commit 668b38e

File tree

8 files changed

+129
-46
lines changed

8 files changed

+129
-46
lines changed

backup-gcs/src/main/scala/io/aiven/guardian/kafka/backup/gcs/BackupClient.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ import io.aiven.guardian.kafka.gcs.configs.{GCS => GCSConfig}
1616
import scala.concurrent.ExecutionContext
1717
import scala.concurrent.Future
1818

19+
// TODO: GCS implementation currently does not work correctly because of inability of current GCS implementation in
20+
// Alpakka to allows us to commit Kafka cursor whenever chunks are uploaded
1921
class BackupClient[T <: KafkaClientInterface](maybeGoogleSettings: Option[GoogleSettings])(implicit
2022
override val kafkaClientInterface: T,
2123
override val backupConfig: Backup,
@@ -33,12 +35,16 @@ class BackupClient[T <: KafkaClientInterface](maybeGoogleSettings: Option[Google
3335

3436
override def backupToStorageSink(key: String,
3537
currentState: Option[Nothing]
36-
): Sink[ByteString, Future[BackupResult]] = {
38+
): Sink[(ByteString, kafkaClientInterface.CursorContext), Future[BackupResult]] = {
3739
val base = GCStorage
3840
.resumableUpload(gcsConfig.dataBucket, key, ContentTypes.`application/json`)
3941
.mapMaterializedValue(future => future.map(result => Some(result))(ExecutionContext.parasitic))
4042

41-
maybeGoogleSettings.fold(base)(googleSettings => base.withAttributes(GoogleAttributes.settings(googleSettings)))
43+
maybeGoogleSettings
44+
.fold(base)(googleSettings => base.withAttributes(GoogleAttributes.settings(googleSettings)))
45+
.contramap[(ByteString, kafkaClientInterface.CursorContext)] { case (byteString, _) =>
46+
byteString
47+
}
4248
}
4349

4450
}

backup-s3/src/main/scala/io/aiven/guardian/kafka/backup/s3/BackupClient.scala

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
11
package io.aiven.guardian.kafka.backup.s3
22

3+
import akka.Done
4+
import akka.NotUsed
35
import akka.actor.ActorSystem
6+
import akka.stream.alpakka.s3.FailedUploadPart
47
import akka.stream.alpakka.s3.MultipartUploadResult
58
import akka.stream.alpakka.s3.Part
69
import akka.stream.alpakka.s3.S3Attributes
710
import akka.stream.alpakka.s3.S3Headers
811
import akka.stream.alpakka.s3.S3Settings
12+
import akka.stream.alpakka.s3.SuccessfulUploadPart
13+
import akka.stream.alpakka.s3.UploadPartResponse
914
import akka.stream.alpakka.s3.scaladsl.S3
1015
import akka.stream.scaladsl._
1116
import akka.util.ByteString
@@ -15,6 +20,7 @@ import io.aiven.guardian.kafka.backup.BackupClientInterface
1520
import io.aiven.guardian.kafka.backup.configs.Backup
1621
import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config}
1722

23+
import scala.collection.immutable
1824
import scala.concurrent.ExecutionContext
1925
import scala.concurrent.Future
2026

@@ -72,27 +78,59 @@ class BackupClient[T <: KafkaClientInterface](maybeS3Settings: Option[S3Settings
7278

7379
}
7480

81+
val failureSink: Sink[(FailedUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext]), Future[Done]] = Sink
82+
.foreach[(FailedUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext])] { case (failedPart, _) =>
83+
logger.warn(
84+
s"Failed to upload a chunk into S3 with bucket: ${failedPart.multipartUpload.bucket}, key: ${failedPart.multipartUpload.key}, uploadId: ${failedPart.multipartUpload.uploadId} and partNumber: ${failedPart.partNumber}",
85+
failedPart.exception
86+
)
87+
}
88+
89+
val successSink: Sink[(SuccessfulUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext]), Future[Done]] =
90+
kafkaClientInterface.commitCursor
91+
.contramap[(SuccessfulUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext])] { case (_, cursors) =>
92+
kafkaClientInterface.batchCursorContext(cursors)
93+
}
94+
95+
val kafkaBatchSink: Sink[(UploadPartResponse, immutable.Iterable[kafkaClientInterface.CursorContext]), NotUsed] = {
96+
97+
val success = Flow[(UploadPartResponse, immutable.Iterable[kafkaClientInterface.CursorContext])]
98+
.collectType[(SuccessfulUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext])]
99+
.toMat(successSink)(Keep.none)
100+
101+
val failure = Flow[(UploadPartResponse, immutable.Iterable[kafkaClientInterface.CursorContext])]
102+
.collectType[(FailedUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext])]
103+
.toMat(failureSink)(Keep.none)
104+
105+
Sink.combine(success, failure)(Broadcast(_))
106+
}
107+
75108
override def backupToStorageSink(key: String,
76109
currentState: Option[CurrentS3State]
77-
): Sink[ByteString, Future[BackupResult]] = {
110+
): Sink[(ByteString, kafkaClientInterface.CursorContext), Future[BackupResult]] = {
111+
112+
// Note that chunkingParallelism is pointless for this usecase since we are directly streaming from Kafka.
113+
// Furthermore the real `KafkaClient` implementation uses `CommittableOffsetBatch` which is a global singleton so
114+
// we can't have concurrent updates to this data structure.
115+
78116
val sink = currentState match {
79117
case Some(state) =>
80-
S3.resumeMultipartUploadWithHeaders(
118+
S3.resumeMultipartUploadWithHeadersAndContext[kafkaClientInterface.CursorContext](
81119
s3Config.dataBucket,
82120
key,
83121
state.uploadId,
84122
state.parts,
123+
kafkaBatchSink,
85124
s3Headers = s3Headers,
86-
chunkingParallelism =
87-
1 // Parallelism is pointless for this usecase since we are directly streaming from Kafka
125+
chunkingParallelism = 1
88126
)
89127
case None =>
90-
S3.multipartUploadWithHeaders(
128+
S3.multipartUploadWithHeadersAndContext[kafkaClientInterface.CursorContext](
91129
s3Config.dataBucket,
92130
key,
131+
kafkaBatchSink,
93132
s3Headers = s3Headers,
94-
chunkingParallelism =
95-
1 // Parallelism is pointless for this usecase since we are directly streaming from Kafka
133+
chunkingParallelism = 1
96134
)
97135
}
98136

build.sbt

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@ ThisBuild / organizationHomepage := Some(url("https://aiven.io/"))
77

88
ThisBuild / resolvers += "Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots"
99

10+
ThisBuild / resolvers += Resolver.mavenLocal
11+
1012
val akkaVersion = "2.6.16"
1113
val akkaHttpVersion = "10.2.6"
1214
val alpakkaKafkaVersion = "2.1.1"
13-
val alpakkaVersion = "3.0.2+34-bdac5519+20211013-1607-SNAPSHOT"
15+
val alpakkaVersion = "3.0.2+33-eea3a5fb-SNAPSHOT"
1416
val quillJdbcMonixVersion = "3.7.2"
1517
val postgresqlJdbcVersion = "42.2.24"
1618
val scalaLoggingVersion = "3.9.4"
@@ -105,10 +107,10 @@ lazy val coreS3 = project
105107
librarySettings,
106108
name := s"$baseName-s3",
107109
libraryDependencies ++= Seq(
108-
"org.mdedetrich" %% "akka-stream-alpakka-s3" % alpakkaVersion,
109-
"org.scalatest" %% "scalatest" % scalaTestVersion % Test,
110-
"org.scalatestplus" %% "scalacheck-1-15" % scalaTestScalaCheckVersion % Test,
111-
"com.typesafe.akka" %% "akka-http-xml" % akkaHttpVersion % Test
110+
"com.lightbend.akka" %% "akka-stream-alpakka-s3" % alpakkaVersion,
111+
"org.scalatest" %% "scalatest" % scalaTestVersion % Test,
112+
"org.scalatestplus" %% "scalacheck-1-15" % scalaTestScalaCheckVersion % Test,
113+
"com.typesafe.akka" %% "akka-http-xml" % akkaHttpVersion % Test
112114
)
113115
)
114116
.dependsOn(core % "compile->compile;test->test")
@@ -119,10 +121,10 @@ lazy val coreGCS = project
119121
librarySettings,
120122
name := s"$baseName-gcs",
121123
libraryDependencies ++= Seq(
122-
"org.mdedetrich" %% "akka-stream-alpakka-google-cloud-storage" % alpakkaVersion,
123-
"org.scalatest" %% "scalatest" % scalaTestVersion % Test,
124-
"org.scalatestplus" %% "scalacheck-1-15" % scalaTestScalaCheckVersion % Test,
125-
"com.typesafe.akka" %% "akka-http-spray-json" % akkaHttpVersion % Test
124+
"com.lightbend.akka" %% "akka-stream-alpakka-google-cloud-storage" % alpakkaVersion,
125+
"org.scalatest" %% "scalatest" % scalaTestVersion % Test,
126+
"org.scalatestplus" %% "scalacheck-1-15" % scalaTestScalaCheckVersion % Test,
127+
"com.typesafe.akka" %% "akka-http-spray-json" % akkaHttpVersion % Test
126128
)
127129
)
128130
.dependsOn(core % "compile->compile;test->test")

core-backup/src/main/scala/io/aiven/guardian/kafka/backup/BackupClientInterface.scala

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -66,15 +66,18 @@ trait BackupClientInterface[T <: KafkaClientInterface] {
6666
*/
6767
def getCurrentUploadState(key: String): Future[Option[CurrentState]]
6868

69-
/** Override this method to define how to backup a `ByteString` to a `DataSource`
69+
/** Override this method to define how to backup a `ByteString` combined with Kafka
70+
* `kafkaClientInterface.CursorContext` to a `DataSource`
7071
* @param key
7172
* The object key or filename for what is being backed up
7273
* @param currentState
7374
* The current state if it exists. This is used when resuming from a previously aborted backup
7475
* @return
7576
* A Sink that also provides a `BackupResult`
7677
*/
77-
def backupToStorageSink(key: String, currentState: Option[CurrentState]): Sink[ByteString, Future[BackupResult]]
78+
def backupToStorageSink(key: String,
79+
currentState: Option[CurrentState]
80+
): Sink[(ByteString, kafkaClientInterface.CursorContext), Future[BackupResult]]
7881

7982
/** Override this method to define a zero vale that covers the case that occurs immediately when `SubFlow` has been
8083
* split at `BackupStreamPosition.Start`. If you have difficulties defining an empty value for `BackupResult` then
@@ -175,27 +178,22 @@ trait BackupClientInterface[T <: KafkaClientInterface] {
175178
}
176179
}
177180

178-
// Note that .alsoTo triggers after .to, see https://stackoverflow.com/questions/47895991/multiple-sinks-in-the-same-stream#comment93028098_47896071
179181
@nowarn("msg=method lazyInit in object Sink is deprecated")
180-
val subFlowSink = substreams
181-
.alsoTo(kafkaClientInterface.commitCursor.contramap[((ByteString, kafkaClientInterface.CursorContext), String)] {
182-
case ((_, context), _) => context
183-
})
184-
.to(
185-
// See https://stackoverflow.com/questions/68774425/combine-prefixandtail1-with-sink-lazysink-for-subflow-created-by-splitafter/68776660?noredirect=1#comment121558518_68776660
186-
Sink.lazyInit(
187-
{ case (_, key) =>
188-
implicit val ec: ExecutionContext = system.getDispatcher
189-
for {
190-
state <- getCurrentUploadState(key)
191-
} yield backupToStorageSink(key, state)
192-
.contramap[((ByteString, kafkaClientInterface.CursorContext), String)] { case ((byteString, _), _) =>
193-
byteString
194-
}
195-
},
196-
empty
197-
)
182+
val subFlowSink = substreams.to(
183+
// See https://stackoverflow.com/questions/68774425/combine-prefixandtail1-with-sink-lazysink-for-subflow-created-by-splitafter/68776660?noredirect=1#comment121558518_68776660
184+
Sink.lazyInit(
185+
{ case (_, key) =>
186+
implicit val ec: ExecutionContext = system.getDispatcher
187+
for {
188+
state <- getCurrentUploadState(key)
189+
} yield backupToStorageSink(key, state)
190+
.contramap[((ByteString, kafkaClientInterface.CursorContext), String)] { case ((byteString, context), _) =>
191+
(byteString, context)
192+
}
193+
},
194+
empty
198195
)
196+
)
199197

200198
subFlowSink
201199
}

core-backup/src/test/scala/io/aiven/guardian/kafka/backup/MockedBackupClientInterface.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,10 @@ class MockedBackupClientInterface(override val kafkaClientInterface: MockedKafka
6969
* @return
7070
* A Sink that also provides a `BackupResult`
7171
*/
72-
override def backupToStorageSink(key: String, currentState: Option[Nothing]): Sink[ByteString, Future[Done]] =
73-
Sink.foreach { byteString =>
72+
override def backupToStorageSink(key: String,
73+
currentState: Option[Nothing]
74+
): Sink[(ByteString, kafkaClientInterface.CursorContext), Future[Done]] =
75+
Sink.foreach { case (byteString, _) =>
7476
backedUpData ++ Iterable((key, byteString))
7577
}
7678

core/src/main/scala/io/aiven/guardian/kafka/KafkaClient.scala

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ package io.aiven.guardian.kafka
33
import akka.Done
44
import akka.actor.ActorSystem
55
import akka.kafka.CommitterSettings
6-
import akka.kafka.ConsumerMessage.Committable
76
import akka.kafka.ConsumerMessage.CommittableOffset
7+
import akka.kafka.ConsumerMessage.CommittableOffsetBatch
88
import akka.kafka.ConsumerSettings
99
import akka.kafka.Subscriptions
1010
import akka.kafka.scaladsl.Committer
@@ -16,6 +16,7 @@ import io.aiven.guardian.kafka.configs.KafkaCluster
1616
import io.aiven.guardian.kafka.models.ReducedConsumerRecord
1717
import org.apache.kafka.common.serialization.ByteArrayDeserializer
1818

19+
import scala.collection.immutable
1920
import scala.concurrent.Future
2021

2122
import java.util.Base64
@@ -35,8 +36,9 @@ class KafkaClient(
3536
)(implicit system: ActorSystem, kafkaClusterConfig: KafkaCluster)
3637
extends KafkaClientInterface
3738
with StrictLogging {
38-
override type CursorContext = Committable
39-
override type Control = Consumer.Control
39+
override type CursorContext = CommittableOffset
40+
override type Control = Consumer.Control
41+
override type BatchedCursorContext = CommittableOffsetBatch
4042

4143
if (kafkaClusterConfig.topics.isEmpty)
4244
logger.warn("Kafka Cluster configuration has no topics set")
@@ -70,5 +72,14 @@ class KafkaClient(
7072
/** @return
7173
* A `Sink` that allows you to commit a `CursorContext` to Kafka to signify you have processed a message
7274
*/
73-
override val commitCursor: Sink[Committable, Future[Done]] = Committer.sink(committerSettings)
75+
override val commitCursor: Sink[CommittableOffsetBatch, Future[Done]] = Committer.sink(committerSettings)
76+
77+
/** How to batch an immutable iterable of `CursorContext` into a `BatchedCursorContext`
78+
* @param cursors
79+
* The cursors that need to be batched
80+
* @return
81+
* A collection data structure that represents the batched cursors
82+
*/
83+
override def batchCursorContext(cursors: immutable.Iterable[CommittableOffset]): CommittableOffsetBatch =
84+
CommittableOffsetBatch(cursors.toSeq)
7485
}

core/src/main/scala/io/aiven/guardian/kafka/KafkaClientInterface.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import akka.stream.scaladsl.Sink
55
import akka.stream.scaladsl.SourceWithContext
66
import io.aiven.guardian.kafka.models.ReducedConsumerRecord
77

8+
import scala.collection.immutable
89
import scala.concurrent.Future
910

1011
trait KafkaClientInterface {
@@ -18,6 +19,10 @@ trait KafkaClientInterface {
1819
*/
1920
type Control
2021

22+
/** The type that represents the result of batching a `CursorContext`
23+
*/
24+
type BatchedCursorContext
25+
2126
/** @return
2227
* A `SourceWithContext` that returns a Kafka Stream which automatically handles committing of cursors
2328
*/
@@ -26,5 +31,13 @@ trait KafkaClientInterface {
2631
/** @return
2732
* A `Sink` that allows you to commit a `CursorContext` to Kafka to signify you have processed a message
2833
*/
29-
def commitCursor: Sink[CursorContext, Future[Done]]
34+
def commitCursor: Sink[BatchedCursorContext, Future[Done]]
35+
36+
/** How to batch an immutable iterable of `CursorContext` into a `BatchedCursorContext`
37+
* @param cursors
38+
* The cursors that need to be batched
39+
* @return
40+
* A collection data structure that represents the batched cursors
41+
*/
42+
def batchCursorContext(cursors: immutable.Iterable[CursorContext]): BatchedCursorContext
3043
}

core/src/test/scala/io/aiven/guardian/kafka/MockedKafkaClientInterface.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import akka.stream.scaladsl.Source
77
import akka.stream.scaladsl.SourceWithContext
88
import io.aiven.guardian.kafka.models.ReducedConsumerRecord
99

10+
import scala.collection.immutable
1011
import scala.concurrent.Future
1112
import scala.jdk.CollectionConverters._
1213

@@ -34,6 +35,10 @@ class MockedKafkaClientInterface(kafkaData: Source[ReducedConsumerRecord, NotUse
3435
*/
3536
override type Control = Future[NotUsed]
3637

38+
/** The type that represents the result of batching a `CursorContext`
39+
*/
40+
override type BatchedCursorContext = Long
41+
3742
/** @return
3843
* A `SourceWithContext` that returns a Kafka Stream which automatically handles committing of cursors
3944
*/
@@ -49,4 +54,12 @@ class MockedKafkaClientInterface(kafkaData: Source[ReducedConsumerRecord, NotUse
4954
*/
5055
override def commitCursor: Sink[Long, Future[Done]] = Sink.foreach(cursor => committedOffsets ++ Iterable(cursor))
5156

57+
/** How to batch an immutable iterable of `CursorContext` into a `BatchedCursorContext`
58+
* @param cursors
59+
* The cursors that need to be batched
60+
* @return
61+
* A collection data structure that represents the batched cursors
62+
*/
63+
override def batchCursorContext(cursors: immutable.Iterable[Long]): Long = cursors.max
64+
5265
}

0 commit comments

Comments
 (0)