Skip to content

Commit 47a8437

Browse files
committed
split to pojotorow and rowtopojo converters
1 parent bbf2ca2 commit 47a8437

File tree

10 files changed

+1343
-1255
lines changed

10 files changed

+1343
-1255
lines changed
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.fluss.client.converter;
18+
19+
import org.apache.fluss.row.BinaryString;
20+
import org.apache.fluss.types.DataType;
21+
import org.apache.fluss.types.DataTypeRoot;
22+
import org.apache.fluss.types.RowType;
23+
24+
import java.math.BigDecimal;
25+
import java.util.Arrays;
26+
import java.util.HashMap;
27+
import java.util.HashSet;
28+
import java.util.Map;
29+
import java.util.Set;
30+
31+
/**
32+
* Internal shared utilities for POJO and Fluss InternalRow conversions.
33+
*
34+
* <p>Provides validation helpers and common functions used by PojoToRowConverter and
35+
* RowToPojoConverter (e.g., supported Java types per Fluss DataType, projection/table validation,
36+
* and text conversion helpers).
37+
*/
38+
final class ConverterCommons {
39+
40+
static final Map<DataTypeRoot, Set<Class<?>>> SUPPORTED_TYPES = createSupportedTypes();
41+
42+
private static Map<DataTypeRoot, Set<Class<?>>> createSupportedTypes() {
43+
Map<DataTypeRoot, Set<Class<?>>> map = new HashMap<>();
44+
map.put(DataTypeRoot.BOOLEAN, setOf(Boolean.class));
45+
map.put(DataTypeRoot.TINYINT, setOf(Byte.class));
46+
map.put(DataTypeRoot.SMALLINT, setOf(Short.class));
47+
map.put(DataTypeRoot.INTEGER, setOf(Integer.class));
48+
map.put(DataTypeRoot.BIGINT, setOf(Long.class));
49+
map.put(DataTypeRoot.FLOAT, setOf(Float.class));
50+
map.put(DataTypeRoot.DOUBLE, setOf(Double.class));
51+
map.put(DataTypeRoot.CHAR, setOf(String.class, Character.class));
52+
map.put(DataTypeRoot.STRING, setOf(String.class, Character.class));
53+
map.put(DataTypeRoot.BINARY, setOf(byte[].class));
54+
map.put(DataTypeRoot.BYTES, setOf(byte[].class));
55+
map.put(DataTypeRoot.DECIMAL, setOf(BigDecimal.class));
56+
map.put(DataTypeRoot.DATE, setOf(java.time.LocalDate.class));
57+
map.put(DataTypeRoot.TIME_WITHOUT_TIME_ZONE, setOf(java.time.LocalTime.class));
58+
map.put(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, setOf(java.time.LocalDateTime.class));
59+
map.put(
60+
DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
61+
setOf(java.time.Instant.class, java.time.OffsetDateTime.class));
62+
return map;
63+
}
64+
65+
static void validatePojoMatchesTable(PojoType<?> pojoType, RowType tableSchema) {
66+
Set<String> pojoNames = pojoType.getProperties().keySet();
67+
Set<String> tableNames = new HashSet<>(tableSchema.getFieldNames());
68+
if (!pojoNames.equals(tableNames)) {
69+
throw new IllegalArgumentException(
70+
String.format(
71+
"POJO fields %s must exactly match table schema fields %s.",
72+
pojoNames, tableNames));
73+
}
74+
for (int i = 0; i < tableSchema.getFieldCount(); i++) {
75+
String name = tableSchema.getFieldNames().get(i);
76+
DataType dt = tableSchema.getTypeAt(i);
77+
PojoType.Property prop = pojoType.getProperty(name);
78+
validateCompatibility(dt, prop);
79+
}
80+
}
81+
82+
static void validateProjectionSubset(RowType projection, RowType tableSchema) {
83+
Set<String> tableNames = new HashSet<>(tableSchema.getFieldNames());
84+
for (String n : projection.getFieldNames()) {
85+
if (!tableNames.contains(n)) {
86+
throw new IllegalArgumentException(
87+
"Projection field '" + n + "' is not part of table schema.");
88+
}
89+
}
90+
}
91+
92+
static void validateCompatibility(DataType fieldType, PojoType.Property prop) {
93+
Set<Class<?>> supported = SUPPORTED_TYPES.get(fieldType.getTypeRoot());
94+
Class<?> actual = prop.type;
95+
if (supported == null) {
96+
throw new UnsupportedOperationException(
97+
String.format(
98+
"Unsupported field type %s for field %s.",
99+
fieldType.getTypeRoot(), prop.name));
100+
}
101+
if (!supported.contains(actual)) {
102+
throw new IllegalArgumentException(
103+
String.format(
104+
"Field '%s' in POJO has Java type %s which is incompatible with Fluss type %s. Supported Java types: %s",
105+
prop.name, actual.getName(), fieldType.getTypeRoot(), supported));
106+
}
107+
}
108+
109+
static BinaryString toBinaryStringForText(Object v, String fieldName, DataTypeRoot root) {
110+
final String s =
111+
(v instanceof Character) ? String.valueOf((Character) v) : String.valueOf(v);
112+
if (root == DataTypeRoot.CHAR && s.length() != 1) {
113+
throw new IllegalArgumentException(
114+
String.format(
115+
"Field %s expects exactly one character for CHAR type, got length %d.",
116+
fieldName, s.length()));
117+
}
118+
return BinaryString.fromString(s);
119+
}
120+
121+
static Set<Class<?>> setOf(Class<?>... javaTypes) {
122+
return new HashSet<>(Arrays.asList(javaTypes));
123+
}
124+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.fluss.client.converter;
18+
19+
import org.apache.fluss.types.RowType;
20+
21+
/** Public facade for creating POJO converters. */
22+
public final class PojoConverters {
23+
24+
private PojoConverters() {}
25+
26+
public static <T> PojoToRowConverter<T> writerOf(
27+
Class<T> pojoClass, RowType tableSchema, RowType projection) {
28+
return PojoToRowConverter.of(pojoClass, tableSchema, projection);
29+
}
30+
31+
public static <T> RowToPojoConverter<T> readerOf(
32+
Class<T> pojoClass, RowType tableSchema, RowType projection) {
33+
return RowToPojoConverter.of(pojoClass, tableSchema, projection);
34+
}
35+
}
Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.fluss.client.converter;
18+
19+
import org.apache.fluss.row.Decimal;
20+
import org.apache.fluss.row.GenericRow;
21+
import org.apache.fluss.row.TimestampLtz;
22+
import org.apache.fluss.row.TimestampNtz;
23+
import org.apache.fluss.types.DataType;
24+
import org.apache.fluss.types.DecimalType;
25+
import org.apache.fluss.types.RowType;
26+
27+
import javax.annotation.Nullable;
28+
29+
import java.math.BigDecimal;
30+
import java.time.Instant;
31+
import java.time.LocalDate;
32+
import java.time.LocalDateTime;
33+
import java.time.LocalTime;
34+
import java.time.OffsetDateTime;
35+
36+
/**
37+
* Converter for writer path: converts POJO instances to Fluss InternalRow according to a (possibly
38+
* projected) RowType. Validation is done against the full table schema.
39+
*/
40+
public final class PojoToRowConverter<T> {
41+
42+
private final PojoType<T> pojoType;
43+
private final RowType tableSchema;
44+
private final RowType projection;
45+
private final FieldToRow[] fieldConverters; // index corresponds to projection position
46+
47+
private PojoToRowConverter(PojoType<T> pojoType, RowType tableSchema, RowType projection) {
48+
this.pojoType = pojoType;
49+
this.tableSchema = tableSchema;
50+
this.projection = projection;
51+
ConverterCommons.validatePojoMatchesTable(pojoType, tableSchema);
52+
ConverterCommons.validateProjectionSubset(projection, tableSchema);
53+
this.fieldConverters = createFieldConverters();
54+
}
55+
56+
public static <T> PojoToRowConverter<T> of(
57+
Class<T> pojoClass, RowType tableSchema, RowType projection) {
58+
return new PojoToRowConverter<>(PojoType.of(pojoClass), tableSchema, projection);
59+
}
60+
61+
public GenericRow toRow(@Nullable T pojo) {
62+
if (pojo == null) {
63+
return null;
64+
}
65+
GenericRow row = new GenericRow(projection.getFieldCount());
66+
for (int i = 0; i < fieldConverters.length; i++) {
67+
Object v;
68+
try {
69+
v = fieldConverters[i].readAndConvert(pojo);
70+
} catch (RuntimeException re) {
71+
throw re;
72+
} catch (Exception e) {
73+
throw new IllegalStateException(
74+
"Failed to access field '"
75+
+ projection.getFieldNames().get(i)
76+
+ "' from POJO "
77+
+ pojoType.getPojoClass().getName(),
78+
e);
79+
}
80+
row.setField(i, v);
81+
}
82+
return row;
83+
}
84+
85+
private FieldToRow[] createFieldConverters() {
86+
FieldToRow[] arr = new FieldToRow[projection.getFieldCount()];
87+
for (int i = 0; i < projection.getFieldCount(); i++) {
88+
String fieldName = projection.getFieldNames().get(i);
89+
DataType fieldType = projection.getTypeAt(i);
90+
PojoType.Property prop = requireProperty(fieldName);
91+
ConverterCommons.validateCompatibility(fieldType, prop);
92+
arr[i] = createFieldConverter(prop, fieldType);
93+
}
94+
return arr;
95+
}
96+
97+
private PojoType.Property requireProperty(String fieldName) {
98+
PojoType.Property p = pojoType.getProperty(fieldName);
99+
if (p == null) {
100+
throw new IllegalArgumentException(
101+
"Field '"
102+
+ fieldName
103+
+ "' not found in POJO class "
104+
+ pojoType.getPojoClass().getName()
105+
+ ".");
106+
}
107+
return p;
108+
}
109+
110+
private static FieldToRow createFieldConverter(PojoType.Property prop, DataType fieldType) {
111+
switch (fieldType.getTypeRoot()) {
112+
case BOOLEAN:
113+
case TINYINT:
114+
case SMALLINT:
115+
case INTEGER:
116+
case BIGINT:
117+
case FLOAT:
118+
case DOUBLE:
119+
case BINARY:
120+
case BYTES:
121+
return (obj) -> prop.read(obj);
122+
case CHAR:
123+
case STRING:
124+
return (obj) -> {
125+
Object v = prop.read(obj);
126+
if (v == null) return null;
127+
return ConverterCommons.toBinaryStringForText(
128+
v, prop.name, fieldType.getTypeRoot());
129+
};
130+
case DECIMAL:
131+
return (obj) -> {
132+
Object v = prop.read(obj);
133+
if (v == null) return null;
134+
if (!(v instanceof BigDecimal)) {
135+
throw new IllegalArgumentException(
136+
String.format(
137+
"Field %s is not a BigDecimal. Cannot convert to DecimalData.",
138+
prop.name));
139+
}
140+
DecimalType decimalType = (DecimalType) fieldType;
141+
return Decimal.fromBigDecimal(
142+
(BigDecimal) v, decimalType.getPrecision(), decimalType.getScale());
143+
};
144+
case DATE:
145+
return (obj) -> {
146+
Object v = prop.read(obj);
147+
if (v == null) return null;
148+
if (!(v instanceof LocalDate)) {
149+
throw new IllegalArgumentException(
150+
String.format(
151+
"Field %s is not a LocalDate. Cannot convert to int days.",
152+
prop.name));
153+
}
154+
return (int) ((LocalDate) v).toEpochDay();
155+
};
156+
case TIME_WITHOUT_TIME_ZONE:
157+
return (obj) -> {
158+
Object v = prop.read(obj);
159+
if (v == null) return null;
160+
if (!(v instanceof LocalTime)) {
161+
throw new IllegalArgumentException(
162+
String.format(
163+
"Field %s is not a LocalTime. Cannot convert to int millis.",
164+
prop.name));
165+
}
166+
LocalTime t = (LocalTime) v;
167+
return (int) (t.toNanoOfDay() / 1_000_000);
168+
};
169+
case TIMESTAMP_WITHOUT_TIME_ZONE:
170+
return (obj) -> {
171+
Object v = prop.read(obj);
172+
if (v == null) return null;
173+
if (!(v instanceof LocalDateTime)) {
174+
throw new IllegalArgumentException(
175+
String.format(
176+
"Field %s is not a LocalDateTime. Cannot convert to TimestampNtz.",
177+
prop.name));
178+
}
179+
return TimestampNtz.fromLocalDateTime((LocalDateTime) v);
180+
};
181+
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
182+
return (obj) -> {
183+
Object v = prop.read(obj);
184+
if (v == null) return null;
185+
if (v instanceof Instant) {
186+
return TimestampLtz.fromInstant((Instant) v);
187+
} else if (v instanceof OffsetDateTime) {
188+
return TimestampLtz.fromInstant(((OffsetDateTime) v).toInstant());
189+
}
190+
throw new IllegalArgumentException(
191+
String.format(
192+
"Field %s is not an Instant or OffsetDateTime. Cannot convert to TimestampLtz.",
193+
prop.name));
194+
};
195+
default:
196+
throw new UnsupportedOperationException(
197+
String.format(
198+
"Unsupported field type %s for field %s.",
199+
fieldType.getTypeRoot(), prop.name));
200+
}
201+
}
202+
203+
private interface FieldToRow {
204+
Object readAndConvert(Object pojo) throws Exception;
205+
}
206+
}

0 commit comments

Comments
 (0)