Skip to content

Commit 3c3db0d

Browse files
committed
[common] Ensure consistency read and write among Compacted, Indexed and Arrow fields
1 parent 7ffbae3 commit 3c3db0d

File tree

7 files changed

+355
-29
lines changed

7 files changed

+355
-29
lines changed

fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRowReader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ private long readLongSlowPath() {
258258
*
259259
* @param fieldType the element type of the row
260260
*/
261-
static FieldReader createFieldReader(DataType fieldType) {
261+
public static FieldReader createFieldReader(DataType fieldType) {
262262
final FieldReader fieldReader;
263263
// ordered by type root definition
264264
switch (fieldType.getTypeRoot()) {
@@ -325,7 +325,7 @@ static FieldReader createFieldReader(DataType fieldType) {
325325
*
326326
* @see #createFieldReader(DataType)
327327
*/
328-
interface FieldReader extends Serializable {
328+
public interface FieldReader extends Serializable {
329329
Object readField(CompactedRowReader reader, int pos);
330330
}
331331
}

fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRowWriter.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,10 @@ private void writeDouble(double value) {
251251
}
252252

253253
private void writeTimestampNtz(TimestampNtz value, int precision) {
254-
if (TimestampNtz.isCompact(precision)) {
254+
if (precision == 0) {
255+
// truncate to seconds to keep consistence with ArrowTimestampNtzWriter
256+
writeLong(value.getMillisecond() / 1000 * 1000);
257+
} else if (TimestampNtz.isCompact(precision)) {
255258
writeLong(value.getMillisecond());
256259
} else {
257260
writeLong(value.getMillisecond());
@@ -260,7 +263,10 @@ private void writeTimestampNtz(TimestampNtz value, int precision) {
260263
}
261264

262265
private void writeTimestampLtz(TimestampLtz value, int precision) {
263-
if (TimestampLtz.isCompact(precision)) {
266+
if (precision == 0) {
267+
// truncate to seconds to keep consistence with ArrowTimestampLtzWriter
268+
writeLong(value.getEpochMillisecond() / 1000 * 1000);
269+
} else if (TimestampLtz.isCompact(precision)) {
264270
writeLong(value.getEpochMillisecond());
265271
} else {
266272
writeLong(value.getEpochMillisecond());
@@ -332,9 +338,20 @@ public static FieldWriter createFieldWriter(DataType fieldType) {
332338
break;
333339
case INTEGER:
334340
case DATE:
335-
case TIME_WITHOUT_TIME_ZONE:
336341
fieldWriter = (writer, pos, value) -> writer.writeInt((int) value);
337342
break;
343+
case TIME_WITHOUT_TIME_ZONE:
344+
final int timePrecision = getPrecision(fieldType);
345+
fieldWriter =
346+
(writer, pos, value) -> {
347+
if (timePrecision == 0) {
348+
// truncate to seconds to keep consistence with ArrowTimeWriter
349+
writer.writeInt((int) value / 1000 * 1000);
350+
} else {
351+
writer.writeInt((int) value);
352+
}
353+
};
354+
break;
338355
case BIGINT:
339356
fieldWriter = (writer, pos, value) -> writer.writeLong((long) value);
340357
break;

fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,11 +252,11 @@ public static boolean isFixedLength(DataType dataType) {
252252
case DATE:
253253
case TIME_WITHOUT_TIME_ZONE:
254254
case CHAR:
255-
case BINARY:
256255
case TIMESTAMP_WITHOUT_TIME_ZONE:
257256
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
258257
return true;
259258
case STRING:
259+
case BINARY:
260260
case BYTES:
261261
return false;
262262
case DECIMAL:

fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRowReader.java

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.fluss.types.DataType;
2828

2929
import java.io.Serializable;
30-
import java.util.Arrays;
3130

3231
import static org.apache.fluss.types.DataTypeChecks.getLength;
3332
import static org.apache.fluss.types.DataTypeChecks.getPrecision;
@@ -157,7 +156,8 @@ public TimestampNtz readTimestampNtz(int precision) {
157156
return TimestampNtz.fromMillis(milliseconds, nanosOfMillisecond);
158157
}
159158

160-
public byte[] readBinary(int length) {
159+
public byte[] readBinary() {
160+
int length = readVarLengthFromVarLengthList();
161161
return readBytesInternal(length);
162162
}
163163

@@ -186,25 +186,16 @@ private BinaryString readStringInternal(int length) {
186186
private byte[] readBytesInternal(int length) {
187187
byte[] bytes = new byte[length];
188188
segment.get(position, bytes, 0, length);
189-
190-
int newLen = 0;
191-
for (int i = length - 1; i >= 0; i--) {
192-
if (bytes[i] != (byte) 0) {
193-
newLen = i + 1;
194-
break;
195-
}
196-
}
197-
198189
position += length;
199-
return Arrays.copyOfRange(bytes, 0, newLen);
190+
return bytes;
200191
}
201192

202193
/**
203194
* Creates an accessor for reading elements.
204195
*
205196
* @param fieldType the element type of the row
206197
*/
207-
static FieldReader createFieldReader(DataType fieldType) {
198+
public static FieldReader createFieldReader(DataType fieldType) {
208199
final FieldReader fieldReader;
209200
// ordered by type root definition
210201
switch (fieldType.getTypeRoot()) {
@@ -219,8 +210,7 @@ static FieldReader createFieldReader(DataType fieldType) {
219210
fieldReader = (reader, pos) -> reader.readBoolean();
220211
break;
221212
case BINARY:
222-
final int binaryLength = getLength(fieldType);
223-
fieldReader = (reader, pos) -> reader.readBinary(binaryLength);
213+
fieldReader = (reader, pos) -> reader.readBinary();
224214
break;
225215
case BYTES:
226216
fieldReader = (reader, pos) -> reader.readBytes();
@@ -277,7 +267,7 @@ static FieldReader createFieldReader(DataType fieldType) {
277267
*
278268
* @see #createFieldReader(DataType)
279269
*/
280-
interface FieldReader extends Serializable {
270+
public interface FieldReader extends Serializable {
281271
Object readField(IndexedRowReader reader, int pos);
282272
}
283273
}

fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRowWriter.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -154,9 +154,8 @@ public void writeBinary(byte[] value, int length) {
154154
if (value.length > length) {
155155
throw new IllegalArgumentException();
156156
}
157-
byte[] newByte = new byte[length];
158-
System.arraycopy(value, 0, newByte, 0, value.length);
159-
write(newByte, 0, length);
157+
writeVarLengthToVarLengthList(value.length);
158+
write(value, 0, value.length);
160159
}
161160

162161
public void writeBytes(byte[] value) {
@@ -173,7 +172,10 @@ public void writeDecimal(Decimal value, int precision) {
173172
}
174173

175174
public void writeTimestampNtz(TimestampNtz value, int precision) {
176-
if (TimestampNtz.isCompact(precision)) {
175+
if (precision == 0) {
176+
// truncate to seconds to keep consistence with ArrowTimestampNtzWriter
177+
writeLong(value.getMillisecond() / 1000 * 1000);
178+
} else if (TimestampNtz.isCompact(precision)) {
177179
writeLong(value.getMillisecond());
178180
} else {
179181
writeLong(value.getMillisecond());
@@ -182,7 +184,10 @@ public void writeTimestampNtz(TimestampNtz value, int precision) {
182184
}
183185

184186
public void writeTimestampLtz(TimestampLtz value, int precision) {
185-
if (TimestampLtz.isCompact(precision)) {
187+
if (precision == 0) {
188+
// truncate to seconds to keep consistence with ArrowTimestampLtzWriter
189+
writeLong(value.getEpochMillisecond() / 1000 * 1000);
190+
} else if (TimestampLtz.isCompact(precision)) {
186191
writeLong(value.getEpochMillisecond());
187192
} else {
188193
writeLong(value.getEpochMillisecond());
@@ -301,9 +306,20 @@ public static FieldWriter createFieldWriter(DataType fieldType) {
301306
break;
302307
case INTEGER:
303308
case DATE:
304-
case TIME_WITHOUT_TIME_ZONE:
305309
fieldWriter = (writer, pos, value) -> writer.writeInt((int) value);
306310
break;
311+
case TIME_WITHOUT_TIME_ZONE:
312+
final int timePrecision = getPrecision(fieldType);
313+
fieldWriter =
314+
(writer, pos, value) -> {
315+
if (timePrecision == 0) {
316+
// truncate to seconds to keep consistence with ArrowTimeWriter
317+
writer.writeInt((int) value / 1000 * 1000);
318+
} else {
319+
writer.writeInt((int) value);
320+
}
321+
};
322+
break;
307323
case BIGINT:
308324
fieldWriter = (writer, pos, value) -> writer.writeLong((long) value);
309325
break;

0 commit comments

Comments
 (0)