99import com .alibaba .hologres .client .model .TableSchema ;
1010import org .postgresql .core .BaseConnection ;
1111import org .postgresql .jdbc .ArrayUtil ;
12+ import org .postgresql .jdbc .TimestampUtil ;
1213
1314import java .io .IOException ;
1415import java .io .OutputStream ;
1516import java .math .BigDecimal ;
1617import java .math .RoundingMode ;
18+ import java .sql .Array ;
1719import java .sql .Date ;
1820import java .sql .SQLException ;
19- import java .sql .Timestamp ;
2021import java .sql .Types ;
21- import java .time .OffsetDateTime ;
22- import java .time .format .DateTimeFormatter ;
23- import java .time .format .DateTimeFormatterBuilder ;
24- import java .time .temporal .ChronoField ;
25- import java .util .TimeZone ;
26- import java .util .concurrent .TimeUnit ;
2722
2823/**
2924 * Record转pg binary流.
@@ -46,6 +41,16 @@ private void fillHeader() throws IOException {
4641 writeInt (0 );
4742 }
4843
44+ @ Override
45+ public void close () throws IOException {
46+ if (!fillHeader ) {
47+ fillHeader = true ;
48+ fillHeader ();
49+ writeCellBuffer ();
50+ }
51+ super .close ();
52+ }
53+
4954 @ Override
5055 protected void fillByteBuffer (Record record ) throws IOException {
5156 if (!fillHeader ) {
@@ -183,50 +188,8 @@ private void fillByteBuffer(Object obj, Column column)
183188 }
184189 break ;
185190 case Types .TIMESTAMP :
186- if (obj instanceof Timestamp ) {
187- Timestamp ts = (Timestamp ) obj ;
188- long seconds = javaEpochToPg (ts .getTime () / 1000 , TimeUnit .SECONDS );
189- // Convert to micros rounding nanoseconds
190- long micros =
191- TimeUnit .SECONDS .toMicros (seconds )
192- + TimeUnit .NANOSECONDS .toMicros (ts .getNanos () + 500 );
193- if ("timestamp" .equals (typeName )) {
194- micros += TimeZone .getDefault ().getRawOffset () * 1000L ;
195- }
196- writeInt (8 );
197- writeLong (micros );
198- } else if (obj instanceof String ) {
199- OffsetDateTime dateTime =
200- OffsetDateTime .parse ((String ) obj , DATE_TIME_FORMATTER );
201- long seconds = javaEpochToPg (dateTime .toEpochSecond (), TimeUnit .SECONDS );
202- // Convert to micros rounding nanoseconds
203- long micros =
204- TimeUnit .SECONDS .toMicros (seconds )
205- + TimeUnit .NANOSECONDS .toMicros (dateTime .getNano () + 500 );
206- writeInt (8 );
207- writeLong (micros );
208- } else if (obj instanceof Number ) {
209- long ms = ((Number ) obj ).longValue ();
210- long seconds = javaEpochToPg (ms / 1000L , TimeUnit .SECONDS );
211- // Convert to micros rounding nanoseconds
212- long micros =
213- TimeUnit .SECONDS .toMicros (seconds )
214- + TimeUnit .NANOSECONDS .toMicros ((ms % 1000 ) * 1000000L + 500 );
215- writeInt (8 );
216- writeLong (micros );
217- } else if (obj instanceof java .util .Date ) {
218- long ms = ((java .util .Date ) obj ).getTime ();
219- long seconds = javaEpochToPg (ms / 1000L , TimeUnit .SECONDS );
220- // Convert to micros rounding nanoseconds
221- long micros =
222- TimeUnit .SECONDS .toMicros (seconds )
223- + TimeUnit .NANOSECONDS .toMicros ((ms % 1000 ) * 1000000L + 500 );
224- writeInt (8 );
225- writeLong (micros );
226- } else {
227- throw new RuntimeException (
228- "unsupported type for timestamp " + obj .getClass ().getName ());
229- }
191+ writeInt (8 );
192+ writeLong (TimestampUtil .timestampToPgEpochMicroSecond (obj , typeName ));
230193 break ;
231194 case Types .BINARY :
232195 if (obj instanceof byte []) {
@@ -296,7 +259,9 @@ private void fillByteBuffer(Object obj, Column column)
296259 throw new IOException ("unsupported type:" + typeName + "(" + type + "). Please call RecordBinaryOutputSteam constructor with BaseConnection Param" );
297260 }
298261 try {
299- byte [] arrayBytes = ArrayUtil .arrayToBinary (conn , obj , column .getTypeName ());
262+ // obj如果是List<>或Object[],都尝试转成Array
263+ Array array = ArrayUtil .objectToArray (conn , obj , column .getTypeName ());
264+ byte [] arrayBytes = ArrayUtil .arrayToBinary (conn , array != null ? ArrayUtil .objectToArray (conn , obj , column .getTypeName ()) : obj , column .getTypeName ());
300265 writeInt (arrayBytes .length );
301266 write (arrayBytes );
302267 } catch (SQLException e ) {
@@ -387,29 +352,4 @@ private static short[] encodeFromString(String num, short[] info) {
387352 info [2 ] = displayScale ;
388353 return digits ;
389354 }
390-
391- public static final DateTimeFormatter DATE_TIME_FORMATTER =
392- new DateTimeFormatterBuilder ()
393- .optionalStart ()
394- .append (DateTimeFormatter .ISO_LOCAL_DATE )
395- .optionalEnd ()
396- .optionalStart ()
397- .appendLiteral (' ' )
398- .optionalEnd ()
399- .optionalStart ()
400- .appendLiteral ('T' )
401- .optionalEnd ()
402- .appendPattern ("HH:mm:ss" )
403- .appendFraction (ChronoField .NANO_OF_SECOND , 0 , 9 , true )
404- .optionalStart ()
405- .appendOffset ("+HH" , "Z" )
406- .optionalEnd ()
407- .toFormatter ();
408-
409- private static final long PG_EPOCH_SECS = 946684800L ;
410-
411- static long javaEpochToPg (long value , TimeUnit timeUnit ) {
412-
413- return value - timeUnit .convert (PG_EPOCH_SECS , TimeUnit .SECONDS );
414- }
415355}
0 commit comments