@@ -243,14 +243,29 @@ class StreamMergeSuite extends Fs2Suite {
243
243
}
244
244
}
245
245
246
- test(" mergePreferred prefers" ) {
247
- forAllF { (leftStream : Stream [Pure , Int ], rightStream : Stream [Pure , Int ]) =>
248
- val leftTagged = leftStream.covary[IO ]
249
- val rightTagged = rightStream.covary[IO ].delayBy(10 .milli)
250
- leftTagged
251
- .mergePreferred(rightTagged)
252
- .assertEmitsSameAs(leftStream ++ rightStream)
253
- }
246
+ test(" mergePreferred prefers this over that" ) {
247
+
248
+ val units = Stream .unit.covary[IO ].repeat
249
+ val left = units.map(Left (_))
250
+ val right = units.map(Right (_))
251
+
252
+ val stream = left.mergePreferred(right)
253
+
254
+ stream
255
+ .take(10000 )
256
+ .fold((0L , 0L )) {
257
+ case ((left, right), Left (_)) => (left + 1 , right)
258
+ case ((left, right), Right (_)) => (left, right + 1 )
259
+ }
260
+ .compile
261
+ .lastOrError
262
+ .map { case (left, right) =>
263
+ val relLeft = left.toDouble / (left + right).toDouble
264
+ // Tolerate up to 2% elements of the non preferred stream.
265
+ // Increase the value, if the test (ocassionally) reports false positives.
266
+ val delta = 0.02d
267
+ assertEqualsDouble(relLeft, 1.0d , delta)
268
+ }
254
269
}
255
270
256
271
test(" mergePreferred fully consumes this" ) {
0 commit comments