1717
1818package org .apache .fluss .client .converter ;
1919
20+ import org .apache .fluss .row .BinaryString ;
2021import org .apache .fluss .row .Decimal ;
2122import org .apache .fluss .row .GenericRow ;
2223import org .apache .fluss .row .TimestampLtz ;
@@ -119,92 +120,20 @@ private static FieldToRow createFieldConverter(PojoType.Property prop, DataType
119120 case DOUBLE :
120121 case BINARY :
121122 case BYTES :
122- return ( obj ) -> prop . read ( obj ) ;
123+ return prop :: read ;
123124 case CHAR :
124125 case STRING :
125- return (obj ) -> {
126- Object v = prop .read (obj );
127- if (v == null ) {
128- return null ;
129- }
130- return ConverterCommons .toBinaryStringForText (
131- v , prop .name , fieldType .getTypeRoot ());
132- };
126+ return (obj ) -> convertTextValue (fieldType , prop , prop .read (obj ));
133127 case DECIMAL :
134- return (obj ) -> {
135- Object v = prop .read (obj );
136- if (v == null ) {
137- return null ;
138- }
139- if (!(v instanceof BigDecimal )) {
140- throw new IllegalArgumentException (
141- String .format (
142- "Field %s is not a BigDecimal. Cannot convert to DecimalData." ,
143- prop .name ));
144- }
145- DecimalType decimalType = (DecimalType ) fieldType ;
146- return Decimal .fromBigDecimal (
147- (BigDecimal ) v , decimalType .getPrecision (), decimalType .getScale ());
148- };
128+ return (obj ) -> convertDecimalValue ((DecimalType ) fieldType , prop , prop .read (obj ));
149129 case DATE :
150- return (obj ) -> {
151- Object v = prop .read (obj );
152- if (v == null ) {
153- return null ;
154- }
155- if (!(v instanceof LocalDate )) {
156- throw new IllegalArgumentException (
157- String .format (
158- "Field %s is not a LocalDate. Cannot convert to int days." ,
159- prop .name ));
160- }
161- return (int ) ((LocalDate ) v ).toEpochDay ();
162- };
130+ return (obj ) -> convertDateValue (prop , prop .read (obj ));
163131 case TIME_WITHOUT_TIME_ZONE :
164- return (obj ) -> {
165- Object v = prop .read (obj );
166- if (v == null ) {
167- return null ;
168- }
169- if (!(v instanceof LocalTime )) {
170- throw new IllegalArgumentException (
171- String .format (
172- "Field %s is not a LocalTime. Cannot convert to int millis." ,
173- prop .name ));
174- }
175- LocalTime t = (LocalTime ) v ;
176- return (int ) (t .toNanoOfDay () / 1_000_000 );
177- };
132+ return (obj ) -> convertTimeValue (prop , prop .read (obj ));
178133 case TIMESTAMP_WITHOUT_TIME_ZONE :
179- return (obj ) -> {
180- Object v = prop .read (obj );
181- if (v == null ) {
182- return null ;
183- }
184- if (!(v instanceof LocalDateTime )) {
185- throw new IllegalArgumentException (
186- String .format (
187- "Field %s is not a LocalDateTime. Cannot convert to TimestampNtz." ,
188- prop .name ));
189- }
190- return TimestampNtz .fromLocalDateTime ((LocalDateTime ) v );
191- };
134+ return (obj ) -> convertTimestampNtzValue (prop , prop .read (obj ));
192135 case TIMESTAMP_WITH_LOCAL_TIME_ZONE :
193- return (obj ) -> {
194- Object v = prop .read (obj );
195- if (v == null ) {
196- return null ;
197- }
198- if (v instanceof Instant ) {
199- return TimestampLtz .fromInstant ((Instant ) v );
200- } else if (v instanceof OffsetDateTime ) {
201- return TimestampLtz .fromInstant (((OffsetDateTime ) v ).toInstant ());
202- }
203- throw new IllegalArgumentException (
204- String .format (
205- "Field %s is not an Instant or OffsetDateTime. Cannot convert to TimestampLtz." ,
206- prop .name ));
207- };
136+ return (obj ) -> convertTimestampLtzValue (prop , prop .read (obj ));
208137 default :
209138 throw new UnsupportedOperationException (
210139 String .format (
@@ -213,6 +142,95 @@ private static FieldToRow createFieldConverter(PojoType.Property prop, DataType
213142 }
214143 }
215144
145+ /**
146+ * Converts a text value (String or Character) from a POJO property to Fluss BinaryString.
147+ *
148+ * <p>For CHAR columns, enforces that the text has exactly one character. Nulls are passed
149+ * through.
150+ */
151+ private static @ Nullable BinaryString convertTextValue (
152+ DataType fieldType , PojoType .Property prop , @ Nullable Object v ) {
153+ if (v == null ) {
154+ return null ;
155+ }
156+ return ConverterCommons .toBinaryStringForText (v , prop .name , fieldType .getTypeRoot ());
157+ }
158+
159+ /** Converts a BigDecimal POJO property to Fluss Decimal respecting precision and scale. */
160+ private static @ Nullable Decimal convertDecimalValue (
161+ DecimalType decimalType , PojoType .Property prop , @ Nullable Object v ) {
162+ if (v == null ) {
163+ return null ;
164+ }
165+ if (!(v instanceof BigDecimal )) {
166+ throw new IllegalArgumentException (
167+ String .format (
168+ "Field %s is not a BigDecimal. Cannot convert to Decimal." , prop .name ));
169+ }
170+ return Decimal .fromBigDecimal (
171+ (BigDecimal ) v , decimalType .getPrecision (), decimalType .getScale ());
172+ }
173+
174+ /** Converts a LocalDate POJO property to number of days since epoch. */
175+ private static @ Nullable Integer convertDateValue (PojoType .Property prop , @ Nullable Object v ) {
176+ if (v == null ) {
177+ return null ;
178+ }
179+ if (!(v instanceof LocalDate )) {
180+ throw new IllegalArgumentException (
181+ String .format (
182+ "Field %s is not a LocalDate. Cannot convert to int days." , prop .name ));
183+ }
184+ return (int ) ((LocalDate ) v ).toEpochDay ();
185+ }
186+
187+ /** Converts a LocalTime POJO property to milliseconds of day. */
188+ private static @ Nullable Integer convertTimeValue (PojoType .Property prop , @ Nullable Object v ) {
189+ if (v == null ) {
190+ return null ;
191+ }
192+ if (!(v instanceof LocalTime )) {
193+ throw new IllegalArgumentException (
194+ String .format (
195+ "Field %s is not a LocalTime. Cannot convert to int millis." ,
196+ prop .name ));
197+ }
198+ LocalTime t = (LocalTime ) v ;
199+ return (int ) (t .toNanoOfDay () / 1_000_000 );
200+ }
201+
202+ /** Converts a LocalDateTime POJO property to Fluss TimestampNtz. */
203+ private static @ Nullable TimestampNtz convertTimestampNtzValue (
204+ PojoType .Property prop , @ Nullable Object v ) {
205+ if (v == null ) {
206+ return null ;
207+ }
208+ if (!(v instanceof LocalDateTime )) {
209+ throw new IllegalArgumentException (
210+ String .format (
211+ "Field %s is not a LocalDateTime. Cannot convert to TimestampNtz." ,
212+ prop .name ));
213+ }
214+ return TimestampNtz .fromLocalDateTime ((LocalDateTime ) v );
215+ }
216+
217+ /** Converts an Instant or OffsetDateTime POJO property to Fluss TimestampLtz (UTC based). */
218+ private static @ Nullable TimestampLtz convertTimestampLtzValue (
219+ PojoType .Property prop , @ Nullable Object v ) {
220+ if (v == null ) {
221+ return null ;
222+ }
223+ if (v instanceof Instant ) {
224+ return TimestampLtz .fromInstant ((Instant ) v );
225+ } else if (v instanceof OffsetDateTime ) {
226+ return TimestampLtz .fromInstant (((OffsetDateTime ) v ).toInstant ());
227+ }
228+ throw new IllegalArgumentException (
229+ String .format (
230+ "Field %s is not an Instant or OffsetDateTime. Cannot convert to TimestampLtz." ,
231+ prop .name ));
232+ }
233+
216234 private interface FieldToRow {
217235 Object readAndConvert (Object pojo ) throws Exception ;
218236 }
0 commit comments