Skip to content

Commit 4efd665

Browse files
committed
Replace unneeded throttle parameter with Source
1 parent 7b53e05 commit 4efd665

File tree

5 files changed

+21
-31
lines changed

5 files changed

+21
-31
lines changed

backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/BackupClientSpec.scala

+6-5
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import akka.stream.alpakka.s3.S3Settings
77
import akka.stream.alpakka.s3.scaladsl.S3
88
import akka.stream.scaladsl.Keep
99
import akka.stream.scaladsl.Sink
10+
import akka.stream.scaladsl.Source
1011
import akka.testkit.TestKitBase
1112
import com.softwaremill.diffx.generic.auto._
1213
import com.softwaremill.diffx.scalatest.DiffMatcher.matchTo
@@ -131,11 +132,11 @@ trait BackupClientSpec
131132
forAll(kafkaDataWithTimePeriodsGen(), s3ConfigGen(useVirtualDotHost, bucketPrefix)) {
132133
(kafkaDataWithTimePeriod: KafkaDataWithTimePeriod, s3Config: S3Config) =>
133134
logger.info(s"Data bucket is ${s3Config.dataBucket}")
134-
val backupClient = new MockedS3BackupClientInterface(kafkaDataWithTimePeriod.data,
135-
kafkaDataWithTimePeriod.periodSlice,
136-
s3Config,
137-
Some(s3Settings),
138-
Some(_.throttle(ThrottleElements, ThrottleAmount))
135+
val backupClient = new MockedS3BackupClientInterface(
136+
Source(kafkaDataWithTimePeriod.data).throttle(ThrottleElements, ThrottleAmount),
137+
kafkaDataWithTimePeriod.periodSlice,
138+
s3Config,
139+
Some(s3Settings)
139140
)
140141

141142
val delay =

backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/MockedS3BackupClientInterface.scala

+3-6
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,12 @@ import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config}
1212
import scala.concurrent.duration.FiniteDuration
1313

1414
class MockedS3BackupClientInterface(
15-
kafkaData: List[ReducedConsumerRecord],
15+
kafkaData: Source[ReducedConsumerRecord, NotUsed],
1616
periodSlice: FiniteDuration,
1717
s3Config: S3Config,
18-
maybeS3Settings: Option[S3Settings],
19-
sourceTransform: Option[
20-
Source[(ReducedConsumerRecord, Long), NotUsed] => Source[(ReducedConsumerRecord, Long), NotUsed]
21-
] = None
18+
maybeS3Settings: Option[S3Settings]
2219
)(implicit val s3Headers: S3Headers)
23-
extends BackupClient(maybeS3Settings)(new MockedKafkaClientInterface(kafkaData, sourceTransform),
20+
extends BackupClient(maybeS3Settings)(new MockedKafkaClientInterface(kafkaData),
2421
Backup(periodSlice),
2522
s3Config,
2623
implicitly

core-backup/src/test/scala/io/aiven/guardian/kafka/backup/BackupClientInterfaceSpec.scala

+4-4
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class BackupClientInterfaceSpec
4040

4141
property("Ordered Kafka events should produce at least one BackupStreamPosition.Boundary") {
4242
forAll(kafkaDataWithTimePeriodsGen()) { (kafkaDataWithTimePeriod: KafkaDataWithTimePeriod) =>
43-
val mock = new MockedBackupClientInterfaceWithMockedKafkaData(kafkaDataWithTimePeriod.data,
43+
val mock = new MockedBackupClientInterfaceWithMockedKafkaData(Source(kafkaDataWithTimePeriod.data),
4444
kafkaDataWithTimePeriod.periodSlice
4545
)
4646

@@ -61,7 +61,7 @@ class BackupClientInterfaceSpec
6161
"Every ReducedConsumerRecord after a BackupStreamPosition.Boundary should be in the next consecutive time period"
6262
) {
6363
forAll(kafkaDataWithTimePeriodsGen()) { (kafkaDataWithTimePeriod: KafkaDataWithTimePeriod) =>
64-
val mock = new MockedBackupClientInterfaceWithMockedKafkaData(kafkaDataWithTimePeriod.data,
64+
val mock = new MockedBackupClientInterfaceWithMockedKafkaData(Source(kafkaDataWithTimePeriod.data),
6565
kafkaDataWithTimePeriod.periodSlice
6666
)
6767

@@ -95,7 +95,7 @@ class BackupClientInterfaceSpec
9595
"The time difference between two consecutive BackupStreamPosition.Middle's has to be less then the specified time period"
9696
) {
9797
forAll(kafkaDataWithTimePeriodsGen()) { (kafkaDataWithTimePeriod: KafkaDataWithTimePeriod) =>
98-
val mock = new MockedBackupClientInterfaceWithMockedKafkaData(kafkaDataWithTimePeriod.data,
98+
val mock = new MockedBackupClientInterfaceWithMockedKafkaData(Source(kafkaDataWithTimePeriod.data),
9999
kafkaDataWithTimePeriod.periodSlice
100100
)
101101

@@ -121,7 +121,7 @@ class BackupClientInterfaceSpec
121121

122122
property("backup method completes flow correctly for all valid Kafka events") {
123123
forAll(kafkaDataWithTimePeriodsGen()) { (kafkaDataWithTimePeriod: KafkaDataWithTimePeriod) =>
124-
val mock = new MockedBackupClientInterfaceWithMockedKafkaData(kafkaDataWithTimePeriod.data,
124+
val mock = new MockedBackupClientInterfaceWithMockedKafkaData(Source(kafkaDataWithTimePeriod.data),
125125
kafkaDataWithTimePeriod.periodSlice
126126
)
127127

core-backup/src/test/scala/io/aiven/guardian/kafka/backup/MockedBackupClientInterface.scala

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package io.aiven.guardian.kafka.backup
22

33
import akka.Done
4+
import akka.NotUsed
45
import akka.actor.ActorSystem
56
import akka.stream.scaladsl.Keep
67
import akka.stream.scaladsl.Sink
8+
import akka.stream.scaladsl.Source
79
import akka.util.ByteString
810
import io.aiven.guardian.kafka.MockedKafkaClientInterface
911
import io.aiven.guardian.kafka.backup.configs.Backup
@@ -78,6 +80,6 @@ class MockedBackupClientInterface(override val kafkaClientInterface: MockedKafka
7880

7981
/** A `MockedBackupClientInterface` that also uses a mocked `KafkaClientInterface`
8082
*/
81-
class MockedBackupClientInterfaceWithMockedKafkaData(kafkaData: List[ReducedConsumerRecord],
83+
class MockedBackupClientInterfaceWithMockedKafkaData(kafkaData: Source[ReducedConsumerRecord, NotUsed],
8284
periodSlice: FiniteDuration
8385
) extends MockedBackupClientInterface(new MockedKafkaClientInterface(kafkaData), periodSlice)

core/src/test/scala/io/aiven/guardian/kafka/MockedKafkaClientInterface.scala

+5-15
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,7 @@ import java.util.concurrent.ConcurrentLinkedQueue
1919
* A function that allows you to transform the source in some way. Convenient for cases such as throttling. By
2020
* default this is `None` so it just preserves the original source.
2121
*/
22-
class MockedKafkaClientInterface(
23-
kafkaData: List[ReducedConsumerRecord],
24-
sourceTransform: Option[
25-
Source[(ReducedConsumerRecord, Long), NotUsed] => Source[(ReducedConsumerRecord, Long), NotUsed]
26-
] = None
27-
) extends KafkaClientInterface {
22+
class MockedKafkaClientInterface(kafkaData: Source[ReducedConsumerRecord, NotUsed]) extends KafkaClientInterface {
2823

2924
/** A collection that keeps track of whenever a cursor is committed
3025
*/
@@ -42,17 +37,12 @@ class MockedKafkaClientInterface(
4237
/** @return
4338
* A `SourceWithContext` that returns a Kafka Stream which automatically handles committing of cursors
4439
*/
45-
override def getSource: SourceWithContext[ReducedConsumerRecord, Long, Future[NotUsed]] = {
46-
val source = Source(kafkaData.map { reducedConsumerRecord =>
47-
(reducedConsumerRecord, reducedConsumerRecord.offset)
48-
})
49-
50-
val finalSource = sourceTransform.fold(source)(block => block(source))
51-
40+
override def getSource: SourceWithContext[ReducedConsumerRecord, Long, Future[NotUsed]] =
5241
SourceWithContext
53-
.fromTuples(finalSource)
42+
.fromTuples(kafkaData.map { reducedConsumerRecord =>
43+
(reducedConsumerRecord, reducedConsumerRecord.offset)
44+
})
5445
.mapMaterializedValue(Future.successful)
55-
}
5646

5747
/** @return
5848
* A `Sink` that allows you to commit a `CursorContext` to Kafka to signify you have processed a message

0 commit comments

Comments
 (0)