Skip to content

Commit f082dd7

Browse files
committed
[common] Ensure consistency read and write among Compacted, Indexed and Arrow fields.
1 parent 7ffbae3 commit f082dd7

File tree

6 files changed

+404
-10
lines changed

6 files changed

+404
-10
lines changed

fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.fluss.row.BinaryString;
4848
import org.apache.fluss.row.GenericRow;
4949
import org.apache.fluss.row.InternalRow;
50+
import org.apache.fluss.row.ProjectedRow;
5051
import org.apache.fluss.row.indexed.IndexedRow;
5152
import org.apache.fluss.types.BigIntType;
5253
import org.apache.fluss.types.DataTypes;
@@ -57,17 +58,21 @@
5758
import org.junit.jupiter.api.Disabled;
5859
import org.junit.jupiter.api.Test;
5960
import org.junit.jupiter.params.ParameterizedTest;
61+
import org.junit.jupiter.params.provider.Arguments;
6062
import org.junit.jupiter.params.provider.CsvSource;
63+
import org.junit.jupiter.params.provider.MethodSource;
6164
import org.junit.jupiter.params.provider.ValueSource;
6265

6366
import javax.annotation.Nullable;
6467

6568
import java.time.Duration;
69+
import java.time.LocalTime;
6670
import java.util.ArrayList;
6771
import java.util.Collections;
6872
import java.util.Iterator;
6973
import java.util.List;
7074
import java.util.concurrent.CompletableFuture;
75+
import java.util.stream.Stream;
7176

7277
import static org.apache.fluss.client.table.scanner.batch.BatchScanUtils.collectRows;
7378
import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
@@ -113,6 +118,58 @@ void testAppendOnly() throws Exception {
113118
}
114119
}
115120

121+
private static Stream<Arguments> upsertTimeValueForAllPrecisionsArgs() {
122+
return Stream.of(Arguments.of(KvFormat.INDEXED), Arguments.of(KvFormat.COMPACTED));
123+
}
124+
125+
@ParameterizedTest
126+
@MethodSource("upsertTimeValueForAllPrecisionsArgs")
127+
void testUpsertTimeValueForAllPrecisions(KvFormat kvFormat) throws Exception {
128+
LocalTime time = LocalTime.of(10, 10, 10, 123000000);
129+
long mill = time.toNanoOfDay() / 1_000_000;
130+
131+
// check for all precisions
132+
for (int i = 0; i <= 9; i++) {
133+
Schema schema =
134+
Schema.newBuilder()
135+
.column("a", DataTypes.TIME(i))
136+
.column("b", DataTypes.INT())
137+
.primaryKey("a", "b")
138+
.build();
139+
TablePath tablePath = TablePath.of("test_db_" + i, "test_pk_table_" + i);
140+
TableDescriptor tableDescriptor =
141+
TableDescriptor.builder()
142+
.schema(schema)
143+
.property(ConfigOptions.TABLE_KV_FORMAT, kvFormat)
144+
.distributedBy(1)
145+
.build();
146+
createTable(tablePath, tableDescriptor, false);
147+
try (Table table = conn.getTable(tablePath)) {
148+
UpsertWriter upsertWriter = table.newUpsert().createWriter();
149+
upsertWriter.upsert(row((int) mill, 1)).get();
150+
upsertWriter.flush();
151+
152+
LogScanner logScanner = createLogScanner(table);
153+
subscribeFromBeginning(logScanner, table);
154+
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
155+
Integer scanResult = null;
156+
for (ScanRecord scanRecord : scanRecords) {
157+
InternalRow row = scanRecord.getRow();
158+
scanResult = row.getInt(0);
159+
}
160+
161+
Lookuper lookuper = table.newLookup().createLookuper();
162+
ProjectedRow keyRow = ProjectedRow.from(schema.getPrimaryKeyIndexes());
163+
keyRow.replaceRow(row((int) mill, 1));
164+
InternalRow singletonRow = lookuper.lookup(keyRow).get().getSingletonRow();
165+
assertThat(singletonRow).isNotNull();
166+
Integer lookupResult = singletonRow.getInt(0);
167+
168+
assertThat(scanResult).isEqualTo(lookupResult);
169+
}
170+
}
171+
}
172+
116173
@ParameterizedTest
117174
@ValueSource(booleans = {true, false})
118175
@Disabled("TODO, fix me in #116")

fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRowReader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ private long readLongSlowPath() {
258258
*
259259
* @param fieldType the element type of the row
260260
*/
261-
static FieldReader createFieldReader(DataType fieldType) {
261+
public static FieldReader createFieldReader(DataType fieldType) {
262262
final FieldReader fieldReader;
263263
// ordered by type root definition
264264
switch (fieldType.getTypeRoot()) {
@@ -325,7 +325,7 @@ static FieldReader createFieldReader(DataType fieldType) {
325325
*
326326
* @see #createFieldReader(DataType)
327327
*/
328-
interface FieldReader extends Serializable {
328+
public interface FieldReader extends Serializable {
329329
Object readField(CompactedRowReader reader, int pos);
330330
}
331331
}

fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRowWriter.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,10 @@ private void writeDouble(double value) {
251251
}
252252

253253
private void writeTimestampNtz(TimestampNtz value, int precision) {
254-
if (TimestampNtz.isCompact(precision)) {
254+
if (precision == 0) {
255+
// truncate to seconds to keep consistence with ArrowTimestampNtzWriter
256+
writeLong(value.getMillisecond() / 1000 * 1000);
257+
} else if (TimestampNtz.isCompact(precision)) {
255258
writeLong(value.getMillisecond());
256259
} else {
257260
writeLong(value.getMillisecond());
@@ -260,7 +263,10 @@ private void writeTimestampNtz(TimestampNtz value, int precision) {
260263
}
261264

262265
private void writeTimestampLtz(TimestampLtz value, int precision) {
263-
if (TimestampLtz.isCompact(precision)) {
266+
if (precision == 0) {
267+
// truncate to seconds to keep consistence with ArrowTimestampLtzWriter
268+
writeLong(value.getEpochMillisecond() / 1000 * 1000);
269+
} else if (TimestampLtz.isCompact(precision)) {
264270
writeLong(value.getEpochMillisecond());
265271
} else {
266272
writeLong(value.getEpochMillisecond());
@@ -332,9 +338,20 @@ public static FieldWriter createFieldWriter(DataType fieldType) {
332338
break;
333339
case INTEGER:
334340
case DATE:
335-
case TIME_WITHOUT_TIME_ZONE:
336341
fieldWriter = (writer, pos, value) -> writer.writeInt((int) value);
337342
break;
343+
case TIME_WITHOUT_TIME_ZONE:
344+
final int timePrecision = getPrecision(fieldType);
345+
fieldWriter =
346+
(writer, pos, value) -> {
347+
if (timePrecision == 0) {
348+
// truncate to seconds to keep consistence with ArrowTimeWriter
349+
writer.writeInt((int) value / 1000 * 1000);
350+
} else {
351+
writer.writeInt((int) value);
352+
}
353+
};
354+
break;
338355
case BIGINT:
339356
fieldWriter = (writer, pos, value) -> writer.writeLong((long) value);
340357
break;

fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRowReader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ private byte[] readBytesInternal(int length) {
204204
*
205205
* @param fieldType the element type of the row
206206
*/
207-
static FieldReader createFieldReader(DataType fieldType) {
207+
public static FieldReader createFieldReader(DataType fieldType) {
208208
final FieldReader fieldReader;
209209
// ordered by type root definition
210210
switch (fieldType.getTypeRoot()) {
@@ -277,7 +277,7 @@ static FieldReader createFieldReader(DataType fieldType) {
277277
*
278278
* @see #createFieldReader(DataType)
279279
*/
280-
interface FieldReader extends Serializable {
280+
public interface FieldReader extends Serializable {
281281
Object readField(IndexedRowReader reader, int pos);
282282
}
283283
}

fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRowWriter.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,10 @@ public void writeDecimal(Decimal value, int precision) {
173173
}
174174

175175
public void writeTimestampNtz(TimestampNtz value, int precision) {
176-
if (TimestampNtz.isCompact(precision)) {
176+
if (precision == 0) {
177+
// truncate to seconds to keep consistence with ArrowTimestampNtzWriter
178+
writeLong(value.getMillisecond() / 1000 * 1000);
179+
} else if (TimestampNtz.isCompact(precision)) {
177180
writeLong(value.getMillisecond());
178181
} else {
179182
writeLong(value.getMillisecond());
@@ -182,7 +185,10 @@ public void writeTimestampNtz(TimestampNtz value, int precision) {
182185
}
183186

184187
public void writeTimestampLtz(TimestampLtz value, int precision) {
185-
if (TimestampLtz.isCompact(precision)) {
188+
if (precision == 0) {
189+
// truncate to seconds to keep consistence with ArrowTimestampLtzWriter
190+
writeLong(value.getEpochMillisecond() / 1000 * 1000);
191+
} else if (TimestampLtz.isCompact(precision)) {
186192
writeLong(value.getEpochMillisecond());
187193
} else {
188194
writeLong(value.getEpochMillisecond());
@@ -301,9 +307,20 @@ public static FieldWriter createFieldWriter(DataType fieldType) {
301307
break;
302308
case INTEGER:
303309
case DATE:
304-
case TIME_WITHOUT_TIME_ZONE:
305310
fieldWriter = (writer, pos, value) -> writer.writeInt((int) value);
306311
break;
312+
case TIME_WITHOUT_TIME_ZONE:
313+
final int timePrecision = getPrecision(fieldType);
314+
fieldWriter =
315+
(writer, pos, value) -> {
316+
if (timePrecision == 0) {
317+
// truncate to seconds to keep consistence with ArrowTimeWriter
318+
writer.writeInt((int) value / 1000 * 1000);
319+
} else {
320+
writer.writeInt((int) value);
321+
}
322+
};
323+
break;
307324
case BIGINT:
308325
fieldWriter = (writer, pos, value) -> writer.writeLong((long) value);
309326
break;

0 commit comments

Comments
 (0)