Skip to content

Commit e4c7b3d

Browse files
committed
nit
1 parent f542e6f commit e4c7b3d

File tree

1 file changed

+36
-268
lines changed

1 file changed

+36
-268
lines changed

fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java

Lines changed: 36 additions & 268 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@
3434
import org.junit.jupiter.api.BeforeAll;
3535
import org.junit.jupiter.api.Test;
3636

37-
import java.util.Arrays;
37+
import java.math.BigDecimal;
38+
import java.util.ArrayList;
3839
import java.util.List;
3940

4041
import static org.apache.fluss.testutils.DataTestUtils.row;
@@ -60,69 +61,7 @@ void testCompaction() throws Exception {
6061
long t1Id = createPkTable(t1);
6162
TableBucket t1Bucket = new TableBucket(t1Id, 0);
6263
// write records
63-
List<InternalRow> rows =
64-
Arrays.asList(
65-
row(
66-
true,
67-
(byte) 100,
68-
(short) 200,
69-
1,
70-
1 + 400L,
71-
500.1f,
72-
600.0d,
73-
"v1",
74-
Decimal.fromUnscaledLong(900, 5, 2),
75-
Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0),
76-
TimestampLtz.fromEpochMillis(1698235273400L),
77-
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
78-
TimestampNtz.fromMillis(1698235273501L),
79-
TimestampNtz.fromMillis(1698235273501L, 8000),
80-
new byte[] {5, 6, 7, 8},
81-
TypeUtils.castFromString("2023-10-25", DataTypes.DATE()),
82-
TypeUtils.castFromString("09:30:00.0", DataTypes.TIME()),
83-
BinaryString.fromString("abc"),
84-
new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}),
85-
row(
86-
true,
87-
(byte) 100,
88-
(short) 200,
89-
2,
90-
2 + 400L,
91-
500.1f,
92-
600.0d,
93-
"v2",
94-
Decimal.fromUnscaledLong(900, 5, 2),
95-
Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0),
96-
TimestampLtz.fromEpochMillis(1698235273400L),
97-
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
98-
TimestampNtz.fromMillis(1698235273501L),
99-
TimestampNtz.fromMillis(1698235273501L, 8000),
100-
new byte[] {5, 6, 7, 8},
101-
TypeUtils.castFromString("2023-10-25", DataTypes.DATE()),
102-
TypeUtils.castFromString("09:30:00.0", DataTypes.TIME()),
103-
BinaryString.fromString("abc"),
104-
new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}),
105-
row(
106-
true,
107-
(byte) 100,
108-
(short) 200,
109-
3,
110-
3 + 400L,
111-
500.1f,
112-
600.0d,
113-
"v3",
114-
Decimal.fromUnscaledLong(900, 5, 2),
115-
Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0),
116-
TimestampLtz.fromEpochMillis(1698235273400L),
117-
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
118-
TimestampNtz.fromMillis(1698235273501L),
119-
TimestampNtz.fromMillis(1698235273501L, 8000),
120-
new byte[] {5, 6, 7, 8},
121-
TypeUtils.castFromString("2023-10-25", DataTypes.DATE()),
122-
TypeUtils.castFromString("09:30:00.0", DataTypes.TIME()),
123-
BinaryString.fromString("abc"),
124-
new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}));
125-
writeRows(t1, rows, false);
64+
writeFullTypeRow(t1, 1, 3);
12665
waitUntilSnapshot(t1Id, 1, 0);
12766

12867
// then start tiering job
@@ -131,222 +70,51 @@ void testCompaction() throws Exception {
13170
// check the status of replica after synced
13271
assertReplicaStatus(t1Bucket, 3);
13372

134-
checkFileInIcebergTable(t1, 1);
135-
136-
// write another file
137-
rows =
138-
Arrays.asList(
139-
row(
140-
true,
141-
(byte) 100,
142-
(short) 200,
143-
4,
144-
4 + 400L,
145-
500.1f,
146-
600.0d,
147-
"v1",
148-
Decimal.fromUnscaledLong(900, 5, 2),
149-
Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0),
150-
TimestampLtz.fromEpochMillis(1698235273400L),
151-
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
152-
TimestampNtz.fromMillis(1698235273501L),
153-
TimestampNtz.fromMillis(1698235273501L, 8000),
154-
new byte[] {5, 6, 7, 8},
155-
TypeUtils.castFromString("2023-10-25", DataTypes.DATE()),
156-
TypeUtils.castFromString("09:30:00.0", DataTypes.TIME()),
157-
BinaryString.fromString("abc"),
158-
new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}),
159-
row(
160-
true,
161-
(byte) 100,
162-
(short) 200,
163-
5,
164-
5 + 400L,
165-
500.1f,
166-
600.0d,
167-
"v2",
168-
Decimal.fromUnscaledLong(900, 5, 2),
169-
Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0),
170-
TimestampLtz.fromEpochMillis(1698235273400L),
171-
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
172-
TimestampNtz.fromMillis(1698235273501L),
173-
TimestampNtz.fromMillis(1698235273501L, 8000),
174-
new byte[] {5, 6, 7, 8},
175-
TypeUtils.castFromString("2023-10-25", DataTypes.DATE()),
176-
TypeUtils.castFromString("09:30:00.0", DataTypes.TIME()),
177-
BinaryString.fromString("abc"),
178-
new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}),
179-
row(
180-
true,
181-
(byte) 100,
182-
(short) 200,
183-
6,
184-
6 + 400L,
185-
500.1f,
186-
600.0d,
187-
"v3",
188-
Decimal.fromUnscaledLong(900, 5, 2),
189-
Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0),
190-
TimestampLtz.fromEpochMillis(1698235273400L),
191-
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
192-
TimestampNtz.fromMillis(1698235273501L),
193-
TimestampNtz.fromMillis(1698235273501L, 8000),
194-
new byte[] {5, 6, 7, 8},
195-
TypeUtils.castFromString("2023-10-25", DataTypes.DATE()),
196-
TypeUtils.castFromString("09:30:00.0", DataTypes.TIME()),
197-
BinaryString.fromString("abc"),
198-
new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}));
199-
writeRows(t1, rows, false);
200-
73+
writeFullTypeRow(t1, 4, 6);
20174
waitUntilSnapshot(t1Id, 1, 1);
202-
203-
// check the status of replica after synced
20475
assertReplicaStatus(t1Bucket, 6);
20576

206-
// write another file
207-
rows =
208-
Arrays.asList(
209-
row(
210-
true,
211-
(byte) 100,
212-
(short) 200,
213-
7,
214-
7 + 400L,
215-
500.1f,
216-
600.0d,
217-
"v1",
218-
Decimal.fromUnscaledLong(900, 5, 2),
219-
Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0),
220-
TimestampLtz.fromEpochMillis(1698235273400L),
221-
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
222-
TimestampNtz.fromMillis(1698235273501L),
223-
TimestampNtz.fromMillis(1698235273501L, 8000),
224-
new byte[] {5, 6, 7, 8},
225-
TypeUtils.castFromString("2023-10-25", DataTypes.DATE()),
226-
TypeUtils.castFromString("09:30:00.0", DataTypes.TIME()),
227-
BinaryString.fromString("abc"),
228-
new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}),
229-
row(
230-
true,
231-
(byte) 100,
232-
(short) 200,
233-
8,
234-
8 + 400L,
235-
500.1f,
236-
600.0d,
237-
"v2",
238-
Decimal.fromUnscaledLong(900, 5, 2),
239-
Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0),
240-
TimestampLtz.fromEpochMillis(1698235273400L),
241-
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
242-
TimestampNtz.fromMillis(1698235273501L),
243-
TimestampNtz.fromMillis(1698235273501L, 8000),
244-
new byte[] {5, 6, 7, 8},
245-
TypeUtils.castFromString("2023-10-25", DataTypes.DATE()),
246-
TypeUtils.castFromString("09:30:00.0", DataTypes.TIME()),
247-
BinaryString.fromString("abc"),
248-
new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}),
249-
row(
250-
true,
251-
(byte) 100,
252-
(short) 200,
253-
9,
254-
9 + 400L,
255-
500.1f,
256-
600.0d,
257-
"v3",
258-
Decimal.fromUnscaledLong(900, 5, 2),
259-
Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0),
260-
TimestampLtz.fromEpochMillis(1698235273400L),
261-
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
262-
TimestampNtz.fromMillis(1698235273501L),
263-
TimestampNtz.fromMillis(1698235273501L, 8000),
264-
new byte[] {5, 6, 7, 8},
265-
TypeUtils.castFromString("2023-10-25", DataTypes.DATE()),
266-
TypeUtils.castFromString("09:30:00.0", DataTypes.TIME()),
267-
BinaryString.fromString("abc"),
268-
new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}));
269-
writeRows(t1, rows, false);
270-
77+
writeFullTypeRow(t1, 7, 9);
27178
waitUntilSnapshot(t1Id, 1, 2);
272-
273-
// check the status of replica after synced
27479
assertReplicaStatus(t1Bucket, 9);
27580

276-
checkFileInIcebergTable(t1, 3);
277-
278-
rows =
279-
Arrays.asList(
280-
row(
281-
true,
282-
(byte) 100,
283-
(short) 200,
284-
10,
285-
10 + 400L,
286-
500.1f,
287-
600.0d,
288-
"v1",
289-
Decimal.fromUnscaledLong(900, 5, 2),
290-
Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0),
291-
TimestampLtz.fromEpochMillis(1698235273400L),
292-
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
293-
TimestampNtz.fromMillis(1698235273501L),
294-
TimestampNtz.fromMillis(1698235273501L, 8000),
295-
new byte[] {5, 6, 7, 8},
296-
TypeUtils.castFromString("2023-10-25", DataTypes.DATE()),
297-
TypeUtils.castFromString("09:30:00.0", DataTypes.TIME()),
298-
BinaryString.fromString("abc"),
299-
new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}),
300-
row(
301-
true,
302-
(byte) 100,
303-
(short) 200,
304-
11,
305-
11 + 400L,
306-
500.1f,
307-
600.0d,
308-
"v2",
309-
Decimal.fromUnscaledLong(900, 5, 2),
310-
Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0),
311-
TimestampLtz.fromEpochMillis(1698235273400L),
312-
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
313-
TimestampNtz.fromMillis(1698235273501L),
314-
TimestampNtz.fromMillis(1698235273501L, 8000),
315-
new byte[] {5, 6, 7, 8},
316-
TypeUtils.castFromString("2023-10-25", DataTypes.DATE()),
317-
TypeUtils.castFromString("09:30:00.0", DataTypes.TIME()),
318-
BinaryString.fromString("abc"),
319-
new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}),
320-
row(
321-
true,
322-
(byte) 100,
323-
(short) 200,
324-
12,
325-
12 + 400L,
326-
500.1f,
327-
600.0d,
328-
"v3",
329-
Decimal.fromUnscaledLong(900, 5, 2),
330-
Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0),
331-
TimestampLtz.fromEpochMillis(1698235273400L),
332-
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
333-
TimestampNtz.fromMillis(1698235273501L),
334-
TimestampNtz.fromMillis(1698235273501L, 8000),
335-
new byte[] {5, 6, 7, 8},
336-
TypeUtils.castFromString("2023-10-25", DataTypes.DATE()),
337-
TypeUtils.castFromString("09:30:00.0", DataTypes.TIME()),
338-
BinaryString.fromString("abc"),
339-
new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}));
340-
writeRows(t1, rows, false);
81+
checkFileCountInIcebergTable(t1, 3);
34182

83+
writeFullTypeRow(t1, 10, 12);
34284
waitUntilSnapshot(t1Id, 1, 3);
343-
344-
// check the status of replica after synced
34585
assertReplicaStatus(t1Bucket, 12);
34686

347-
checkFileInIcebergTable(t1, 2);
87+
checkFileCountInIcebergTable(t1, 2);
34888
} finally {
34989
jobClient.cancel().get();
35090
}
35191
}
92+
93+
private void writeFullTypeRow(TablePath tablePath, int from, int to) throws Exception {
94+
List<InternalRow> rows = new ArrayList<>();
95+
for (int i = from; i <= to; i++) {
96+
rows.add(
97+
row(
98+
true,
99+
(byte) 100,
100+
(short) 200,
101+
i,
102+
i + 400L,
103+
500.1f,
104+
600.0d,
105+
"v1",
106+
Decimal.fromUnscaledLong(900, 5, 2),
107+
Decimal.fromBigDecimal(new BigDecimal(1000), 20, 0),
108+
TimestampLtz.fromEpochMillis(1698235273400L),
109+
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
110+
TimestampNtz.fromMillis(1698235273501L),
111+
TimestampNtz.fromMillis(1698235273501L, 8000),
112+
new byte[] {5, 6, 7, 8},
113+
TypeUtils.castFromString("2023-10-25", DataTypes.DATE()),
114+
TypeUtils.castFromString("09:30:00.0", DataTypes.TIME()),
115+
BinaryString.fromString("abc"),
116+
new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}));
117+
}
118+
writeRows(tablePath, rows, false);
119+
}
352120
}

0 commit comments

Comments
 (0)