Skip to content

Commit f731fc6

Browse files
authored
[client] Add Pojo API support for read and write operations (#1992)
1 parent b9b92d4 commit f731fc6

30 files changed

+1531
-47
lines changed

fluss-client/src/main/java/org/apache/fluss/client/converter/ConverterCommons.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,24 @@ static void validatePojoMatchesTable(PojoType<?> pojoType, RowType tableSchema)
8383
}
8484
}
8585

86+
static void validatePojoMatchesProjection(PojoType<?> pojoType, RowType projection) {
87+
Set<String> pojoNames = pojoType.getProperties().keySet();
88+
List<String> fieldNames = projection.getFieldNames();
89+
if (!pojoNames.containsAll(fieldNames)) {
90+
throw new IllegalArgumentException(
91+
String.format(
92+
"POJO fields %s must contain all projection fields %s. "
93+
+ "For full-table writes, POJO fields must exactly match table schema fields.",
94+
pojoNames, fieldNames));
95+
}
96+
for (int i = 0; i < projection.getFieldCount(); i++) {
97+
String name = fieldNames.get(i);
98+
DataType dt = projection.getTypeAt(i);
99+
PojoType.Property prop = pojoType.getProperty(name);
100+
validateCompatibility(dt, prop);
101+
}
102+
}
103+
86104
static void validateProjectionSubset(RowType projection, RowType tableSchema) {
87105
Set<String> tableNames = new HashSet<>(tableSchema.getFieldNames());
88106
for (String n : projection.getFieldNames()) {

fluss-client/src/main/java/org/apache/fluss/client/converter/PojoToRowConverter.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import javax.annotation.Nullable;
3131

3232
import java.math.BigDecimal;
33+
import java.math.RoundingMode;
3334
import java.time.Instant;
3435
import java.time.LocalDate;
3536
import java.time.LocalDateTime;
@@ -54,7 +55,9 @@ private PojoToRowConverter(PojoType<T> pojoType, RowType tableSchema, RowType pr
5455
this.tableSchema = tableSchema;
5556
this.projection = projection;
5657
this.projectionFieldNames = projection.getFieldNames();
57-
ConverterCommons.validatePojoMatchesTable(pojoType, tableSchema);
58+
// For writer path, allow POJO to be a superset of the projection. It must contain all
59+
// projected fields.
60+
ConverterCommons.validatePojoMatchesProjection(pojoType, projection);
5861
ConverterCommons.validateProjectionSubset(projection, tableSchema);
5962
this.fieldConverters = createFieldConverters();
6063
}
@@ -177,8 +180,22 @@ private static FieldToRow createFieldConverter(PojoType.Property prop, DataType
177180
String.format(
178181
"Field %s is not a BigDecimal. Cannot convert to Decimal.", prop.name));
179182
}
180-
return Decimal.fromBigDecimal(
181-
(BigDecimal) v, decimalType.getPrecision(), decimalType.getScale());
183+
final int precision = decimalType.getPrecision();
184+
final int scale = decimalType.getScale();
185+
186+
// Scale with a deterministic rounding mode to avoid ArithmeticException when rounding is
187+
// needed.
188+
BigDecimal bd = (BigDecimal) v;
189+
BigDecimal scaled = bd.setScale(scale, RoundingMode.HALF_UP);
190+
191+
if (scaled.precision() > precision) {
192+
throw new IllegalArgumentException(
193+
String.format(
194+
"Decimal value for field %s exceeds precision %d after scaling to %d: %s",
195+
prop.name, precision, scale, scaled));
196+
}
197+
198+
return Decimal.fromBigDecimal(scaled, precision, scale);
182199
}
183200

184201
/** Converts a LocalDate POJO property to number of days since epoch. */

fluss-client/src/main/java/org/apache/fluss/client/converter/PojoType.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ final class PojoType<T> {
3737
private final Class<T> pojoClass;
3838
private final Constructor<T> defaultConstructor;
3939
private final Map<String, Property> properties; // property name -> property
40+
private static final Map<Class<?>, Class<?>> PRIMITIVE_TO_BOXED = createPrimitiveToBoxedMap();
4041

4142
private PojoType(Class<T> pojoClass, Constructor<T> ctor, Map<String, Property> props) {
4243
this.pojoClass = pojoClass;
@@ -73,12 +74,15 @@ static <T> PojoType<T> of(Class<T> pojoClass) {
7374
for (Map.Entry<String, Field> e : allFields.entrySet()) {
7475
String name = e.getKey();
7576
Field field = e.getValue();
77+
// Enforce nullable fields: primitives are not allowed in POJO definitions.
7678
if (field.getType().isPrimitive()) {
7779
throw new IllegalArgumentException(
7880
String.format(
7981
"POJO class %s has primitive field '%s' of type %s. Primitive types are not allowed; all fields must be nullable (use wrapper types).",
8082
pojoClass.getName(), name, field.getType().getName()));
8183
}
84+
// use boxed type as effective type
85+
Class<?> effectiveType = boxIfPrimitive(field.getType());
8286
boolean publicField = Modifier.isPublic(field.getModifiers());
8387
Method getter = getters.get(name);
8488
Method setter = setters.get(name);
@@ -94,8 +98,7 @@ static <T> PojoType<T> of(Class<T> pojoClass) {
9498
}
9599
props.put(
96100
name,
97-
new Property(
98-
name, field.getType(), publicField ? field : null, getter, setter));
101+
new Property(name, effectiveType, publicField ? field : null, getter, setter));
99102
}
100103

101104
return new PojoType<>(pojoClass, ctor, props);
@@ -235,6 +238,29 @@ private static String decapitalize(String s) {
235238
return s.substring(0, 1).toLowerCase(Locale.ROOT) + s.substring(1);
236239
}
237240

241+
private static Map<Class<?>, Class<?>> createPrimitiveToBoxedMap() {
242+
Map<Class<?>, Class<?>> map = new HashMap<>();
243+
map.put(boolean.class, Boolean.class);
244+
map.put(byte.class, Byte.class);
245+
map.put(short.class, Short.class);
246+
map.put(int.class, Integer.class);
247+
map.put(long.class, Long.class);
248+
map.put(float.class, Float.class);
249+
map.put(double.class, Double.class);
250+
map.put(char.class, Character.class);
251+
// void shouldn't appear as a field type, but handle defensively
252+
map.put(void.class, Void.class);
253+
return map;
254+
}
255+
256+
private static Class<?> boxIfPrimitive(Class<?> type) {
257+
if (!type.isPrimitive()) {
258+
return type;
259+
}
260+
Class<?> boxed = PRIMITIVE_TO_BOXED.get(type);
261+
return boxed != null ? boxed : type;
262+
}
263+
238264
static final class Property {
239265
final String name;
240266
final Class<?> type;

fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookup.java

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,31 +24,37 @@
2424

2525
/**
2626
* Used to configure and create a {@link Lookuper} to lookup rows of a primary key table. The built
27-
* Lookuper can be a primary key lookuper that lookups by the primary key, or a prefix key lookup
28-
* that lookups by the prefix key of the primary key.
27+
* lookuper can lookup by the full primary key, or by a prefix of the primary key when configured
28+
* via {@link #lookupBy}.
2929
*
3030
* <p>{@link Lookup} objects are immutable and can be shared between threads. Refinement methods,
31-
* like {@link #lookupBy}, create new Lookup instances.
31+
* like {@link #lookupBy}, create new {@code Lookup} instances.
3232
*
33-
* <p>Example1: Create a Primary Key Lookuper. Given a table with primary key column [k STRING].
33+
* <p>Examples
34+
*
35+
* <p>Example 1: Primary Key Lookuper using an InternalRow key. Given a table with primary key
36+
* column [k STRING]:
3437
*
3538
* <pre>{@code
3639
* Lookuper lookuper = table.newLookup().createLookuper();
3740
* CompletableFuture<LookupResult> resultFuture = lookuper.lookup(GenericRow.of("key1"));
38-
* resultFuture.get().getRows().forEach(row -> {
39-
* System.out.println(row);
40-
* });
41+
* resultFuture.get().getRowList().forEach(System.out::println);
4142
* }</pre>
4243
*
43-
* <p>Example2: Create a Prefix Key Lookuper. Given a table with primary key column [a INT, b
44-
* STRING, c BIGINT] and bucket key [a, b].
44+
* <p>Example 2: Prefix Key Lookuper using an InternalRow key. Given a table with primary key
45+
* columns [a INT, b STRING, c BIGINT] and bucket key [a, b]:
4546
*
4647
* <pre>{@code
4748
* Lookuper lookuper = table.newLookup().lookupBy("a", "b").createLookuper();
4849
* CompletableFuture<LookupResult> resultFuture = lookuper.lookup(GenericRow.of(1, "b1"));
49-
* resultFuture.get().getRows().forEach(row -> {
50-
* System.out.println(row);
51-
* });
50+
* resultFuture.get().getRowList().forEach(System.out::println);
51+
* }</pre>
52+
*
53+
* <p>Example 3: Using a POJO key (conversion handled internally):
54+
*
55+
* <pre>{@code
56+
* TypedLookuper<MyKeyPojo> lookuper = table.newLookup().createTypedLookuper(MyKeyPojo.class);
57+
* LookupResult result = lookuper.lookup(new MyKeyPojo(...)).get();
5258
* }</pre>
5359
*
5460
* @since 0.6
@@ -96,4 +102,13 @@ default Lookup lookupBy(String... lookupColumnNames) {
96102
* @return the lookuper
97103
*/
98104
Lookuper createLookuper();
105+
106+
/**
107+
* Creates a {@link TypedLookuper} instance to lookup rows of a primary key table using POJOs.
108+
*
109+
* @param pojoClass the class of the POJO
110+
* @param <T> the type of the POJO
111+
* @return the typed lookuper
112+
*/
113+
<T> TypedLookuper<T> createTypedLookuper(Class<T> pojoClass);
99114
}

fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupResult.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@
2727
import java.util.Objects;
2828

2929
/**
30-
* The result of {@link Lookuper#lookup(InternalRow)}.
30+
* The result of a lookup operation performed by a {@link Lookuper}. It carries zero, one, or many
31+
* {@link org.apache.fluss.row.InternalRow} values depending on whether the underlying lookup is a
32+
* primary-key lookup (at most one) or a prefix-key lookup (zero or more).
3133
*
3234
* @since 0.1
3335
*/

fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookuper.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,16 @@
2525
import java.util.concurrent.CompletableFuture;
2626

2727
/**
28-
* The lookup-er is used to lookup row of a primary key table by primary key or prefix key. The
29-
* lookuper has retriable ability to handle transient errors during lookup operations which is
30-
* configured by {@link org.apache.fluss.config.ConfigOptions#CLIENT_LOOKUP_MAX_RETRIES}.
28+
* A lookuper performs key-based lookups against a primary key table, using either the full primary
29+
* key or a prefix of the primary key (when configured via {@code Lookup#lookupBy}).
3130
*
32-
* <p>Note: Lookuper instances are not thread-safe.
31+
* <p>Usage examples:
32+
*
33+
* <pre>{@code
34+
* // Row-based key (InternalRow)
35+
* Lookuper lookuper = table.newLookup().createLookuper();
36+
* LookupResult res = lookuper.lookup(keyRow).get();
37+
* }</pre>
3338
*
3439
* @since 0.6
3540
*/
@@ -44,7 +49,7 @@ public interface Lookuper {
4449
* {@code table.newLookup().createLookuper()}), or be the prefix key if the lookuper is a Prefix
4550
* Key Lookuper (created by {@code table.newLookup().lookupBy(prefixKeys).createLookuper()}).
4651
*
47-
* @param lookupKey the lookup key.
52+
* @param lookupKey the lookup key
4853
* @return the result of lookup.
4954
*/
5055
CompletableFuture<LookupResult> lookup(InternalRow lookupKey);

fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
* of the primary key.
4747
*/
4848
@NotThreadSafe
49-
class PrefixKeyLookuper extends AbstractLookuper {
49+
class PrefixKeyLookuper extends AbstractLookuper implements Lookuper {
5050

5151
/** Extract bucket key from prefix lookup key row. */
5252
private final KeyEncoder bucketKeyEncoder;

fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040

4141
/** An implementation of {@link Lookuper} that lookups by primary key. */
4242
@NotThreadSafe
43-
class PrimaryKeyLookuper extends AbstractLookuper {
43+
class PrimaryKeyLookuper extends AbstractLookuper implements Lookuper {
4444

4545
private final KeyEncoder primaryKeyEncoder;
4646

fluss-client/src/main/java/org/apache/fluss/client/lookup/TableLookup.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,4 +71,9 @@ public Lookuper createLookuper() {
7171
tableInfo, schemaGetter, metadataUpdater, lookupClient, lookupColumnNames);
7272
}
7373
}
74+
75+
@Override
76+
public <T> TypedLookuper<T> createTypedLookuper(Class<T> pojoClass) {
77+
return new TypedLookuperImpl<>(createLookuper(), tableInfo, lookupColumnNames, pojoClass);
78+
}
7479
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.lookup;
19+
20+
import org.apache.fluss.annotation.PublicEvolving;
21+
22+
import java.util.concurrent.CompletableFuture;
23+
24+
/**
25+
* A typed lookuper performs key-based lookups against a primary key table using POJOs.
26+
*
27+
* @param <T> the type of the lookup key
28+
* @since 0.6
29+
*/
30+
@PublicEvolving
31+
public interface TypedLookuper<T> {
32+
33+
/**
34+
* Lookups certain row from the given lookup key.
35+
*
36+
* @param lookupKey the lookup key
37+
* @return the result of lookup.
38+
*/
39+
CompletableFuture<LookupResult> lookup(T lookupKey);
40+
}

0 commit comments

Comments
 (0)