Skip to content

Add suspend and resume functionality #75

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jan 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.aiven.guardian.kafka.backup.gcs

import akka.actor.ActorSystem
import akka.http.scaladsl.model.ContentTypes
import akka.stream.alpakka.google.GoogleAttributes
import akka.stream.alpakka.google.GoogleSettings
Expand All @@ -15,22 +16,35 @@ import io.aiven.guardian.kafka.gcs.configs.{GCS => GCSConfig}
import scala.concurrent.ExecutionContext
import scala.concurrent.Future

// TODO: GCS implementation currently does not work correctly because of inability of current GCS implementation in
// Alpakka to allow us to commit Kafka cursor whenever chunks are uploaded
class BackupClient[T <: KafkaClientInterface](maybeGoogleSettings: Option[GoogleSettings])(implicit
override val kafkaClientInterface: T,
override val backupConfig: Backup,
override val system: ActorSystem,
gcsConfig: GCSConfig
) extends BackupClientInterface[T] {

override def empty: () => Future[Option[StorageObject]] = () => Future.successful(None)

override type BackupResult = Option[StorageObject]

override def backupToStorageSink(key: String): Sink[ByteString, Future[BackupResult]] = {
override type CurrentState = Nothing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing this Nothing is purely a placeholder

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, GCS is not implemented fully


override def getCurrentUploadState(key: String): Future[Option[Nothing]] = Future.successful(None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above


override def backupToStorageSink(key: String,
currentState: Option[Nothing]
): Sink[(ByteString, kafkaClientInterface.CursorContext), Future[BackupResult]] = {
val base = GCStorage
.resumableUpload(gcsConfig.dataBucket, key, ContentTypes.`application/json`)
.mapMaterializedValue(future => future.map(result => Some(result))(ExecutionContext.parasitic))

maybeGoogleSettings.fold(base)(googleSettings => base.withAttributes(GoogleAttributes.settings(googleSettings)))
maybeGoogleSettings
.fold(base)(googleSettings => base.withAttributes(GoogleAttributes.settings(googleSettings)))
.contramap[(ByteString, kafkaClientInterface.CursorContext)] { case (byteString, _) =>
byteString
}
}

}
Original file line number Diff line number Diff line change
@@ -1,40 +1,171 @@
package io.aiven.guardian.kafka.backup.s3

import akka.Done
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.alpakka.s3.FailedUploadPart
import akka.stream.alpakka.s3.MultipartUploadResult
import akka.stream.alpakka.s3.Part
import akka.stream.alpakka.s3.S3Attributes
import akka.stream.alpakka.s3.S3Headers
import akka.stream.alpakka.s3.S3Settings
import akka.stream.alpakka.s3.SuccessfulUploadPart
import akka.stream.alpakka.s3.UploadPartResponse
import akka.stream.alpakka.s3.scaladsl.S3
import akka.stream.scaladsl._
import akka.util.ByteString
import com.typesafe.scalalogging.StrictLogging
import io.aiven.guardian.kafka.KafkaClientInterface
import io.aiven.guardian.kafka.backup.BackupClientInterface
import io.aiven.guardian.kafka.backup.configs.Backup
import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config}

import scala.collection.immutable
import scala.concurrent.ExecutionContext
import scala.concurrent.Future

import java.time.Instant

class BackupClient[T <: KafkaClientInterface](maybeS3Settings: Option[S3Settings])(implicit
override val kafkaClientInterface: T,
override val backupConfig: Backup,
override val system: ActorSystem,
s3Config: S3Config,
s3Headers: S3Headers
) extends BackupClientInterface[T] {
) extends BackupClientInterface[T]
with StrictLogging {
import BackupClient._

override def empty: () => Future[Option[MultipartUploadResult]] = () => Future.successful(None)

override type BackupResult = Option[MultipartUploadResult]

override def backupToStorageSink(key: String): Sink[ByteString, Future[BackupResult]] = {
val base = S3
.multipartUploadWithHeaders(
s3Config.dataBucket,
key,
s3Headers = s3Headers,
chunkingParallelism = 1 // Parallelism is pointless for this usecase since we are directly streaming from Kafka
override type CurrentState = CurrentS3State

override def getCurrentUploadState(key: String): Future[Option[CurrentS3State]] = {
implicit val ec: ExecutionContext = system.classicSystem.getDispatcher
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably it would be better to use a dedicated EC in this case. This one is the internal one used by Akka, and usually you don't want to cog that one.

Copy link
Contributor Author

@mdedetrich mdedetrich Jan 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also thought the same however for this case I believe its overcomplicated due to the fact that this ec will only get executed per time slice which in the grand scheme of things is nothing (unless you do something silly and configure it with timeslice of 1 nanosecond).


val baseListMultipart = S3.listMultipartUpload(s3Config.dataBucket, None)

for {
incompleteUploads <-
maybeS3Settings
.fold(baseListMultipart)(s3Settings => baseListMultipart.withAttributes(S3Attributes.settings(s3Settings)))
.runWith(Sink.seq)
keys = incompleteUploads.filter(_.key == key)
result <- if (keys.isEmpty)
Future.successful(None)
else {
val listMultipartUploads = keys match {
case Seq(single) =>
logger.info(
s"Found previous uploadId: ${single.uploadId} and bucket: ${s3Config.dataBucket} with key: ${single.key}"
)
single
case rest =>
val last = rest.maxBy(_.initiated)(Ordering[Instant])
logger.warn(
s"Found multiple previously cancelled uploads for key: $key and bucket: ${s3Config.dataBucket}, picking uploadId: ${last.uploadId}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a call to action to clean the other ones up?

Copy link
Contributor Author

@mdedetrich mdedetrich Jan 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I want to resolve this comprehensively with #82, it may be that this case won't actually occur after resolving this ticket.

)
last
}
val uploadId = listMultipartUploads.uploadId
val baseList = S3.listParts(s3Config.dataBucket, key, listMultipartUploads.uploadId)

for {
parts <- maybeS3Settings
.fold(baseList)(s3Settings => baseList.withAttributes(S3Attributes.settings(s3Settings)))
.runWith(Sink.seq)

finalParts = parts.lastOption match {
case Some(part) if part.size >= akka.stream.alpakka.s3.scaladsl.S3.MinChunkSize =>
parts
case _ =>
// We drop the last part here since its broken
parts.dropRight(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? And what happens this last part? Is it corrupt? Is it simply useless data?

Copy link
Contributor Author

@mdedetrich mdedetrich Jan 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this was added when I was figuring out what was causing corrupted data in the S3 buckets. I don't think this case can occur and I just left it there as a safeguard but I can remove it if requested (I think it theoretically can occur but I need to simulate it via a test case)

}
} yield Some(CurrentS3State(uploadId, finalParts.map(_.toPart)))
}
} yield result

}

private[s3] def failureSink
: Sink[(FailedUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext]), Future[Done]] = Sink
.foreach[(FailedUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext])] { case (failedPart, _) =>
logger.warn(
s"Failed to upload a chunk into S3 with bucket: ${failedPart.multipartUpload.bucket}, key: ${failedPart.multipartUpload.key}, uploadId: ${failedPart.multipartUpload.uploadId} and partNumber: ${failedPart.partNumber}",
failedPart.exception
)
.mapMaterializedValue(future => future.map(result => Some(result))(ExecutionContext.parasitic))
}

private[s3] def successSink
: Sink[(SuccessfulUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext]), Future[Done]] =
kafkaClientInterface.commitCursor
.contramap[(SuccessfulUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext])] { case (_, cursors) =>
kafkaClientInterface.batchCursorContext(cursors)
}

private[s3] def kafkaBatchSink
: Sink[(UploadPartResponse, immutable.Iterable[kafkaClientInterface.CursorContext]), NotUsed] = {

val success = Flow[(UploadPartResponse, immutable.Iterable[kafkaClientInterface.CursorContext])]
.collectType[(SuccessfulUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext])]
.wireTap { data =>
val (part, _) = data
logger.info(
s"Committing kafka cursor for uploadId:${part.multipartUpload.uploadId} key: ${part.multipartUpload.key} and S3 part: ${part.partNumber}"
)
}
.toMat(successSink)(Keep.none)

val failure = Flow[(UploadPartResponse, immutable.Iterable[kafkaClientInterface.CursorContext])]
.collectType[(FailedUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext])]
.toMat(failureSink)(Keep.none)

Sink.combine(success, failure)(Broadcast(_))
}

override def backupToStorageSink(key: String,
currentState: Option[CurrentS3State]
): Sink[(ByteString, kafkaClientInterface.CursorContext), Future[BackupResult]] = {

// Note that chunkingParallelism is pointless for this usecase since we are directly streaming from Kafka.
// Furthermore the real `KafkaClient` implementation uses `CommittableOffsetBatch` which is a global singleton so
// we can't have concurrent updates to this data structure.

val sink = currentState match {
case Some(state) =>
logger.info(
s"Resuming previous upload with uploadId: ${state.uploadId} and bucket: ${s3Config.dataBucket} with key: $key"
)
S3.resumeMultipartUploadWithHeadersAndContext[kafkaClientInterface.CursorContext](
s3Config.dataBucket,
key,
state.uploadId,
state.parts,
kafkaBatchSink,
s3Headers = s3Headers,
chunkingParallelism = 1
)
case None =>
logger.info(
s"Creating new upload with bucket: ${s3Config.dataBucket} and key: $key"
)
S3.multipartUploadWithHeadersAndContext[kafkaClientInterface.CursorContext](
s3Config.dataBucket,
key,
kafkaBatchSink,
s3Headers = s3Headers,
chunkingParallelism = 1
)
}

val base = sink.mapMaterializedValue(future => future.map(result => Some(result))(ExecutionContext.parasitic))
maybeS3Settings.fold(base)(s3Settings => base.withAttributes(S3Attributes.settings(s3Settings)))
}
}

object BackupClient {
final case class CurrentS3State(uploadId: String, parts: Seq[Part])
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.aiven.guardian.kafka.backup.s3

import akka.Done
import akka.actor.ActorSystem
import akka.stream.alpakka.s3.S3Headers
import akka.stream.alpakka.s3.S3Settings
import akka.stream.alpakka.s3.SuccessfulUploadPart
import akka.stream.scaladsl.Sink
import io.aiven.guardian.kafka.KafkaClientInterface
import io.aiven.guardian.kafka.backup.configs.Backup
import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config}

import scala.collection.immutable
import scala.concurrent.Future

import java.util.concurrent.ConcurrentLinkedQueue

class BackupClientChunkState[T <: KafkaClientInterface](maybeS3Settings: Option[S3Settings])(implicit
override val kafkaClientInterface: T,
override val backupConfig: Backup,
override val system: ActorSystem,
s3Config: S3Config,
s3Headers: S3Headers
) extends BackupClient[T](maybeS3Settings) {
val processedChunks: ConcurrentLinkedQueue[SuccessfulUploadPart] = new ConcurrentLinkedQueue[SuccessfulUploadPart]()

override val successSink
: Sink[(SuccessfulUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext]), Future[Done]] =
super.successSink.contramap { case (part, value) =>
processedChunks.add(part)
(part, value)
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package io.aiven.guardian.kafka.backup.s3

import akka.actor.Scheduler
import akka.stream.Attributes
import akka.stream.alpakka.s3.BucketAccess
import akka.stream.alpakka.s3.ListBucketResultContents
import akka.stream.alpakka.s3.S3Attributes
import akka.stream.alpakka.s3.S3Settings
import akka.stream.alpakka.s3.scaladsl.S3
Expand All @@ -14,6 +16,7 @@ import com.softwaremill.diffx.scalatest.DiffMatcher.matchTo
import com.typesafe.scalalogging.StrictLogging
import io.aiven.guardian.akka.AkkaHttpTestKit
import io.aiven.guardian.kafka.Generators._
import io.aiven.guardian.kafka.backup.configs.PeriodFromFirst
import io.aiven.guardian.kafka.codecs.Circe._
import io.aiven.guardian.kafka.models.ReducedConsumerRecord
import io.aiven.guardian.kafka.s3.Config
Expand Down Expand Up @@ -49,7 +52,9 @@ trait BackupClientSpec
with StrictLogging {

implicit val ec: ExecutionContext = system.dispatcher
implicit val defaultPatience: PatienceConfig = PatienceConfig(90 seconds, 100 millis)
implicit val defaultPatience: PatienceConfig = PatienceConfig(5 minutes, 100 millis)
implicit override val generatorDrivenConfig: PropertyCheckConfiguration =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was causing issues in our tests, normally for property driven tests they run a certain number of times however in our case with a real integration test with Kafka we only want it to run once (and to succeed once)

PropertyCheckConfiguration(minSuccessful = 1, minSize = 1)

val ThrottleElements: Int = 100
val ThrottleAmount: FiniteDuration = 1 millis
Expand Down Expand Up @@ -128,13 +133,41 @@ trait BackupClientSpec
case None => ()
}

/** @param dataBucket
* Which S3 bucket the objects are being persisted into
* @param transformResult
* A function that transforms the download result from S3 into the data `T` that you need. Note that you can also
* throw an exception in this transform function to trigger a retry (i.e. using it as a an additional predicate)
* @param attempts
* Total number of attempts
* @param delay
* The delay between each attempt after the first
* @tparam T
* Type of the final result transformed by `transformResult`
* @return
*/
def waitForS3Download[T](dataBucket: String,
transformResult: Seq[ListBucketResultContents] => T,
attempts: Int = 10,
delay: FiniteDuration = 1 second
): Future[T] = {
implicit val scheduler: Scheduler = system.scheduler

val attempt = () =>
S3.listBucket(dataBucket, None).withAttributes(s3Attrs).runWith(Sink.seq).map {
transformResult
}

akka.pattern.retry(attempt, attempts, delay)
}

property("backup method completes flow correctly for all valid Kafka events") {
forAll(kafkaDataWithTimePeriodsGen(), s3ConfigGen(useVirtualDotHost, bucketPrefix)) {
(kafkaDataWithTimePeriod: KafkaDataWithTimePeriod, s3Config: S3Config) =>
logger.info(s"Data bucket is ${s3Config.dataBucket}")
val backupClient = new MockedS3BackupClientInterface(
Source(kafkaDataWithTimePeriod.data).throttle(ThrottleElements, ThrottleAmount),
kafkaDataWithTimePeriod.periodSlice,
PeriodFromFirst(kafkaDataWithTimePeriod.periodSlice),
s3Config,
Some(s3Settings)
)
Expand Down Expand Up @@ -171,7 +204,7 @@ trait BackupClientSpec
})
keysWithRecords <- Future.sequence(keysWithSource.map { case (key, source) =>
source
.via(CirceStreamSupport.decode[List[ReducedConsumerRecord]])
.via(CirceStreamSupport.decode[List[Option[ReducedConsumerRecord]]])
.toMat(Sink.collection)(Keep.right)
.run()
.map(list => (key, list.flatten))
Expand All @@ -181,7 +214,9 @@ trait BackupClientSpec
OffsetDateTime.parse(date).toEpochSecond
}(Ordering[Long].reverse)
flattened = sorted.flatMap { case (_, records) => records }
} yield flattened
} yield flattened.collect { case Some(reducedConsumerRecord) =>
reducedConsumerRecord
}
val observed = calculatedFuture.futureValue

kafkaDataWithTimePeriod.data.containsSlice(observed) mustEqual true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.aiven.guardian.kafka.backup.s3

import akka.actor.ActorSystem
import akka.kafka.CommitterSettings
import akka.kafka.ConsumerMessage
import akka.kafka.ConsumerSettings
import akka.kafka.scaladsl.Consumer
import akka.stream.SharedKillSwitch
import akka.stream.scaladsl.SourceWithContext
import io.aiven.guardian.kafka.KafkaClient
import io.aiven.guardian.kafka.configs.KafkaCluster
import io.aiven.guardian.kafka.models.ReducedConsumerRecord

class KafkaClientWithKillSwitch(
configureConsumer: Option[
ConsumerSettings[Array[Byte], Array[Byte]] => ConsumerSettings[Array[Byte], Array[Byte]]
] = None,
configureCommitter: Option[
CommitterSettings => CommitterSettings
] = None,
killSwitch: SharedKillSwitch
)(implicit system: ActorSystem, kafkaClusterConfig: KafkaCluster)
extends KafkaClient(configureConsumer, configureCommitter) {
override def getSource
: SourceWithContext[ReducedConsumerRecord, ConsumerMessage.CommittableOffset, Consumer.Control] =
super.getSource.via(killSwitch.flow)
}
Loading