Skip to content

Commit 7452a52

Browse files
authored
[client] Introduce util helpers for conversion between POJO and InternalRow (#1378)
1 parent e7d92ce commit 7452a52

File tree

7 files changed

+1414
-0
lines changed

7 files changed

+1414
-0
lines changed
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.converter;
19+
20+
import org.apache.fluss.row.BinaryString;
21+
import org.apache.fluss.types.DataType;
22+
import org.apache.fluss.types.DataTypeRoot;
23+
import org.apache.fluss.types.RowType;
24+
25+
import java.math.BigDecimal;
26+
import java.util.Arrays;
27+
import java.util.HashMap;
28+
import java.util.HashSet;
29+
import java.util.Map;
30+
import java.util.Set;
31+
32+
import static org.apache.fluss.client.converter.RowToPojoConverter.charLengthExceptionMessage;
33+
34+
/**
35+
* Internal shared utilities for POJO and Fluss InternalRow conversions.
36+
*
37+
* <p>Provides validation helpers and common functions used by PojoToRowConverter and
38+
* RowToPojoConverter (e.g., supported Java types per Fluss DataType, projection/table validation,
39+
* and text conversion helpers).
40+
*/
41+
final class ConverterCommons {
42+
43+
static final Map<DataTypeRoot, Set<Class<?>>> SUPPORTED_TYPES = createSupportedTypes();
44+
45+
private static Map<DataTypeRoot, Set<Class<?>>> createSupportedTypes() {
46+
Map<DataTypeRoot, Set<Class<?>>> map = new HashMap<>();
47+
map.put(DataTypeRoot.BOOLEAN, setOf(Boolean.class));
48+
map.put(DataTypeRoot.TINYINT, setOf(Byte.class));
49+
map.put(DataTypeRoot.SMALLINT, setOf(Short.class));
50+
map.put(DataTypeRoot.INTEGER, setOf(Integer.class));
51+
map.put(DataTypeRoot.BIGINT, setOf(Long.class));
52+
map.put(DataTypeRoot.FLOAT, setOf(Float.class));
53+
map.put(DataTypeRoot.DOUBLE, setOf(Double.class));
54+
map.put(DataTypeRoot.CHAR, setOf(String.class, Character.class));
55+
map.put(DataTypeRoot.STRING, setOf(String.class, Character.class));
56+
map.put(DataTypeRoot.BINARY, setOf(byte[].class));
57+
map.put(DataTypeRoot.BYTES, setOf(byte[].class));
58+
map.put(DataTypeRoot.DECIMAL, setOf(BigDecimal.class));
59+
map.put(DataTypeRoot.DATE, setOf(java.time.LocalDate.class));
60+
map.put(DataTypeRoot.TIME_WITHOUT_TIME_ZONE, setOf(java.time.LocalTime.class));
61+
map.put(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, setOf(java.time.LocalDateTime.class));
62+
map.put(
63+
DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
64+
setOf(java.time.Instant.class, java.time.OffsetDateTime.class));
65+
return map;
66+
}
67+
68+
static void validatePojoMatchesTable(PojoType<?> pojoType, RowType tableSchema) {
69+
Set<String> pojoNames = pojoType.getProperties().keySet();
70+
Set<String> tableNames = new HashSet<>(tableSchema.getFieldNames());
71+
if (!pojoNames.equals(tableNames)) {
72+
throw new IllegalArgumentException(
73+
String.format(
74+
"POJO fields %s must exactly match table schema fields %s.",
75+
pojoNames, tableNames));
76+
}
77+
for (int i = 0; i < tableSchema.getFieldCount(); i++) {
78+
String name = tableSchema.getFieldNames().get(i);
79+
DataType dt = tableSchema.getTypeAt(i);
80+
PojoType.Property prop = pojoType.getProperty(name);
81+
validateCompatibility(dt, prop);
82+
}
83+
}
84+
85+
static void validateProjectionSubset(RowType projection, RowType tableSchema) {
86+
Set<String> tableNames = new HashSet<>(tableSchema.getFieldNames());
87+
for (String n : projection.getFieldNames()) {
88+
if (!tableNames.contains(n)) {
89+
throw new IllegalArgumentException(
90+
"Projection field '" + n + "' is not part of table schema.");
91+
}
92+
}
93+
}
94+
95+
static void validateCompatibility(DataType fieldType, PojoType.Property prop) {
96+
Set<Class<?>> supported = SUPPORTED_TYPES.get(fieldType.getTypeRoot());
97+
Class<?> actual = prop.type;
98+
if (supported == null) {
99+
throw new UnsupportedOperationException(
100+
String.format(
101+
"Unsupported field type %s for field %s.",
102+
fieldType.getTypeRoot(), prop.name));
103+
}
104+
if (!supported.contains(actual)) {
105+
throw new IllegalArgumentException(
106+
String.format(
107+
"Field '%s' in POJO has Java type %s which is incompatible with Fluss type %s. Supported Java types: %s",
108+
prop.name, actual.getName(), fieldType.getTypeRoot(), supported));
109+
}
110+
}
111+
112+
static BinaryString toBinaryStringForText(Object v, String fieldName, DataTypeRoot root) {
113+
final String s = String.valueOf(v);
114+
if (root == DataTypeRoot.CHAR && s.length() != 1) {
115+
throw new IllegalArgumentException(charLengthExceptionMessage(fieldName, s.length()));
116+
}
117+
return BinaryString.fromString(s);
118+
}
119+
120+
static Set<Class<?>> setOf(Class<?>... javaTypes) {
121+
return new HashSet<>(Arrays.asList(javaTypes));
122+
}
123+
}
Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.converter;
19+
20+
import org.apache.fluss.row.BinaryString;
21+
import org.apache.fluss.row.Decimal;
22+
import org.apache.fluss.row.GenericRow;
23+
import org.apache.fluss.row.TimestampLtz;
24+
import org.apache.fluss.row.TimestampNtz;
25+
import org.apache.fluss.types.DataType;
26+
import org.apache.fluss.types.DecimalType;
27+
import org.apache.fluss.types.RowType;
28+
29+
import javax.annotation.Nullable;
30+
31+
import java.math.BigDecimal;
32+
import java.time.Instant;
33+
import java.time.LocalDate;
34+
import java.time.LocalDateTime;
35+
import java.time.LocalTime;
36+
import java.time.OffsetDateTime;
37+
import java.util.List;
38+
39+
/**
40+
* Converter for writer path: converts POJO instances to Fluss InternalRow according to a (possibly
41+
* projected) RowType. Validation is done against the full table schema.
42+
*/
43+
public final class PojoToRowConverter<T> {
44+
45+
private final PojoType<T> pojoType;
46+
private final RowType tableSchema;
47+
private final RowType projection;
48+
private final List<String> projectionFieldNames;
49+
private final FieldToRow[] fieldConverters; // index corresponds to projection position
50+
51+
private PojoToRowConverter(PojoType<T> pojoType, RowType tableSchema, RowType projection) {
52+
this.pojoType = pojoType;
53+
this.tableSchema = tableSchema;
54+
this.projection = projection;
55+
this.projectionFieldNames = projection.getFieldNames();
56+
ConverterCommons.validatePojoMatchesTable(pojoType, tableSchema);
57+
ConverterCommons.validateProjectionSubset(projection, tableSchema);
58+
this.fieldConverters = createFieldConverters();
59+
}
60+
61+
public static <T> PojoToRowConverter<T> of(
62+
Class<T> pojoClass, RowType tableSchema, RowType projection) {
63+
return new PojoToRowConverter<>(PojoType.of(pojoClass), tableSchema, projection);
64+
}
65+
66+
public GenericRow toRow(@Nullable T pojo) {
67+
if (pojo == null) {
68+
return null;
69+
}
70+
GenericRow row = new GenericRow(projection.getFieldCount());
71+
for (int i = 0; i < fieldConverters.length; i++) {
72+
Object v;
73+
try {
74+
v = fieldConverters[i].readAndConvert(pojo);
75+
} catch (RuntimeException re) {
76+
throw re;
77+
} catch (Exception e) {
78+
throw new IllegalStateException(
79+
"Failed to access field '"
80+
+ projectionFieldNames.get(i)
81+
+ "' from POJO "
82+
+ pojoType.getPojoClass().getName(),
83+
e);
84+
}
85+
row.setField(i, v);
86+
}
87+
return row;
88+
}
89+
90+
private FieldToRow[] createFieldConverters() {
91+
FieldToRow[] arr = new FieldToRow[projection.getFieldCount()];
92+
for (int i = 0; i < projection.getFieldCount(); i++) {
93+
String fieldName = projectionFieldNames.get(i);
94+
DataType fieldType = projection.getTypeAt(i);
95+
PojoType.Property prop = requireProperty(fieldName);
96+
ConverterCommons.validateCompatibility(fieldType, prop);
97+
arr[i] = createFieldConverter(prop, fieldType);
98+
}
99+
return arr;
100+
}
101+
102+
private PojoType.Property requireProperty(String fieldName) {
103+
PojoType.Property p = pojoType.getProperty(fieldName);
104+
if (p == null) {
105+
throw new IllegalArgumentException(
106+
"Field '"
107+
+ fieldName
108+
+ "' not found in POJO class "
109+
+ pojoType.getPojoClass().getName()
110+
+ ".");
111+
}
112+
return p;
113+
}
114+
115+
private static FieldToRow createFieldConverter(PojoType.Property prop, DataType fieldType) {
116+
switch (fieldType.getTypeRoot()) {
117+
case BOOLEAN:
118+
case TINYINT:
119+
case SMALLINT:
120+
case INTEGER:
121+
case BIGINT:
122+
case FLOAT:
123+
case DOUBLE:
124+
case BINARY:
125+
case BYTES:
126+
return prop::read;
127+
case CHAR:
128+
case STRING:
129+
return (obj) -> convertTextValue(fieldType, prop, prop.read(obj));
130+
case DECIMAL:
131+
return (obj) -> convertDecimalValue((DecimalType) fieldType, prop, prop.read(obj));
132+
case DATE:
133+
return (obj) -> convertDateValue(prop, prop.read(obj));
134+
case TIME_WITHOUT_TIME_ZONE:
135+
return (obj) -> convertTimeValue(prop, prop.read(obj));
136+
case TIMESTAMP_WITHOUT_TIME_ZONE:
137+
return (obj) -> convertTimestampNtzValue(prop, prop.read(obj));
138+
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
139+
return (obj) -> convertTimestampLtzValue(prop, prop.read(obj));
140+
default:
141+
throw new UnsupportedOperationException(
142+
String.format(
143+
"Unsupported field type %s for field %s.",
144+
fieldType.getTypeRoot(), prop.name));
145+
}
146+
}
147+
148+
/**
149+
* Converts a text value (String or Character) from a POJO property to Fluss BinaryString.
150+
*
151+
* <p>For CHAR columns, enforces that the text has exactly one character. Nulls are passed
152+
* through.
153+
*/
154+
private static @Nullable BinaryString convertTextValue(
155+
DataType fieldType, PojoType.Property prop, @Nullable Object v) {
156+
if (v == null) {
157+
return null;
158+
}
159+
return ConverterCommons.toBinaryStringForText(v, prop.name, fieldType.getTypeRoot());
160+
}
161+
162+
/** Converts a BigDecimal POJO property to Fluss Decimal respecting precision and scale. */
163+
private static @Nullable Decimal convertDecimalValue(
164+
DecimalType decimalType, PojoType.Property prop, @Nullable Object v) {
165+
if (v == null) {
166+
return null;
167+
}
168+
if (!(v instanceof BigDecimal)) {
169+
throw new IllegalArgumentException(
170+
String.format(
171+
"Field %s is not a BigDecimal. Cannot convert to Decimal.", prop.name));
172+
}
173+
return Decimal.fromBigDecimal(
174+
(BigDecimal) v, decimalType.getPrecision(), decimalType.getScale());
175+
}
176+
177+
/** Converts a LocalDate POJO property to number of days since epoch. */
178+
private static @Nullable Integer convertDateValue(PojoType.Property prop, @Nullable Object v) {
179+
if (v == null) {
180+
return null;
181+
}
182+
if (!(v instanceof LocalDate)) {
183+
throw new IllegalArgumentException(
184+
String.format(
185+
"Field %s is not a LocalDate. Cannot convert to int days.", prop.name));
186+
}
187+
return (int) ((LocalDate) v).toEpochDay();
188+
}
189+
190+
/** Converts a LocalTime POJO property to milliseconds of day. */
191+
private static @Nullable Integer convertTimeValue(PojoType.Property prop, @Nullable Object v) {
192+
if (v == null) {
193+
return null;
194+
}
195+
if (!(v instanceof LocalTime)) {
196+
throw new IllegalArgumentException(
197+
String.format(
198+
"Field %s is not a LocalTime. Cannot convert to int millis.",
199+
prop.name));
200+
}
201+
LocalTime t = (LocalTime) v;
202+
return (int) (t.toNanoOfDay() / 1_000_000);
203+
}
204+
205+
/** Converts a LocalDateTime POJO property to Fluss TimestampNtz. */
206+
private static @Nullable TimestampNtz convertTimestampNtzValue(
207+
PojoType.Property prop, @Nullable Object v) {
208+
if (v == null) {
209+
return null;
210+
}
211+
if (!(v instanceof LocalDateTime)) {
212+
throw new IllegalArgumentException(
213+
String.format(
214+
"Field %s is not a LocalDateTime. Cannot convert to TimestampNtz.",
215+
prop.name));
216+
}
217+
return TimestampNtz.fromLocalDateTime((LocalDateTime) v);
218+
}
219+
220+
/** Converts an Instant or OffsetDateTime POJO property to Fluss TimestampLtz (UTC based). */
221+
private static @Nullable TimestampLtz convertTimestampLtzValue(
222+
PojoType.Property prop, @Nullable Object v) {
223+
if (v == null) {
224+
return null;
225+
}
226+
if (v instanceof Instant) {
227+
return TimestampLtz.fromInstant((Instant) v);
228+
} else if (v instanceof OffsetDateTime) {
229+
return TimestampLtz.fromInstant(((OffsetDateTime) v).toInstant());
230+
}
231+
throw new IllegalArgumentException(
232+
String.format(
233+
"Field %s is not an Instant or OffsetDateTime. Cannot convert to TimestampLtz.",
234+
prop.name));
235+
}
236+
237+
private interface FieldToRow {
238+
Object readAndConvert(Object pojo) throws Exception;
239+
}
240+
}

0 commit comments

Comments
 (0)