Skip to content

Commit 5a7933c

Browse files
authored
[lake/iceberg] Introduce IcebergRecordAsFlussRow for iceberg union read (#1672)
1 parent 517cf70 commit 5a7933c

File tree

7 files changed

+309
-8
lines changed

7 files changed

+309
-8
lines changed

fluss-lake/fluss-lake-iceberg/pom.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,13 +68,14 @@
6868
<version>${iceberg.version}</version>
6969
</dependency>
7070

71-
<!-- test dependency -->
7271
<dependency>
7372
<groupId>org.apache.fluss</groupId>
7473
<artifactId>fluss-common</artifactId>
7574
<version>${project.version}</version>
7675
<scope>provided</scope>
7776
</dependency>
77+
78+
<!-- test dependency -->
7879
<dependency>
7980
<groupId>org.apache.fluss</groupId>
8081
<artifactId>fluss-flink-common</artifactId>

fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public class IcebergLakeCatalog implements LakeCatalog {
5656

5757
public static final String ICEBERG_CATALOG_DEFAULT_NAME = "fluss-iceberg-catalog";
5858

59-
private static final LinkedHashMap<String, Type> SYSTEM_COLUMNS = new LinkedHashMap<>();
59+
public static final LinkedHashMap<String, Type> SYSTEM_COLUMNS = new LinkedHashMap<>();
6060

6161
static {
6262
// We need __bucket system column to filter out the given bucket
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.fluss.lake.iceberg.source;
20+
21+
import org.apache.fluss.row.BinaryString;
22+
import org.apache.fluss.row.Decimal;
23+
import org.apache.fluss.row.InternalRow;
24+
import org.apache.fluss.row.TimestampLtz;
25+
import org.apache.fluss.row.TimestampNtz;
26+
import org.apache.fluss.utils.BytesUtils;
27+
28+
import org.apache.iceberg.data.Record;
29+
30+
import java.math.BigDecimal;
31+
import java.nio.ByteBuffer;
32+
import java.time.LocalDateTime;
33+
import java.time.OffsetDateTime;
34+
35+
import static org.apache.fluss.lake.iceberg.IcebergLakeCatalog.SYSTEM_COLUMNS;
36+
37+
/** Adapter for Iceberg Record as fluss row. */
38+
public class IcebergRecordAsFlussRow implements InternalRow {
39+
40+
private Record icebergRecord;
41+
42+
public IcebergRecordAsFlussRow() {}
43+
44+
public void setIcebergRecord(Record icebergRecord) {
45+
this.icebergRecord = icebergRecord;
46+
}
47+
48+
@Override
49+
public int getFieldCount() {
50+
return icebergRecord.struct().fields().size() - SYSTEM_COLUMNS.size();
51+
}
52+
53+
@Override
54+
public boolean isNullAt(int pos) {
55+
return icebergRecord.get(pos) == null;
56+
}
57+
58+
@Override
59+
public boolean getBoolean(int pos) {
60+
return (boolean) icebergRecord.get(pos);
61+
}
62+
63+
@Override
64+
public byte getByte(int pos) {
65+
Object value = icebergRecord.get(pos);
66+
// Iceberg stores TINYINT as Integer, need to cast to byte
67+
return ((Integer) value).byteValue();
68+
}
69+
70+
@Override
71+
public short getShort(int pos) {
72+
Object value = icebergRecord.get(pos);
73+
// Iceberg stores SMALLINT as Integer, need to cast to short
74+
return ((Integer) value).shortValue();
75+
}
76+
77+
@Override
78+
public int getInt(int pos) {
79+
Object value = icebergRecord.get(pos);
80+
return (Integer) value;
81+
}
82+
83+
@Override
84+
public long getLong(int pos) {
85+
Object value = icebergRecord.get(pos);
86+
return (Long) value;
87+
}
88+
89+
@Override
90+
public float getFloat(int pos) {
91+
Object value = icebergRecord.get(pos);
92+
return (float) value;
93+
}
94+
95+
@Override
96+
public double getDouble(int pos) {
97+
Object value = icebergRecord.get(pos);
98+
return (double) value;
99+
}
100+
101+
@Override
102+
public BinaryString getChar(int pos, int length) {
103+
String value = (String) icebergRecord.get(pos);
104+
return BinaryString.fromBytes(value.getBytes());
105+
}
106+
107+
@Override
108+
public BinaryString getString(int pos) {
109+
String value = (String) icebergRecord.get(pos);
110+
return BinaryString.fromBytes(value.getBytes());
111+
}
112+
113+
@Override
114+
public Decimal getDecimal(int pos, int precision, int scale) {
115+
BigDecimal bigDecimal = (BigDecimal) icebergRecord.get(pos);
116+
return Decimal.fromBigDecimal(bigDecimal, precision, scale);
117+
}
118+
119+
@Override
120+
public TimestampNtz getTimestampNtz(int pos, int precision) {
121+
Object value = icebergRecord.get(pos);
122+
if (value == null) {
123+
throw new IllegalStateException("Value at position " + pos + " is null");
124+
}
125+
LocalDateTime localDateTime = (LocalDateTime) value;
126+
return TimestampNtz.fromLocalDateTime(localDateTime);
127+
}
128+
129+
@Override
130+
public TimestampLtz getTimestampLtz(int pos, int precision) {
131+
Object value = icebergRecord.get(pos);
132+
OffsetDateTime offsetDateTime = (OffsetDateTime) value;
133+
return TimestampLtz.fromInstant(offsetDateTime.toInstant());
134+
}
135+
136+
@Override
137+
public byte[] getBinary(int pos, int length) {
138+
Object value = icebergRecord.get(pos);
139+
ByteBuffer byteBuffer = (ByteBuffer) value;
140+
return BytesUtils.toArray(byteBuffer);
141+
}
142+
143+
@Override
144+
public byte[] getBytes(int pos) {
145+
Object value = icebergRecord.get(pos);
146+
ByteBuffer byteBuffer = (ByteBuffer) value;
147+
return BytesUtils.toArray(byteBuffer);
148+
}
149+
}
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.fluss.lake.iceberg.source;
20+
21+
import org.apache.iceberg.Schema;
22+
import org.apache.iceberg.data.GenericRecord;
23+
import org.apache.iceberg.data.Record;
24+
import org.apache.iceberg.types.Types;
25+
import org.junit.jupiter.api.BeforeEach;
26+
import org.junit.jupiter.api.Test;
27+
28+
import java.math.BigDecimal;
29+
import java.nio.ByteBuffer;
30+
import java.time.LocalDateTime;
31+
import java.time.OffsetDateTime;
32+
import java.time.ZoneOffset;
33+
34+
import static org.apache.iceberg.types.Types.NestedField.optional;
35+
import static org.apache.iceberg.types.Types.NestedField.required;
36+
import static org.assertj.core.api.Assertions.assertThat;
37+
38+
/** Unit tests for {@link IcebergRecordAsFlussRow}. */
39+
class IcebergRecordAsFlussRowTest {
40+
41+
private IcebergRecordAsFlussRow icebergRecordAsFlussRow;
42+
private Record record;
43+
44+
@BeforeEach
45+
void setUp() {
46+
icebergRecordAsFlussRow = new IcebergRecordAsFlussRow();
47+
48+
// Create a schema with various data types
49+
Schema schema =
50+
new Schema(
51+
required(1, "id", Types.LongType.get()),
52+
optional(2, "name", Types.StringType.get()),
53+
optional(3, "age", Types.IntegerType.get()),
54+
optional(4, "salary", Types.DoubleType.get()),
55+
optional(5, "is_active", Types.BooleanType.get()),
56+
optional(6, "tiny_int", Types.IntegerType.get()),
57+
optional(7, "small_int", Types.IntegerType.get()),
58+
optional(8, "float_val", Types.FloatType.get()),
59+
optional(9, "decimal_val", Types.DecimalType.of(10, 2)),
60+
optional(10, "timestamp_ntz", Types.TimestampType.withoutZone()),
61+
optional(11, "timestamp_ltz", Types.TimestampType.withZone()),
62+
optional(12, "binary_data", Types.BinaryType.get()),
63+
optional(13, "char_data", Types.StringType.get()),
64+
// System columns
65+
required(14, "__bucket", Types.IntegerType.get()),
66+
required(15, "__offset", Types.LongType.get()),
67+
required(16, "__timestamp", Types.TimestampType.withZone()));
68+
69+
record = GenericRecord.create(schema);
70+
}
71+
72+
@Test
73+
void testGetFieldCount() {
74+
// Set up record with data
75+
record.setField("id", 1L);
76+
record.setField("name", "John");
77+
record.setField("age", 30);
78+
record.setField("__bucket", 1);
79+
record.setField("__offset", 100L);
80+
record.setField("__timestamp", OffsetDateTime.now(ZoneOffset.UTC));
81+
82+
icebergRecordAsFlussRow.setIcebergRecord(record);
83+
84+
// Should return count excluding system columns (3 system columns)
85+
assertThat(icebergRecordAsFlussRow.getFieldCount()).isEqualTo(13);
86+
}
87+
88+
@Test
89+
void testIsNullAt() {
90+
record.setField("id", 1L);
91+
record.setField("name", null); // null value
92+
record.setField("age", 30);
93+
94+
icebergRecordAsFlussRow.setIcebergRecord(record);
95+
96+
assertThat(icebergRecordAsFlussRow.isNullAt(0)).isFalse(); // id
97+
assertThat(icebergRecordAsFlussRow.isNullAt(1)).isTrue(); // name
98+
assertThat(icebergRecordAsFlussRow.isNullAt(2)).isFalse(); // age
99+
}
100+
101+
@Test
102+
void testAllDataTypes() {
103+
// Set up all data types with test values
104+
record.setField("id", 12345L);
105+
record.setField("name", "John Doe");
106+
record.setField("age", 30);
107+
record.setField("salary", 50000.50);
108+
record.setField("is_active", true);
109+
record.setField("tiny_int", 127);
110+
record.setField("small_int", 32767);
111+
record.setField("float_val", 3.14f);
112+
record.setField("decimal_val", new BigDecimal("123.45"));
113+
record.setField("timestamp_ntz", LocalDateTime.of(2023, 12, 25, 10, 30, 45));
114+
record.setField(
115+
"timestamp_ltz", OffsetDateTime.of(2023, 12, 25, 10, 30, 45, 0, ZoneOffset.UTC));
116+
record.setField("binary_data", ByteBuffer.wrap("Hello World".getBytes()));
117+
record.setField("char_data", "Hello");
118+
// System columns
119+
record.setField("__bucket", 1);
120+
record.setField("__offset", 100L);
121+
record.setField("__timestamp", OffsetDateTime.now(ZoneOffset.UTC));
122+
123+
icebergRecordAsFlussRow.setIcebergRecord(record);
124+
125+
// Test all data type conversions
126+
assertThat(icebergRecordAsFlussRow.getLong(0)).isEqualTo(12345L); // id
127+
assertThat(icebergRecordAsFlussRow.getString(1).toString()).isEqualTo("John Doe"); // name
128+
assertThat(icebergRecordAsFlussRow.getInt(2)).isEqualTo(30); // age
129+
assertThat(icebergRecordAsFlussRow.getDouble(3)).isEqualTo(50000.50); // salary
130+
assertThat(icebergRecordAsFlussRow.getBoolean(4)).isTrue(); // is_active
131+
assertThat(icebergRecordAsFlussRow.getByte(5)).isEqualTo((byte) 127); // tiny_int
132+
assertThat(icebergRecordAsFlussRow.getShort(6)).isEqualTo((short) 32767); // small_int
133+
assertThat(icebergRecordAsFlussRow.getFloat(7)).isEqualTo(3.14f); // float_val
134+
assertThat(icebergRecordAsFlussRow.getDecimal(8, 10, 2).toBigDecimal())
135+
.isEqualTo(new BigDecimal("123.45")); // decimal_val
136+
assertThat(icebergRecordAsFlussRow.getTimestampNtz(9, 3).toLocalDateTime())
137+
.isEqualTo(LocalDateTime.of(2023, 12, 25, 10, 30, 45)); // timestamp_ntz
138+
assertThat(icebergRecordAsFlussRow.getTimestampLtz(10, 3).toInstant())
139+
.isEqualTo(
140+
OffsetDateTime.of(2023, 12, 25, 10, 30, 45, 0, ZoneOffset.UTC)
141+
.toInstant()); // timestamp_ltz
142+
assertThat(icebergRecordAsFlussRow.getBytes(11))
143+
.isEqualTo("Hello World".getBytes()); // binary_data
144+
assertThat(icebergRecordAsFlussRow.getChar(12, 10).toString())
145+
.isEqualTo("Hello"); // char_data
146+
147+
// Test field count (excluding system columns)
148+
assertThat(icebergRecordAsFlussRow.getFieldCount()).isEqualTo(13);
149+
}
150+
}

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
/** A Paimon implementation of {@link LakeCatalog}. */
4747
public class PaimonLakeCatalog implements LakeCatalog {
4848

49-
private static final LinkedHashMap<String, DataType> SYSTEM_COLUMNS = new LinkedHashMap<>();
49+
public static final LinkedHashMap<String, DataType> SYSTEM_COLUMNS = new LinkedHashMap<>();
5050

5151
static {
5252
// We need __bucket system column to filter out the given bucket

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,13 @@
2525
import org.apache.paimon.types.RowKind;
2626
import org.apache.paimon.types.RowType;
2727

28+
import static org.apache.fluss.lake.paimon.PaimonLakeCatalog.SYSTEM_COLUMNS;
2829
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toRowKind;
2930
import static org.apache.fluss.utils.Preconditions.checkState;
3031

3132
/** To wrap Fluss {@link LogRecord} as paimon {@link InternalRow}. */
3233
public class FlussRecordAsPaimonRow extends FlussRowAsPaimonRow {
3334

34-
// Lake table for paimon will append three system columns: __bucket, __offset,__timestamp
35-
private static final int LAKE_PAIMON_SYSTEM_COLUMNS = 3;
3635
private final int bucket;
3736
private LogRecord logRecord;
3837
private int originRowFieldCount;
@@ -47,7 +46,7 @@ public void setFlussRecord(LogRecord logRecord) {
4746
this.internalRow = logRecord.getRow();
4847
this.originRowFieldCount = internalRow.getFieldCount();
4948
checkState(
50-
originRowFieldCount == tableRowType.getFieldCount() - LAKE_PAIMON_SYSTEM_COLUMNS,
49+
originRowFieldCount == tableRowType.getFieldCount() - SYSTEM_COLUMNS.size(),
5150
"The paimon table fields count must equals to LogRecord's fields count.");
5251
}
5352

@@ -56,7 +55,7 @@ public int getFieldCount() {
5655
return
5756
// business (including partitions) + system (three system fields: bucket, offset,
5857
// timestamp)
59-
originRowFieldCount + LAKE_PAIMON_SYSTEM_COLUMNS;
58+
originRowFieldCount + SYSTEM_COLUMNS.size();
6059
}
6160

6261
@Override

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonRowAsFlussRow.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525

2626
import org.apache.paimon.data.Timestamp;
2727

28+
import static org.apache.fluss.lake.paimon.PaimonLakeCatalog.SYSTEM_COLUMNS;
29+
2830
/** Adapter for paimon row as fluss row. */
2931
public class PaimonRowAsFlussRow implements InternalRow {
3032

@@ -43,7 +45,7 @@ public PaimonRowAsFlussRow replaceRow(org.apache.paimon.data.InternalRow paimonR
4345

4446
@Override
4547
public int getFieldCount() {
46-
return paimonRow.getFieldCount();
48+
return paimonRow.getFieldCount() - SYSTEM_COLUMNS.size();
4749
}
4850

4951
@Override

0 commit comments

Comments
 (0)