Skip to content

Commit aefb78c

Browse files
committed
[common] Ensure consistency among CompactedRowWriter, IndexedRowWriter and ArrowTimeWriter when writing to the TIME field.
1 parent ec14e1b commit aefb78c

File tree

3 files changed

+81
-2
lines changed

3 files changed

+81
-2
lines changed

fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.fluss.row.BinaryString;
4848
import org.apache.fluss.row.GenericRow;
4949
import org.apache.fluss.row.InternalRow;
50+
import org.apache.fluss.row.ProjectedRow;
5051
import org.apache.fluss.row.indexed.IndexedRow;
5152
import org.apache.fluss.types.BigIntType;
5253
import org.apache.fluss.types.DataTypes;
@@ -57,17 +58,21 @@
5758
import org.junit.jupiter.api.Disabled;
5859
import org.junit.jupiter.api.Test;
5960
import org.junit.jupiter.params.ParameterizedTest;
61+
import org.junit.jupiter.params.provider.Arguments;
6062
import org.junit.jupiter.params.provider.CsvSource;
63+
import org.junit.jupiter.params.provider.MethodSource;
6164
import org.junit.jupiter.params.provider.ValueSource;
6265

6366
import javax.annotation.Nullable;
6467

6568
import java.time.Duration;
69+
import java.time.LocalTime;
6670
import java.util.ArrayList;
6771
import java.util.Collections;
6872
import java.util.Iterator;
6973
import java.util.List;
7074
import java.util.concurrent.CompletableFuture;
75+
import java.util.stream.Stream;
7176

7277
import static org.apache.fluss.client.table.scanner.batch.BatchScanUtils.collectRows;
7378
import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
@@ -113,6 +118,58 @@ void testAppendOnly() throws Exception {
113118
}
114119
}
115120

121+
private static Stream<Arguments> upsertTimeValueForAllPrecisionsArgs() {
122+
return Stream.of(Arguments.of(KvFormat.INDEXED), Arguments.of(KvFormat.COMPACTED));
123+
}
124+
125+
@ParameterizedTest
126+
@MethodSource("upsertTimeValueForAllPrecisionsArgs")
127+
void testUpsertTimeValueForAllPrecisions(KvFormat kvFormat) throws Exception {
128+
LocalTime time = LocalTime.of(10, 10, 10, 123000000);
129+
long mill = time.toNanoOfDay() / 1_000_000;
130+
131+
// check for all precisions
132+
for (int i = 0; i <= 9; i++) {
133+
Schema schema =
134+
Schema.newBuilder()
135+
.column("a", DataTypes.TIME(i))
136+
.column("b", DataTypes.INT())
137+
.primaryKey("a", "b")
138+
.build();
139+
TablePath tablePath = TablePath.of("test_db_" + i, "test_pk_table_" + i);
140+
TableDescriptor tableDescriptor =
141+
TableDescriptor.builder()
142+
.schema(schema)
143+
.property(ConfigOptions.TABLE_KV_FORMAT, kvFormat)
144+
.distributedBy(1)
145+
.build();
146+
createTable(tablePath, tableDescriptor, false);
147+
try (Table table = conn.getTable(tablePath)) {
148+
UpsertWriter upsertWriter = table.newUpsert().createWriter();
149+
upsertWriter.upsert(row((int) mill, 1)).get();
150+
upsertWriter.flush();
151+
152+
LogScanner logScanner = createLogScanner(table);
153+
subscribeFromBeginning(logScanner, table);
154+
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
155+
Integer scanResult = null;
156+
for (ScanRecord scanRecord : scanRecords) {
157+
InternalRow row = scanRecord.getRow();
158+
scanResult = row.getInt(0);
159+
}
160+
161+
Lookuper lookuper = table.newLookup().createLookuper();
162+
ProjectedRow keyRow = ProjectedRow.from(schema.getPrimaryKeyIndexes());
163+
keyRow.replaceRow(row((int) mill, 1));
164+
InternalRow singletonRow = lookuper.lookup(keyRow).get().getSingletonRow();
165+
assertThat(singletonRow).isNotNull();
166+
Integer lookupResult = singletonRow.getInt(0);
167+
168+
assertThat(scanResult).isEqualTo(lookupResult);
169+
}
170+
}
171+
}
172+
116173
@ParameterizedTest
117174
@ValueSource(booleans = {true, false})
118175
@Disabled("TODO, fix me in #116")

fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRowWriter.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -332,9 +332,20 @@ public static FieldWriter createFieldWriter(DataType fieldType) {
332332
break;
333333
case INTEGER:
334334
case DATE:
335-
case TIME_WITHOUT_TIME_ZONE:
336335
fieldWriter = (writer, pos, value) -> writer.writeInt((int) value);
337336
break;
337+
case TIME_WITHOUT_TIME_ZONE:
338+
final int timePrecision = getPrecision(fieldType);
339+
fieldWriter =
340+
(writer, pos, value) -> {
341+
if (timePrecision == 0) {
342+
// truncate to seconds to keep consistence with ArrowTimeWriter
343+
writer.writeInt((int) value / 1000 * 1000);
344+
} else {
345+
writer.writeInt((int) value);
346+
}
347+
};
348+
break;
338349
case BIGINT:
339350
fieldWriter = (writer, pos, value) -> writer.writeLong((long) value);
340351
break;

fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRowWriter.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,9 +301,20 @@ public static FieldWriter createFieldWriter(DataType fieldType) {
301301
break;
302302
case INTEGER:
303303
case DATE:
304-
case TIME_WITHOUT_TIME_ZONE:
305304
fieldWriter = (writer, pos, value) -> writer.writeInt((int) value);
306305
break;
306+
case TIME_WITHOUT_TIME_ZONE:
307+
final int timePrecision = getPrecision(fieldType);
308+
fieldWriter =
309+
(writer, pos, value) -> {
310+
if (timePrecision == 0) {
311+
// truncate to seconds to keep consistence with ArrowTimeWriter
312+
writer.writeInt((int) value / 1000 * 1000);
313+
} else {
314+
writer.writeInt((int) value);
315+
}
316+
};
317+
break;
307318
case BIGINT:
308319
fieldWriter = (writer, pos, value) -> writer.writeLong((long) value);
309320
break;

0 commit comments

Comments
 (0)