Skip to content

Commit 56bef2d

Browse files
authored
[iceberg] minor improvements (apache#2015)
1 parent 05a5db9 commit 56bef2d

File tree

4 files changed

+22
-23
lines changed

4 files changed

+22
-23
lines changed

fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -212,13 +212,13 @@ private long commit(SnapshotUpdate<?> snapshotUpdate, Map<String, String> snapsh
212212
public void abort(IcebergCommittable committable) {
213213
List<String> dataFilesToDelete =
214214
committable.getDataFiles().stream()
215-
.map(file -> file.path().toString())
215+
.map(ContentFile::location)
216216
.collect(Collectors.toList());
217217
CatalogUtil.deleteFiles(icebergTable.io(), dataFilesToDelete, "data file", true);
218218

219219
List<String> deleteFilesToDelete =
220220
committable.getDeleteFiles().stream()
221-
.map(file -> file.path().toString())
221+
.map(ContentFile::location)
222222
.collect(Collectors.toList());
223223
CatalogUtil.deleteFiles(icebergTable.io(), deleteFilesToDelete, "delete file", true);
224224
}

fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/bucketing/IcebergBucketingFunctionTest.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.apache.iceberg.types.Types;
3232
import org.junit.jupiter.api.Test;
3333

34-
import java.io.IOException;
3534
import java.math.BigDecimal;
3635
import java.nio.ByteBuffer;
3736
import java.nio.ByteOrder;
@@ -44,7 +43,7 @@
4443
class IcebergBucketingFunctionTest {
4544

4645
@Test
47-
void testIntegerHash() throws IOException {
46+
void testIntegerHash() {
4847
int testValue = 42;
4948
int bucketNum = 10;
5049

@@ -69,7 +68,7 @@ void testIntegerHash() throws IOException {
6968
}
7069

7170
@Test
72-
void testLongHash() throws IOException {
71+
void testLongHash() {
7372
long testValue = 1234567890123456789L;
7473
int bucketNum = 10;
7574

@@ -94,7 +93,7 @@ void testLongHash() throws IOException {
9493
}
9594

9695
@Test
97-
void testStringHash() throws IOException {
96+
void testStringHash() {
9897
String testValue = "Hello Iceberg, Fluss this side!";
9998
int bucketNum = 10;
10099

@@ -120,7 +119,7 @@ void testStringHash() throws IOException {
120119
}
121120

122121
@Test
123-
void testDecimalHash() throws IOException {
122+
void testDecimalHash() {
124123
BigDecimal testValue = new BigDecimal("123.45");
125124
Decimal decimal = Decimal.fromBigDecimal(testValue, 10, 2);
126125
int bucketNum = 10;
@@ -148,7 +147,7 @@ void testDecimalHash() throws IOException {
148147
}
149148

150149
@Test
151-
void testTimestampEncodingHash() throws IOException {
150+
void testTimestampEncodingHash() {
152151
// Iceberg expects microseconds for TIMESTAMP type
153152
long millis = 1698235273182L;
154153
int nanos = 123456;
@@ -179,7 +178,7 @@ void testTimestampEncodingHash() throws IOException {
179178
}
180179

181180
@Test
182-
void testDateHash() throws IOException {
181+
void testDateHash() {
183182
int dateValue = 19655;
184183
int bucketNum = 10;
185184

@@ -204,7 +203,7 @@ void testDateHash() throws IOException {
204203
}
205204

206205
@Test
207-
void testTimeHashing() throws IOException {
206+
void testTimeHashing() {
208207
// Fluss stores time as int (milliseconds since midnight)
209208
int timeMillis = 34200000;
210209
long timeMicros = timeMillis * 1000L; // Convert to microseconds for Iceberg
@@ -231,7 +230,7 @@ void testTimeHashing() throws IOException {
231230
}
232231

233232
@Test
234-
void testBinaryEncoding() throws IOException {
233+
void testBinaryEncoding() {
235234
byte[] testValue = "Hello i only understand binary data".getBytes();
236235
int bucketNum = 10;
237236

fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordReaderTest.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -153,16 +153,16 @@ void testReadLogTable(boolean isPartitioned) throws Exception {
153153
List<Row> projectExpect = new ArrayList<>();
154154
try (CloseableIterable<FileScanTask> fileScanTasks = tableScan.planFiles()) {
155155
for (FileScanTask task : fileScanTasks) {
156-
org.apache.iceberg.io.CloseableIterator<Record> iterator =
157-
reader.open(task).iterator();
158-
IcebergRecordAsFlussRow recordAsFlussRow = new IcebergRecordAsFlussRow();
159-
projectExpect.addAll(
160-
convertToFlinkRow(
161-
projectFieldGetters,
162-
TransformingCloseableIterator.transform(
163-
CloseableIterator.wrap(iterator),
164-
recordAsFlussRow::replaceIcebergRecord)));
165-
iterator.close();
156+
try (org.apache.iceberg.io.CloseableIterator<Record> iterator =
157+
reader.open(task).iterator()) {
158+
IcebergRecordAsFlussRow recordAsFlussRow = new IcebergRecordAsFlussRow();
159+
projectExpect.addAll(
160+
convertToFlinkRow(
161+
projectFieldGetters,
162+
TransformingCloseableIterator.transform(
163+
CloseableIterator.wrap(iterator),
164+
recordAsFlussRow::replaceIcebergRecord)));
165+
}
166166
}
167167
}
168168
assertThat(projectActual).containsExactlyInAnyOrderElementsOf(projectExpect);

fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -437,7 +437,7 @@ private CloseableIterator<Record> getIcebergRows(
437437

438438
for (DataFile file : files) {
439439
Iterable<Record> iterable =
440-
Parquet.read(table.io().newInputFile(file.path().toString()))
440+
Parquet.read(table.io().newInputFile(file.location()))
441441
.project(table.schema())
442442
.createReaderFunc(
443443
fileSchema ->
@@ -471,7 +471,7 @@ private TableScan filterByPartition(TableScan tableScan, Map<String, String> par
471471
}
472472

473473
protected void checkSnapshotPropertyInIceberg(
474-
TablePath tablePath, Map<String, String> expectedProperties) throws Exception {
474+
TablePath tablePath, Map<String, String> expectedProperties) {
475475
org.apache.iceberg.Table table = icebergCatalog.loadTable(toIceberg(tablePath));
476476
Snapshot snapshot = table.currentSnapshot();
477477
assertThat(snapshot.summary()).containsAllEntriesOf(expectedProperties);

0 commit comments

Comments
 (0)