@@ -41,6 +41,7 @@ import java.nio.file.{Files, Path}
41
41
import java .util
42
42
import java .util .{Collections , Optional , Properties }
43
43
import scala .jdk .CollectionConverters ._
44
+ import scala .util .Using
44
45
45
46
final class KafkaMetadataLogTest {
46
47
import KafkaMetadataLogTest ._
@@ -123,7 +124,7 @@ final class KafkaMetadataLogTest {
123
124
append(log, numberOfRecords, epoch)
124
125
log.updateHighWatermark(new LogOffsetMetadata (numberOfRecords))
125
126
126
- TestUtils .resource (log.createNewSnapshot(snapshotId).get()) { snapshot =>
127
+ Using (log.createNewSnapshot(snapshotId).get()) { snapshot =>
127
128
snapshot.freeze()
128
129
}
129
130
@@ -191,7 +192,7 @@ final class KafkaMetadataLogTest {
191
192
192
193
append(log, numberOfRecords, epoch)
193
194
log.updateHighWatermark(new LogOffsetMetadata (numberOfRecords))
194
- TestUtils .resource (log.createNewSnapshot(snapshotId).get()) { snapshot =>
195
+ Using (log.createNewSnapshot(snapshotId).get()) { snapshot =>
195
196
snapshot.freeze()
196
197
}
197
198
@@ -227,7 +228,7 @@ final class KafkaMetadataLogTest {
227
228
append(log, numberOfRecords, epoch)
228
229
log.updateHighWatermark(new LogOffsetMetadata (numberOfRecords))
229
230
230
- TestUtils .resource (log.createNewSnapshot(snapshotId).get()) { snapshot =>
231
+ Using (log.createNewSnapshot(snapshotId).get()) { snapshot =>
231
232
snapshot.freeze()
232
233
}
233
234
@@ -276,7 +277,7 @@ final class KafkaMetadataLogTest {
276
277
append(log, numberOfRecords, epoch)
277
278
log.updateHighWatermark(new LogOffsetMetadata (numberOfRecords))
278
279
279
- TestUtils .resource (log.createNewSnapshot(snapshotId).get()) { snapshot =>
280
+ Using (log.createNewSnapshot(snapshotId).get()) { snapshot =>
280
281
snapshot.freeze()
281
282
}
282
283
@@ -323,7 +324,7 @@ final class KafkaMetadataLogTest {
323
324
324
325
append(log, numberOfRecords, epoch)
325
326
326
- TestUtils .resource (log.createNewSnapshotUnchecked(sameEpochSnapshotId).get()) { snapshot =>
327
+ Using (log.createNewSnapshotUnchecked(sameEpochSnapshotId).get()) { snapshot =>
327
328
snapshot.freeze()
328
329
}
329
330
@@ -337,7 +338,7 @@ final class KafkaMetadataLogTest {
337
338
338
339
append(log, numberOfRecords, epoch)
339
340
340
- TestUtils .resource (log.createNewSnapshotUnchecked(greaterEpochSnapshotId).get()) { snapshot =>
341
+ Using (log.createNewSnapshotUnchecked(greaterEpochSnapshotId).get()) { snapshot =>
341
342
snapshot.freeze()
342
343
}
343
344
@@ -356,25 +357,25 @@ final class KafkaMetadataLogTest {
356
357
357
358
append(log, 1 , epoch - 1 )
358
359
val oldSnapshotId1 = new OffsetAndEpoch (1 , epoch - 1 )
359
- TestUtils .resource (log.createNewSnapshotUnchecked(oldSnapshotId1).get()) { snapshot =>
360
+ Using (log.createNewSnapshotUnchecked(oldSnapshotId1).get()) { snapshot =>
360
361
snapshot.freeze()
361
362
}
362
363
363
364
append(log, 1 , epoch)
364
365
val oldSnapshotId2 = new OffsetAndEpoch (2 , epoch)
365
- TestUtils .resource (log.createNewSnapshotUnchecked(oldSnapshotId2).get()) { snapshot =>
366
+ Using (log.createNewSnapshotUnchecked(oldSnapshotId2).get()) { snapshot =>
366
367
snapshot.freeze()
367
368
}
368
369
369
370
append(log, numberOfRecords - 2 , epoch)
370
371
val oldSnapshotId3 = new OffsetAndEpoch (numberOfRecords, epoch)
371
- TestUtils .resource (log.createNewSnapshotUnchecked(oldSnapshotId3).get()) { snapshot =>
372
+ Using (log.createNewSnapshotUnchecked(oldSnapshotId3).get()) { snapshot =>
372
373
snapshot.freeze()
373
374
}
374
375
375
376
val greaterSnapshotId = new OffsetAndEpoch (3 * numberOfRecords, epoch)
376
377
append(log, numberOfRecords, epoch)
377
- TestUtils .resource (log.createNewSnapshotUnchecked(greaterSnapshotId).get()) { snapshot =>
378
+ Using (log.createNewSnapshotUnchecked(greaterSnapshotId).get()) { snapshot =>
378
379
snapshot.freeze()
379
380
}
380
381
@@ -467,7 +468,7 @@ final class KafkaMetadataLogTest {
467
468
metadataDir : File ,
468
469
snapshotId : OffsetAndEpoch
469
470
): Unit = {
470
- TestUtils .resource (FileRawSnapshotWriter .create(metadataDir.toPath, snapshotId))(_.freeze())
471
+ Using (FileRawSnapshotWriter .create(metadataDir.toPath, snapshotId))(_.freeze())
471
472
}
472
473
473
474
@ Test
@@ -479,7 +480,7 @@ final class KafkaMetadataLogTest {
479
480
append(log, numberOfRecords, epoch)
480
481
481
482
val olderEpochSnapshotId = new OffsetAndEpoch (numberOfRecords, epoch - 1 )
482
- TestUtils .resource (log.createNewSnapshotUnchecked(olderEpochSnapshotId).get()) { snapshot =>
483
+ Using (log.createNewSnapshotUnchecked(olderEpochSnapshotId).get()) { snapshot =>
483
484
snapshot.freeze()
484
485
}
485
486
@@ -488,7 +489,7 @@ final class KafkaMetadataLogTest {
488
489
append(log, numberOfRecords, epoch)
489
490
490
491
val olderOffsetSnapshotId = new OffsetAndEpoch (numberOfRecords, epoch)
491
- TestUtils .resource (log.createNewSnapshotUnchecked(olderOffsetSnapshotId).get()) { snapshot =>
492
+ Using (log.createNewSnapshotUnchecked(olderOffsetSnapshotId).get()) { snapshot =>
492
493
snapshot.freeze()
493
494
}
494
495
@@ -503,7 +504,7 @@ final class KafkaMetadataLogTest {
503
504
val snapshotId = new OffsetAndEpoch (1 , epoch)
504
505
505
506
append(log, numberOfRecords, epoch)
506
- TestUtils .resource (log.createNewSnapshotUnchecked(snapshotId).get()) { snapshot =>
507
+ Using (log.createNewSnapshotUnchecked(snapshotId).get()) { snapshot =>
507
508
snapshot.freeze()
508
509
}
509
510
@@ -540,25 +541,25 @@ final class KafkaMetadataLogTest {
540
541
541
542
append(log, 1 , epoch - 1 )
542
543
val oldSnapshotId1 = new OffsetAndEpoch (1 , epoch - 1 )
543
- TestUtils .resource (log.createNewSnapshotUnchecked(oldSnapshotId1).get()) { snapshot =>
544
+ Using (log.createNewSnapshotUnchecked(oldSnapshotId1).get()) { snapshot =>
544
545
snapshot.freeze()
545
546
}
546
547
547
548
append(log, 1 , epoch)
548
549
val oldSnapshotId2 = new OffsetAndEpoch (2 , epoch)
549
- TestUtils .resource (log.createNewSnapshotUnchecked(oldSnapshotId2).get()) { snapshot =>
550
+ Using (log.createNewSnapshotUnchecked(oldSnapshotId2).get()) { snapshot =>
550
551
snapshot.freeze()
551
552
}
552
553
553
554
append(log, numberOfRecords - 2 , epoch)
554
555
val oldSnapshotId3 = new OffsetAndEpoch (numberOfRecords, epoch)
555
- TestUtils .resource (log.createNewSnapshotUnchecked(oldSnapshotId3).get()) { snapshot =>
556
+ Using (log.createNewSnapshotUnchecked(oldSnapshotId3).get()) { snapshot =>
556
557
snapshot.freeze()
557
558
}
558
559
559
560
val greaterSnapshotId = new OffsetAndEpoch (3 * numberOfRecords, epoch)
560
561
append(log, numberOfRecords, epoch)
561
- TestUtils .resource (log.createNewSnapshotUnchecked(greaterSnapshotId).get()) { snapshot =>
562
+ Using (log.createNewSnapshotUnchecked(greaterSnapshotId).get()) { snapshot =>
562
563
snapshot.freeze()
563
564
}
564
565
@@ -589,7 +590,7 @@ final class KafkaMetadataLogTest {
589
590
val snapshotId = new OffsetAndEpoch (numberOfRecords + 1 , epoch + 1 )
590
591
591
592
append(log, numberOfRecords, epoch)
592
- TestUtils .resource (log.createNewSnapshotUnchecked(snapshotId).get()) { snapshot =>
593
+ Using (log.createNewSnapshotUnchecked(snapshotId).get()) { snapshot =>
593
594
snapshot.freeze()
594
595
}
595
596
@@ -688,7 +689,7 @@ final class KafkaMetadataLogTest {
688
689
log.updateHighWatermark(new LogOffsetMetadata (numberOfRecords))
689
690
690
691
val snapshotId = new OffsetAndEpoch (numberOfRecords, epoch)
691
- TestUtils .resource (log.createNewSnapshot(snapshotId).get()) { snapshot =>
692
+ Using (log.createNewSnapshot(snapshotId).get()) { snapshot =>
692
693
snapshot.freeze()
693
694
}
694
695
@@ -708,7 +709,7 @@ final class KafkaMetadataLogTest {
708
709
log.updateHighWatermark(new LogOffsetMetadata (offset))
709
710
710
711
val snapshotId = new OffsetAndEpoch (offset, epoch)
711
- TestUtils .resource (log.createNewSnapshot(snapshotId).get()) { snapshot =>
712
+ Using (log.createNewSnapshot(snapshotId).get()) { snapshot =>
712
713
snapshot.freeze()
713
714
}
714
715
// Simulate log cleaning advancing the LSO
@@ -730,7 +731,7 @@ final class KafkaMetadataLogTest {
730
731
log.updateHighWatermark(new LogOffsetMetadata (offset))
731
732
732
733
val snapshotId = new OffsetAndEpoch (offset, epoch)
733
- TestUtils .resource (log.createNewSnapshot(snapshotId).get()) { snapshot =>
734
+ Using (log.createNewSnapshot(snapshotId).get()) { snapshot =>
734
735
snapshot.freeze()
735
736
}
736
737
@@ -747,7 +748,7 @@ final class KafkaMetadataLogTest {
747
748
val log = buildMetadataLog(tempDir, mockTime)
748
749
log.updateHighWatermark(new LogOffsetMetadata (offset))
749
750
val snapshotId = new OffsetAndEpoch (offset, 1 )
750
- TestUtils .resource (log.createNewSnapshotUnchecked(snapshotId).get()) { snapshot =>
751
+ Using (log.createNewSnapshotUnchecked(snapshotId).get()) { snapshot =>
751
752
snapshot.freeze()
752
753
}
753
754
log.truncateToLatestSnapshot()
@@ -771,7 +772,7 @@ final class KafkaMetadataLogTest {
771
772
val log = buildMetadataLog(tempDir, mockTime)
772
773
log.updateHighWatermark(new LogOffsetMetadata (offset))
773
774
val snapshotId = new OffsetAndEpoch (offset, 1 )
774
- TestUtils .resource (log.createNewSnapshotUnchecked(snapshotId).get()) { snapshot =>
775
+ Using (log.createNewSnapshotUnchecked(snapshotId).get()) { snapshot =>
775
776
snapshot.freeze()
776
777
}
777
778
log.truncateToLatestSnapshot()
@@ -853,13 +854,13 @@ final class KafkaMetadataLogTest {
853
854
assertFalse(log.maybeClean(), " Should not clean since no snapshots exist" )
854
855
855
856
val snapshotId1 = new OffsetAndEpoch (1000 , 1 )
856
- TestUtils .resource (log.createNewSnapshotUnchecked(snapshotId1).get()) { snapshot =>
857
+ Using (log.createNewSnapshotUnchecked(snapshotId1).get()) { snapshot =>
857
858
append(snapshot, 100 )
858
859
snapshot.freeze()
859
860
}
860
861
861
862
val snapshotId2 = new OffsetAndEpoch (2000 , 1 )
862
- TestUtils .resource (log.createNewSnapshotUnchecked(snapshotId2).get()) { snapshot =>
863
+ Using (log.createNewSnapshotUnchecked(snapshotId2).get()) { snapshot =>
863
864
append(snapshot, 100 )
864
865
snapshot.freeze()
865
866
}
@@ -891,7 +892,7 @@ final class KafkaMetadataLogTest {
891
892
892
893
for (offset <- Seq (100 , 200 , 300 , 400 , 500 , 600 )) {
893
894
val snapshotId = new OffsetAndEpoch (offset, 1 )
894
- TestUtils .resource (log.createNewSnapshotUnchecked(snapshotId).get()) { snapshot =>
895
+ Using (log.createNewSnapshotUnchecked(snapshotId).get()) { snapshot =>
895
896
append(snapshot, 10 )
896
897
snapshot.freeze()
897
898
}
@@ -926,14 +927,14 @@ final class KafkaMetadataLogTest {
926
927
927
928
// Then generate two snapshots
928
929
val snapshotId1 = new OffsetAndEpoch (1000 , 1 )
929
- TestUtils .resource (log.createNewSnapshotUnchecked(snapshotId1).get()) { snapshot =>
930
+ Using (log.createNewSnapshotUnchecked(snapshotId1).get()) { snapshot =>
930
931
append(snapshot, 500 )
931
932
snapshot.freeze()
932
933
}
933
934
934
935
// Then generate a snapshot
935
936
val snapshotId2 = new OffsetAndEpoch (2000 , 1 )
936
- TestUtils .resource (log.createNewSnapshotUnchecked(snapshotId2).get()) { snapshot =>
937
+ Using (log.createNewSnapshotUnchecked(snapshotId2).get()) { snapshot =>
937
938
append(snapshot, 500 )
938
939
snapshot.freeze()
939
940
}
@@ -973,15 +974,15 @@ final class KafkaMetadataLogTest {
973
974
log.log.logSegments.asScala.drop(1 ).head.baseOffset,
974
975
1
975
976
)
976
- TestUtils .resource (log.createNewSnapshotUnchecked(snapshotId1).get()) { snapshot =>
977
+ Using (log.createNewSnapshotUnchecked(snapshotId1).get()) { snapshot =>
977
978
snapshot.freeze()
978
979
}
979
980
// Generate second snapshots that includes the second segment by using the base offset of the third segment
980
981
val snapshotId2 = new OffsetAndEpoch (
981
982
log.log.logSegments.asScala.drop(2 ).head.baseOffset,
982
983
1
983
984
)
984
- TestUtils .resource (log.createNewSnapshotUnchecked(snapshotId2).get()) { snapshot =>
985
+ Using (log.createNewSnapshotUnchecked(snapshotId2).get()) { snapshot =>
985
986
snapshot.freeze()
986
987
}
987
988
0 commit comments