Skip to content

Commit b27a14e

Browse files
committed
Use killswitch's in RealS3BackupClientSpec and properly terminate JSON
1 parent 668b38e commit b27a14e

File tree

4 files changed

+98
-47
lines changed

4 files changed

+98
-47
lines changed

backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/BackupClientSpec.scala

+7-3
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,9 @@ trait BackupClientSpec
4949
with StrictLogging {
5050

5151
implicit val ec: ExecutionContext = system.dispatcher
52-
implicit val defaultPatience: PatienceConfig = PatienceConfig(90 seconds, 100 millis)
52+
implicit val defaultPatience: PatienceConfig = PatienceConfig(5 minutes, 100 millis)
53+
implicit override val generatorDrivenConfig: PropertyCheckConfiguration =
54+
PropertyCheckConfiguration(minSuccessful = 1, minSize = 1)
5355

5456
val ThrottleElements: Int = 100
5557
val ThrottleAmount: FiniteDuration = 1 millis
@@ -172,7 +174,7 @@ trait BackupClientSpec
172174
})
173175
keysWithRecords <- Future.sequence(keysWithSource.map { case (key, source) =>
174176
source
175-
.via(CirceStreamSupport.decode[List[ReducedConsumerRecord]])
177+
.via(CirceStreamSupport.decode[List[Option[ReducedConsumerRecord]]])
176178
.toMat(Sink.collection)(Keep.right)
177179
.run()
178180
.map(list => (key, list.flatten))
@@ -182,7 +184,9 @@ trait BackupClientSpec
182184
OffsetDateTime.parse(date).toEpochSecond
183185
}(Ordering[Long].reverse)
184186
flattened = sorted.flatMap { case (_, records) => records }
185-
} yield flattened
187+
} yield flattened.collect { case Some(reducedConsumerRecord) =>
188+
reducedConsumerRecord
189+
}
186190
val observed = calculatedFuture.futureValue
187191

188192
kafkaDataWithTimePeriod.data.containsSlice(observed) mustEqual true
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package io.aiven.guardian.kafka.backup.s3
2+
3+
import akka.actor.ActorSystem
4+
import akka.kafka.ConsumerMessage
5+
import akka.kafka.ConsumerSettings
6+
import akka.kafka.scaladsl.Consumer
7+
import akka.stream.SharedKillSwitch
8+
import akka.stream.scaladsl.SourceWithContext
9+
import io.aiven.guardian.kafka.KafkaClient
10+
import io.aiven.guardian.kafka.configs.KafkaCluster
11+
import io.aiven.guardian.kafka.models.ReducedConsumerRecord
12+
13+
class KafkaClientWithKillSwitch(
14+
configure: Option[ConsumerSettings[Array[Byte], Array[Byte]] => ConsumerSettings[Array[Byte], Array[Byte]]] = None,
15+
killSwitch: SharedKillSwitch
16+
)(implicit system: ActorSystem, kafkaClusterConfig: KafkaCluster)
17+
extends KafkaClient(configure) {
18+
override def getSource
19+
: SourceWithContext[ReducedConsumerRecord, ConsumerMessage.CommittableOffset, Consumer.Control] =
20+
super.getSource.via(killSwitch.flow)
21+
}

backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/RealS3BackupClientSpec.scala

+68-42
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,16 @@ package io.aiven.guardian.kafka.backup.s3
33
import akka.actor.ActorSystem
44
import akka.kafka.ProducerSettings
55
import akka.kafka.scaladsl.Producer
6+
import akka.stream.KillSwitches
7+
import akka.stream.SharedKillSwitch
68
import akka.stream.alpakka.s3.S3Settings
79
import akka.stream.alpakka.s3.scaladsl.S3
810
import akka.stream.scaladsl.Sink
911
import akka.stream.scaladsl.Source
12+
import akka.util.ByteString
1013
import io.aiven.guardian.akka.AnyPropTestKit
1114
import io.aiven.guardian.kafka.Generators.KafkaDataInChunksWithTimePeriod
1215
import io.aiven.guardian.kafka.Generators.kafkaDataWithMinSizeGen
13-
import io.aiven.guardian.kafka.KafkaClient
1416
import io.aiven.guardian.kafka.KafkaClusterTest
1517
import io.aiven.guardian.kafka.Utils._
1618
import io.aiven.guardian.kafka.backup.configs.Backup
@@ -26,6 +28,7 @@ import org.apache.kafka.clients.producer.ProducerRecord
2628
import org.apache.kafka.common.serialization.ByteArraySerializer
2729
import org.mdedetrich.akka.stream.support.CirceStreamSupport
2830

31+
import scala.concurrent.Future
2932
import scala.concurrent.duration._
3033
import scala.jdk.CollectionConverters._
3134
import scala.jdk.FutureConverters._
@@ -47,12 +50,26 @@ class RealS3BackupClientSpec
4750
override lazy val bucketPrefix: Option[String] = Some("guardian-")
4851
override lazy val enableCleanup: Option[FiniteDuration] = Some(5 seconds)
4952

53+
case object TerminationException extends Exception("termination-exception")
54+
5055
def reducedConsumerRecordsToJson(reducedConsumerRecords: List[ReducedConsumerRecord]): Array[Byte] = {
5156
import io.aiven.guardian.kafka.codecs.Circe._
5257
import io.circe.syntax._
5358
reducedConsumerRecords.asJson.noSpaces.getBytes
5459
}
5560

61+
def createKafkaClient(
62+
killSwitch: SharedKillSwitch
63+
)(implicit kafkaClusterConfig: KafkaCluster): KafkaClientWithKillSwitch =
64+
new KafkaClientWithKillSwitch(
65+
configure = Some(
66+
_.withBootstrapServers(
67+
container.bootstrapServers
68+
).withGroupId("test-group")
69+
),
70+
killSwitch
71+
)
72+
5673
property("suspend/resume works correctly") {
5774
forAll(kafkaDataWithMinSizeGen(S3.MinChunkSize, 5, reducedConsumerRecordsToJson),
5875
s3ConfigGen(useVirtualDotHost, bucketPrefix)
@@ -72,18 +89,15 @@ class RealS3BackupClientSpec
7289
ProducerSettings(system, new ByteArraySerializer, new ByteArraySerializer)
7390
.withBootstrapServers(container.bootstrapServers)
7491

75-
val backupClient = new BackupClient(Some(s3Settings))(new KafkaClient(
76-
configure = Some(
77-
_.withBootstrapServers(
78-
container.bootstrapServers
79-
).withGroupId("test-group")
80-
)
81-
),
82-
implicitly,
83-
implicitly,
84-
implicitly,
85-
implicitly
86-
)
92+
val firstKillSwitch = KillSwitches.shared("first-kill-switch")
93+
94+
val backupClient =
95+
new BackupClient(Some(s3Settings))(createKafkaClient(firstKillSwitch),
96+
implicitly,
97+
implicitly,
98+
implicitly,
99+
implicitly
100+
)
87101

88102
val asProducerRecord = data.map { reducedConsumerRecord =>
89103
val keyAsByteArray = Base64.getDecoder.decode(reducedConsumerRecord.key)
@@ -110,45 +124,57 @@ class RealS3BackupClientSpec
110124
val calculatedFuture = for {
111125
_ <- createTopics.all().toCompletableFuture.asScala
112126
_ <- createBucket(s3Config.dataBucket)
113-
firstControl = backupClient.backup.run()
114-
_ = baseSource.runWith(Producer.plainSink(producerSettings))
115-
_ <- akka.pattern.after(15 seconds)(firstControl.stop())
116-
secondBackupClient = new BackupClient(Some(s3Settings))(new KafkaClient(
117-
configure = Some(
118-
_.withBootstrapServers(
119-
container.bootstrapServers
120-
).withGroupId("test-group")
121-
)
122-
),
123-
implicitly,
124-
implicitly,
125-
implicitly,
126-
implicitly
127-
)
128-
secondControl = secondBackupClient.backup.run()
129-
_ <- akka.pattern.after(15 seconds)(secondControl.stop())
127+
_ = backupClient.backup.run()
128+
_ = baseSource.runWith(Producer.plainSink(producerSettings))
129+
_ <- akka.pattern.after(15 seconds)(Future {
130+
firstKillSwitch.abort(TerminationException)
131+
})
132+
secondKillSwitch = KillSwitches.shared("second-kill-switch")
133+
secondBackupClient =
134+
new BackupClient(Some(s3Settings))(createKafkaClient(secondKillSwitch),
135+
implicitly,
136+
implicitly,
137+
implicitly,
138+
implicitly
139+
)
140+
_ <- akka.pattern.after(10 seconds)(Future(secondBackupClient.backup.run()))
141+
_ <- akka.pattern.after(30 seconds)(Future {
142+
secondKillSwitch.abort(TerminationException)
143+
})
130144
// Need to manually merge the bucket since we haven't gone over time window
131-
incomplete <- S3.listMultipartUpload(s3Config.dataBucket, None).withAttributes(s3Attrs).runWith(Sink.seq)
145+
incomplete <- akka.pattern.after(10 seconds)(
146+
S3.listMultipartUpload(s3Config.dataBucket, None).withAttributes(s3Attrs).runWith(Sink.seq)
147+
)
132148

133149
grouped = incomplete.groupBy(_.key)
134150

135151
(key, uploadIds) = grouped.head
136-
uploadId = uploadIds.head
152+
upload = uploadIds.head
137153

138-
parts <- S3.listParts(s3Config.dataBucket, key, uploadId.uploadId).runWith(Sink.seq)
154+
parts <- S3.listParts(s3Config.dataBucket, key, upload.uploadId).runWith(Sink.seq)
139155

140-
_ <- S3.completeMultipartUpload(s3Config.dataBucket, key, uploadId.uploadId, parts.map(_.toPart))
156+
_ <- Source
157+
.single(ByteString("null]"))
158+
.runWith(
159+
S3.resumeMultipartUpload(s3Config.dataBucket, key, upload.uploadId, parts.map(_.toPart))
160+
.withAttributes(s3Attrs)
161+
)
141162

142163
downloaded <-
143-
akka.pattern.after(5.seconds)(
144-
S3.download(s3Config.dataBucket, key).withAttributes(s3Attrs).runWith(Sink.head).flatMap {
145-
case Some((downloadSource, _)) =>
146-
downloadSource.via(CirceStreamSupport.decode[List[ReducedConsumerRecord]]).runWith(Sink.seq)
147-
case None => throw new Exception(s"Expected object in bucket ${s3Config.dataBucket} with key $key")
148-
}
164+
akka.pattern.after(10 seconds)(
165+
S3.download(s3Config.dataBucket, key)
166+
.withAttributes(s3Attrs)
167+
.runWith(Sink.head)
168+
.flatMap {
169+
case Some((downloadSource, _)) =>
170+
downloadSource.via(CirceStreamSupport.decode[List[Option[ReducedConsumerRecord]]]).runWith(Sink.seq)
171+
case None => throw new Exception(s"Expected object in bucket ${s3Config.dataBucket} with key $key")
172+
}
149173
)
150174

151-
} yield downloaded.flatten
175+
} yield downloaded.toList.flatten.collect { case Some(reducedConsumerRecord) =>
176+
reducedConsumerRecord
177+
}
152178

153179
val downloaded = calculatedFuture.futureValue
154180

@@ -161,7 +187,7 @@ class RealS3BackupClientSpec
161187
}
162188
.toMap
163189

164-
val inputAsKey = kafkaDataInChunksWithTimePeriod.data.flatten
190+
val inputAsKey = data
165191
.groupBy(_.key)
166192
.view
167193
.mapValues { reducedConsumerRecords =>

core/src/main/scala/io/aiven/guardian/kafka/KafkaClient.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ class KafkaClient(
5353
/** @return
5454
* A `SourceWithContext` that returns a Kafka Stream which automatically handles committing of cursors
5555
*/
56-
override val getSource: SourceWithContext[ReducedConsumerRecord, CommittableOffset, Consumer.Control] =
56+
override def getSource: SourceWithContext[ReducedConsumerRecord, CommittableOffset, Consumer.Control] =
5757
Consumer
5858
.sourceWithOffsetContext(consumerSettings, subscriptions)
5959
.map(consumerRecord =>
@@ -72,7 +72,7 @@ class KafkaClient(
7272
/** @return
7373
* A `Sink` that allows you to commit a `CursorContext` to Kafka to signify you have processed a message
7474
*/
75-
override val commitCursor: Sink[CommittableOffsetBatch, Future[Done]] = Committer.sink(committerSettings)
75+
override def commitCursor: Sink[CommittableOffsetBatch, Future[Done]] = Committer.sink(committerSettings)
7676

7777
/** How to batch an immutable iterable of `CursorContext` into a `BatchedCursorContext`
7878
* @param cursors

0 commit comments

Comments
 (0)