|
41 | 41 | import org.junit.jupiter.api.Test; |
42 | 42 |
|
43 | 43 | import java.time.Duration; |
| 44 | +import java.util.ArrayList; |
44 | 45 | import java.util.Collections; |
45 | 46 | import java.util.HashMap; |
46 | 47 | import java.util.List; |
@@ -262,7 +263,7 @@ public void testTotallyExpire() throws Exception { |
262 | 263 | table = table.copy(map); |
263 | 264 |
|
264 | 265 | int currentSecs = (int) (System.currentTimeMillis() / 1000); |
265 | | - // if seconds is too short, this test might file |
| 266 | + // if seconds is too short, this test might fail |
266 | 267 | int seconds = 5; |
267 | 268 |
|
268 | 269 | // large file A. It has no delete records and expired records, will be upgraded to maxLevel |
@@ -307,6 +308,61 @@ public void testTotallyExpire() throws Exception { |
307 | 308 | GenericRow.of(1, 3, currentSecs + 60 * 60)); |
308 | 309 | } |
309 | 310 |
|
| 311 | + @Test |
| 312 | + public void testPickSmallFilesWhenFullCompact() throws Exception { |
| 313 | + Map<String, String> map = new HashMap<>(); |
| 314 | + map.put(CoreOptions.TARGET_FILE_SIZE.key(), "6000 B"); |
| 315 | + table = table.copy(map); |
| 316 | + |
| 317 | + int currentSecs = (int) (System.currentTimeMillis() / 1000); |
| 318 | + // if seconds is too short, this test might fail |
| 319 | + int seconds = 5; |
| 320 | + |
| 321 | + // [1-1000], no expire |
| 322 | + writeCommit(rows(1, 1000, currentSecs + 60 * 60).toArray(new GenericRow[0])); |
| 323 | + compact(1); |
| 324 | + List<DataSplit> splits = table.newSnapshotReader().read().dataSplits(); |
| 325 | + assertThat(splits.size()).isEqualTo(1); |
| 326 | + assertThat(splits.get(0).dataFiles().size()).isEqualTo(1); |
| 327 | + |
| 328 | + // [1001-1012], 1011 will be expired |
| 329 | + List<GenericRow> rows2 = rows(1001, 1010, currentSecs + 60 * 60); |
| 330 | + rows2.add(GenericRow.of(1, 1011, currentSecs + seconds)); |
| 331 | + rows2.add(GenericRow.of(1, 1012, currentSecs + 60 * 60)); |
| 332 | + writeCommit(rows2.toArray(new GenericRow[0])); |
| 333 | + compact(1); |
| 334 | + splits = table.newSnapshotReader().read().dataSplits(); |
| 335 | + assertThat(splits.get(0).dataFiles().size()).isEqualTo(2); |
| 336 | + |
| 337 | + // this will generate 2 files: [2000-2999],[3000,3012]. 3011 will be expired |
| 338 | + List<GenericRow> rows3 = rows(2000, 3010, currentSecs + 60 * 60); |
| 339 | + rows3.add(GenericRow.of(1, 3011, currentSecs + seconds)); |
| 340 | + rows3.add(GenericRow.of(1, 3012, currentSecs + 60 * 60)); |
| 341 | + writeCommit(rows3.toArray(new GenericRow[0])); |
| 342 | + compact(1); |
| 343 | + splits = table.newSnapshotReader().read().dataSplits(); |
| 344 | + assertThat(splits.get(0).dataFiles().size()).isEqualTo(4); |
| 345 | + assertThat(splits.get(0).dataFiles().stream().mapToLong(DataFileMeta::rowCount).sum()) |
| 346 | + .isEqualTo(2025); |
| 347 | + |
| 348 | + // ensure (1, 2, currentSecs + seconds) out of date |
| 349 | + Thread.sleep(seconds * 1000 + 2000); |
| 350 | + // pick two small files: [1001-1012] and [3000,3012] |
| 351 | + compact(1); |
| 352 | + splits = table.newSnapshotReader().read().dataSplits(); |
| 353 | + assertThat(splits.get(0).dataFiles().size()).isEqualTo(4); |
| 354 | + assertThat(splits.get(0).dataFiles().stream().mapToLong(DataFileMeta::rowCount).sum()) |
| 355 | + .isEqualTo(2023); |
| 356 | + } |
| 357 | + |
| 358 | + private List<GenericRow> rows(int start, int end, int time) { |
| 359 | + List<GenericRow> rows = new ArrayList<>(); |
| 360 | + for (int i = start; i <= end; i++) { |
| 361 | + rows.add(GenericRow.of(1, i, time)); |
| 362 | + } |
| 363 | + return rows; |
| 364 | + } |
| 365 | + |
310 | 366 | private void refreshTable() throws Catalog.TableNotExistException { |
311 | 367 | CatalogContext context = |
312 | 368 | CatalogContext.create( |
|
0 commit comments