Skip to content

Commit 2f1d0f1

Browse files
committed
nit
1 parent a3c4796 commit 2f1d0f1

File tree

2 files changed

+249
-1
lines changed

2 files changed

+249
-1
lines changed

fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java

Lines changed: 235 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.fluss.lake.iceberg.maintenance;
2020

21+
import org.apache.flink.core.execution.JobClient;
2122
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
2223
import org.apache.fluss.config.Configuration;
2324
import org.apache.fluss.lake.iceberg.testutils.FlinkIcebergTieringTestBase;
@@ -26,6 +27,13 @@
2627
import org.apache.fluss.metadata.TableBucket;
2728
import org.apache.fluss.metadata.TablePath;
2829

30+
import org.apache.fluss.row.BinaryString;
31+
import org.apache.fluss.row.Decimal;
32+
import org.apache.fluss.row.InternalRow;
33+
import org.apache.fluss.row.TimestampLtz;
34+
import org.apache.fluss.row.TimestampNtz;
35+
import org.apache.fluss.types.DataTypes;
36+
import org.apache.fluss.utils.TypeUtils;
2937
import org.apache.iceberg.AppendFiles;
3038
import org.apache.iceberg.DataFile;
3139
import org.apache.iceberg.FileScanTask;
@@ -51,12 +59,17 @@
5159
import java.io.File;
5260
import java.io.IOException;
5361
import java.util.ArrayList;
62+
import java.util.Arrays;
63+
import java.util.HashMap;
5464
import java.util.List;
65+
import java.util.Map;
5566

67+
import static org.apache.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
5668
import static org.apache.fluss.lake.iceberg.utils.IcebergConversions.toIceberg;
5769
import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
5870
import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
5971
import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
72+
import static org.apache.fluss.testutils.DataTestUtils.row;
6073
import static org.apache.fluss.utils.Preconditions.checkState;
6174
import static org.assertj.core.api.Assertions.assertThat;
6275

@@ -76,6 +89,227 @@ protected static void beforeAll() {
7689

7790
@Test
7891
void testCompaction() throws Exception {
79-
92+
// create a pk table, write some records and wait until snapshot finished
93+
TablePath t1 = TablePath.of(DEFAULT_DB, "pkTable");
94+
long t1Id = createPkTable(t1);
95+
TableBucket t1Bucket = new TableBucket(t1Id, 0);
96+
// write records
97+
List<InternalRow> rows =
98+
Arrays.asList(
99+
row(
100+
true,
101+
(byte) 100,
102+
(short) 200,
103+
1,
104+
1 + 400L,
105+
500.1f,
106+
600.0d,
107+
"v1",
108+
Decimal.fromUnscaledLong(900, 5, 2),
109+
Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0),
110+
TimestampLtz.fromEpochMillis(1698235273400L),
111+
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
112+
TimestampNtz.fromMillis(1698235273501L),
113+
TimestampNtz.fromMillis(1698235273501L, 8000),
114+
new byte[] {5, 6, 7, 8},
115+
TypeUtils.castFromString("2023-10-25", DataTypes.DATE()),
116+
TypeUtils.castFromString("09:30:00.0", DataTypes.TIME()),
117+
BinaryString.fromString("abc"),
118+
new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}),
119+
row(
120+
true,
121+
(byte) 100,
122+
(short) 200,
123+
2,
124+
2 + 400L,
125+
500.1f,
126+
600.0d,
127+
"v2",
128+
Decimal.fromUnscaledLong(900, 5, 2),
129+
Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0),
130+
TimestampLtz.fromEpochMillis(1698235273400L),
131+
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
132+
TimestampNtz.fromMillis(1698235273501L),
133+
TimestampNtz.fromMillis(1698235273501L, 8000),
134+
new byte[] {5, 6, 7, 8},
135+
TypeUtils.castFromString("2023-10-25", DataTypes.DATE()),
136+
TypeUtils.castFromString("09:30:00.0", DataTypes.TIME()),
137+
BinaryString.fromString("abc"),
138+
new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}),
139+
row(
140+
true,
141+
(byte) 100,
142+
(short) 200,
143+
3,
144+
3 + 400L,
145+
500.1f,
146+
600.0d,
147+
"v3",
148+
Decimal.fromUnscaledLong(900, 5, 2),
149+
Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0),
150+
TimestampLtz.fromEpochMillis(1698235273400L),
151+
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
152+
TimestampNtz.fromMillis(1698235273501L),
153+
TimestampNtz.fromMillis(1698235273501L, 8000),
154+
new byte[] {5, 6, 7, 8},
155+
TypeUtils.castFromString("2023-10-25", DataTypes.DATE()),
156+
TypeUtils.castFromString("09:30:00.0", DataTypes.TIME()),
157+
BinaryString.fromString("abc"),
158+
new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}));
159+
writeRows(t1, rows, false);
160+
waitUntilSnapshot(t1Id, 1, 0);
161+
162+
// then start tiering job
163+
JobClient jobClient = buildTieringJob(execEnv);
164+
try {
165+
// check the status of replica after synced
166+
assertReplicaStatus(t1Bucket, 3);
167+
168+
checkFileInIcebergTable(t1, 1);
169+
170+
// write another file
171+
rows =
172+
Arrays.asList(
173+
row(
174+
true,
175+
(byte) 100,
176+
(short) 200,
177+
4,
178+
4 + 400L,
179+
500.1f,
180+
600.0d,
181+
"v1",
182+
Decimal.fromUnscaledLong(900, 5, 2),
183+
Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0),
184+
TimestampLtz.fromEpochMillis(1698235273400L),
185+
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
186+
TimestampNtz.fromMillis(1698235273501L),
187+
TimestampNtz.fromMillis(1698235273501L, 8000),
188+
new byte[] {5, 6, 7, 8},
189+
TypeUtils.castFromString("2023-10-25", DataTypes.DATE()),
190+
TypeUtils.castFromString("09:30:00.0", DataTypes.TIME()),
191+
BinaryString.fromString("abc"),
192+
new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}),
193+
row(
194+
true,
195+
(byte) 100,
196+
(short) 200,
197+
5,
198+
5 + 400L,
199+
500.1f,
200+
600.0d,
201+
"v2",
202+
Decimal.fromUnscaledLong(900, 5, 2),
203+
Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0),
204+
TimestampLtz.fromEpochMillis(1698235273400L),
205+
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
206+
TimestampNtz.fromMillis(1698235273501L),
207+
TimestampNtz.fromMillis(1698235273501L, 8000),
208+
new byte[] {5, 6, 7, 8},
209+
TypeUtils.castFromString("2023-10-25", DataTypes.DATE()),
210+
TypeUtils.castFromString("09:30:00.0", DataTypes.TIME()),
211+
BinaryString.fromString("abc"),
212+
new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}),
213+
row(
214+
true,
215+
(byte) 100,
216+
(short) 200,
217+
6,
218+
6 + 400L,
219+
500.1f,
220+
600.0d,
221+
"v3",
222+
Decimal.fromUnscaledLong(900, 5, 2),
223+
Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0),
224+
TimestampLtz.fromEpochMillis(1698235273400L),
225+
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
226+
TimestampNtz.fromMillis(1698235273501L),
227+
TimestampNtz.fromMillis(1698235273501L, 8000),
228+
new byte[] {5, 6, 7, 8},
229+
TypeUtils.castFromString("2023-10-25", DataTypes.DATE()),
230+
TypeUtils.castFromString("09:30:00.0", DataTypes.TIME()),
231+
BinaryString.fromString("abc"),
232+
new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}));
233+
writeRows(t1, rows, false);
234+
235+
waitUntilSnapshot(t1Id, 1, 1);
236+
237+
// check the status of replica after synced
238+
assertReplicaStatus(t1Bucket, 6);
239+
240+
// write another file
241+
rows =
242+
Arrays.asList(
243+
row(
244+
true,
245+
(byte) 100,
246+
(short) 200,
247+
7,
248+
7 + 400L,
249+
500.1f,
250+
600.0d,
251+
"v1",
252+
Decimal.fromUnscaledLong(900, 5, 2),
253+
Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0),
254+
TimestampLtz.fromEpochMillis(1698235273400L),
255+
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
256+
TimestampNtz.fromMillis(1698235273501L),
257+
TimestampNtz.fromMillis(1698235273501L, 8000),
258+
new byte[] {5, 6, 7, 8},
259+
TypeUtils.castFromString("2023-10-25", DataTypes.DATE()),
260+
TypeUtils.castFromString("09:30:00.0", DataTypes.TIME()),
261+
BinaryString.fromString("abc"),
262+
new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}),
263+
row(
264+
true,
265+
(byte) 100,
266+
(short) 200,
267+
8,
268+
8 + 400L,
269+
500.1f,
270+
600.0d,
271+
"v2",
272+
Decimal.fromUnscaledLong(900, 5, 2),
273+
Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0),
274+
TimestampLtz.fromEpochMillis(1698235273400L),
275+
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
276+
TimestampNtz.fromMillis(1698235273501L),
277+
TimestampNtz.fromMillis(1698235273501L, 8000),
278+
new byte[] {5, 6, 7, 8},
279+
TypeUtils.castFromString("2023-10-25", DataTypes.DATE()),
280+
TypeUtils.castFromString("09:30:00.0", DataTypes.TIME()),
281+
BinaryString.fromString("abc"),
282+
new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}),
283+
row(
284+
true,
285+
(byte) 100,
286+
(short) 200,
287+
9,
288+
9 + 400L,
289+
500.1f,
290+
600.0d,
291+
"v3",
292+
Decimal.fromUnscaledLong(900, 5, 2),
293+
Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0),
294+
TimestampLtz.fromEpochMillis(1698235273400L),
295+
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
296+
TimestampNtz.fromMillis(1698235273501L),
297+
TimestampNtz.fromMillis(1698235273501L, 8000),
298+
new byte[] {5, 6, 7, 8},
299+
TypeUtils.castFromString("2023-10-25", DataTypes.DATE()),
300+
TypeUtils.castFromString("09:30:00.0", DataTypes.TIME()),
301+
BinaryString.fromString("abc"),
302+
new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}));
303+
writeRows(t1, rows, false);
304+
305+
waitUntilSnapshot(t1Id, 1, 2);
306+
307+
// check the status of replica after synced
308+
assertReplicaStatus(t1Bucket, 9);
309+
310+
checkFileInIcebergTable(t1, 3);
311+
} finally {
312+
jobClient.cancel().get();
313+
}
80314
}
81315
}

fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,15 @@
4545
import org.apache.flink.core.execution.JobClient;
4646
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
4747
import org.apache.iceberg.DataFile;
48+
import org.apache.iceberg.FileScanTask;
4849
import org.apache.iceberg.Snapshot;
4950
import org.apache.iceberg.TableScan;
5051
import org.apache.iceberg.catalog.Catalog;
5152
import org.apache.iceberg.data.IcebergGenerics;
5253
import org.apache.iceberg.data.Record;
5354
import org.apache.iceberg.data.parquet.GenericParquetReaders;
5455
import org.apache.iceberg.hadoop.HadoopCatalog;
56+
import org.apache.iceberg.io.CloseableIterable;
5557
import org.apache.iceberg.io.CloseableIterator;
5658
import org.apache.iceberg.parquet.Parquet;
5759
import org.junit.jupiter.api.AfterAll;
@@ -60,6 +62,7 @@
6062
import org.junit.jupiter.api.extension.RegisterExtension;
6163

6264
import java.io.Closeable;
65+
import java.io.IOException;
6366
import java.nio.ByteBuffer;
6467
import java.nio.ByteOrder;
6568
import java.nio.file.Files;
@@ -383,6 +386,17 @@ protected void checkDataInIcebergAppendOnlyTable(
383386
}
384387
}
385388

389+
protected void checkFileInIcebergTable(TablePath tablePath, int expectedFileCount) throws IOException {
390+
org.apache.iceberg.Table table = icebergCatalog.loadTable(toIceberg(tablePath));
391+
int count = 0;
392+
try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
393+
for (FileScanTask ignored : tasks) {
394+
count++;
395+
}
396+
}
397+
assertThat(count).isEqualTo(expectedFileCount);
398+
}
399+
386400
protected void checkDataInIcebergAppendOnlyPartitionedTable(
387401
TablePath tablePath,
388402
Map<String, String> partitionSpec,

0 commit comments

Comments
 (0)