@@ -3,6 +3,7 @@ package io.aiven.guardian.kafka
3
3
import io .aiven .guardian .kafka .models .ReducedConsumerRecord
4
4
import org .apache .kafka .common .record .TimestampType
5
5
import org .scalacheck .Gen
6
+ import org .scalacheck .util .Buildable
6
7
7
8
import scala .collection .mutable .ListBuffer
8
9
import scala .concurrent .duration .FiniteDuration
@@ -113,6 +114,31 @@ object Generators {
113
114
Gen .choose[Long ](head.timestamp, last.timestamp - 1 ).map(millis => FiniteDuration (millis, MILLISECONDS ))
114
115
}
115
116
117
+ @ SuppressWarnings (
118
+ Array (
119
+ " scalafix:DisableSyntax.while"
120
+ )
121
+ )
122
+ @ scala.annotation.nowarn(" msg=return statement uses an exception" )
123
+ private def buildableOfCollCond [C <: Iterable [T ], T ](cond : C => Boolean , g : Gen [C ])(implicit
124
+ evb : Buildable [T , C ]
125
+ ): Gen [C ] =
126
+ Gen .infiniteLazyList(g).map { ll =>
127
+ val it = ll.iterator
128
+ val bldr = evb.builder
129
+ while (true ) {
130
+ val result = bldr.result()
131
+ if (cond(result)) {
132
+ return result
133
+ }
134
+ bldr ++= it.next()
135
+ }
136
+ return bldr.result()
137
+ }
138
+
139
+ private def listOfFillCond [T ](finishCondition : List [T ] => Boolean , g : => Gen [List [T ]]) =
140
+ buildableOfCollCond[List [T ], T ](finishCondition, g)
141
+
116
142
def kafkaDateGen (min : Int = 2 ,
117
143
max : Int = 100 ,
118
144
padTimestampsMillis : Range = Range .inclusive(1 , 10 ),
@@ -121,7 +147,7 @@ object Generators {
121
147
topic <- kafkaTopic
122
148
records <- {
123
149
val base = Generators .kafkaReducedConsumerRecordsGen(topic, min, max, padTimestampsMillis)
124
- condition.fold(base)(c => Gen . listOfFillCond(c, base))
150
+ condition.fold(base)(c => listOfFillCond(c, base))
125
151
}
126
152
} yield records
127
153
0 commit comments