Skip to content

Commit 73c807d

Browse files
committed
[common] Ensure consistency read and write among Compacted, Indexed and Arrow fields
1 parent 7ffbae3 commit 73c807d

File tree

8 files changed

+358
-45
lines changed

8 files changed

+358
-45
lines changed

fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRowReader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ private long readLongSlowPath() {
258258
*
259259
* @param fieldType the element type of the row
260260
*/
261-
static FieldReader createFieldReader(DataType fieldType) {
261+
public static FieldReader createFieldReader(DataType fieldType) {
262262
final FieldReader fieldReader;
263263
// ordered by type root definition
264264
switch (fieldType.getTypeRoot()) {
@@ -325,7 +325,7 @@ static FieldReader createFieldReader(DataType fieldType) {
325325
*
326326
* @see #createFieldReader(DataType)
327327
*/
328-
interface FieldReader extends Serializable {
328+
public interface FieldReader extends Serializable {
329329
Object readField(CompactedRowReader reader, int pos);
330330
}
331331
}

fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRowWriter.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,10 @@ private void writeDouble(double value) {
251251
}
252252

253253
private void writeTimestampNtz(TimestampNtz value, int precision) {
254-
if (TimestampNtz.isCompact(precision)) {
254+
if (precision == 0) {
255+
// truncate to seconds to keep consistence with ArrowTimestampNtzWriter
256+
writeLong(value.getMillisecond() / 1000 * 1000);
257+
} else if (TimestampNtz.isCompact(precision)) {
255258
writeLong(value.getMillisecond());
256259
} else {
257260
writeLong(value.getMillisecond());
@@ -260,7 +263,10 @@ private void writeTimestampNtz(TimestampNtz value, int precision) {
260263
}
261264

262265
private void writeTimestampLtz(TimestampLtz value, int precision) {
263-
if (TimestampLtz.isCompact(precision)) {
266+
if (precision == 0) {
267+
// truncate to seconds to keep consistence with ArrowTimestampLtzWriter
268+
writeLong(value.getEpochMillisecond() / 1000 * 1000);
269+
} else if (TimestampLtz.isCompact(precision)) {
264270
writeLong(value.getEpochMillisecond());
265271
} else {
266272
writeLong(value.getEpochMillisecond());
@@ -332,9 +338,20 @@ public static FieldWriter createFieldWriter(DataType fieldType) {
332338
break;
333339
case INTEGER:
334340
case DATE:
335-
case TIME_WITHOUT_TIME_ZONE:
336341
fieldWriter = (writer, pos, value) -> writer.writeInt((int) value);
337342
break;
343+
case TIME_WITHOUT_TIME_ZONE:
344+
final int timePrecision = getPrecision(fieldType);
345+
fieldWriter =
346+
(writer, pos, value) -> {
347+
if (timePrecision == 0) {
348+
// truncate to seconds to keep consistence with ArrowTimeWriter
349+
writer.writeInt((int) value / 1000 * 1000);
350+
} else {
351+
writer.writeInt((int) value);
352+
}
353+
};
354+
break;
338355
case BIGINT:
339356
fieldWriter = (writer, pos, value) -> writer.writeLong((long) value);
340357
break;

fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRow.java

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@
3636
import org.apache.fluss.types.StringType;
3737
import org.apache.fluss.utils.MurmurHashUtils;
3838

39-
import java.util.Arrays;
40-
4139
import static org.apache.fluss.types.DataTypeChecks.getPrecision;
4240

4341
/**
@@ -252,11 +250,11 @@ public static boolean isFixedLength(DataType dataType) {
252250
case DATE:
253251
case TIME_WITHOUT_TIME_ZONE:
254252
case CHAR:
255-
case BINARY:
256253
case TIMESTAMP_WITHOUT_TIME_ZONE:
257254
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
258255
return true;
259256
case STRING:
257+
case BINARY:
260258
case BYTES:
261259
return false;
262260
case DECIMAL:
@@ -414,17 +412,7 @@ public TimestampLtz getTimestampLtz(int pos, int precision) {
414412

415413
@Override
416414
public byte[] getBinary(int pos, int length) {
417-
byte[] bytes = new byte[length];
418-
segment.get(getFieldOffset(pos), bytes, 0, length);
419-
420-
int newLen = 0;
421-
for (int i = length - 1; i >= 0; i--) {
422-
if (bytes[i] != (byte) 0) {
423-
newLen = i + 1;
424-
break;
425-
}
426-
}
427-
return Arrays.copyOfRange(bytes, 0, newLen);
415+
return getBytes(pos);
428416
}
429417

430418
@Override

fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRowReader.java

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.fluss.types.DataType;
2828

2929
import java.io.Serializable;
30-
import java.util.Arrays;
3130

3231
import static org.apache.fluss.types.DataTypeChecks.getLength;
3332
import static org.apache.fluss.types.DataTypeChecks.getPrecision;
@@ -157,8 +156,8 @@ public TimestampNtz readTimestampNtz(int precision) {
157156
return TimestampNtz.fromMillis(milliseconds, nanosOfMillisecond);
158157
}
159158

160-
public byte[] readBinary(int length) {
161-
return readBytesInternal(length);
159+
public byte[] readBinary() {
160+
return readBytes();
162161
}
163162

164163
public byte[] readBytes() {
@@ -186,25 +185,16 @@ private BinaryString readStringInternal(int length) {
186185
private byte[] readBytesInternal(int length) {
187186
byte[] bytes = new byte[length];
188187
segment.get(position, bytes, 0, length);
189-
190-
int newLen = 0;
191-
for (int i = length - 1; i >= 0; i--) {
192-
if (bytes[i] != (byte) 0) {
193-
newLen = i + 1;
194-
break;
195-
}
196-
}
197-
198188
position += length;
199-
return Arrays.copyOfRange(bytes, 0, newLen);
189+
return bytes;
200190
}
201191

202192
/**
203193
* Creates an accessor for reading elements.
204194
*
205195
* @param fieldType the element type of the row
206196
*/
207-
static FieldReader createFieldReader(DataType fieldType) {
197+
public static FieldReader createFieldReader(DataType fieldType) {
208198
final FieldReader fieldReader;
209199
// ordered by type root definition
210200
switch (fieldType.getTypeRoot()) {
@@ -219,8 +209,7 @@ static FieldReader createFieldReader(DataType fieldType) {
219209
fieldReader = (reader, pos) -> reader.readBoolean();
220210
break;
221211
case BINARY:
222-
final int binaryLength = getLength(fieldType);
223-
fieldReader = (reader, pos) -> reader.readBinary(binaryLength);
212+
fieldReader = (reader, pos) -> reader.readBinary();
224213
break;
225214
case BYTES:
226215
fieldReader = (reader, pos) -> reader.readBytes();
@@ -277,7 +266,7 @@ static FieldReader createFieldReader(DataType fieldType) {
277266
*
278267
* @see #createFieldReader(DataType)
279268
*/
280-
interface FieldReader extends Serializable {
269+
public interface FieldReader extends Serializable {
281270
Object readField(IndexedRowReader reader, int pos);
282271
}
283272
}

fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRowWriter.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -154,9 +154,8 @@ public void writeBinary(byte[] value, int length) {
154154
if (value.length > length) {
155155
throw new IllegalArgumentException();
156156
}
157-
byte[] newByte = new byte[length];
158-
System.arraycopy(value, 0, newByte, 0, value.length);
159-
write(newByte, 0, length);
157+
writeVarLengthToVarLengthList(value.length);
158+
write(value, 0, value.length);
160159
}
161160

162161
public void writeBytes(byte[] value) {
@@ -173,7 +172,10 @@ public void writeDecimal(Decimal value, int precision) {
173172
}
174173

175174
public void writeTimestampNtz(TimestampNtz value, int precision) {
176-
if (TimestampNtz.isCompact(precision)) {
175+
if (precision == 0) {
176+
// truncate to seconds to keep consistence with ArrowTimestampNtzWriter
177+
writeLong(value.getMillisecond() / 1000 * 1000);
178+
} else if (TimestampNtz.isCompact(precision)) {
177179
writeLong(value.getMillisecond());
178180
} else {
179181
writeLong(value.getMillisecond());
@@ -182,7 +184,10 @@ public void writeTimestampNtz(TimestampNtz value, int precision) {
182184
}
183185

184186
public void writeTimestampLtz(TimestampLtz value, int precision) {
185-
if (TimestampLtz.isCompact(precision)) {
187+
if (precision == 0) {
188+
// truncate to seconds to keep consistence with ArrowTimestampLtzWriter
189+
writeLong(value.getEpochMillisecond() / 1000 * 1000);
190+
} else if (TimestampLtz.isCompact(precision)) {
186191
writeLong(value.getEpochMillisecond());
187192
} else {
188193
writeLong(value.getEpochMillisecond());
@@ -301,9 +306,20 @@ public static FieldWriter createFieldWriter(DataType fieldType) {
301306
break;
302307
case INTEGER:
303308
case DATE:
304-
case TIME_WITHOUT_TIME_ZONE:
305309
fieldWriter = (writer, pos, value) -> writer.writeInt((int) value);
306310
break;
311+
case TIME_WITHOUT_TIME_ZONE:
312+
final int timePrecision = getPrecision(fieldType);
313+
fieldWriter =
314+
(writer, pos, value) -> {
315+
if (timePrecision == 0) {
316+
// truncate to seconds to keep consistence with ArrowTimeWriter
317+
writer.writeInt((int) value / 1000 * 1000);
318+
} else {
319+
writer.writeInt((int) value);
320+
}
321+
};
322+
break;
307323
case BIGINT:
308324
fieldWriter = (writer, pos, value) -> writer.writeLong((long) value);
309325
break;

0 commit comments

Comments
 (0)