3131import javax .annotation .Nullable ;
3232
3333import java .lang .reflect .InvocationTargetException ;
34+ import java .math .BigDecimal ;
3435import java .time .Instant ;
3536import java .time .LocalDate ;
3637import java .time .LocalTime ;
@@ -121,89 +122,35 @@ private PojoType.Property requireProperty(String fieldName) {
121122 private static RowToField createRowReader (DataType fieldType , PojoType .Property prop ) {
122123 switch (fieldType .getTypeRoot ()) {
123124 case BOOLEAN :
124- return ( row , pos ) -> row . getBoolean ( pos ) ;
125+ return InternalRow :: getBoolean ;
125126 case TINYINT :
126- return ( row , pos ) -> row . getByte ( pos ) ;
127+ return InternalRow :: getByte ;
127128 case SMALLINT :
128- return ( row , pos ) -> row . getShort ( pos ) ;
129+ return InternalRow :: getShort ;
129130 case INTEGER :
130- return ( row , pos ) -> row . getInt ( pos ) ;
131+ return InternalRow :: getInt ;
131132 case BIGINT :
132- return ( row , pos ) -> row . getLong ( pos ) ;
133+ return InternalRow :: getLong ;
133134 case FLOAT :
134- return ( row , pos ) -> row . getFloat ( pos ) ;
135+ return InternalRow :: getFloat ;
135136 case DOUBLE :
136- return ( row , pos ) -> row . getDouble ( pos ) ;
137+ return InternalRow :: getDouble ;
137138 case CHAR :
138139 case STRING :
139- return (row , pos ) -> {
140- BinaryString s = row .getString (pos );
141- String v = s .toString ();
142- if (prop .type == String .class ) {
143- if (fieldType .getTypeRoot () == DataTypeRoot .CHAR && v .length () != 1 ) {
144- throw new IllegalArgumentException (
145- String .format (
146- "Field %s expects exactly one character for CHAR type, got length %d." ,
147- prop .name , v .length ()));
148- }
149- return v ;
150- } else if (prop .type == Character .class ) {
151- if (v .isEmpty ()) {
152- throw new IllegalArgumentException (
153- String .format (
154- "Field %s expects Character, but the string value is empty." ,
155- prop .name ));
156- }
157- if (fieldType .getTypeRoot () == DataTypeRoot .CHAR && v .length () != 1 ) {
158- throw new IllegalArgumentException (
159- String .format (
160- "Field %s expects exactly one character for CHAR type, got length %d." ,
161- prop .name , v .length ()));
162- }
163- return v .charAt (0 );
164- }
165- throw new IllegalArgumentException (
166- String .format (
167- "Field %s is not a String or Character. Cannot convert from string." ,
168- prop .name ));
169- };
140+ return (row , pos ) -> convertTextValue (fieldType , prop , row .getString (pos ));
170141 case BINARY :
171142 case BYTES :
172- return ( row , pos ) -> row . getBytes ( pos ) ;
143+ return InternalRow :: getBytes ;
173144 case DECIMAL :
174- return (row , pos ) -> {
175- DecimalType dt = (DecimalType ) fieldType ;
176- Decimal d = row .getDecimal (pos , dt .getPrecision (), dt .getScale ());
177- return d .toBigDecimal ();
178- };
145+ return (row , pos ) -> convertDecimalValue ((DecimalType ) fieldType , row , pos );
179146 case DATE :
180- return ( row , pos ) -> LocalDate . ofEpochDay ( row . getInt ( pos )) ;
147+ return RowToPojoConverter :: convertDateValue ;
181148 case TIME_WITHOUT_TIME_ZONE :
182- return ( row , pos ) -> LocalTime . ofNanoOfDay ( row . getInt ( pos ) * 1_000_000L ) ;
149+ return RowToPojoConverter :: convertTimeValue ;
183150 case TIMESTAMP_WITHOUT_TIME_ZONE :
184- {
185- final int precision = DataTypeChecks .getPrecision (fieldType );
186- return (row , pos ) -> {
187- TimestampNtz t = row .getTimestampNtz (pos , precision );
188- return t .toLocalDateTime ();
189- };
190- }
151+ return (row , pos ) -> convertTimestampNtzValue (fieldType , row , pos );
191152 case TIMESTAMP_WITH_LOCAL_TIME_ZONE :
192- {
193- final int precision = DataTypeChecks .getPrecision (fieldType );
194- return (row , pos ) -> {
195- TimestampLtz t = row .getTimestampLtz (pos , precision );
196- if (prop .type == Instant .class ) {
197- return t .toInstant ();
198- } else if (prop .type == OffsetDateTime .class ) {
199- return OffsetDateTime .ofInstant (t .toInstant (), ZoneOffset .UTC );
200- }
201- throw new IllegalArgumentException (
202- String .format (
203- "Field %s is not an Instant or OffsetDateTime. Cannot convert from TimestampData." ,
204- prop .name ));
205- };
206- }
153+ return (row , pos ) -> convertTimestampLtzValue (fieldType , prop , row , pos );
207154 default :
208155 throw new UnsupportedOperationException (
209156 String .format (
@@ -212,6 +159,96 @@ private static RowToField createRowReader(DataType fieldType, PojoType.Property
212159 }
213160 }
214161
162+ /**
163+ * Converts a text value (CHAR/STRING) read from an InternalRow into the target Java type
164+ * declared by the POJO property.
165+ *
166+ * <p>Supported target types are String and Character. For CHAR columns, this enforces that the
167+ * value has exactly one character. For Character targets, empty strings are rejected.
168+ *
169+ * @param fieldType Fluss column DataType (must be CHAR or STRING)
170+ * @param prop The target POJO property (used for type and error messages)
171+ * @param s The BinaryString read from the row
172+ * @return Converted Java value (String or Character)
173+ * @throws IllegalArgumentException if the target type is unsupported or constraints are
174+ * violated
175+ */
176+ private static Object convertTextValue (
177+ DataType fieldType , PojoType .Property prop , BinaryString s ) {
178+ String v = s .toString ();
179+ String formattedMessage =
180+ String .format (
181+ "Field %s expects exactly one character for CHAR type, got length %d." ,
182+ prop .name , v .length ());
183+ if (prop .type == String .class ) {
184+ if (fieldType .getTypeRoot () == DataTypeRoot .CHAR && v .length () != 1 ) {
185+ throw new IllegalArgumentException (formattedMessage );
186+ }
187+ return v ;
188+ } else if (prop .type == Character .class ) {
189+ if (v .isEmpty ()) {
190+ throw new IllegalArgumentException (
191+ String .format (
192+ "Field %s expects Character, but the string value is empty." ,
193+ prop .name ));
194+ }
195+ if (fieldType .getTypeRoot () == DataTypeRoot .CHAR && v .length () != 1 ) {
196+ throw new IllegalArgumentException (formattedMessage );
197+ }
198+ return v .charAt (0 );
199+ }
200+ throw new IllegalArgumentException (
201+ String .format (
202+ "Field %s is not a String or Character. Cannot convert from string." ,
203+ prop .name ));
204+ }
205+
206+ /**
207+ * Converts a DECIMAL value from an InternalRow into a BigDecimal using the column's precision
208+ * and scale. The row position is assumed non-null (caller checks), so this never returns null.
209+ */
210+ private static BigDecimal convertDecimalValue (
211+ DecimalType decimalType , InternalRow row , int pos ) {
212+ Decimal d = row .getDecimal (pos , decimalType .getPrecision (), decimalType .getScale ());
213+ return d .toBigDecimal ();
214+ }
215+
216+ /** Converts a DATE value stored as int days since epoch to a LocalDate. */
217+ private static LocalDate convertDateValue (InternalRow row , int pos ) {
218+ return LocalDate .ofEpochDay (row .getInt (pos ));
219+ }
220+
221+ /** Converts a TIME_WITHOUT_TIME_ZONE value stored as int millis of day to a LocalTime. */
222+ private static LocalTime convertTimeValue (InternalRow row , int pos ) {
223+ return LocalTime .ofNanoOfDay (row .getInt (pos ) * 1_000_000L );
224+ }
225+
226+ /** Converts a TIMESTAMP_WITHOUT_TIME_ZONE value to a LocalDateTime honoring precision. */
227+ private static Object convertTimestampNtzValue (DataType fieldType , InternalRow row , int pos ) {
228+ final int precision = DataTypeChecks .getPrecision (fieldType );
229+ TimestampNtz t = row .getTimestampNtz (pos , precision );
230+ return t .toLocalDateTime ();
231+ }
232+
233+ /**
234+ * Converts a TIMESTAMP_WITH_LOCAL_TIME_ZONE value to either Instant or OffsetDateTime in UTC,
235+ * depending on the target POJO property type.
236+ */
237+ private static Object convertTimestampLtzValue (
238+ DataType fieldType , PojoType .Property prop , InternalRow row , int pos ) {
239+ final int precision = DataTypeChecks .getPrecision (fieldType );
240+ TimestampLtz t = row .getTimestampLtz (pos , precision );
241+ if (prop .type == Instant .class ) {
242+ return t .toInstant ();
243+ } else if (prop .type == OffsetDateTime .class ) {
244+ return OffsetDateTime .ofInstant (t .toInstant (), ZoneOffset .UTC );
245+ }
246+ throw new IllegalArgumentException (
247+ String .format (
248+ "Field %s is not an Instant or OffsetDateTime. Cannot convert from TimestampData." ,
249+ prop .name ));
250+ }
251+
215252 private interface RowToField {
216253 Object convert (InternalRow row , int pos ) throws Exception ;
217254 }
0 commit comments