Skip to content

Commit 903deb8

Browse files
committed
[common] Ensure consistency read and write among Compacted, Indexed and Arrow fields
1 parent 7ffbae3 commit 903deb8

File tree

5 files changed

+348
-21
lines changed

5 files changed

+348
-21
lines changed

fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRowReader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ private long readLongSlowPath() {
258258
*
259259
* @param fieldType the element type of the row
260260
*/
261-
static FieldReader createFieldReader(DataType fieldType) {
261+
public static FieldReader createFieldReader(DataType fieldType) {
262262
final FieldReader fieldReader;
263263
// ordered by type root definition
264264
switch (fieldType.getTypeRoot()) {
@@ -325,7 +325,7 @@ static FieldReader createFieldReader(DataType fieldType) {
325325
*
326326
* @see #createFieldReader(DataType)
327327
*/
328-
interface FieldReader extends Serializable {
328+
public interface FieldReader extends Serializable {
329329
Object readField(CompactedRowReader reader, int pos);
330330
}
331331
}

fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRowWriter.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,10 @@ private void writeDouble(double value) {
251251
}
252252

253253
private void writeTimestampNtz(TimestampNtz value, int precision) {
254-
if (TimestampNtz.isCompact(precision)) {
254+
if (precision == 0) {
255+
// truncate to seconds to keep consistence with ArrowTimestampNtzWriter
256+
writeLong(value.getMillisecond() / 1000 * 1000);
257+
} else if (TimestampNtz.isCompact(precision)) {
255258
writeLong(value.getMillisecond());
256259
} else {
257260
writeLong(value.getMillisecond());
@@ -260,7 +263,10 @@ private void writeTimestampNtz(TimestampNtz value, int precision) {
260263
}
261264

262265
private void writeTimestampLtz(TimestampLtz value, int precision) {
263-
if (TimestampLtz.isCompact(precision)) {
266+
if (precision == 0) {
267+
// truncate to seconds to keep consistence with ArrowTimestampLtzWriter
268+
writeLong(value.getEpochMillisecond() / 1000 * 1000);
269+
} else if (TimestampLtz.isCompact(precision)) {
264270
writeLong(value.getEpochMillisecond());
265271
} else {
266272
writeLong(value.getEpochMillisecond());
@@ -332,9 +338,20 @@ public static FieldWriter createFieldWriter(DataType fieldType) {
332338
break;
333339
case INTEGER:
334340
case DATE:
335-
case TIME_WITHOUT_TIME_ZONE:
336341
fieldWriter = (writer, pos, value) -> writer.writeInt((int) value);
337342
break;
343+
case TIME_WITHOUT_TIME_ZONE:
344+
final int timePrecision = getPrecision(fieldType);
345+
fieldWriter =
346+
(writer, pos, value) -> {
347+
if (timePrecision == 0) {
348+
// truncate to seconds to keep consistence with ArrowTimeWriter
349+
writer.writeInt((int) value / 1000 * 1000);
350+
} else {
351+
writer.writeInt((int) value);
352+
}
353+
};
354+
break;
338355
case BIGINT:
339356
fieldWriter = (writer, pos, value) -> writer.writeLong((long) value);
340357
break;

fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRowReader.java

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.fluss.types.DataType;
2828

2929
import java.io.Serializable;
30-
import java.util.Arrays;
3130

3231
import static org.apache.fluss.types.DataTypeChecks.getLength;
3332
import static org.apache.fluss.types.DataTypeChecks.getPrecision;
@@ -186,25 +185,16 @@ private BinaryString readStringInternal(int length) {
186185
private byte[] readBytesInternal(int length) {
187186
byte[] bytes = new byte[length];
188187
segment.get(position, bytes, 0, length);
189-
190-
int newLen = 0;
191-
for (int i = length - 1; i >= 0; i--) {
192-
if (bytes[i] != (byte) 0) {
193-
newLen = i + 1;
194-
break;
195-
}
196-
}
197-
198188
position += length;
199-
return Arrays.copyOfRange(bytes, 0, newLen);
189+
return bytes;
200190
}
201191

202192
/**
203193
* Creates an accessor for reading elements.
204194
*
205195
* @param fieldType the element type of the row
206196
*/
207-
static FieldReader createFieldReader(DataType fieldType) {
197+
public static FieldReader createFieldReader(DataType fieldType) {
208198
final FieldReader fieldReader;
209199
// ordered by type root definition
210200
switch (fieldType.getTypeRoot()) {
@@ -277,7 +267,7 @@ static FieldReader createFieldReader(DataType fieldType) {
277267
*
278268
* @see #createFieldReader(DataType)
279269
*/
280-
interface FieldReader extends Serializable {
270+
public interface FieldReader extends Serializable {
281271
Object readField(IndexedRowReader reader, int pos);
282272
}
283273
}

fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRowWriter.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,10 @@ public void writeDecimal(Decimal value, int precision) {
173173
}
174174

175175
public void writeTimestampNtz(TimestampNtz value, int precision) {
176-
if (TimestampNtz.isCompact(precision)) {
176+
if (precision == 0) {
177+
// truncate to seconds to keep consistence with ArrowTimestampNtzWriter
178+
writeLong(value.getMillisecond() / 1000 * 1000);
179+
} else if (TimestampNtz.isCompact(precision)) {
177180
writeLong(value.getMillisecond());
178181
} else {
179182
writeLong(value.getMillisecond());
@@ -182,7 +185,10 @@ public void writeTimestampNtz(TimestampNtz value, int precision) {
182185
}
183186

184187
public void writeTimestampLtz(TimestampLtz value, int precision) {
185-
if (TimestampLtz.isCompact(precision)) {
188+
if (precision == 0) {
189+
// truncate to seconds to keep consistence with ArrowTimestampLtzWriter
190+
writeLong(value.getEpochMillisecond() / 1000 * 1000);
191+
} else if (TimestampLtz.isCompact(precision)) {
186192
writeLong(value.getEpochMillisecond());
187193
} else {
188194
writeLong(value.getEpochMillisecond());
@@ -301,9 +307,20 @@ public static FieldWriter createFieldWriter(DataType fieldType) {
301307
break;
302308
case INTEGER:
303309
case DATE:
304-
case TIME_WITHOUT_TIME_ZONE:
305310
fieldWriter = (writer, pos, value) -> writer.writeInt((int) value);
306311
break;
312+
case TIME_WITHOUT_TIME_ZONE:
313+
final int timePrecision = getPrecision(fieldType);
314+
fieldWriter =
315+
(writer, pos, value) -> {
316+
if (timePrecision == 0) {
317+
// truncate to seconds to keep consistence with ArrowTimeWriter
318+
writer.writeInt((int) value / 1000 * 1000);
319+
} else {
320+
writer.writeInt((int) value);
321+
}
322+
};
323+
break;
307324
case BIGINT:
308325
fieldWriter = (writer, pos, value) -> writer.writeLong((long) value);
309326
break;

0 commit comments

Comments
 (0)