Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
98ecaa7
Introduce Generics and Typed Classes
polyzos Nov 17, 2025
dceb71e
fix checkstyle violation
polyzos Nov 17, 2025
47e50f8
fix checkstyle violation
polyzos Nov 17, 2025
a24c32a
update tests
polyzos Nov 17, 2025
7dc648d
add end2end tests
polyzos Nov 17, 2025
4dc25d6
add required parameterized types
polyzos Nov 17, 2025
f291a0f
fix checkstyle violation
polyzos Nov 17, 2025
8297380
add missing types to flink module
polyzos Nov 17, 2025
2a8a2b5
patch tests
polyzos Nov 17, 2025
eabb51d
improve test coverage
polyzos Nov 17, 2025
367461d
fix checkstyle violation
polyzos Nov 17, 2025
e06df33
instantiate converters once
polyzos Nov 18, 2025
040fa90
Introduce Generics and Typed Classes
polyzos Nov 17, 2025
f07a84b
fix checkstyle violation
polyzos Nov 17, 2025
a7cad6d
fix checkstyle violation
polyzos Nov 17, 2025
b22defd
update tests
polyzos Nov 17, 2025
950ed3b
add end2end tests
polyzos Nov 17, 2025
1fdbdfe
add required parameterized types
polyzos Nov 17, 2025
5393618
fix checkstyle violation
polyzos Nov 17, 2025
b579655
patch tests
polyzos Nov 17, 2025
3d0e607
improve test coverage
polyzos Nov 17, 2025
5f9a2b1
fix checkstyle violation
polyzos Nov 17, 2025
421a70e
instantiate converters once
polyzos Nov 18, 2025
5a891c0
revert message
polyzos Nov 25, 2025
e063083
fix test
polyzos Nov 25, 2025
3ea717c
fix checkstyle
polyzos Nov 28, 2025
7dc4879
refactor to typed apis
polyzos Dec 22, 2025
8b8a33d
add TypedScanRecords
polyzos Dec 22, 2025
1f2e546
revert unneeded changes
polyzos Dec 22, 2025
c1975b3
address comment for the lookuper
polyzos Dec 22, 2025
084cc4c
delete unneeded test
polyzos Dec 22, 2025
604e6f6
make the api consistent
polyzos Dec 22, 2025
99b0d06
address comments
polyzos Dec 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,24 @@ static void validatePojoMatchesTable(PojoType<?> pojoType, RowType tableSchema)
}
}

static void validatePojoMatchesProjection(PojoType<?> pojoType, RowType projection) {
Set<String> pojoNames = pojoType.getProperties().keySet();
List<String> fieldNames = projection.getFieldNames();
if (!pojoNames.containsAll(fieldNames)) {
throw new IllegalArgumentException(
String.format(
"POJO fields %s must contain all projection fields %s. "
+ "For full-table writes, POJO fields must exactly match table schema fields.",
pojoNames, fieldNames));
}
for (int i = 0; i < projection.getFieldCount(); i++) {
String name = fieldNames.get(i);
DataType dt = projection.getTypeAt(i);
PojoType.Property prop = pojoType.getProperty(name);
validateCompatibility(dt, prop);
}
}

static void validateProjectionSubset(RowType projection, RowType tableSchema) {
Set<String> tableNames = new HashSet<>(tableSchema.getFieldNames());
for (String n : projection.getFieldNames()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import javax.annotation.Nullable;

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
Expand All @@ -54,7 +55,9 @@ private PojoToRowConverter(PojoType<T> pojoType, RowType tableSchema, RowType pr
this.tableSchema = tableSchema;
this.projection = projection;
this.projectionFieldNames = projection.getFieldNames();
ConverterCommons.validatePojoMatchesTable(pojoType, tableSchema);
// For writer path, allow POJO to be a superset of the projection. It must contain all
// projected fields.
ConverterCommons.validatePojoMatchesProjection(pojoType, projection);
ConverterCommons.validateProjectionSubset(projection, tableSchema);
this.fieldConverters = createFieldConverters();
}
Expand Down Expand Up @@ -177,8 +180,22 @@ private static FieldToRow createFieldConverter(PojoType.Property prop, DataType
String.format(
"Field %s is not a BigDecimal. Cannot convert to Decimal.", prop.name));
}
return Decimal.fromBigDecimal(
(BigDecimal) v, decimalType.getPrecision(), decimalType.getScale());
final int precision = decimalType.getPrecision();
final int scale = decimalType.getScale();

// Scale with a deterministic rounding mode to avoid ArithmeticException when rounding is
// needed.
BigDecimal bd = (BigDecimal) v;
BigDecimal scaled = bd.setScale(scale, RoundingMode.HALF_UP);

if (scaled.precision() > precision) {
throw new IllegalArgumentException(
String.format(
"Decimal value for field %s exceeds precision %d after scaling to %d: %s",
prop.name, precision, scale, scaled));
}

return Decimal.fromBigDecimal(scaled, precision, scale);
}

/** Converts a LocalDate POJO property to number of days since epoch. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ final class PojoType<T> {
private final Class<T> pojoClass;
private final Constructor<T> defaultConstructor;
private final Map<String, Property> properties; // property name -> property
private static final Map<Class<?>, Class<?>> PRIMITIVE_TO_BOXED = createPrimitiveToBoxedMap();

private PojoType(Class<T> pojoClass, Constructor<T> ctor, Map<String, Property> props) {
this.pojoClass = pojoClass;
Expand Down Expand Up @@ -73,12 +74,15 @@ static <T> PojoType<T> of(Class<T> pojoClass) {
for (Map.Entry<String, Field> e : allFields.entrySet()) {
String name = e.getKey();
Field field = e.getValue();
// Enforce nullable fields: primitives are not allowed in POJO definitions.
if (field.getType().isPrimitive()) {
throw new IllegalArgumentException(
String.format(
"POJO class %s has primitive field '%s' of type %s. Primitive types are not allowed; all fields must be nullable (use wrapper types).",
pojoClass.getName(), name, field.getType().getName()));
}
// use boxed type as effective type
Class<?> effectiveType = boxIfPrimitive(field.getType());
boolean publicField = Modifier.isPublic(field.getModifiers());
Method getter = getters.get(name);
Method setter = setters.get(name);
Expand All @@ -94,8 +98,7 @@ static <T> PojoType<T> of(Class<T> pojoClass) {
}
props.put(
name,
new Property(
name, field.getType(), publicField ? field : null, getter, setter));
new Property(name, effectiveType, publicField ? field : null, getter, setter));
}

return new PojoType<>(pojoClass, ctor, props);
Expand Down Expand Up @@ -235,6 +238,29 @@ private static String decapitalize(String s) {
return s.substring(0, 1).toLowerCase(Locale.ROOT) + s.substring(1);
}

private static Map<Class<?>, Class<?>> createPrimitiveToBoxedMap() {
Map<Class<?>, Class<?>> map = new HashMap<>();
map.put(boolean.class, Boolean.class);
map.put(byte.class, Byte.class);
map.put(short.class, Short.class);
map.put(int.class, Integer.class);
map.put(long.class, Long.class);
map.put(float.class, Float.class);
map.put(double.class, Double.class);
map.put(char.class, Character.class);
// void shouldn't appear as a field type, but handle defensively
map.put(void.class, Void.class);
return map;
}

private static Class<?> boxIfPrimitive(Class<?> type) {
if (!type.isPrimitive()) {
return type;
}
Class<?> boxed = PRIMITIVE_TO_BOXED.get(type);
return boxed != null ? boxed : type;
}

static final class Property {
final String name;
final Class<?> type;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,31 +24,37 @@

/**
* Used to configure and create a {@link Lookuper} to lookup rows of a primary key table. The built
* Lookuper can be a primary key lookuper that lookups by the primary key, or a prefix key lookup
* that lookups by the prefix key of the primary key.
* lookuper can lookup by the full primary key, or by a prefix of the primary key when configured
* via {@link #lookupBy}.
*
* <p>{@link Lookup} objects are immutable and can be shared between threads. Refinement methods,
* like {@link #lookupBy}, create new Lookup instances.
* like {@link #lookupBy}, create new {@code Lookup} instances.
*
* <p>Example1: Create a Primary Key Lookuper. Given a table with primary key column [k STRING].
* <p>Examples
*
* <p>Example 1: Primary Key Lookuper using an InternalRow key. Given a table with primary key
* column [k STRING]:
*
* <pre>{@code
* Lookuper lookuper = table.newLookup().createLookuper();
* CompletableFuture<LookupResult> resultFuture = lookuper.lookup(GenericRow.of("key1"));
* resultFuture.get().getRows().forEach(row -> {
* System.out.println(row);
* });
* resultFuture.get().getRowList().forEach(System.out::println);
* }</pre>
*
* <p>Example2: Create a Prefix Key Lookuper. Given a table with primary key column [a INT, b
* STRING, c BIGINT] and bucket key [a, b].
* <p>Example 2: Prefix Key Lookuper using an InternalRow key. Given a table with primary key
* columns [a INT, b STRING, c BIGINT] and bucket key [a, b]:
*
* <pre>{@code
* Lookuper lookuper = table.newLookup().lookupBy("a", "b").createLookuper();
* CompletableFuture<LookupResult> resultFuture = lookuper.lookup(GenericRow.of(1, "b1"));
* resultFuture.get().getRows().forEach(row -> {
* System.out.println(row);
* });
* resultFuture.get().getRowList().forEach(System.out::println);
* }</pre>
*
* <p>Example 3: Using a POJO key (conversion handled internally):
*
* <pre>{@code
* TypedLookuper<MyKeyPojo> lookuper = table.newLookup().createTypedLookuper(MyKeyPojo.class);
* LookupResult result = lookuper.lookup(new MyKeyPojo(...)).get();
* }</pre>
*
* @since 0.6
Expand Down Expand Up @@ -96,4 +102,13 @@ default Lookup lookupBy(String... lookupColumnNames) {
* @return the lookuper
*/
Lookuper createLookuper();

/**
* Creates a {@link TypedLookuper} instance to lookup rows of a primary key table using POJOs.
*
* @param pojoClass the class of the POJO
* @param <T> the type of the POJO
* @return the typed lookuper
*/
<T> TypedLookuper<T> createTypedLookuper(Class<T> pojoClass);
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import java.util.Objects;

/**
* The result of {@link Lookuper#lookup(InternalRow)}.
* The result of a lookup operation performed by a {@link Lookuper}. It carries zero, one, or many
* {@link org.apache.fluss.row.InternalRow} values depending on whether the underlying lookup is a
* primary-key lookup (at most one) or a prefix-key lookup (zero or more).
*
* @since 0.1
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,16 @@
import java.util.concurrent.CompletableFuture;

/**
* The lookup-er is used to lookup row of a primary key table by primary key or prefix key. The
* lookuper has retriable ability to handle transient errors during lookup operations which is
* configured by {@link org.apache.fluss.config.ConfigOptions#CLIENT_LOOKUP_MAX_RETRIES}.
* A lookuper performs key-based lookups against a primary key table, using either the full primary
* key or a prefix of the primary key (when configured via {@code Lookup#lookupBy}).
*
* <p>Note: Lookuper instances are not thread-safe.
* <p>Usage examples:
*
* <pre>{@code
* // Row-based key (InternalRow)
* Lookuper lookuper = table.newLookup().createLookuper();
* LookupResult res = lookuper.lookup(keyRow).get();
* }</pre>
*
* @since 0.6
*/
Expand All @@ -44,7 +49,7 @@ public interface Lookuper {
* {@code table.newLookup().createLookuper()}), or be the prefix key if the lookuper is a Prefix
* Key Lookuper (created by {@code table.newLookup().lookupBy(prefixKeys).createLookuper()}).
*
* @param lookupKey the lookup key.
* @param lookupKey the lookup key
* @return the result of lookup.
*/
CompletableFuture<LookupResult> lookup(InternalRow lookupKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
* of the primary key.
*/
@NotThreadSafe
class PrefixKeyLookuper extends AbstractLookuper {
class PrefixKeyLookuper extends AbstractLookuper implements Lookuper {

/** Extract bucket key from prefix lookup key row. */
private final KeyEncoder bucketKeyEncoder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@

/** An implementation of {@link Lookuper} that lookups by primary key. */
@NotThreadSafe
class PrimaryKeyLookuper extends AbstractLookuper {
class PrimaryKeyLookuper extends AbstractLookuper implements Lookuper {

private final KeyEncoder primaryKeyEncoder;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,9 @@ public Lookuper createLookuper() {
tableInfo, schemaGetter, metadataUpdater, lookupClient, lookupColumnNames);
}
}

@Override
public <T> TypedLookuper<T> createTypedLookuper(Class<T> pojoClass) {
return new TypedLookuperImpl<>(createLookuper(), tableInfo, lookupColumnNames, pojoClass);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.client.lookup;

import org.apache.fluss.annotation.PublicEvolving;

import java.util.concurrent.CompletableFuture;

/**
* A typed lookuper performs key-based lookups against a primary key table using POJOs.
*
* @param <T> the type of the lookup key
* @since 0.6
*/
@PublicEvolving
public interface TypedLookuper<T> {

/**
* Lookups certain row from the given lookup key.
*
* @param lookupKey the lookup key
* @return the result of lookup.
*/
CompletableFuture<LookupResult> lookup(T lookupKey);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package org.apache.fluss.client.lookup;

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import org.apache.fluss.client.converter.PojoToRowConverter;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.types.RowType;

import javax.annotation.Nullable;

import java.util.List;
import java.util.concurrent.CompletableFuture;

/**
* Decorator for {@link Lookuper} that enables generic key lookup via {@link
* TypedLookuper#lookup(Object)}. Converts POJO keys to {@link InternalRow} using existing
* converters based on table schema and active lookup columns, and directly delegates when the key
* is already an {@link InternalRow}.
*/
final class TypedLookuperImpl<K> implements TypedLookuper<K> {

private final Lookuper delegate;
private final TableInfo tableInfo;
@Nullable private final List<String> lookupColumnNames;
private final PojoToRowConverter<K> keyConv;

TypedLookuperImpl(
Lookuper delegate,
TableInfo tableInfo,
@Nullable List<String> lookupColumnNames,
Class<K> keyClass) {
this.delegate = delegate;
this.tableInfo = tableInfo;
this.lookupColumnNames = lookupColumnNames;
this.keyConv = createPojoToRowConverter(keyClass);
}

@Override
public CompletableFuture<LookupResult> lookup(K key) {
if (key == null) {
throw new IllegalArgumentException("key must not be null");
}
// Fast-path: already an InternalRow
if (key instanceof InternalRow) {
return delegate.lookup((InternalRow) key);
}

InternalRow keyRow = keyConv.toRow(key);
return delegate.lookup(keyRow);
}

private PojoToRowConverter<K> createPojoToRowConverter(Class<K> keyClass) {
RowType tableSchema = tableInfo.getRowType();
RowType keyProjection;
if (lookupColumnNames == null) {
keyProjection = tableSchema.project(tableInfo.getPrimaryKeys());
} else {
keyProjection = tableSchema.project(lookupColumnNames);
}
return PojoToRowConverter.of(keyClass, tableSchema, keyProjection);
}
}
Loading