Skip to content

Commit a956aa5

Browse files
authored
Fix timestamp precision (#1931)
* fix:Enhance the timestamp conversion function, support precision parameters and add truncation methods * fix:Precision test file * [client][hotfix] Add Javadoc for TimestampPojo test class
1 parent 69d03e1 commit a956aa5

File tree

2 files changed

+244
-13
lines changed

2 files changed

+244
-13
lines changed

fluss-client/src/main/java/org/apache/fluss/client/converter/PojoToRowConverter.java

Lines changed: 86 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.fluss.row.TimestampLtz;
2424
import org.apache.fluss.row.TimestampNtz;
2525
import org.apache.fluss.types.DataType;
26+
import org.apache.fluss.types.DataTypeChecks;
2627
import org.apache.fluss.types.DecimalType;
2728
import org.apache.fluss.types.RowType;
2829

@@ -134,9 +135,15 @@ private static FieldToRow createFieldConverter(PojoType.Property prop, DataType
134135
case TIME_WITHOUT_TIME_ZONE:
135136
return (obj) -> convertTimeValue(prop, prop.read(obj));
136137
case TIMESTAMP_WITHOUT_TIME_ZONE:
137-
return (obj) -> convertTimestampNtzValue(prop, prop.read(obj));
138+
{
139+
final int precision = DataTypeChecks.getPrecision(fieldType);
140+
return (obj) -> convertTimestampNtzValue(precision, prop, prop.read(obj));
141+
}
138142
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
139-
return (obj) -> convertTimestampLtzValue(prop, prop.read(obj));
143+
{
144+
final int precision = DataTypeChecks.getPrecision(fieldType);
145+
return (obj) -> convertTimestampLtzValue(precision, prop, prop.read(obj));
146+
}
140147
default:
141148
throw new UnsupportedOperationException(
142149
String.format(
@@ -202,9 +209,17 @@ private static FieldToRow createFieldConverter(PojoType.Property prop, DataType
202209
return (int) (t.toNanoOfDay() / 1_000_000);
203210
}
204211

205-
/** Converts a LocalDateTime POJO property to Fluss TimestampNtz. */
212+
/**
213+
* Converts a LocalDateTime POJO property to Fluss TimestampNtz, respecting the specified
214+
* precision.
215+
*
216+
* @param precision the timestamp precision (0-9)
217+
* @param prop the POJO property metadata
218+
* @param v the value to convert
219+
* @return TimestampNtz with precision applied, or null if v is null
220+
*/
206221
private static @Nullable TimestampNtz convertTimestampNtzValue(
207-
PojoType.Property prop, @Nullable Object v) {
222+
int precision, PojoType.Property prop, @Nullable Object v) {
208223
if (v == null) {
209224
return null;
210225
}
@@ -214,24 +229,82 @@ private static FieldToRow createFieldConverter(PojoType.Property prop, DataType
214229
"Field %s is not a LocalDateTime. Cannot convert to TimestampNtz.",
215230
prop.name));
216231
}
217-
return TimestampNtz.fromLocalDateTime((LocalDateTime) v);
232+
LocalDateTime ldt = (LocalDateTime) v;
233+
LocalDateTime truncated = truncateToTimestampPrecision(ldt, precision);
234+
return TimestampNtz.fromLocalDateTime(truncated);
218235
}
219236

220-
/** Converts an Instant or OffsetDateTime POJO property to Fluss TimestampLtz (UTC based). */
237+
/**
238+
* Converts an Instant or OffsetDateTime POJO property to Fluss TimestampLtz (UTC based),
239+
* respecting the specified precision.
240+
*
241+
* @param precision the timestamp precision (0-9)
242+
* @param prop the POJO property metadata
243+
* @param v the value to convert
244+
* @return TimestampLtz with precision applied, or null if v is null
245+
*/
221246
private static @Nullable TimestampLtz convertTimestampLtzValue(
222-
PojoType.Property prop, @Nullable Object v) {
247+
int precision, PojoType.Property prop, @Nullable Object v) {
223248
if (v == null) {
224249
return null;
225250
}
251+
Instant instant;
226252
if (v instanceof Instant) {
227-
return TimestampLtz.fromInstant((Instant) v);
253+
instant = (Instant) v;
228254
} else if (v instanceof OffsetDateTime) {
229-
return TimestampLtz.fromInstant(((OffsetDateTime) v).toInstant());
255+
instant = ((OffsetDateTime) v).toInstant();
256+
} else {
257+
throw new IllegalArgumentException(
258+
String.format(
259+
"Field %s is not an Instant or OffsetDateTime. Cannot convert to TimestampLtz.",
260+
prop.name));
261+
}
262+
Instant truncated = truncateToTimestampPrecision(instant, precision);
263+
return TimestampLtz.fromInstant(truncated);
264+
}
265+
266+
/**
267+
* Truncates a LocalDateTime to the specified timestamp precision.
268+
*
269+
* @param ldt the LocalDateTime to truncate
270+
* @param precision the precision (0-9)
271+
* @return truncated LocalDateTime
272+
*/
273+
private static LocalDateTime truncateToTimestampPrecision(LocalDateTime ldt, int precision) {
274+
if (precision >= 9) {
275+
return ldt;
230276
}
231-
throw new IllegalArgumentException(
232-
String.format(
233-
"Field %s is not an Instant or OffsetDateTime. Cannot convert to TimestampLtz.",
234-
prop.name));
277+
int nanos = ldt.getNano();
278+
int truncatedNanos = truncateNanos(nanos, precision);
279+
return ldt.withNano(truncatedNanos);
280+
}
281+
282+
/**
283+
* Truncates an Instant to the specified timestamp precision.
284+
*
285+
* @param instant the Instant to truncate
286+
* @param precision the precision (0-9)
287+
* @return truncated Instant
288+
*/
289+
private static Instant truncateToTimestampPrecision(Instant instant, int precision) {
290+
if (precision >= 9) {
291+
return instant;
292+
}
293+
int nanos = instant.getNano();
294+
int truncatedNanos = truncateNanos(nanos, precision);
295+
return Instant.ofEpochSecond(instant.getEpochSecond(), truncatedNanos);
296+
}
297+
298+
/**
299+
* Truncates nanoseconds to the specified precision.
300+
*
301+
* @param nanos the nanoseconds value (0-999,999,999)
302+
* @param precision the precision (0-9)
303+
* @return truncated nanoseconds
304+
*/
305+
private static int truncateNanos(int nanos, int precision) {
306+
int divisor = (int) Math.pow(10, 9 - precision);
307+
return (nanos / divisor) * divisor;
235308
}
236309

237310
private interface FieldToRow {

fluss-client/src/test/java/org/apache/fluss/client/converter/PojoToRowConverterTest.java

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,16 @@
1818
package org.apache.fluss.client.converter;
1919

2020
import org.apache.fluss.row.GenericRow;
21+
import org.apache.fluss.row.TimestampLtz;
22+
import org.apache.fluss.row.TimestampNtz;
2123
import org.apache.fluss.types.DataTypes;
2224
import org.apache.fluss.types.RowType;
2325

2426
import org.junit.jupiter.api.Test;
2527

28+
import java.time.Instant;
29+
import java.time.LocalDateTime;
30+
2631
import static org.assertj.core.api.Assertions.assertThat;
2732
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2833

@@ -168,4 +173,157 @@ public void testUnsupportedSchemaFieldTypeThrows() {
168173
.hasMessageContaining("MAP")
169174
.hasMessageContaining("mapField");
170175
}
176+
177+
@Test
178+
public void testTimestampPrecision3() {
179+
// Test with precision 3 milliseconds
180+
RowType table =
181+
RowType.builder()
182+
.field("timestampNtzField", DataTypes.TIMESTAMP(3))
183+
.field("timestampLtzField", DataTypes.TIMESTAMP_LTZ(3))
184+
.build();
185+
186+
PojoToRowConverter<TimestampPojo> writer =
187+
PojoToRowConverter.of(TimestampPojo.class, table, table);
188+
189+
// 123.456789
190+
LocalDateTime ldt = LocalDateTime.of(2025, 7, 23, 15, 1, 30, 123456789);
191+
Instant instant = Instant.parse("2025-07-23T15:01:30.123456789Z");
192+
193+
TimestampPojo pojo = new TimestampPojo(ldt, instant);
194+
GenericRow row = writer.toRow(pojo);
195+
196+
// truncate to 123.000000
197+
TimestampNtz expectedNtz = TimestampNtz.fromLocalDateTime(ldt.withNano(123000000));
198+
TimestampLtz expectedLtz =
199+
TimestampLtz.fromInstant(
200+
Instant.ofEpochSecond(instant.getEpochSecond(), 123000000));
201+
202+
assertThat(row.getTimestampNtz(0, 3)).isEqualTo(expectedNtz);
203+
assertThat(row.getTimestampLtz(1, 3)).isEqualTo(expectedLtz);
204+
}
205+
206+
@Test
207+
public void testTimestampPrecision6() {
208+
// Test with precision 6 microseconds
209+
RowType table =
210+
RowType.builder()
211+
.field("timestampNtzField", DataTypes.TIMESTAMP(6))
212+
.field("timestampLtzField", DataTypes.TIMESTAMP_LTZ(6))
213+
.build();
214+
215+
PojoToRowConverter<TimestampPojo> writer =
216+
PojoToRowConverter.of(TimestampPojo.class, table, table);
217+
218+
// 123.456789
219+
LocalDateTime ldt = LocalDateTime.of(2025, 7, 23, 15, 1, 30, 123456789);
220+
Instant instant = Instant.parse("2025-07-23T15:01:30.123456789Z");
221+
222+
TimestampPojo pojo = new TimestampPojo(ldt, instant);
223+
GenericRow row = writer.toRow(pojo);
224+
225+
// truncate to 123.456000
226+
TimestampNtz expectedNtz = TimestampNtz.fromLocalDateTime(ldt.withNano(123456000));
227+
TimestampLtz expectedLtz =
228+
TimestampLtz.fromInstant(
229+
Instant.ofEpochSecond(instant.getEpochSecond(), 123456000));
230+
231+
assertThat(row.getTimestampNtz(0, 6)).isEqualTo(expectedNtz);
232+
assertThat(row.getTimestampLtz(1, 6)).isEqualTo(expectedLtz);
233+
}
234+
235+
@Test
236+
public void testTimestampPrecision9() {
237+
// Test with precision 9 nanoseconds
238+
RowType table =
239+
RowType.builder()
240+
.field("timestampNtzField", DataTypes.TIMESTAMP(9))
241+
.field("timestampLtzField", DataTypes.TIMESTAMP_LTZ(9))
242+
.build();
243+
244+
PojoToRowConverter<TimestampPojo> writer =
245+
PojoToRowConverter.of(TimestampPojo.class, table, table);
246+
247+
LocalDateTime ldt = LocalDateTime.of(2025, 7, 23, 15, 1, 30, 123456789);
248+
Instant instant = Instant.parse("2025-07-23T15:01:30.123456789Z");
249+
250+
TimestampPojo pojo = new TimestampPojo(ldt, instant);
251+
GenericRow row = writer.toRow(pojo);
252+
253+
TimestampNtz expectedNtz = TimestampNtz.fromLocalDateTime(ldt);
254+
TimestampLtz expectedLtz = TimestampLtz.fromInstant(instant);
255+
256+
assertThat(row.getTimestampNtz(0, 9)).isEqualTo(expectedNtz);
257+
assertThat(row.getTimestampLtz(1, 9)).isEqualTo(expectedLtz);
258+
}
259+
260+
@Test
261+
public void testTimestampPrecisionRoundTrip() {
262+
testRoundTripWithPrecision(3);
263+
testRoundTripWithPrecision(6);
264+
testRoundTripWithPrecision(9);
265+
}
266+
267+
private void testRoundTripWithPrecision(int precision) {
268+
RowType table =
269+
RowType.builder()
270+
.field("timestampNtzField", DataTypes.TIMESTAMP(precision))
271+
.field("timestampLtzField", DataTypes.TIMESTAMP_LTZ(precision))
272+
.build();
273+
274+
PojoToRowConverter<TimestampPojo> writer =
275+
PojoToRowConverter.of(TimestampPojo.class, table, table);
276+
RowToPojoConverter<TimestampPojo> reader =
277+
RowToPojoConverter.of(TimestampPojo.class, table, table);
278+
279+
LocalDateTime originalLdt = LocalDateTime.of(2025, 7, 23, 15, 1, 30, 123456789);
280+
Instant originalInstant = Instant.parse("2025-07-23T15:01:30.123456789Z");
281+
282+
TimestampPojo originalPojo = new TimestampPojo(originalLdt, originalInstant);
283+
284+
// Convert POJO -> Row -> POJO
285+
GenericRow row = writer.toRow(originalPojo);
286+
TimestampPojo resultPojo = reader.fromRow(row);
287+
288+
LocalDateTime expectedLdt = truncateLocalDateTime(originalLdt, precision);
289+
Instant expectedInstant = truncateInstant(originalInstant, precision);
290+
291+
assertThat(resultPojo.timestampNtzField)
292+
.as("Round-trip LocalDateTime with precision %d", precision)
293+
.isEqualTo(expectedLdt);
294+
assertThat(resultPojo.timestampLtzField)
295+
.as("Round-trip Instant with precision %d", precision)
296+
.isEqualTo(expectedInstant);
297+
}
298+
299+
private LocalDateTime truncateLocalDateTime(LocalDateTime ldt, int precision) {
300+
if (precision >= 9) {
301+
return ldt;
302+
}
303+
int divisor = (int) Math.pow(10, 9 - precision);
304+
int truncatedNanos = (ldt.getNano() / divisor) * divisor;
305+
return ldt.withNano(truncatedNanos);
306+
}
307+
308+
private Instant truncateInstant(Instant instant, int precision) {
309+
if (precision >= 9) {
310+
return instant;
311+
}
312+
int divisor = (int) Math.pow(10, 9 - precision);
313+
int truncatedNanos = (instant.getNano() / divisor) * divisor;
314+
return Instant.ofEpochSecond(instant.getEpochSecond(), truncatedNanos);
315+
}
316+
317+
/** POJO for testing timestamp precision. */
318+
public static class TimestampPojo {
319+
public LocalDateTime timestampNtzField;
320+
public Instant timestampLtzField;
321+
322+
public TimestampPojo() {}
323+
324+
public TimestampPojo(LocalDateTime timestampNtzField, Instant timestampLtzField) {
325+
this.timestampNtzField = timestampNtzField;
326+
this.timestampLtzField = timestampLtzField;
327+
}
328+
}
171329
}

0 commit comments

Comments
 (0)