Skip to content

Commit 0a6233f

Browse files
committed
Merge main branch into fix-2128-datalake-add-column-mvp
- Merged latest main changes including Array type support - Resolved test conflicts in FlussRecordAsPaimonRowTest.java - Retained both Array tests and schema evolution tests
2 parents 6c22ad2 + f577b4f commit 0a6233f

File tree

383 files changed

+19582
-3750
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

383 files changed

+19582
-3750
lines changed

.github/workflows/ci-template.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ jobs:
3636
strategy:
3737
fail-fast: false
3838
matrix:
39-
module: [ core, flink, lake ]
39+
module: [ core, flink, spark3, lake ]
4040
name: "${{ matrix.module }}"
4141
steps:
4242
- name: Checkout code

.github/workflows/stage.sh

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,30 @@
1919

2020
STAGE_CORE="core"
2121
STAGE_FLINK="flink"
22+
STAGE_SPARK="spark3"
2223
STAGE_LAKE="lake"
2324

2425
MODULES_FLINK="\
2526
fluss-flink,\
2627
fluss-flink/fluss-flink-common,\
27-
fluss-flink/fluss-flink-2.1,\
28+
fluss-flink/fluss-flink-2.2,\
2829
fluss-flink/fluss-flink-1.20,\
2930
"
3031

32+
MODULES_COMMON_SPARK="\
33+
fluss-spark,\
34+
fluss-spark/fluss-spark-common,\
35+
fluss-spark/fluss-spark-ut,\
36+
"
37+
38+
MODULES_SPARK3="\
39+
fluss-spark,\
40+
fluss-spark/fluss-spark-common,\
41+
fluss-spark/fluss-spark-ut,\
42+
fluss-spark/fluss-spark-3.5,\
43+
fluss-spark/fluss-spark-3.4,\
44+
"
45+
3146
# we move Flink legacy version tests to "lake" module for balancing testing time
3247
MODULES_LAKE="\
3348
fluss-flink/fluss-flink-1.19,\
@@ -42,10 +57,12 @@ function get_test_modules_for_stage() {
4257
local stage=$1
4358

4459
local modules_flink=$MODULES_FLINK
60+
local modules_spark3=$MODULES_SPARK3
4561
local modules_lake=$MODULES_LAKE
4662
local negated_flink=\!${MODULES_FLINK//,/,\!}
63+
local negated_spark=\!${MODULES_COMMON_SPARK//,/,\!}
4764
local negated_lake=\!${MODULES_LAKE//,/,\!}
48-
local modules_core="$negated_flink,$negated_lake"
65+
local modules_core="$negated_flink,$negated_spark,$negated_lake"
4966

5067
case ${stage} in
5168
(${STAGE_CORE})
@@ -54,6 +71,9 @@ function get_test_modules_for_stage() {
5471
(${STAGE_FLINK})
5572
echo "-pl fluss-test-coverage,$modules_flink"
5673
;;
74+
(${STAGE_SPARK})
75+
echo "-Pspark3 -pl fluss-test-coverage,$modules_spark3"
76+
;;
5777
(${STAGE_LAKE})
5878
echo "-pl fluss-test-coverage,$modules_lake"
5979
;;

.scalafmt.conf

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
runner.dialect = scala212
2+
3+
# Version is required to make sure IntelliJ picks the right version
4+
version = 3.10.2
5+
preset = default
6+
7+
# Max column
8+
maxColumn = 100
9+
10+
# This parameter simply says the .stripMargin method was not redefined by the user to assign
11+
# special meaning to indentation preceding the | character. Hence, that indentation can be modified.
12+
assumeStandardLibraryStripMargin = true
13+
align.stripMargin = true
14+
15+
# Align settings
16+
align.preset = none
17+
align.closeParenSite = false
18+
align.openParenCallSite = false
19+
danglingParentheses.defnSite = false
20+
danglingParentheses.callSite = false
21+
danglingParentheses.ctrlSite = true
22+
danglingParentheses.tupleSite = false
23+
align.openParenCallSite = false
24+
align.openParenDefnSite = false
25+
align.openParenTupleSite = false
26+
27+
# Newlines
28+
newlines.alwaysBeforeElseAfterCurlyIf = false
29+
newlines.beforeCurlyLambdaParams = multiline # Newline before lambda params
30+
newlines.afterCurlyLambdaParams = squash # No newline after lambda params
31+
newlines.inInterpolation = "avoid"
32+
newlines.avoidInResultType = true
33+
optIn.annotationNewlines = true
34+
35+
# Scaladoc
36+
docstrings.style = Asterisk # Javadoc style
37+
docstrings.removeEmpty = true
38+
docstrings.oneline = fold
39+
docstrings.forceBlankLineBefore = true
40+
41+
# Indentation
42+
indent.extendSite = 2 # This makes sure extend is not indented as the ctor parameters
43+
44+
# Rewrites
45+
rewrite.rules = [AvoidInfix, Imports, RedundantBraces, SortModifiers]
46+
47+
# Imports
48+
rewrite.imports.sort = scalastyle
49+
rewrite.imports.groups = [
50+
["org.apache.fluss\\..*"],
51+
["org.apache.fluss.shade\\..*"],
52+
[".*"],
53+
["javax\\..*"],
54+
["java\\..*"],
55+
["scala\\..*"]
56+
]
57+
rewrite.imports.contiguousGroups = no
58+
importSelectors = singleline # Imports in a single line, like IntelliJ
59+
60+
# Remove redundant braces in string interpolation.
61+
rewrite.redundantBraces.stringInterpolation = true
62+
rewrite.redundantBraces.defnBodies = false
63+
rewrite.redundantBraces.generalExpressions = false
64+
rewrite.redundantBraces.ifElseExpressions = false
65+
rewrite.redundantBraces.methodBodies = false
66+
rewrite.redundantBraces.includeUnitMethods = false
67+
rewrite.redundantBraces.maxBreaks = 1
68+
69+
# Remove trailing commas
70+
rewrite.trailingCommas.style = "never"

fluss-flink/fluss-flink-1.18/src/main/java/org/apache/flink/api/connector/sink2/WriterInitContext.java renamed to copyright.txt

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,3 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.flink.api.connector.sink2;
19-
20-
/** Placeholder class to resolve compatibility issues. */
21-
public interface WriterInitContext extends Sink.InitContext {}

fluss-client/src/main/java/org/apache/fluss/client/converter/ConverterCommons.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,24 @@ static void validatePojoMatchesTable(PojoType<?> pojoType, RowType tableSchema)
8383
}
8484
}
8585

86+
static void validatePojoMatchesProjection(PojoType<?> pojoType, RowType projection) {
87+
Set<String> pojoNames = pojoType.getProperties().keySet();
88+
List<String> fieldNames = projection.getFieldNames();
89+
if (!pojoNames.containsAll(fieldNames)) {
90+
throw new IllegalArgumentException(
91+
String.format(
92+
"POJO fields %s must contain all projection fields %s. "
93+
+ "For full-table writes, POJO fields must exactly match table schema fields.",
94+
pojoNames, fieldNames));
95+
}
96+
for (int i = 0; i < projection.getFieldCount(); i++) {
97+
String name = fieldNames.get(i);
98+
DataType dt = projection.getTypeAt(i);
99+
PojoType.Property prop = pojoType.getProperty(name);
100+
validateCompatibility(dt, prop);
101+
}
102+
}
103+
86104
static void validateProjectionSubset(RowType projection, RowType tableSchema) {
87105
Set<String> tableNames = new HashSet<>(tableSchema.getFieldNames());
88106
for (String n : projection.getFieldNames()) {

fluss-client/src/main/java/org/apache/fluss/client/converter/PojoToRowConverter.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import javax.annotation.Nullable;
3131

3232
import java.math.BigDecimal;
33+
import java.math.RoundingMode;
3334
import java.time.Instant;
3435
import java.time.LocalDate;
3536
import java.time.LocalDateTime;
@@ -54,7 +55,9 @@ private PojoToRowConverter(PojoType<T> pojoType, RowType tableSchema, RowType pr
5455
this.tableSchema = tableSchema;
5556
this.projection = projection;
5657
this.projectionFieldNames = projection.getFieldNames();
57-
ConverterCommons.validatePojoMatchesTable(pojoType, tableSchema);
58+
// For writer path, allow POJO to be a superset of the projection. It must contain all
59+
// projected fields.
60+
ConverterCommons.validatePojoMatchesProjection(pojoType, projection);
5861
ConverterCommons.validateProjectionSubset(projection, tableSchema);
5962
this.fieldConverters = createFieldConverters();
6063
}
@@ -177,8 +180,22 @@ private static FieldToRow createFieldConverter(PojoType.Property prop, DataType
177180
String.format(
178181
"Field %s is not a BigDecimal. Cannot convert to Decimal.", prop.name));
179182
}
180-
return Decimal.fromBigDecimal(
181-
(BigDecimal) v, decimalType.getPrecision(), decimalType.getScale());
183+
final int precision = decimalType.getPrecision();
184+
final int scale = decimalType.getScale();
185+
186+
// Scale with a deterministic rounding mode to avoid ArithmeticException when rounding is
187+
// needed.
188+
BigDecimal bd = (BigDecimal) v;
189+
BigDecimal scaled = bd.setScale(scale, RoundingMode.HALF_UP);
190+
191+
if (scaled.precision() > precision) {
192+
throw new IllegalArgumentException(
193+
String.format(
194+
"Decimal value for field %s exceeds precision %d after scaling to %d: %s",
195+
prop.name, precision, scale, scaled));
196+
}
197+
198+
return Decimal.fromBigDecimal(scaled, precision, scale);
182199
}
183200

184201
/** Converts a LocalDate POJO property to number of days since epoch. */

fluss-client/src/main/java/org/apache/fluss/client/converter/PojoType.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ final class PojoType<T> {
3737
private final Class<T> pojoClass;
3838
private final Constructor<T> defaultConstructor;
3939
private final Map<String, Property> properties; // property name -> property
40+
private static final Map<Class<?>, Class<?>> PRIMITIVE_TO_BOXED = createPrimitiveToBoxedMap();
4041

4142
private PojoType(Class<T> pojoClass, Constructor<T> ctor, Map<String, Property> props) {
4243
this.pojoClass = pojoClass;
@@ -73,12 +74,15 @@ static <T> PojoType<T> of(Class<T> pojoClass) {
7374
for (Map.Entry<String, Field> e : allFields.entrySet()) {
7475
String name = e.getKey();
7576
Field field = e.getValue();
77+
// Enforce nullable fields: primitives are not allowed in POJO definitions.
7678
if (field.getType().isPrimitive()) {
7779
throw new IllegalArgumentException(
7880
String.format(
7981
"POJO class %s has primitive field '%s' of type %s. Primitive types are not allowed; all fields must be nullable (use wrapper types).",
8082
pojoClass.getName(), name, field.getType().getName()));
8183
}
84+
// use boxed type as effective type
85+
Class<?> effectiveType = boxIfPrimitive(field.getType());
8286
boolean publicField = Modifier.isPublic(field.getModifiers());
8387
Method getter = getters.get(name);
8488
Method setter = setters.get(name);
@@ -94,8 +98,7 @@ static <T> PojoType<T> of(Class<T> pojoClass) {
9498
}
9599
props.put(
96100
name,
97-
new Property(
98-
name, field.getType(), publicField ? field : null, getter, setter));
101+
new Property(name, effectiveType, publicField ? field : null, getter, setter));
99102
}
100103

101104
return new PojoType<>(pojoClass, ctor, props);
@@ -235,6 +238,29 @@ private static String decapitalize(String s) {
235238
return s.substring(0, 1).toLowerCase(Locale.ROOT) + s.substring(1);
236239
}
237240

241+
private static Map<Class<?>, Class<?>> createPrimitiveToBoxedMap() {
242+
Map<Class<?>, Class<?>> map = new HashMap<>();
243+
map.put(boolean.class, Boolean.class);
244+
map.put(byte.class, Byte.class);
245+
map.put(short.class, Short.class);
246+
map.put(int.class, Integer.class);
247+
map.put(long.class, Long.class);
248+
map.put(float.class, Float.class);
249+
map.put(double.class, Double.class);
250+
map.put(char.class, Character.class);
251+
// void shouldn't appear as a field type, but handle defensively
252+
map.put(void.class, Void.class);
253+
return map;
254+
}
255+
256+
private static Class<?> boxIfPrimitive(Class<?> type) {
257+
if (!type.isPrimitive()) {
258+
return type;
259+
}
260+
Class<?> boxed = PRIMITIVE_TO_BOXED.get(type);
261+
return boxed != null ? boxed : type;
262+
}
263+
238264
static final class Property {
239265
final String name;
240266
final Class<?> type;

fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookup.java

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,31 +24,37 @@
2424

2525
/**
2626
* Used to configure and create a {@link Lookuper} to lookup rows of a primary key table. The built
27-
* Lookuper can be a primary key lookuper that lookups by the primary key, or a prefix key lookup
28-
* that lookups by the prefix key of the primary key.
27+
* lookuper can lookup by the full primary key, or by a prefix of the primary key when configured
28+
* via {@link #lookupBy}.
2929
*
3030
* <p>{@link Lookup} objects are immutable and can be shared between threads. Refinement methods,
31-
* like {@link #lookupBy}, create new Lookup instances.
31+
* like {@link #lookupBy}, create new {@code Lookup} instances.
3232
*
33-
* <p>Example1: Create a Primary Key Lookuper. Given a table with primary key column [k STRING].
33+
* <p>Examples
34+
*
35+
* <p>Example 1: Primary Key Lookuper using an InternalRow key. Given a table with primary key
36+
* column [k STRING]:
3437
*
3538
* <pre>{@code
3639
* Lookuper lookuper = table.newLookup().createLookuper();
3740
* CompletableFuture<LookupResult> resultFuture = lookuper.lookup(GenericRow.of("key1"));
38-
* resultFuture.get().getRows().forEach(row -> {
39-
* System.out.println(row);
40-
* });
41+
* resultFuture.get().getRowList().forEach(System.out::println);
4142
* }</pre>
4243
*
43-
* <p>Example2: Create a Prefix Key Lookuper. Given a table with primary key column [a INT, b
44-
* STRING, c BIGINT] and bucket key [a, b].
44+
* <p>Example 2: Prefix Key Lookuper using an InternalRow key. Given a table with primary key
45+
* columns [a INT, b STRING, c BIGINT] and bucket key [a, b]:
4546
*
4647
* <pre>{@code
4748
* Lookuper lookuper = table.newLookup().lookupBy("a", "b").createLookuper();
4849
* CompletableFuture<LookupResult> resultFuture = lookuper.lookup(GenericRow.of(1, "b1"));
49-
* resultFuture.get().getRows().forEach(row -> {
50-
* System.out.println(row);
51-
* });
50+
* resultFuture.get().getRowList().forEach(System.out::println);
51+
* }</pre>
52+
*
53+
* <p>Example 3: Using a POJO key (conversion handled internally):
54+
*
55+
* <pre>{@code
56+
* TypedLookuper<MyKeyPojo> lookuper = table.newLookup().createTypedLookuper(MyKeyPojo.class);
57+
* LookupResult result = lookuper.lookup(new MyKeyPojo(...)).get();
5258
* }</pre>
5359
*
5460
* @since 0.6
@@ -96,4 +102,13 @@ default Lookup lookupBy(String... lookupColumnNames) {
96102
* @return the lookuper
97103
*/
98104
Lookuper createLookuper();
105+
106+
/**
107+
* Creates a {@link TypedLookuper} instance to lookup rows of a primary key table using POJOs.
108+
*
109+
* @param pojoClass the class of the POJO
110+
* @param <T> the type of the POJO
111+
* @return the typed lookuper
112+
*/
113+
<T> TypedLookuper<T> createTypedLookuper(Class<T> pojoClass);
99114
}

fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupResult.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@
2727
import java.util.Objects;
2828

2929
/**
30-
* The result of {@link Lookuper#lookup(InternalRow)}.
30+
* The result of a lookup operation performed by a {@link Lookuper}. It carries zero, one, or many
31+
* {@link org.apache.fluss.row.InternalRow} values depending on whether the underlying lookup is a
32+
* primary-key lookup (at most one) or a prefix-key lookup (zero or more).
3133
*
3234
* @since 0.1
3335
*/

fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookuper.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,16 @@
2525
import java.util.concurrent.CompletableFuture;
2626

2727
/**
28-
* The lookup-er is used to lookup row of a primary key table by primary key or prefix key. The
29-
* lookuper has retriable ability to handle transient errors during lookup operations which is
30-
* configured by {@link org.apache.fluss.config.ConfigOptions#CLIENT_LOOKUP_MAX_RETRIES}.
28+
* A lookuper performs key-based lookups against a primary key table, using either the full primary
29+
* key or a prefix of the primary key (when configured via {@code Lookup#lookupBy}).
3130
*
32-
* <p>Note: Lookuper instances are not thread-safe.
31+
* <p>Usage examples:
32+
*
33+
* <pre>{@code
34+
* // Row-based key (InternalRow)
35+
* Lookuper lookuper = table.newLookup().createLookuper();
36+
* LookupResult res = lookuper.lookup(keyRow).get();
37+
* }</pre>
3338
*
3439
* @since 0.6
3540
*/
@@ -44,7 +49,7 @@ public interface Lookuper {
4449
* {@code table.newLookup().createLookuper()}), or be the prefix key if the lookuper is a Prefix
4550
* Key Lookuper (created by {@code table.newLookup().lookupBy(prefixKeys).createLookuper()}).
4651
*
47-
* @param lookupKey the lookup key.
52+
* @param lookupKey the lookup key
4853
* @return the result of lookup.
4954
*/
5055
CompletableFuture<LookupResult> lookup(InternalRow lookupKey);

0 commit comments

Comments
 (0)