Skip to content

Commit f542e6f

Browse files
committed
nit
1 parent 9995f68 commit f542e6f

File tree

2 files changed

+73
-8
lines changed

2 files changed

+73
-8
lines changed

fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.List;
3434

3535
import static org.apache.fluss.metadata.ResolvedPartitionSpec.PARTITION_SPEC_SEPARATOR;
36+
import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
3637

3738
/** Utility class for static conversions between Fluss and Iceberg types. */
3839
public class IcebergConversions {
@@ -78,14 +79,7 @@ public static Expression toFilterExpression(
7879
partition));
7980
}
8081
}
81-
expression =
82-
Expressions.and(
83-
expression,
84-
Expressions.equal(
85-
table.schema()
86-
.findColumnName(
87-
partitionFields.get(partitionIndex).sourceId()),
88-
bucket));
82+
expression = Expressions.and(expression, Expressions.equal(BUCKET_COLUMN_NAME, bucket));
8983
return expression;
9084
}
9185
}

fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,77 @@ void testCompaction() throws Exception {
274274
assertReplicaStatus(t1Bucket, 9);
275275

276276
checkFileInIcebergTable(t1, 3);
277+
278+
rows =
279+
Arrays.asList(
280+
row(
281+
true,
282+
(byte) 100,
283+
(short) 200,
284+
10,
285+
10 + 400L,
286+
500.1f,
287+
600.0d,
288+
"v1",
289+
Decimal.fromUnscaledLong(900, 5, 2),
290+
Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0),
291+
TimestampLtz.fromEpochMillis(1698235273400L),
292+
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
293+
TimestampNtz.fromMillis(1698235273501L),
294+
TimestampNtz.fromMillis(1698235273501L, 8000),
295+
new byte[] {5, 6, 7, 8},
296+
TypeUtils.castFromString("2023-10-25", DataTypes.DATE()),
297+
TypeUtils.castFromString("09:30:00.0", DataTypes.TIME()),
298+
BinaryString.fromString("abc"),
299+
new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}),
300+
row(
301+
true,
302+
(byte) 100,
303+
(short) 200,
304+
11,
305+
11 + 400L,
306+
500.1f,
307+
600.0d,
308+
"v2",
309+
Decimal.fromUnscaledLong(900, 5, 2),
310+
Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0),
311+
TimestampLtz.fromEpochMillis(1698235273400L),
312+
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
313+
TimestampNtz.fromMillis(1698235273501L),
314+
TimestampNtz.fromMillis(1698235273501L, 8000),
315+
new byte[] {5, 6, 7, 8},
316+
TypeUtils.castFromString("2023-10-25", DataTypes.DATE()),
317+
TypeUtils.castFromString("09:30:00.0", DataTypes.TIME()),
318+
BinaryString.fromString("abc"),
319+
new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}),
320+
row(
321+
true,
322+
(byte) 100,
323+
(short) 200,
324+
12,
325+
12 + 400L,
326+
500.1f,
327+
600.0d,
328+
"v3",
329+
Decimal.fromUnscaledLong(900, 5, 2),
330+
Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0),
331+
TimestampLtz.fromEpochMillis(1698235273400L),
332+
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
333+
TimestampNtz.fromMillis(1698235273501L),
334+
TimestampNtz.fromMillis(1698235273501L, 8000),
335+
new byte[] {5, 6, 7, 8},
336+
TypeUtils.castFromString("2023-10-25", DataTypes.DATE()),
337+
TypeUtils.castFromString("09:30:00.0", DataTypes.TIME()),
338+
BinaryString.fromString("abc"),
339+
new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}));
340+
writeRows(t1, rows, false);
341+
342+
waitUntilSnapshot(t1Id, 1, 3);
343+
344+
// check the status of replica after synced
345+
assertReplicaStatus(t1Bucket, 12);
346+
347+
checkFileInIcebergTable(t1, 2);
277348
} finally {
278349
jobClient.cancel().get();
279350
}

0 commit comments

Comments
 (0)