|
1 | 1 | package io.aiven.guardian.kafka.backup.s3 |
2 | 2 |
|
| 3 | +import akka.Done |
| 4 | +import akka.NotUsed |
| 5 | +import akka.actor.ActorSystem |
| 6 | +import akka.stream.alpakka.s3.FailedUploadPart |
3 | 7 | import akka.stream.alpakka.s3.MultipartUploadResult |
| 8 | +import akka.stream.alpakka.s3.Part |
4 | 9 | import akka.stream.alpakka.s3.S3Attributes |
5 | 10 | import akka.stream.alpakka.s3.S3Headers |
6 | 11 | import akka.stream.alpakka.s3.S3Settings |
| 12 | +import akka.stream.alpakka.s3.SuccessfulUploadPart |
| 13 | +import akka.stream.alpakka.s3.UploadPartResponse |
7 | 14 | import akka.stream.alpakka.s3.scaladsl.S3 |
8 | 15 | import akka.stream.scaladsl._ |
9 | 16 | import akka.util.ByteString |
| 17 | +import com.typesafe.scalalogging.StrictLogging |
10 | 18 | import io.aiven.guardian.kafka.KafkaClientInterface |
11 | 19 | import io.aiven.guardian.kafka.backup.BackupClientInterface |
12 | 20 | import io.aiven.guardian.kafka.backup.configs.Backup |
13 | 21 | import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config} |
14 | 22 |
|
| 23 | +import scala.collection.immutable |
15 | 24 | import scala.concurrent.ExecutionContext |
16 | 25 | import scala.concurrent.Future |
17 | 26 |
|
| 27 | +import java.time.Instant |
| 28 | + |
18 | 29 | class BackupClient[T <: KafkaClientInterface](maybeS3Settings: Option[S3Settings])(implicit |
19 | 30 | override val kafkaClientInterface: T, |
20 | 31 | override val backupConfig: Backup, |
| 32 | + override val system: ActorSystem, |
21 | 33 | s3Config: S3Config, |
22 | 34 | s3Headers: S3Headers |
23 | | -) extends BackupClientInterface[T] { |
| 35 | +) extends BackupClientInterface[T] |
| 36 | + with StrictLogging { |
| 37 | + import BackupClient._ |
24 | 38 |
|
25 | 39 | override def empty: () => Future[Option[MultipartUploadResult]] = () => Future.successful(None) |
26 | 40 |
|
27 | 41 | override type BackupResult = Option[MultipartUploadResult] |
28 | 42 |
|
29 | | - override def backupToStorageSink(key: String): Sink[ByteString, Future[BackupResult]] = { |
30 | | - val base = S3 |
31 | | - .multipartUploadWithHeaders( |
32 | | - s3Config.dataBucket, |
33 | | - key, |
34 | | - s3Headers = s3Headers, |
35 | | - chunkingParallelism = 1 // Parallelism is pointless for this usecase since we are directly streaming from Kafka |
| 43 | + override type CurrentState = CurrentS3State |
| 44 | + |
| 45 | + override def getCurrentUploadState(key: String): Future[Option[CurrentS3State]] = { |
| 46 | + implicit val ec: ExecutionContext = system.classicSystem.getDispatcher |
| 47 | + |
| 48 | + val baseListMultipart = S3.listMultipartUpload(s3Config.dataBucket, None) |
| 49 | + |
| 50 | + for { |
| 51 | + incompleteUploads <- |
| 52 | + maybeS3Settings |
| 53 | + .fold(baseListMultipart)(s3Settings => baseListMultipart.withAttributes(S3Attributes.settings(s3Settings))) |
| 54 | + .runWith(Sink.seq) |
| 55 | + keys = incompleteUploads.filter(_.key == key) |
| 56 | + result <- if (keys.isEmpty) |
| 57 | + Future.successful(None) |
| 58 | + else { |
| 59 | + val listMultipartUploads = keys match { |
| 60 | + case Seq(single) => |
| 61 | + logger.info( |
| 62 | + s"Found previous uploadId: ${single.uploadId} and bucket: ${s3Config.dataBucket} with key: ${single.key}" |
| 63 | + ) |
| 64 | + single |
| 65 | + case rest => |
| 66 | + val last = rest.maxBy(_.initiated)(Ordering[Instant]) |
| 67 | + logger.warn( |
| 68 | + s"Found multiple previously cancelled uploads for key: $key and bucket: ${s3Config.dataBucket}, picking uploadId: ${last.uploadId}" |
| 69 | + ) |
| 70 | + last |
| 71 | + } |
| 72 | + val uploadId = listMultipartUploads.uploadId |
| 73 | + val baseList = S3.listParts(s3Config.dataBucket, key, listMultipartUploads.uploadId) |
| 74 | + |
| 75 | + for { |
| 76 | + parts <- maybeS3Settings |
| 77 | + .fold(baseList)(s3Settings => baseList.withAttributes(S3Attributes.settings(s3Settings))) |
| 78 | + .runWith(Sink.seq) |
| 79 | + |
| 80 | + finalParts = parts.lastOption match { |
| 81 | + case Some(part) if part.size >= akka.stream.alpakka.s3.scaladsl.S3.MinChunkSize => |
| 82 | + parts |
| 83 | + case _ => |
| 84 | + // We drop the last part here since its broken |
| 85 | + parts.dropRight(1) |
| 86 | + } |
| 87 | + } yield Some(CurrentS3State(uploadId, finalParts.map(_.toPart))) |
| 88 | + } |
| 89 | + } yield result |
| 90 | + |
| 91 | + } |
| 92 | + |
| 93 | + private[s3] def failureSink |
| 94 | + : Sink[(FailedUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext]), Future[Done]] = Sink |
| 95 | + .foreach[(FailedUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext])] { case (failedPart, _) => |
| 96 | + logger.warn( |
| 97 | + s"Failed to upload a chunk into S3 with bucket: ${failedPart.multipartUpload.bucket}, key: ${failedPart.multipartUpload.key}, uploadId: ${failedPart.multipartUpload.uploadId} and partNumber: ${failedPart.partNumber}", |
| 98 | + failedPart.exception |
36 | 99 | ) |
37 | | - .mapMaterializedValue(future => future.map(result => Some(result))(ExecutionContext.parasitic)) |
| 100 | + } |
| 101 | + |
| 102 | + private[s3] def successSink |
| 103 | + : Sink[(SuccessfulUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext]), Future[Done]] = |
| 104 | + kafkaClientInterface.commitCursor |
| 105 | + .contramap[(SuccessfulUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext])] { case (_, cursors) => |
| 106 | + kafkaClientInterface.batchCursorContext(cursors) |
| 107 | + } |
| 108 | + |
| 109 | + private[s3] def kafkaBatchSink |
| 110 | + : Sink[(UploadPartResponse, immutable.Iterable[kafkaClientInterface.CursorContext]), NotUsed] = { |
| 111 | + |
| 112 | + val success = Flow[(UploadPartResponse, immutable.Iterable[kafkaClientInterface.CursorContext])] |
| 113 | + .collectType[(SuccessfulUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext])] |
| 114 | + .wireTap { data => |
| 115 | + val (part, _) = data |
| 116 | + logger.info( |
| 117 | + s"Committing kafka cursor for uploadId:${part.multipartUpload.uploadId} key: ${part.multipartUpload.key} and S3 part: ${part.partNumber}" |
| 118 | + ) |
| 119 | + } |
| 120 | + .toMat(successSink)(Keep.none) |
| 121 | + |
| 122 | + val failure = Flow[(UploadPartResponse, immutable.Iterable[kafkaClientInterface.CursorContext])] |
| 123 | + .collectType[(FailedUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext])] |
| 124 | + .toMat(failureSink)(Keep.none) |
| 125 | + |
| 126 | + Sink.combine(success, failure)(Broadcast(_)) |
| 127 | + } |
| 128 | + |
| 129 | + override def backupToStorageSink(key: String, |
| 130 | + currentState: Option[CurrentS3State] |
| 131 | + ): Sink[(ByteString, kafkaClientInterface.CursorContext), Future[BackupResult]] = { |
| 132 | + |
| 133 | + // Note that chunkingParallelism is pointless for this usecase since we are directly streaming from Kafka. |
| 134 | + // Furthermore the real `KafkaClient` implementation uses `CommittableOffsetBatch` which is a global singleton so |
| 135 | + // we can't have concurrent updates to this data structure. |
| 136 | + |
| 137 | + val sink = currentState match { |
| 138 | + case Some(state) => |
| 139 | + logger.info( |
| 140 | + s"Resuming previous upload with uploadId: ${state.uploadId} and bucket: ${s3Config.dataBucket} with key: $key" |
| 141 | + ) |
| 142 | + S3.resumeMultipartUploadWithHeadersAndContext[kafkaClientInterface.CursorContext]( |
| 143 | + s3Config.dataBucket, |
| 144 | + key, |
| 145 | + state.uploadId, |
| 146 | + state.parts, |
| 147 | + kafkaBatchSink, |
| 148 | + s3Headers = s3Headers, |
| 149 | + chunkingParallelism = 1 |
| 150 | + ) |
| 151 | + case None => |
| 152 | + logger.info( |
| 153 | + s"Creating new upload with bucket: ${s3Config.dataBucket} and key: $key" |
| 154 | + ) |
| 155 | + S3.multipartUploadWithHeadersAndContext[kafkaClientInterface.CursorContext]( |
| 156 | + s3Config.dataBucket, |
| 157 | + key, |
| 158 | + kafkaBatchSink, |
| 159 | + s3Headers = s3Headers, |
| 160 | + chunkingParallelism = 1 |
| 161 | + ) |
| 162 | + } |
| 163 | + |
| 164 | + val base = sink.mapMaterializedValue(future => future.map(result => Some(result))(ExecutionContext.parasitic)) |
38 | 165 | maybeS3Settings.fold(base)(s3Settings => base.withAttributes(S3Attributes.settings(s3Settings))) |
39 | 166 | } |
40 | 167 | } |
| 168 | + |
| 169 | +object BackupClient { |
| 170 | + final case class CurrentS3State(uploadId: String, parts: Seq[Part]) |
| 171 | +} |
0 commit comments