diff --git a/fluss-client/src/main/java/org/apache/fluss/client/converter/ConverterCommons.java b/fluss-client/src/main/java/org/apache/fluss/client/converter/ConverterCommons.java index 5f4f1b6c12..aeb36419a6 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/converter/ConverterCommons.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/converter/ConverterCommons.java @@ -83,6 +83,24 @@ static void validatePojoMatchesTable(PojoType pojoType, RowType tableSchema) } } + static void validatePojoMatchesProjection(PojoType pojoType, RowType projection) { + Set pojoNames = pojoType.getProperties().keySet(); + List fieldNames = projection.getFieldNames(); + if (!pojoNames.containsAll(fieldNames)) { + throw new IllegalArgumentException( + String.format( + "POJO fields %s must contain all projection fields %s. " + + "For full-table writes, POJO fields must exactly match table schema fields.", + pojoNames, fieldNames)); + } + for (int i = 0; i < projection.getFieldCount(); i++) { + String name = fieldNames.get(i); + DataType dt = projection.getTypeAt(i); + PojoType.Property prop = pojoType.getProperty(name); + validateCompatibility(dt, prop); + } + } + static void validateProjectionSubset(RowType projection, RowType tableSchema) { Set tableNames = new HashSet<>(tableSchema.getFieldNames()); for (String n : projection.getFieldNames()) { diff --git a/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoToRowConverter.java b/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoToRowConverter.java index 4d75aa91a5..0f4f6d8a4e 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoToRowConverter.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoToRowConverter.java @@ -30,6 +30,7 @@ import javax.annotation.Nullable; import java.math.BigDecimal; +import java.math.RoundingMode; import java.time.Instant; import java.time.LocalDate; import java.time.LocalDateTime; @@ -54,7 +55,9 @@ private PojoToRowConverter(PojoType pojoType, RowType tableSchema, RowType pr this.tableSchema = tableSchema; this.projection = projection; this.projectionFieldNames = projection.getFieldNames(); - ConverterCommons.validatePojoMatchesTable(pojoType, tableSchema); + // For writer path, allow POJO to be a superset of the projection. It must contain all + // projected fields. + ConverterCommons.validatePojoMatchesProjection(pojoType, projection); ConverterCommons.validateProjectionSubset(projection, tableSchema); this.fieldConverters = createFieldConverters(); } @@ -177,8 +180,22 @@ private static FieldToRow createFieldConverter(PojoType.Property prop, DataType String.format( "Field %s is not a BigDecimal. Cannot convert to Decimal.", prop.name)); } - return Decimal.fromBigDecimal( - (BigDecimal) v, decimalType.getPrecision(), decimalType.getScale()); + final int precision = decimalType.getPrecision(); + final int scale = decimalType.getScale(); + + // Scale with a deterministic rounding mode to avoid ArithmeticException when rounding is + // needed. + BigDecimal bd = (BigDecimal) v; + BigDecimal scaled = bd.setScale(scale, RoundingMode.HALF_UP); + + if (scaled.precision() > precision) { + throw new IllegalArgumentException( + String.format( + "Decimal value for field %s exceeds precision %d after scaling to %d: %s", + prop.name, precision, scale, scaled)); + } + + return Decimal.fromBigDecimal(scaled, precision, scale); } /** Converts a LocalDate POJO property to number of days since epoch. */ diff --git a/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoType.java b/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoType.java index 7790b91438..de848f448f 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoType.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoType.java @@ -37,6 +37,7 @@ final class PojoType { private final Class pojoClass; private final Constructor defaultConstructor; private final Map properties; // property name -> property + private static final Map, Class> PRIMITIVE_TO_BOXED = createPrimitiveToBoxedMap(); private PojoType(Class pojoClass, Constructor ctor, Map props) { this.pojoClass = pojoClass; @@ -73,12 +74,15 @@ static PojoType of(Class pojoClass) { for (Map.Entry e : allFields.entrySet()) { String name = e.getKey(); Field field = e.getValue(); + // Enforce nullable fields: primitives are not allowed in POJO definitions. if (field.getType().isPrimitive()) { throw new IllegalArgumentException( String.format( "POJO class %s has primitive field '%s' of type %s. Primitive types are not allowed; all fields must be nullable (use wrapper types).", pojoClass.getName(), name, field.getType().getName())); } + // use boxed type as effective type + Class effectiveType = boxIfPrimitive(field.getType()); boolean publicField = Modifier.isPublic(field.getModifiers()); Method getter = getters.get(name); Method setter = setters.get(name); @@ -94,8 +98,7 @@ static PojoType of(Class pojoClass) { } props.put( name, - new Property( - name, field.getType(), publicField ? field : null, getter, setter)); + new Property(name, effectiveType, publicField ? field : null, getter, setter)); } return new PojoType<>(pojoClass, ctor, props); @@ -235,6 +238,29 @@ private static String decapitalize(String s) { return s.substring(0, 1).toLowerCase(Locale.ROOT) + s.substring(1); } + private static Map, Class> createPrimitiveToBoxedMap() { + Map, Class> map = new HashMap<>(); + map.put(boolean.class, Boolean.class); + map.put(byte.class, Byte.class); + map.put(short.class, Short.class); + map.put(int.class, Integer.class); + map.put(long.class, Long.class); + map.put(float.class, Float.class); + map.put(double.class, Double.class); + map.put(char.class, Character.class); + // void shouldn't appear as a field type, but handle defensively + map.put(void.class, Void.class); + return map; + } + + private static Class boxIfPrimitive(Class type) { + if (!type.isPrimitive()) { + return type; + } + Class boxed = PRIMITIVE_TO_BOXED.get(type); + return boxed != null ? boxed : type; + } + static final class Property { final String name; final Class type; diff --git a/fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookup.java b/fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookup.java index 31faed2f77..e64685b9a0 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookup.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookup.java @@ -24,31 +24,37 @@ /** * Used to configure and create a {@link Lookuper} to lookup rows of a primary key table. The built - * Lookuper can be a primary key lookuper that lookups by the primary key, or a prefix key lookup - * that lookups by the prefix key of the primary key. + * lookuper can lookup by the full primary key, or by a prefix of the primary key when configured + * via {@link #lookupBy}. * *

{@link Lookup} objects are immutable and can be shared between threads. Refinement methods, - * like {@link #lookupBy}, create new Lookup instances. + * like {@link #lookupBy}, create new {@code Lookup} instances. * - *

Example1: Create a Primary Key Lookuper. Given a table with primary key column [k STRING]. + *

Examples + * + *

Example 1: Primary Key Lookuper using an InternalRow key. Given a table with primary key + * column [k STRING]: * *

{@code
  * Lookuper lookuper = table.newLookup().createLookuper();
  * CompletableFuture resultFuture = lookuper.lookup(GenericRow.of("key1"));
- * resultFuture.get().getRows().forEach(row -> {
- *    System.out.println(row);
- * });
+ * resultFuture.get().getRowList().forEach(System.out::println);
  * }
* - *

Example2: Create a Prefix Key Lookuper. Given a table with primary key column [a INT, b - * STRING, c BIGINT] and bucket key [a, b]. + *

Example 2: Prefix Key Lookuper using an InternalRow key. Given a table with primary key + * columns [a INT, b STRING, c BIGINT] and bucket key [a, b]: * *

{@code
  * Lookuper lookuper = table.newLookup().lookupBy("a", "b").createLookuper();
  * CompletableFuture resultFuture = lookuper.lookup(GenericRow.of(1, "b1"));
- * resultFuture.get().getRows().forEach(row -> {
- *   System.out.println(row);
- * });
+ * resultFuture.get().getRowList().forEach(System.out::println);
+ * }
+ * + *

Example 3: Using a POJO key (conversion handled internally): + * + *

{@code
+ * TypedLookuper lookuper = table.newLookup().createTypedLookuper(MyKeyPojo.class);
+ * LookupResult result = lookuper.lookup(new MyKeyPojo(...)).get();
  * }
* * @since 0.6 @@ -96,4 +102,13 @@ default Lookup lookupBy(String... lookupColumnNames) { * @return the lookuper */ Lookuper createLookuper(); + + /** + * Creates a {@link TypedLookuper} instance to lookup rows of a primary key table using POJOs. + * + * @param pojoClass the class of the POJO + * @param the type of the POJO + * @return the typed lookuper + */ + TypedLookuper createTypedLookuper(Class pojoClass); } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupResult.java b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupResult.java index b4536640ed..83497d37cf 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupResult.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupResult.java @@ -27,7 +27,9 @@ import java.util.Objects; /** - * The result of {@link Lookuper#lookup(InternalRow)}. + * The result of a lookup operation performed by a {@link Lookuper}. It carries zero, one, or many + * {@link org.apache.fluss.row.InternalRow} values depending on whether the underlying lookup is a + * primary-key lookup (at most one) or a prefix-key lookup (zero or more). * * @since 0.1 */ diff --git a/fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookuper.java b/fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookuper.java index 37157d9b12..9998a2612a 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookuper.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookuper.java @@ -25,11 +25,16 @@ import java.util.concurrent.CompletableFuture; /** - * The lookup-er is used to lookup row of a primary key table by primary key or prefix key. The - * lookuper has retriable ability to handle transient errors during lookup operations which is - * configured by {@link org.apache.fluss.config.ConfigOptions#CLIENT_LOOKUP_MAX_RETRIES}. + * A lookuper performs key-based lookups against a primary key table, using either the full primary + * key or a prefix of the primary key (when configured via {@code Lookup#lookupBy}). * - *

Note: Lookuper instances are not thread-safe. + *

Usage examples: + * + *

{@code
+ * // Row-based key (InternalRow)
+ * Lookuper lookuper = table.newLookup().createLookuper();
+ * LookupResult res = lookuper.lookup(keyRow).get();
+ * }
* * @since 0.6 */ @@ -44,7 +49,7 @@ public interface Lookuper { * {@code table.newLookup().createLookuper()}), or be the prefix key if the lookuper is a Prefix * Key Lookuper (created by {@code table.newLookup().lookupBy(prefixKeys).createLookuper()}). * - * @param lookupKey the lookup key. + * @param lookupKey the lookup key * @return the result of lookup. */ CompletableFuture lookup(InternalRow lookupKey); diff --git a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java index a2d1029d25..a7b3e6545f 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java @@ -46,7 +46,7 @@ * of the primary key. */ @NotThreadSafe -class PrefixKeyLookuper extends AbstractLookuper { +class PrefixKeyLookuper extends AbstractLookuper implements Lookuper { /** Extract bucket key from prefix lookup key row. */ private final KeyEncoder bucketKeyEncoder; diff --git a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java index 6c32647636..d2cd949630 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java @@ -40,7 +40,7 @@ /** An implementation of {@link Lookuper} that lookups by primary key. */ @NotThreadSafe -class PrimaryKeyLookuper extends AbstractLookuper { +class PrimaryKeyLookuper extends AbstractLookuper implements Lookuper { private final KeyEncoder primaryKeyEncoder; diff --git a/fluss-client/src/main/java/org/apache/fluss/client/lookup/TableLookup.java b/fluss-client/src/main/java/org/apache/fluss/client/lookup/TableLookup.java index 84c9bdb211..e0c92661f1 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/lookup/TableLookup.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/lookup/TableLookup.java @@ -71,4 +71,9 @@ public Lookuper createLookuper() { tableInfo, schemaGetter, metadataUpdater, lookupClient, lookupColumnNames); } } + + @Override + public TypedLookuper createTypedLookuper(Class pojoClass) { + return new TypedLookuperImpl<>(createLookuper(), tableInfo, lookupColumnNames, pojoClass); + } } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/lookup/TypedLookuper.java b/fluss-client/src/main/java/org/apache/fluss/client/lookup/TypedLookuper.java new file mode 100644 index 0000000000..2aa7624c82 --- /dev/null +++ b/fluss-client/src/main/java/org/apache/fluss/client/lookup/TypedLookuper.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.lookup; + +import org.apache.fluss.annotation.PublicEvolving; + +import java.util.concurrent.CompletableFuture; + +/** + * A typed lookuper performs key-based lookups against a primary key table using POJOs. + * + * @param the type of the lookup key + * @since 0.6 + */ +@PublicEvolving +public interface TypedLookuper { + + /** + * Lookups certain row from the given lookup key. + * + * @param lookupKey the lookup key + * @return the result of lookup. + */ + CompletableFuture lookup(T lookupKey); +} diff --git a/fluss-client/src/main/java/org/apache/fluss/client/lookup/TypedLookuperImpl.java b/fluss-client/src/main/java/org/apache/fluss/client/lookup/TypedLookuperImpl.java new file mode 100644 index 0000000000..c88eb15d13 --- /dev/null +++ b/fluss-client/src/main/java/org/apache/fluss/client/lookup/TypedLookuperImpl.java @@ -0,0 +1,78 @@ +package org.apache.fluss.client.lookup; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.fluss.client.converter.PojoToRowConverter; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.types.RowType; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * Decorator for {@link Lookuper} that enables generic key lookup via {@link + * TypedLookuper#lookup(Object)}. Converts POJO keys to {@link InternalRow} using existing + * converters based on table schema and active lookup columns, and directly delegates when the key + * is already an {@link InternalRow}. + */ +final class TypedLookuperImpl implements TypedLookuper { + + private final Lookuper delegate; + private final TableInfo tableInfo; + @Nullable private final List lookupColumnNames; + private final PojoToRowConverter keyConv; + + TypedLookuperImpl( + Lookuper delegate, + TableInfo tableInfo, + @Nullable List lookupColumnNames, + Class keyClass) { + this.delegate = delegate; + this.tableInfo = tableInfo; + this.lookupColumnNames = lookupColumnNames; + this.keyConv = createPojoToRowConverter(keyClass); + } + + @Override + public CompletableFuture lookup(K key) { + if (key == null) { + throw new IllegalArgumentException("key must not be null"); + } + // Fast-path: already an InternalRow + if (key instanceof InternalRow) { + return delegate.lookup((InternalRow) key); + } + + InternalRow keyRow = keyConv.toRow(key); + return delegate.lookup(keyRow); + } + + private PojoToRowConverter createPojoToRowConverter(Class keyClass) { + RowType tableSchema = tableInfo.getRowType(); + RowType keyProjection; + if (lookupColumnNames == null) { + keyProjection = tableSchema.project(tableInfo.getPrimaryKeys()); + } else { + keyProjection = tableSchema.project(lookupColumnNames); + } + return PojoToRowConverter.of(keyClass, tableSchema, keyProjection); + } +} diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java index 97e37847f6..1c05bd3916 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java @@ -20,6 +20,7 @@ import org.apache.fluss.annotation.PublicEvolving; import org.apache.fluss.client.table.scanner.batch.BatchScanner; import org.apache.fluss.client.table.scanner.log.LogScanner; +import org.apache.fluss.client.table.scanner.log.TypedLogScanner; import org.apache.fluss.metadata.TableBucket; import javax.annotation.Nullable; @@ -65,6 +66,13 @@ public interface Scan { */ LogScanner createLogScanner(); + /** + * Creates a {@link TypedLogScanner} to continuously read log data as POJOs of the given class. + * + *

Note: this API doesn't support pre-configured with {@link #limit(int)}. + */ + TypedLogScanner createTypedLogScanner(Class pojoClass); + /** * Creates a {@link BatchScanner} to read current data in the given table bucket for this scan. * diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java index 0ca1952cf6..0d1d28a0e2 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java @@ -25,6 +25,8 @@ import org.apache.fluss.client.table.scanner.batch.LimitBatchScanner; import org.apache.fluss.client.table.scanner.log.LogScanner; import org.apache.fluss.client.table.scanner.log.LogScannerImpl; +import org.apache.fluss.client.table.scanner.log.TypedLogScanner; +import org.apache.fluss.client.table.scanner.log.TypedLogScannerImpl; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.exception.FlussRuntimeException; import org.apache.fluss.metadata.SchemaGetter; @@ -113,6 +115,12 @@ public LogScanner createLogScanner() { schemaGetter); } + @Override + public TypedLogScanner createTypedLogScanner(Class pojoClass) { + LogScanner base = createLogScanner(); + return new TypedLogScannerImpl<>(base, pojoClass, tableInfo, projectedColumns); + } + @Override public BatchScanner createBatchScanner(TableBucket tableBucket) { if (limit == null) { diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TypedScanRecord.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TypedScanRecord.java new file mode 100644 index 0000000000..5a2d3b8a3e --- /dev/null +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TypedScanRecord.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.table.scanner; + +import org.apache.fluss.annotation.PublicEvolving; +import org.apache.fluss.record.ChangeType; +import org.apache.fluss.row.InternalRow; + +import java.util.Objects; + +/** + * A record produced by a table scanner which contains a typed value. + * + * @param The type of the value. + */ +@PublicEvolving +public class TypedScanRecord { + + private final ScanRecord scanRecord; + private final T value; + + public TypedScanRecord(ScanRecord scanRecord, T value) { + this.scanRecord = scanRecord; + this.value = value; + } + + /** The position of this record in the corresponding fluss table bucket. */ + public long logOffset() { + return scanRecord.logOffset(); + } + + /** The timestamp of this record. */ + public long timestamp() { + return scanRecord.timestamp(); + } + + /** The change type of this record. */ + public ChangeType getChangeType() { + return scanRecord.getChangeType(); + } + + /** Returns the record value. */ + public T getValue() { + return value; + } + + /** Returns the internal row of this record. */ + public InternalRow getRow() { + return scanRecord.getRow(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TypedScanRecord that = (TypedScanRecord) o; + return Objects.equals(scanRecord, that.scanRecord) && Objects.equals(value, that.value); + } + + @Override + public int hashCode() { + return Objects.hash(scanRecord, value); + } + + @Override + public String toString() { + return scanRecord.getChangeType().shortString() + value + "@" + scanRecord.logOffset(); + } +} diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/TypedLogScanner.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/TypedLogScanner.java new file mode 100644 index 0000000000..a700ab4340 --- /dev/null +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/TypedLogScanner.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.table.scanner.log; + +import org.apache.fluss.annotation.PublicEvolving; + +import java.time.Duration; + +/** + * A typed scanner is used to scan log data as POJOs of specify table from Fluss. + * + * @param the type of the POJO + * @since 0.6 + */ +@PublicEvolving +public interface TypedLogScanner extends AutoCloseable { + + /** + * Poll log data from tablet server. + * + * @param timeout the timeout to poll. + * @return the result of poll. + */ + TypedScanRecords poll(Duration timeout); + + /** + * Subscribe to the given table bucket from beginning dynamically. If the table bucket is + * already subscribed, the start offset will be updated. + * + * @param bucket the table bucket to subscribe. + */ + void subscribeFromBeginning(int bucket); + + /** + * Subscribe to the given partitioned table bucket from beginning dynamically. If the table + * bucket is already subscribed, the start offset will be updated. + * + * @param partitionId the partition id of the table partition to subscribe. + * @param bucket the table bucket to subscribe. + */ + void subscribeFromBeginning(long partitionId, int bucket); + + /** + * Subscribe to the given table bucket in given offset dynamically. If the table bucket is + * already subscribed, the offset will be updated. + * + * @param bucket the table bucket to subscribe. + * @param offset the offset to start from. + */ + void subscribe(int bucket, long offset); + + /** + * Subscribe to the given partitioned table bucket in given offset dynamically. If the table + * bucket is already subscribed, the offset will be updated. + * + * @param partitionId the partition id of the table partition to subscribe. + * @param bucket the table bucket to subscribe. + * @param offset the offset to start from. + */ + void subscribe(long partitionId, int bucket, long offset); + + /** + * Unsubscribe from the given bucket of given partition dynamically. + * + * @param partitionId the partition id of the table partition to unsubscribe. + * @param bucket the table bucket to unsubscribe. + */ + void unsubscribe(long partitionId, int bucket); + + /** + * Wake up the log scanner in case the fetcher thread in log scanner is blocking in {@link + * #poll(Duration timeout)}. + */ + void wakeup(); +} diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/TypedLogScannerImpl.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/TypedLogScannerImpl.java new file mode 100644 index 0000000000..3b7ea32d4f --- /dev/null +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/TypedLogScannerImpl.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.table.scanner.log; + +import org.apache.fluss.client.converter.RowToPojoConverter; +import org.apache.fluss.client.table.scanner.ScanRecord; +import org.apache.fluss.client.table.scanner.TypedScanRecord; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.types.RowType; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Adapter that converts {@link InternalRow} records from a {@link LogScanner} into POJOs of type T. + */ +public class TypedLogScannerImpl implements TypedLogScanner { + + private final LogScanner delegate; + private final RowToPojoConverter converter; + + public TypedLogScannerImpl( + LogScanner delegate, Class pojoClass, TableInfo tableInfo, int[] projectedColumns) { + this.delegate = delegate; + RowType tableSchema = tableInfo.getRowType(); + RowType projection = + projectedColumns == null ? tableSchema : tableSchema.project(projectedColumns); + this.converter = RowToPojoConverter.of(pojoClass, tableSchema, projection); + } + + @Override + public TypedScanRecords poll(Duration timeout) { + ScanRecords records = delegate.poll(timeout); + if (records == null || records.isEmpty()) { + return TypedScanRecords.empty(); + } + Map>> out = new HashMap<>(); + for (TableBucket bucket : records.buckets()) { + List list = records.records(bucket); + List> converted = new ArrayList<>(list.size()); + for (ScanRecord r : list) { + InternalRow row = r.getRow(); + T pojo = converter.fromRow(row); + converted.add(new TypedScanRecord<>(r, pojo)); + } + out.put(bucket, converted); + } + return new TypedScanRecords<>(out); + } + + @Override + public void subscribeFromBeginning(int bucket) { + delegate.subscribeFromBeginning(bucket); + } + + @Override + public void subscribeFromBeginning(long partitionId, int bucket) { + delegate.subscribeFromBeginning(partitionId, bucket); + } + + @Override + public void subscribe(int bucket, long offset) { + delegate.subscribe(bucket, offset); + } + + @Override + public void subscribe(long partitionId, int bucket, long offset) { + delegate.subscribe(partitionId, bucket, offset); + } + + @Override + public void unsubscribe(long partitionId, int bucket) { + delegate.unsubscribe(partitionId, bucket); + } + + @Override + public void wakeup() { + delegate.wakeup(); + } + + @Override + public void close() { + try { + delegate.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/TypedScanRecords.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/TypedScanRecords.java new file mode 100644 index 0000000000..f69347802b --- /dev/null +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/TypedScanRecords.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.table.scanner.log; + +import org.apache.fluss.annotation.PublicEvolving; +import org.apache.fluss.client.table.scanner.TypedScanRecord; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.utils.AbstractIterator; + +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * A container that holds the list {@link TypedScanRecord} per bucket for a particular table. There + * is one {@link TypedScanRecord} list for every bucket returned by a {@link + * TypedLogScanner#poll(java.time.Duration)} operation. + * + * @param the type of the POJO + * @since 0.6 + */ +@PublicEvolving +public class TypedScanRecords implements Iterable> { + + public static TypedScanRecords empty() { + return new TypedScanRecords<>(Collections.emptyMap()); + } + + private final Map>> records; + + public TypedScanRecords(Map>> records) { + this.records = records; + } + + /** + * Get just the records for the given bucketId. + * + * @param scanBucket The bucket to get records for + */ + public List> records(TableBucket scanBucket) { + List> recs = records.get(scanBucket); + if (recs == null) { + return Collections.emptyList(); + } + return Collections.unmodifiableList(recs); + } + + /** + * Get the bucket ids which have records contained in this record set. + * + * @return the set of partitions with data in this record set (maybe empty if no data was + * returned) + */ + public Set buckets() { + return Collections.unmodifiableSet(records.keySet()); + } + + /** The number of records for all buckets. */ + public int count() { + int count = 0; + for (List> recs : records.values()) { + count += recs.size(); + } + return count; + } + + public boolean isEmpty() { + return records.isEmpty(); + } + + @Override + public Iterator> iterator() { + return new ConcatenatedIterable<>(records.values()).iterator(); + } + + private static class ConcatenatedIterable implements Iterable> { + + private final Iterable>> iterables; + + public ConcatenatedIterable(Iterable>> iterables) { + this.iterables = iterables; + } + + @Override + public Iterator> iterator() { + return new AbstractIterator>() { + final Iterator>> iters = iterables.iterator(); + Iterator> current; + + public TypedScanRecord makeNext() { + while (current == null || !current.hasNext()) { + if (iters.hasNext()) { + current = iters.next().iterator(); + } else { + return allDone(); + } + } + return current.next(); + } + }; + } + } +} diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/Append.java b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/Append.java index 6a5b10523d..06a0b694e6 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/Append.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/Append.java @@ -32,6 +32,9 @@ public interface Append { // TODO: Add more methods to configure the AppendWriter, such as apply static partitions, // apply overwrites, etc. - /** Create a new {@link AppendWriter} to write data to a Log Table. */ + /** Create a new {@link AppendWriter} to write data to a Log Table using InternalRow. */ AppendWriter createWriter(); + + /** Create a new typed {@link AppendWriter} to write POJOs directly. */ + TypedAppendWriter createTypedWriter(Class pojoClass); } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriter.java b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriter.java index 4edcc2471d..1b5cee055f 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriter.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriter.java @@ -31,10 +31,10 @@ public interface AppendWriter extends TableWriter { /** - * Append row into a Log Table. + * Append a record into a Log Table. * - * @param row the row to append. + * @param record the record to append. * @return A {@link CompletableFuture} that always returns append result when complete normally. */ - CompletableFuture append(InternalRow row); + CompletableFuture append(InternalRow record); } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableAppend.java b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableAppend.java index 50e416d563..84ccaf3195 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableAppend.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableAppend.java @@ -38,4 +38,9 @@ public TableAppend(TablePath tablePath, TableInfo tableInfo, WriterClient writer public AppendWriter createWriter() { return new AppendWriterImpl(tablePath, tableInfo, writerClient); } + + @Override + public TypedAppendWriter createTypedWriter(Class pojoClass) { + return new TypedAppendWriterImpl<>(createWriter(), pojoClass, tableInfo); + } } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableUpsert.java b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableUpsert.java index 7d90a9ccd3..fe865eac43 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableUpsert.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableUpsert.java @@ -95,4 +95,9 @@ public Upsert partialUpdate(String... targetColumnNames) { public UpsertWriter createWriter() { return new UpsertWriterImpl(tablePath, tableInfo, targetColumns, writerClient); } + + @Override + public TypedUpsertWriter createTypedWriter(Class pojoClass) { + return new TypedUpsertWriterImpl<>(createWriter(), pojoClass, tableInfo, targetColumns); + } } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TypedAppendWriter.java b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TypedAppendWriter.java new file mode 100644 index 0000000000..bff9039139 --- /dev/null +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TypedAppendWriter.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.table.writer; + +import org.apache.fluss.annotation.PublicEvolving; + +import java.util.concurrent.CompletableFuture; + +/** + * The typed writer to write data to the log table using POJOs. + * + * @param the type of the record + * @since 0.6 + */ +@PublicEvolving +public interface TypedAppendWriter extends TableWriter { + + /** + * Append a record into a Log Table. + * + * @param record the record to append. + * @return A {@link CompletableFuture} that always returns append result when complete normally. + */ + CompletableFuture append(T record); +} diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TypedAppendWriterImpl.java b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TypedAppendWriterImpl.java new file mode 100644 index 0000000000..3b964594bb --- /dev/null +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TypedAppendWriterImpl.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.table.writer; + +import org.apache.fluss.client.converter.PojoToRowConverter; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.types.RowType; + +import java.util.concurrent.CompletableFuture; + +/** + * A typed {@link AppendWriter} that converts POJOs to {@link InternalRow} and delegates to the + * existing internal-row based writer implementation. + */ +class TypedAppendWriterImpl implements TypedAppendWriter { + + private final AppendWriter delegate; + private final RowType tableSchema; + private final PojoToRowConverter pojoToRowConverter; + + TypedAppendWriterImpl(AppendWriter delegate, Class pojoClass, TableInfo tableInfo) { + this.delegate = delegate; + this.tableSchema = tableInfo.getRowType(); + this.pojoToRowConverter = PojoToRowConverter.of(pojoClass, tableSchema, tableSchema); + } + + @Override + public void flush() { + delegate.flush(); + } + + @Override + public CompletableFuture append(T record) { + if (record instanceof InternalRow) { + return delegate.append((InternalRow) record); + } + InternalRow row = pojoToRowConverter.toRow(record); + return delegate.append(row); + } +} diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TypedUpsertWriter.java b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TypedUpsertWriter.java new file mode 100644 index 0000000000..4d241f5e61 --- /dev/null +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TypedUpsertWriter.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.table.writer; + +import org.apache.fluss.annotation.PublicEvolving; + +import java.util.concurrent.CompletableFuture; + +/** + * The typed writer to write data to the primary key table using POJOs. + * + * @param the type of the record + * @since 0.6 + */ +@PublicEvolving +public interface TypedUpsertWriter extends TableWriter { + + /** + * Inserts a record into Fluss table if it does not already exist, or updates it if it does. + * + * @param record the record to upsert. + * @return A {@link CompletableFuture} that always returns upsert result when complete normally. + */ + CompletableFuture upsert(T record); + + /** + * Delete a certain record from the Fluss table. The input must contain the primary key fields. + * + * @param record the record to delete. + * @return A {@link CompletableFuture} that always delete result when complete normally. + */ + CompletableFuture delete(T record); +} diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TypedUpsertWriterImpl.java b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TypedUpsertWriterImpl.java new file mode 100644 index 0000000000..45af9f0ef6 --- /dev/null +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TypedUpsertWriterImpl.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.table.writer; + +import org.apache.fluss.client.converter.PojoToRowConverter; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.row.GenericRow; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.types.RowType; + +import javax.annotation.Nullable; + +import java.util.concurrent.CompletableFuture; + +/** + * A typed {@link UpsertWriter} that converts POJOs to {@link InternalRow} and delegates to the + * existing internal-row based writer implementation. + */ +class TypedUpsertWriterImpl implements TypedUpsertWriter { + + private final UpsertWriter delegate; + private final TableInfo tableInfo; + private final RowType tableSchema; + @Nullable private final int[] targetColumns; + + private final RowType pkProjection; + @Nullable private final RowType targetProjection; + + private final PojoToRowConverter pojoToRowConverter; + private final PojoToRowConverter pkConverter; + @Nullable private final PojoToRowConverter targetConverter; + + TypedUpsertWriterImpl( + UpsertWriter delegate, Class pojoClass, TableInfo tableInfo, int[] targetColumns) { + this.delegate = delegate; + this.tableInfo = tableInfo; + this.tableSchema = tableInfo.getRowType(); + this.targetColumns = targetColumns; + + // Precompute projections + this.pkProjection = this.tableSchema.project(tableInfo.getPhysicalPrimaryKeys()); + this.targetProjection = + (targetColumns == null) ? null : this.tableSchema.project(targetColumns); + + // Initialize reusable converters + this.pojoToRowConverter = PojoToRowConverter.of(pojoClass, tableSchema, tableSchema); + this.pkConverter = PojoToRowConverter.of(pojoClass, tableSchema, pkProjection); + this.targetConverter = + (targetProjection == null) + ? null + : PojoToRowConverter.of(pojoClass, tableSchema, targetProjection); + } + + @Override + public void flush() { + delegate.flush(); + } + + @Override + public CompletableFuture upsert(T record) { + if (record instanceof InternalRow) { + return delegate.upsert((InternalRow) record); + } + InternalRow row = convertPojo(record, /*forDelete=*/ false); + return delegate.upsert(row); + } + + @Override + public CompletableFuture delete(T record) { + if (record instanceof InternalRow) { + return delegate.delete((InternalRow) record); + } + InternalRow pkOnly = convertPojo(record, /*forDelete=*/ true); + return delegate.delete(pkOnly); + } + + private InternalRow convertPojo(T pojo, boolean forDelete) { + final RowType projection; + final PojoToRowConverter converter; + if (forDelete) { + projection = pkProjection; + converter = pkConverter; + } else if (targetProjection != null && targetConverter != null) { + projection = targetProjection; + converter = targetConverter; + } else { + projection = tableSchema; + converter = pojoToRowConverter; + } + + GenericRow projected = converter.toRow(pojo); + if (projection == tableSchema) { + return projected; + } + // expand projected row to full row if needed + GenericRow full = new GenericRow(tableSchema.getFieldCount()); + if (forDelete) { + // set PK fields, others null + for (String pk : tableInfo.getPhysicalPrimaryKeys()) { + int projIndex = projection.getFieldIndex(pk); + + // TODO: this can be optimized by pre-computing + // the index mapping in the constructor? + int fullIndex = tableSchema.getFieldIndex(pk); + full.setField(fullIndex, projected.getField(projIndex)); + } + } else if (targetColumns != null) { + for (int i = 0; i < projection.getFieldCount(); i++) { + String name = projection.getFieldNames().get(i); + int fullIdx = tableSchema.getFieldIndex(name); + full.setField(fullIdx, projected.getField(i)); + } + } + return full; + } +} diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/Upsert.java b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/Upsert.java index d3c3608ee8..0843437fee 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/Upsert.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/Upsert.java @@ -18,7 +18,6 @@ package org.apache.fluss.client.table.writer; import org.apache.fluss.annotation.PublicEvolving; -import org.apache.fluss.row.InternalRow; import javax.annotation.Nullable; @@ -37,16 +36,14 @@ public interface Upsert { /** * Apply partial update columns and returns a new Upsert instance. * - *

For {@link UpsertWriter#upsert(InternalRow)} operation, only the specified columns will be - * updated and other columns will remain unchanged if the row exists or set to null if the row - * doesn't exist. + *

For upsert operations, only the specified columns will be updated and other columns will + * remain unchanged if the row exists or set to null if the row doesn't exist. * - *

For {@link UpsertWriter#delete(InternalRow)} operation, the entire row will not be - * removed, but only the specified columns except primary key will be set to null. The entire - * row will be removed when all columns except primary key are null after a {@link - * UpsertWriter#delete(InternalRow)} operation. + *

For delete operations, the entire row will not be removed immediately, but only the + * specified columns except primary key will be set to null. The entire row will be removed when + * all columns except primary key are null after a delete operation. * - *

Note: The specified columns must be a contains all columns of primary key, and all columns + *

Note: The specified columns must contain all columns of primary key, and all columns * except primary key should be nullable. * * @param targetColumns the column indexes to partial update @@ -60,8 +57,11 @@ public interface Upsert { Upsert partialUpdate(String... targetColumnNames); /** - * Create a new {@link UpsertWriter} with the optional {@link #partialUpdate(String...)} - * information to upsert and delete data to a Primary Key Table. + * Create a new {@link UpsertWriter} using {@code InternalRow} with the optional {@link + * #partialUpdate(String...)} information to upsert and delete data to a Primary Key Table. */ UpsertWriter createWriter(); + + /** Create a new typed {@link UpsertWriter} to write POJOs directly. */ + TypedUpsertWriter createTypedWriter(Class pojoClass); } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriter.java b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriter.java index 6648eebfc0..e4d751747d 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriter.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriter.java @@ -31,19 +31,18 @@ public interface UpsertWriter extends TableWriter { /** - * Inserts row into Fluss table if they do not already exist, or updates them if they do exist. + * Inserts a record into Fluss table if it does not already exist, or updates it if it does. * - * @param row the row to upsert. + * @param record the record to upsert. * @return A {@link CompletableFuture} that always returns upsert result when complete normally. */ - CompletableFuture upsert(InternalRow row); + CompletableFuture upsert(InternalRow record); /** - * Delete certain row by the input row in Fluss table, the input row must contain the primary - * key. + * Delete a certain record from the Fluss table. The input must contain the primary key fields. * - * @param row the row to delete. + * @param record the record to delete. * @return A {@link CompletableFuture} that always delete result when complete normally. */ - CompletableFuture delete(InternalRow row); + CompletableFuture delete(InternalRow record); } diff --git a/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java b/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java index a62c1b70d3..8270ed7882 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java @@ -26,6 +26,7 @@ import org.apache.fluss.client.table.scanner.ScanRecord; import org.apache.fluss.client.table.scanner.log.LogScanner; import org.apache.fluss.client.table.scanner.log.ScanRecords; +import org.apache.fluss.client.table.scanner.log.TypedLogScanner; import org.apache.fluss.client.table.writer.UpsertWriter; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; @@ -144,6 +145,13 @@ protected static void subscribeFromBeginning(LogScanner logScanner, Table table) } } + protected static void subscribeFromBeginning(TypedLogScanner logScanner, Table table) { + int bucketCount = table.getTableInfo().getNumBuckets(); + for (int i = 0; i < bucketCount; i++) { + logScanner.subscribeFromBeginning(i); + } + } + protected static void subscribeFromTimestamp( TablePath tablePath, @Nullable String partitionName, diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTypedClientITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTypedClientITCase.java new file mode 100644 index 0000000000..c7724799be --- /dev/null +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTypedClientITCase.java @@ -0,0 +1,558 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.table; + +import org.apache.fluss.client.admin.ClientToServerITCaseBase; +import org.apache.fluss.client.converter.RowToPojoConverter; +import org.apache.fluss.client.lookup.LookupResult; +import org.apache.fluss.client.lookup.Lookuper; +import org.apache.fluss.client.lookup.TypedLookuper; +import org.apache.fluss.client.table.scanner.Scan; +import org.apache.fluss.client.table.scanner.TypedScanRecord; +import org.apache.fluss.client.table.scanner.log.TypedLogScanner; +import org.apache.fluss.client.table.scanner.log.TypedScanRecords; +import org.apache.fluss.client.table.writer.TypedAppendWriter; +import org.apache.fluss.client.table.writer.TypedUpsertWriter; +import org.apache.fluss.client.table.writer.Upsert; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.record.ChangeType; +import org.apache.fluss.row.GenericRow; +import org.apache.fluss.types.DataTypes; +import org.apache.fluss.types.RowType; + +import org.junit.jupiter.api.Test; + +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +import static org.assertj.core.api.Assertions.assertThat; + +/** End-to-end tests for writing and scanning POJOs via client API. */ +public class FlussTypedClientITCase extends ClientToServerITCaseBase { + + /** Test POJO containing all supported field types used by converters. */ + public static class AllTypesPojo { + // primary key + public Integer a; + // all supported converter fields + public Boolean bool1; + public Byte tiny; + public Short small; + public Integer intv; + public Long big; + public Float flt; + public Double dbl; + public Character ch; + public String str; + public byte[] bin; + public byte[] bytes; + public BigDecimal dec; + public LocalDate dt; + public LocalTime tm; + public LocalDateTime tsNtz; + public Instant tsLtz; + + public AllTypesPojo() {} + + public AllTypesPojo( + Integer a, + Boolean bool1, + Byte tiny, + Short small, + Integer intv, + Long big, + Float flt, + Double dbl, + Character ch, + String str, + byte[] bin, + byte[] bytes, + BigDecimal dec, + LocalDate dt, + LocalTime tm, + LocalDateTime tsNtz, + Instant tsLtz) { + this.a = a; + this.bool1 = bool1; + this.tiny = tiny; + this.small = small; + this.intv = intv; + this.big = big; + this.flt = flt; + this.dbl = dbl; + this.ch = ch; + this.str = str; + this.bin = bin; + this.bytes = bytes; + this.dec = dec; + this.dt = dt; + this.tm = tm; + this.tsNtz = tsNtz; + this.tsLtz = tsLtz; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + AllTypesPojo that = (AllTypesPojo) o; + return Objects.equals(a, that.a) + && Objects.equals(bool1, that.bool1) + && Objects.equals(tiny, that.tiny) + && Objects.equals(small, that.small) + && Objects.equals(intv, that.intv) + && Objects.equals(big, that.big) + && Objects.equals(flt, that.flt) + && Objects.equals(dbl, that.dbl) + && Objects.equals(ch, that.ch) + && Objects.equals(str, that.str) + && Arrays.equals(bin, that.bin) + && Arrays.equals(bytes, that.bytes) + && Objects.equals(dec, that.dec) + && Objects.equals(dt, that.dt) + && Objects.equals(tm, that.tm) + && Objects.equals(tsNtz, that.tsNtz) + && Objects.equals(tsLtz, that.tsLtz); + } + + @Override + public int hashCode() { + int result = + Objects.hash( + a, bool1, tiny, small, intv, big, flt, dbl, ch, str, dec, dt, tm, tsNtz, + tsLtz); + result = 31 * result + Arrays.hashCode(bin); + result = 31 * result + Arrays.hashCode(bytes); + return result; + } + + @Override + public String toString() { + return "AllTypesPojo{" + + "a=" + + a + + ", bool1=" + + bool1 + + ", tiny=" + + tiny + + ", small=" + + small + + ", intv=" + + intv + + ", big=" + + big + + ", flt=" + + flt + + ", dbl=" + + dbl + + ", ch=" + + ch + + ", str='" + + str + + '\'' + + ", bin=" + + Arrays.toString(bin) + + ", bytes=" + + Arrays.toString(bytes) + + ", dec=" + + dec + + ", dt=" + + dt + + ", tm=" + + tm + + ", tsNtz=" + + tsNtz + + ", tsLtz=" + + tsLtz + + '}'; + } + } + + /** Minimal POJO representing the primary key for {@link AllTypesPojo}. */ + public static class PLookupKey { + public Integer a; + + public PLookupKey() {} + + public PLookupKey(Integer a) { + this.a = a; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PLookupKey that = (PLookupKey) o; + return Objects.equals(a, that.a); + } + + @Override + public int hashCode() { + return Objects.hash(a); + } + + @Override + public String toString() { + return "PLookupKey{" + "a=" + a + '}'; + } + } + + private static Schema allTypesLogSchema() { + return Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("bool1", DataTypes.BOOLEAN()) + .column("tiny", DataTypes.TINYINT()) + .column("small", DataTypes.SMALLINT()) + .column("intv", DataTypes.INT()) + .column("big", DataTypes.BIGINT()) + .column("flt", DataTypes.FLOAT()) + .column("dbl", DataTypes.DOUBLE()) + .column("ch", DataTypes.CHAR(1)) + .column("str", DataTypes.STRING()) + .column("bin", DataTypes.BINARY(3)) + .column("bytes", DataTypes.BYTES()) + .column("dec", DataTypes.DECIMAL(10, 2)) + .column("dt", DataTypes.DATE()) + .column("tm", DataTypes.TIME()) + .column("tsNtz", DataTypes.TIMESTAMP(3)) + .column("tsLtz", DataTypes.TIMESTAMP_LTZ(3)) + .build(); + } + + private static Schema allTypesPkSchema() { + // Same columns as log schema but with PK on 'a' + return Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("bool1", DataTypes.BOOLEAN()) + .column("tiny", DataTypes.TINYINT()) + .column("small", DataTypes.SMALLINT()) + .column("intv", DataTypes.INT()) + .column("big", DataTypes.BIGINT()) + .column("flt", DataTypes.FLOAT()) + .column("dbl", DataTypes.DOUBLE()) + .column("ch", DataTypes.CHAR(1)) + .column("str", DataTypes.STRING()) + .column("bin", DataTypes.BINARY(3)) + .column("bytes", DataTypes.BYTES()) + .column("dec", DataTypes.DECIMAL(10, 2)) + .column("dt", DataTypes.DATE()) + .column("tm", DataTypes.TIME()) + .column("tsNtz", DataTypes.TIMESTAMP(3)) + .column("tsLtz", DataTypes.TIMESTAMP_LTZ(3)) + .primaryKey("a") + .build(); + } + + private static AllTypesPojo newAllTypesPojo(int i) { + Integer a = i; + Boolean bool1 = (i % 2) == 0; + Byte tiny = (byte) (i - 5); + Short small = (short) (100 + i); + Integer intv = 1000 + i; + Long big = 100000L + i; + Float flt = 1.5f + i; + Double dbl = 2.5 + i; + Character ch = (char) ('a' + (i % 26)); + String str = "s" + i; + byte[] bin = new byte[] {(byte) i, (byte) (i + 1), (byte) (i + 2)}; + byte[] bytes = new byte[] {(byte) (10 + i), (byte) (20 + i)}; + BigDecimal dec = new BigDecimal("12345." + (10 + i)).setScale(2, RoundingMode.HALF_UP); + LocalDate dt = LocalDate.of(2024, 1, 1).plusDays(i); + LocalTime tm = LocalTime.of(12, (i * 7) % 60, (i * 11) % 60); + LocalDateTime tsNtz = LocalDateTime.of(2024, 1, 1, 0, 0).plusSeconds(i).withNano(0); + Instant tsLtz = Instant.ofEpochMilli(1700000000000L + (i * 1000L)); + return new AllTypesPojo( + a, bool1, tiny, small, intv, big, flt, dbl, ch, str, bin, bytes, dec, dt, tm, tsNtz, + tsLtz); + } + + @Test + void testTypedAppendWriteAndScan() throws Exception { + // Build all-types log table schema + Schema schema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("bool1", DataTypes.BOOLEAN()) + .column("tiny", DataTypes.TINYINT()) + .column("small", DataTypes.SMALLINT()) + .column("intv", DataTypes.INT()) + .column("big", DataTypes.BIGINT()) + .column("flt", DataTypes.FLOAT()) + .column("dbl", DataTypes.DOUBLE()) + .column("ch", DataTypes.CHAR(1)) + .column("str", DataTypes.STRING()) + .column("bin", DataTypes.BINARY(3)) + .column("bytes", DataTypes.BYTES()) + .column("dec", DataTypes.DECIMAL(10, 2)) + .column("dt", DataTypes.DATE()) + .column("tm", DataTypes.TIME()) + .column("tsNtz", DataTypes.TIMESTAMP(3)) + .column("tsLtz", DataTypes.TIMESTAMP_LTZ(3)) + .build(); + TablePath path = TablePath.of("pojo_db", "all_types_log"); + TableDescriptor td = TableDescriptor.builder().schema(schema).distributedBy(2).build(); + createTable(path, td, true); + + try (Table table = conn.getTable(path)) { + // write + TypedAppendWriter writer = + table.newAppend().createTypedWriter(AllTypesPojo.class); + List expected = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + AllTypesPojo u = newAllTypesPojo(i); + expected.add(u); + writer.append(u); + } + writer.flush(); + + // read + Scan scan = table.newScan(); + TypedLogScanner scanner = scan.createTypedLogScanner(AllTypesPojo.class); + subscribeFromBeginning(scanner, table); + + List actual = new ArrayList<>(); + while (actual.size() < expected.size()) { + TypedScanRecords recs = scanner.poll(Duration.ofSeconds(2)); + for (TypedScanRecord r : recs) { + assertThat(r.getChangeType()).isEqualTo(ChangeType.APPEND_ONLY); + actual.add(r.getValue()); + } + } + assertThat(actual) + .usingRecursiveFieldByFieldElementComparator() + .containsExactlyInAnyOrderElementsOf(expected); + } + } + + @Test + void testTypedUpsertWriteAndScan() throws Exception { + // Build all-types PK table schema (PK on 'a') + Schema schema = allTypesPkSchema(); + TablePath path = TablePath.of("pojo_db", "all_types_pk"); + TableDescriptor td = TableDescriptor.builder().schema(schema).distributedBy(2, "a").build(); + createTable(path, td, true); + + try (Table table = conn.getTable(path)) { + Upsert upsert = table.newUpsert(); + TypedUpsertWriter writer = upsert.createTypedWriter(AllTypesPojo.class); + + AllTypesPojo p1 = newAllTypesPojo(1); + AllTypesPojo p2 = newAllTypesPojo(2); + writer.upsert(p1).get(); + writer.upsert(p2).get(); + + // update key 1: change a couple of fields + AllTypesPojo p1Updated = newAllTypesPojo(1); + p1Updated.str = "a1"; + p1Updated.dec = new java.math.BigDecimal("42.42"); + writer.upsert(p1Updated).get(); + writer.flush(); + + // scan as POJOs and verify change types and values + TypedLogScanner scanner = + table.newScan().createTypedLogScanner(AllTypesPojo.class); + subscribeFromBeginning(scanner, table); + + List changes = new ArrayList<>(); + List values = new ArrayList<>(); + while (values.size() < 4) { // INSERT 1, INSERT 2, UPDATE_BEFORE 1, UPDATE_AFTER 1 + TypedScanRecords recs = scanner.poll(Duration.ofSeconds(2)); + for (TypedScanRecord r : recs) { + changes.add(r.getChangeType()); + values.add(r.getValue()); + } + } + + assertThat(changes) + .containsExactlyInAnyOrder( + ChangeType.INSERT, + ChangeType.INSERT, + ChangeType.UPDATE_BEFORE, + ChangeType.UPDATE_AFTER); + // ensure the last update_after reflects new value + int lastIdx = changes.lastIndexOf(ChangeType.UPDATE_AFTER); + assertThat(values.get(lastIdx)).isEqualTo(p1Updated); + } + } + + @Test + void testTypedLookups() throws Exception { + Schema schema = allTypesPkSchema(); + TablePath path = TablePath.of("pojo_db", "lookup_pk"); + TableDescriptor td = TableDescriptor.builder().schema(schema).distributedBy(2, "a").build(); + createTable(path, td, true); + + try (Table table = conn.getTable(path)) { + TypedUpsertWriter writer = + table.newUpsert().createTypedWriter(AllTypesPojo.class); + writer.upsert(newAllTypesPojo(1)).get(); + writer.upsert(newAllTypesPojo(2)).get(); + writer.flush(); + + // primary key lookup using Lookuper API with POJO key + TypedLookuper lookuper = + table.newLookup().createTypedLookuper(PLookupKey.class); + RowType tableSchema = table.getTableInfo().getRowType(); + RowToPojoConverter rowConv = + RowToPojoConverter.of(AllTypesPojo.class, tableSchema, tableSchema); + + LookupResult lr = lookuper.lookup(new PLookupKey(1)).get(); + AllTypesPojo one = rowConv.fromRow(lr.getSingletonRow()); + assertThat(one).isEqualTo(newAllTypesPojo(1)); + } + } + + @Test + void testInternalRowLookup() throws Exception { + Schema schema = allTypesPkSchema(); + TablePath path = TablePath.of("pojo_db", "lookup_internalrow"); + TableDescriptor td = TableDescriptor.builder().schema(schema).distributedBy(2, "a").build(); + createTable(path, td, true); + + try (Table table = conn.getTable(path)) { + // write a couple of rows via POJO writer + TypedUpsertWriter writer = + table.newUpsert().createTypedWriter(AllTypesPojo.class); + writer.upsert(newAllTypesPojo(101)).get(); + writer.upsert(newAllTypesPojo(202)).get(); + writer.flush(); + + // now perform lookup using the raw InternalRow path to ensure it's still supported + Lookuper lookuper = table.newLookup().createLookuper(); + RowType tableSchema = table.getTableInfo().getRowType(); + RowType keyProjection = tableSchema.project(table.getTableInfo().getPrimaryKeys()); + + // Build the key row directly using GenericRow to avoid any POJO conversion + GenericRow keyRow = new GenericRow(keyProjection.getFieldCount()); + keyRow.setField(0, 101); // primary key field 'a' + + LookupResult lr = lookuper.lookup(keyRow).get(); + RowToPojoConverter rowConv = + RowToPojoConverter.of(AllTypesPojo.class, tableSchema, tableSchema); + AllTypesPojo pojo = rowConv.fromRow(lr.getSingletonRow()); + assertThat(pojo).isEqualTo(newAllTypesPojo(101)); + } + } + + @Test + void testTypedProjections() throws Exception { + TablePath path = TablePath.of("pojo_db", "proj_log"); + TableDescriptor td = + TableDescriptor.builder().schema(allTypesLogSchema()).distributedBy(1).build(); + createTable(path, td, true); + + try (Table table = conn.getTable(path)) { + TypedAppendWriter writer = + table.newAppend().createTypedWriter(AllTypesPojo.class); + writer.append(newAllTypesPojo(10)).get(); + writer.append(newAllTypesPojo(11)).get(); + writer.flush(); + + // Project only a subset of fields + TypedLogScanner scanner = + table.newScan() + .project(Arrays.asList("a", "str")) + .createTypedLogScanner(AllTypesPojo.class); + subscribeFromBeginning(scanner, table); + TypedScanRecords recs = scanner.poll(Duration.ofSeconds(2)); + int i = 10; + for (TypedScanRecord r : recs) { + AllTypesPojo u = r.getValue(); + AllTypesPojo expectedPojo = new AllTypesPojo(); + expectedPojo.a = i; + expectedPojo.str = "s" + i; + assertThat(u).isEqualTo(expectedPojo); + i++; + } + } + } + + @Test + void testTypedPartialUpdates() throws Exception { + // Use full PK schema and update a subset of fields + Schema schema = allTypesPkSchema(); + TablePath path = TablePath.of("pojo_db", "pk_partial"); + TableDescriptor td = TableDescriptor.builder().schema(schema).distributedBy(1, "a").build(); + createTable(path, td, true); + + try (Table table = conn.getTable(path)) { + // 1. initial full row + TypedUpsertWriter fullWriter = + table.newUpsert().createTypedWriter(AllTypesPojo.class); + fullWriter.upsert(newAllTypesPojo(1)).get(); + fullWriter.flush(); + + // 2. partial update: only PK + subset fields + Upsert upsert = table.newUpsert().partialUpdate("a", "str", "dec"); + TypedUpsertWriter writer = upsert.createTypedWriter(AllTypesPojo.class); + + AllTypesPojo patch = new AllTypesPojo(); + patch.a = 1; + patch.str = "second"; + patch.dec = new BigDecimal("99.99"); + writer.upsert(patch).get(); + writer.flush(); + + // verify via lookup and scan using Lookuper + POJO key + TypedLookuper lookuper = + table.newLookup().createTypedLookuper(PLookupKey.class); + RowType tableSchema = table.getTableInfo().getRowType(); + RowToPojoConverter rowConv = + RowToPojoConverter.of(AllTypesPojo.class, tableSchema, tableSchema); + AllTypesPojo lookedUp = + rowConv.fromRow(lookuper.lookup(new PLookupKey(1)).get().getSingletonRow()); + AllTypesPojo expected = newAllTypesPojo(1); + expected.str = "second"; + expected.dec = new BigDecimal("99.99"); + assertThat(lookedUp).isEqualTo(expected); + + TypedLogScanner scanner = + table.newScan().createTypedLogScanner(AllTypesPojo.class); + subscribeFromBeginning(scanner, table); + boolean sawUpdateAfter = false; + while (!sawUpdateAfter) { + TypedScanRecords recs = scanner.poll(Duration.ofSeconds(2)); + for (TypedScanRecord r : recs) { + if (r.getChangeType() == ChangeType.UPDATE_AFTER) { + assertThat(r.getValue()).isEqualTo(expected); + sawUpdateAfter = true; + } + } + } + } + } +} diff --git a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java index 24f264c926..efebb32782 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java @@ -562,6 +562,8 @@ private Cluster updateCluster(List bucketLocations) { Map tableIdByPath = new HashMap<>(); tableIdByPath.put(DATA1_TABLE_PATH, DATA1_TABLE_ID); + Map tableInfoByPath = new HashMap<>(); + tableInfoByPath.put(DATA1_TABLE_PATH, DATA1_TABLE_INFO); return new Cluster( aliveTabletServersById, new ServerNode(0, "localhost", 89, ServerType.COORDINATOR),