Skip to content

Commit 38344b2

Browse files
Merge branch 'main' into numeric-type-widening-converters
2 parents 5c615c4 + e41f8b4 commit 38344b2

File tree

104 files changed

+3874
-732
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

104 files changed

+3874
-732
lines changed

.github/ISSUE_TEMPLATE/bug.yml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,8 @@ body:
3939
description: What Fluss version are you using?
4040
multiple: false
4141
options:
42-
- "0.7.0 (latest release)"
43-
- "0.6.0"
44-
- "0.5.0"
42+
- "0.8.0 (latest release)"
43+
- "0.7.0"
4544
- "main (development)"
4645
validations:
4746
required: true

LICENSE

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,3 +401,7 @@ LightProto
401401
./fluss-protogen/fluss-protogen-generator/src/main/java/org/apache/fluss/protogen/generator/generator/ProtobufMessage.java
402402
./fluss-protogen/fluss-protogen-generator/src/main/java/org/apache/fluss/protogen/generator/generator/ProtobufNumberField.java
403403
./fluss-protogen/fluss-protogen-maven-plugin/src/main/java/org/apache/fluss/protogen/maven/plugin/ProtoGenMojo.java
404+
405+
Apache Maven Wrapper
406+
./mvnw
407+
./mvnw.cmd

NOTICE

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,19 @@ with the following copyright notice:
2121
LightProto
2222
Copyright 2020 Splunk Inc.
2323

24-
----------------------------------------------------------
24+
----------------------------------------------------------
25+
26+
This product contains code from the Apache Maven Wrapper Project:
27+
28+
Apache Maven Wrapper
29+
Copyright 2013-2025 The Apache Software Foundation
30+
31+
This product includes software developed at
32+
The Apache Software Foundation (http://www.apache.org/).
33+
34+
The original idea and initial implementation of the maven-wrapper module is derived
35+
from the Gradle Wrapper which was written originally by Hans Dockter and Adam Murdoch.
36+
Copyright 2007 the original author or authors.
37+
38+
----------------------------------------------------------
39+

README.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,21 @@
1+
<!--
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
-->
18+
119
<p align="center">
220
<picture>
321
<source media="(prefers-color-scheme: dark)" srcset="website/static/img/logo/svg/white_color_logo.svg">

docker/quickstart-flink/README.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,21 @@
1+
<!--
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
-->
18+
119
# Fluss Quickstart Flink Docker
220

321
This directory contains the Docker setup for Fluss Quickstart with Flink integration.

fluss-client/src/main/java/org/apache/fluss/client/converter/ConverterCommons.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@
2424

2525
import java.math.BigDecimal;
2626
import java.util.Arrays;
27-
import java.util.HashMap;
27+
import java.util.EnumMap;
2828
import java.util.HashSet;
29+
import java.util.List;
2930
import java.util.Map;
3031
import java.util.Set;
3132

@@ -43,7 +44,7 @@ final class ConverterCommons {
4344
static final Map<DataTypeRoot, Set<Class<?>>> SUPPORTED_TYPES = createSupportedTypes();
4445

4546
private static Map<DataTypeRoot, Set<Class<?>>> createSupportedTypes() {
46-
Map<DataTypeRoot, Set<Class<?>>> map = new HashMap<>();
47+
Map<DataTypeRoot, Set<Class<?>>> map = new EnumMap<>(DataTypeRoot.class);
4748
map.put(DataTypeRoot.BOOLEAN, setOf(Boolean.class));
4849

4950
// Numeric types with widening support
@@ -94,15 +95,15 @@ private static Map<DataTypeRoot, Set<Class<?>>> createSupportedTypes() {
9495

9596
static void validatePojoMatchesTable(PojoType<?> pojoType, RowType tableSchema) {
9697
Set<String> pojoNames = pojoType.getProperties().keySet();
97-
Set<String> tableNames = new HashSet<>(tableSchema.getFieldNames());
98-
if (!pojoNames.equals(tableNames)) {
98+
List<String> fieldNames = tableSchema.getFieldNames();
99+
if (!pojoNames.containsAll(fieldNames)) {
99100
throw new IllegalArgumentException(
100101
String.format(
101102
"POJO fields %s must exactly match table schema fields %s.",
102-
pojoNames, tableNames));
103+
pojoNames, fieldNames));
103104
}
104105
for (int i = 0; i < tableSchema.getFieldCount(); i++) {
105-
String name = tableSchema.getFieldNames().get(i);
106+
String name = fieldNames.get(i);
106107
DataType dt = tableSchema.getTypeAt(i);
107108
PojoType.Property prop = pojoType.getProperty(name);
108109
validateCompatibility(dt, prop);

fluss-client/src/main/java/org/apache/fluss/client/converter/PojoToRowConverter.java

Lines changed: 86 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.fluss.row.TimestampLtz;
2424
import org.apache.fluss.row.TimestampNtz;
2525
import org.apache.fluss.types.DataType;
26+
import org.apache.fluss.types.DataTypeChecks;
2627
import org.apache.fluss.types.DecimalType;
2728
import org.apache.fluss.types.RowType;
2829

@@ -142,9 +143,15 @@ private static FieldToRow createFieldConverter(PojoType.Property prop, DataType
142143
case TIME_WITHOUT_TIME_ZONE:
143144
return (obj) -> convertTimeValue(prop, prop.read(obj));
144145
case TIMESTAMP_WITHOUT_TIME_ZONE:
145-
return (obj) -> convertTimestampNtzValue(prop, prop.read(obj));
146+
{
147+
final int precision = DataTypeChecks.getPrecision(fieldType);
148+
return (obj) -> convertTimestampNtzValue(precision, prop, prop.read(obj));
149+
}
146150
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
147-
return (obj) -> convertTimestampLtzValue(prop, prop.read(obj));
151+
{
152+
final int precision = DataTypeChecks.getPrecision(fieldType);
153+
return (obj) -> convertTimestampLtzValue(precision, prop, prop.read(obj));
154+
}
148155
default:
149156
throw new UnsupportedOperationException(
150157
String.format(
@@ -210,9 +217,17 @@ private static FieldToRow createFieldConverter(PojoType.Property prop, DataType
210217
return (int) (t.toNanoOfDay() / 1_000_000);
211218
}
212219

213-
/** Converts a LocalDateTime POJO property to Fluss TimestampNtz. */
220+
/**
221+
* Converts a LocalDateTime POJO property to Fluss TimestampNtz, respecting the specified
222+
* precision.
223+
*
224+
* @param precision the timestamp precision (0-9)
225+
* @param prop the POJO property metadata
226+
* @param v the value to convert
227+
* @return TimestampNtz with precision applied, or null if v is null
228+
*/
214229
private static @Nullable TimestampNtz convertTimestampNtzValue(
215-
PojoType.Property prop, @Nullable Object v) {
230+
int precision, PojoType.Property prop, @Nullable Object v) {
216231
if (v == null) {
217232
return null;
218233
}
@@ -222,24 +237,82 @@ private static FieldToRow createFieldConverter(PojoType.Property prop, DataType
222237
"Field %s is not a LocalDateTime. Cannot convert to TimestampNtz.",
223238
prop.name));
224239
}
225-
return TimestampNtz.fromLocalDateTime((LocalDateTime) v);
240+
LocalDateTime ldt = (LocalDateTime) v;
241+
LocalDateTime truncated = truncateToTimestampPrecision(ldt, precision);
242+
return TimestampNtz.fromLocalDateTime(truncated);
226243
}
227244

228-
/** Converts an Instant or OffsetDateTime POJO property to Fluss TimestampLtz (UTC based). */
245+
/**
246+
* Converts an Instant or OffsetDateTime POJO property to Fluss TimestampLtz (UTC based),
247+
* respecting the specified precision.
248+
*
249+
* @param precision the timestamp precision (0-9)
250+
* @param prop the POJO property metadata
251+
* @param v the value to convert
252+
* @return TimestampLtz with precision applied, or null if v is null
253+
*/
229254
private static @Nullable TimestampLtz convertTimestampLtzValue(
230-
PojoType.Property prop, @Nullable Object v) {
255+
int precision, PojoType.Property prop, @Nullable Object v) {
231256
if (v == null) {
232257
return null;
233258
}
259+
Instant instant;
234260
if (v instanceof Instant) {
235-
return TimestampLtz.fromInstant((Instant) v);
261+
instant = (Instant) v;
236262
} else if (v instanceof OffsetDateTime) {
237-
return TimestampLtz.fromInstant(((OffsetDateTime) v).toInstant());
263+
instant = ((OffsetDateTime) v).toInstant();
264+
} else {
265+
throw new IllegalArgumentException(
266+
String.format(
267+
"Field %s is not an Instant or OffsetDateTime. Cannot convert to TimestampLtz.",
268+
prop.name));
238269
}
239-
throw new IllegalArgumentException(
240-
String.format(
241-
"Field %s is not an Instant or OffsetDateTime. Cannot convert to TimestampLtz.",
242-
prop.name));
270+
Instant truncated = truncateToTimestampPrecision(instant, precision);
271+
return TimestampLtz.fromInstant(truncated);
272+
}
273+
274+
/**
275+
* Truncates a LocalDateTime to the specified timestamp precision.
276+
*
277+
* @param ldt the LocalDateTime to truncate
278+
* @param precision the precision (0-9)
279+
* @return truncated LocalDateTime
280+
*/
281+
private static LocalDateTime truncateToTimestampPrecision(LocalDateTime ldt, int precision) {
282+
if (precision >= 9) {
283+
return ldt;
284+
}
285+
int nanos = ldt.getNano();
286+
int truncatedNanos = truncateNanos(nanos, precision);
287+
return ldt.withNano(truncatedNanos);
288+
}
289+
290+
/**
291+
* Truncates an Instant to the specified timestamp precision.
292+
*
293+
* @param instant the Instant to truncate
294+
* @param precision the precision (0-9)
295+
* @return truncated Instant
296+
*/
297+
private static Instant truncateToTimestampPrecision(Instant instant, int precision) {
298+
if (precision >= 9) {
299+
return instant;
300+
}
301+
int nanos = instant.getNano();
302+
int truncatedNanos = truncateNanos(nanos, precision);
303+
return Instant.ofEpochSecond(instant.getEpochSecond(), truncatedNanos);
304+
}
305+
306+
/**
307+
* Truncates nanoseconds to the specified precision.
308+
*
309+
* @param nanos the nanoseconds value (0-999,999,999)
310+
* @param precision the precision (0-9)
311+
* @return truncated nanoseconds
312+
*/
313+
private static int truncateNanos(int nanos, int precision) {
314+
int divisor = (int) Math.pow(10, 9 - precision);
315+
return (nanos / divisor) * divisor;
243316
}
244317

245318
/** Converts a numeric POJO property to TINYINT (Byte). */

0 commit comments

Comments
 (0)