Skip to content

Commit f56a70c

Browse files
committed
patch tests
1 parent a71ba7c commit f56a70c

File tree

6 files changed

+162
-30
lines changed

6 files changed

+162
-30
lines changed

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/reader/SortMergeReaderTest.java

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.fluss.flink.lake.reader;
1919

20-
import org.apache.fluss.client.table.scanner.ScanRecord;
2120
import org.apache.fluss.record.ChangeType;
2221
import org.apache.fluss.record.LogRecord;
2322
import org.apache.fluss.row.BinaryString;
@@ -153,8 +152,44 @@ private List<LogRecord> createRecords(int startId, int count, boolean isLog) {
153152
startId + i,
154153
BinaryString.fromString(isLog ? "a" + "_updated" : "a"),
155154
BinaryString.fromString(isLog ? "A" + "_updated" : "A"));
156-
logRecords.add(new ScanRecord(i, System.currentTimeMillis(), ChangeType.INSERT, row));
155+
logRecords.add(
156+
new TestLogRecord(i, System.currentTimeMillis(), ChangeType.INSERT, row));
157157
}
158158
return logRecords;
159159
}
160+
161+
/** Simple LogRecord for tests. */
162+
private static final class TestLogRecord implements LogRecord {
163+
private final long offset;
164+
private final long ts;
165+
private final ChangeType ct;
166+
private final InternalRow row;
167+
168+
private TestLogRecord(long offset, long ts, ChangeType ct, InternalRow row) {
169+
this.offset = offset;
170+
this.ts = ts;
171+
this.ct = ct;
172+
this.row = row;
173+
}
174+
175+
@Override
176+
public long logOffset() {
177+
return offset;
178+
}
179+
180+
@Override
181+
public long timestamp() {
182+
return ts;
183+
}
184+
185+
@Override
186+
public ChangeType getChangeType() {
187+
return ct;
188+
}
189+
190+
@Override
191+
public InternalRow getRow() {
192+
return row;
193+
}
194+
}
160195
}

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -248,17 +248,17 @@ void testAppendLogWithRoundRobin() throws Exception {
248248
Configuration clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
249249
try (Connection conn = ConnectionFactory.createConnection(clientConf);
250250
Table table = conn.getTable(TablePath.of(DEFAULT_DB, "sink_test"));
251-
LogScanner logScanner = table.newScan().createLogScanner()) {
251+
LogScanner<InternalRow> logScanner = table.newScan().createLogScanner()) {
252252
logScanner.subscribeFromBeginning(0);
253253
logScanner.subscribeFromBeginning(1);
254254
logScanner.subscribeFromBeginning(2);
255255
long scanned = 0;
256256
while (scanned < 6) {
257-
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
257+
ScanRecords<InternalRow> scanRecords = logScanner.poll(Duration.ofSeconds(1));
258258
for (TableBucket bucket : scanRecords.buckets()) {
259259
List<String> rowsBucket =
260260
rows.computeIfAbsent(bucket.getBucket(), k -> new ArrayList<>());
261-
for (ScanRecord record : scanRecords.records(bucket)) {
261+
for (ScanRecord<InternalRow> record : scanRecords.records(bucket)) {
262262
InternalRow row = record.getRow();
263263
rowsBucket.add(
264264
Row.of(row.getInt(0), row.getLong(1), row.getString(2).toString())

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlussSinkITCase.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ public void testRowDataTablePKSink() throws Exception {
149149
env.executeAsync("Test RowData Fluss Sink");
150150

151151
Table table = conn.getTable(new TablePath(DEFAULT_DB, pkTableName));
152-
LogScanner logScanner = table.newScan().createLogScanner();
152+
LogScanner<InternalRow> logScanner = table.newScan().createLogScanner();
153153

154154
int numBuckets = table.getTableInfo().getNumBuckets();
155155
for (int i = 0; i < numBuckets; i++) {
@@ -159,9 +159,9 @@ public void testRowDataTablePKSink() throws Exception {
159159
List<RowData> rows = new ArrayList<>();
160160

161161
while (rows.size() < inputRows.size()) {
162-
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
162+
ScanRecords<InternalRow> scanRecords = logScanner.poll(Duration.ofSeconds(1));
163163
for (TableBucket bucket : scanRecords.buckets()) {
164-
for (ScanRecord record : scanRecords.records(bucket)) {
164+
for (ScanRecord<InternalRow> record : scanRecords.records(bucket)) {
165165
RowData row = converter.toFlinkRowData(record.getRow());
166166
row.setRowKind(toFlinkRowKind(record.getChangeType()));
167167
rows.add(row);
@@ -222,7 +222,7 @@ public void testRowDataTableLogSink() throws Exception {
222222
env.executeAsync("Test RowData Fluss Sink");
223223

224224
Table table = conn.getTable(new TablePath(DEFAULT_DB, logTableName));
225-
LogScanner logScanner = table.newScan().createLogScanner();
225+
LogScanner<InternalRow> logScanner = table.newScan().createLogScanner();
226226

227227
int numBuckets = table.getTableInfo().getNumBuckets();
228228
for (int i = 0; i < numBuckets; i++) {
@@ -232,9 +232,9 @@ public void testRowDataTableLogSink() throws Exception {
232232
List<RowData> rows = new ArrayList<>();
233233

234234
while (rows.size() < inputRows.size()) {
235-
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
235+
ScanRecords<InternalRow> scanRecords = logScanner.poll(Duration.ofSeconds(1));
236236
for (TableBucket bucket : scanRecords.buckets()) {
237-
for (ScanRecord record : scanRecords.records(bucket)) {
237+
for (ScanRecord<InternalRow> record : scanRecords.records(bucket)) {
238238
RowData row = converter.toFlinkRowData(record.getRow());
239239
row.setRowKind(toFlinkRowKind(record.getChangeType()));
240240
rows.add(row);
@@ -288,9 +288,9 @@ public void testOrdersTablePKSink() throws Exception {
288288

289289
List<TestOrder> rows = new ArrayList<>();
290290
while (rows.size() < orders.size()) {
291-
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
291+
ScanRecords<InternalRow> scanRecords = logScanner.poll(Duration.ofSeconds(1));
292292
for (TableBucket bucket : scanRecords.buckets()) {
293-
for (ScanRecord record : scanRecords.records(bucket)) {
293+
for (ScanRecord<InternalRow> record : scanRecords.records(bucket)) {
294294
InternalRow row = record.getRow();
295295
TestOrder order =
296296
new TestOrder(

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/deserializer/FlussDeserializationSchemaTest.java

Lines changed: 45 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@
2121
import org.apache.fluss.flink.source.testutils.Order;
2222
import org.apache.fluss.flink.source.testutils.OrderDeserializationSchema;
2323
import org.apache.fluss.record.ChangeType;
24+
import org.apache.fluss.record.LogRecord;
2425
import org.apache.fluss.row.BinaryString;
2526
import org.apache.fluss.row.Decimal;
2627
import org.apache.fluss.row.GenericRow;
28+
import org.apache.fluss.row.InternalRow;
2729
import org.apache.fluss.row.TimestampLtz;
2830
import org.apache.fluss.row.TimestampNtz;
2931
import org.apache.fluss.types.DataField;
@@ -48,6 +50,36 @@
4850
* conversion from Fluss records to various target formats.
4951
*/
5052
public class FlussDeserializationSchemaTest {
53+
54+
/** Lightweight adapter to view a {@code ScanRecord<InternalRow>} as a {@link LogRecord}. */
55+
private static final class ScanRecordLogRecord implements LogRecord {
56+
private final ScanRecord<InternalRow> delegate;
57+
58+
private ScanRecordLogRecord(ScanRecord<InternalRow> delegate) {
59+
this.delegate = delegate;
60+
}
61+
62+
@Override
63+
public long logOffset() {
64+
return delegate.logOffset();
65+
}
66+
67+
@Override
68+
public long timestamp() {
69+
return delegate.timestamp();
70+
}
71+
72+
@Override
73+
public ChangeType getChangeType() {
74+
return delegate.getChangeType();
75+
}
76+
77+
@Override
78+
public InternalRow getRow() {
79+
return delegate.getRow();
80+
}
81+
}
82+
5183
@Test
5284
public void testDeserialize() throws Exception {
5385
// Create GenericRow with proper types
@@ -57,10 +89,10 @@ public void testDeserialize() throws Exception {
5789
row.setField(2, 3);
5890
row.setField(3, BinaryString.fromString("123 Main St"));
5991

60-
ScanRecord scanRecord = new ScanRecord(row);
92+
ScanRecord<InternalRow> scanRecord = new ScanRecord<>(row);
6193
OrderDeserializationSchema deserializer = new OrderDeserializationSchema();
6294

63-
Order result = deserializer.deserialize(scanRecord);
95+
Order result = deserializer.deserialize(new ScanRecordLogRecord(scanRecord));
6496

6597
assertThat(result.getOrderId()).isEqualTo(1001L);
6698
assertThat(result.getItemId()).isEqualTo(5001L);
@@ -76,10 +108,10 @@ public void testDeserializeWithNumericConversion() throws Exception {
76108
row.setField(2, 4);
77109
row.setField(3, BinaryString.fromString("456 Oak Ave"));
78110

79-
ScanRecord scanRecord = new ScanRecord(row);
111+
ScanRecord<InternalRow> scanRecord = new ScanRecord<>(row);
80112
OrderDeserializationSchema schema = new OrderDeserializationSchema();
81113

82-
Order result = schema.deserialize(scanRecord);
114+
Order result = schema.deserialize(new ScanRecordLogRecord(scanRecord));
83115

84116
assertThat(result.getOrderId()).isEqualTo(1002L);
85117
assertThat(result.getItemId()).isEqualTo(5002L);
@@ -95,10 +127,10 @@ public void testDeserializeWithNullValues() throws Exception {
95127
row.setField(2, 5);
96128
row.setField(3, null);
97129

98-
ScanRecord scanRecord = new ScanRecord(row);
130+
ScanRecord<InternalRow> scanRecord = new ScanRecord<>(row);
99131
OrderDeserializationSchema schema = new OrderDeserializationSchema();
100132

101-
Order result = schema.deserialize(scanRecord);
133+
Order result = schema.deserialize(new ScanRecordLogRecord(scanRecord));
102134

103135
assertThat(result.getOrderId()).isEqualTo(1003L);
104136
assertThat(result.getItemId()).isEqualTo(5003L);
@@ -169,13 +201,13 @@ public void testJsonStringDeserialize() throws Exception {
169201
row.setField(14, TimestampNtz.fromMillis(testTimestampInSeconds * 1000));
170202
row.setField(15, TimestampLtz.fromEpochMillis(testTimestampInSeconds * 1000));
171203
row.setField(16, null);
172-
ScanRecord scanRecord = new ScanRecord(row);
204+
ScanRecord<InternalRow> scanRecord = new ScanRecord<>(row);
173205

174206
// Create deserializer
175207
JsonStringDeserializationSchema deserializer = new JsonStringDeserializationSchema();
176208
// Test deserialization
177209
deserializer.open(new DeserializerInitContextImpl(null, null, rowType));
178-
String result = deserializer.deserialize(scanRecord);
210+
String result = deserializer.deserialize(new ScanRecordLogRecord(scanRecord));
179211

180212
String rowJson =
181213
"{"
@@ -206,8 +238,9 @@ public void testJsonStringDeserialize() throws Exception {
206238
+ "}");
207239

208240
// Verify with offset and timestamp
209-
ScanRecord scanRecord2 = new ScanRecord(1001, 1743261788400L, ChangeType.DELETE, row);
210-
String result2 = deserializer.deserialize(scanRecord2);
241+
ScanRecord<InternalRow> scanRecord2 =
242+
new ScanRecord<>(1001, 1743261788400L, ChangeType.DELETE, row);
243+
String result2 = deserializer.deserialize(new ScanRecordLogRecord(scanRecord2));
211244
assertThat(result2).isNotNull();
212245
assertThat(result2)
213246
.isEqualTo(
@@ -220,8 +253,8 @@ public void testJsonStringDeserialize() throws Exception {
220253
row.setField(2, true);
221254
row.setField(8, 512);
222255
row.setField(13, 72000000);
223-
ScanRecord changedRecord = new ScanRecord(row);
224-
String changedResult = deserializer.deserialize(changedRecord);
256+
ScanRecord<InternalRow> changedRecord = new ScanRecord<>(row);
257+
String changedResult = deserializer.deserialize(new ScanRecordLogRecord(changedRecord));
225258
String changedRowJson =
226259
"{"
227260
+ "\"char\":\"b\","

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/deserializer/RowDataDeserializationSchemaTest.java

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@
1818
package org.apache.fluss.flink.source.deserializer;
1919

2020
import org.apache.fluss.client.table.scanner.ScanRecord;
21+
import org.apache.fluss.record.ChangeType;
22+
import org.apache.fluss.record.LogRecord;
2123
import org.apache.fluss.row.BinaryString;
2224
import org.apache.fluss.row.GenericRow;
25+
import org.apache.fluss.row.InternalRow;
2326
import org.apache.fluss.types.DataField;
2427
import org.apache.fluss.types.DataTypes;
2528
import org.apache.fluss.types.RowType;
@@ -77,10 +80,10 @@ public void testDeserialize() throws Exception {
7780
row.setField(2, 45);
7881
row.setField(3, BinaryString.fromString("Test addr"));
7982

80-
ScanRecord scanRecord = new ScanRecord(row);
83+
ScanRecord<InternalRow> scanRecord = new ScanRecord<>(row);
8184

8285
RowDataDeserializationSchema deserializer = getRowDataDeserializationSchema(rowType);
83-
RowData result = deserializer.deserialize(scanRecord);
86+
RowData result = deserializer.deserialize(new ScanRecordLogRecord(scanRecord));
8487

8588
assertThat(result.getArity()).isEqualTo(4);
8689
assertThat(result.getLong(0)).isEqualTo(100L);
@@ -89,6 +92,35 @@ public void testDeserialize() throws Exception {
8992
assertThat(result.getString(3).toString()).isEqualTo("Test addr");
9093
}
9194

95+
/** Lightweight adapter to view a {@code ScanRecord<InternalRow>} as a {@link LogRecord}. */
96+
private static final class ScanRecordLogRecord implements LogRecord {
97+
private final ScanRecord<InternalRow> delegate;
98+
99+
private ScanRecordLogRecord(ScanRecord<InternalRow> delegate) {
100+
this.delegate = delegate;
101+
}
102+
103+
@Override
104+
public long logOffset() {
105+
return delegate.logOffset();
106+
}
107+
108+
@Override
109+
public long timestamp() {
110+
return delegate.timestamp();
111+
}
112+
113+
@Override
114+
public ChangeType getChangeType() {
115+
return delegate.getChangeType();
116+
}
117+
118+
@Override
119+
public InternalRow getRow() {
120+
return delegate.getRow();
121+
}
122+
}
123+
92124
private @NotNull RowDataDeserializationSchema getRowDataDeserializationSchema(RowType rowType)
93125
throws Exception {
94126
RowDataDeserializationSchema deserializationSchema = new RowDataDeserializationSchema();

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlussRowToFlinkRowConverterTest.java

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
import org.apache.fluss.client.table.scanner.ScanRecord;
2121
import org.apache.fluss.record.ChangeType;
22+
import org.apache.fluss.record.LogRecord;
23+
import org.apache.fluss.row.InternalRow;
2224
import org.apache.fluss.row.indexed.IndexedRow;
2325
import org.apache.fluss.row.indexed.IndexedRowWriter;
2426
import org.apache.fluss.types.DataType;
@@ -52,9 +54,10 @@ void testConverter() throws Exception {
5254
try (IndexedRowWriter writer = genRecordForAllTypes(createAllTypes())) {
5355
row.pointTo(writer.segment(), 0, writer.position());
5456

55-
ScanRecord scanRecord = new ScanRecord(0, 1L, ChangeType.UPDATE_BEFORE, row);
56-
57-
RowData flinkRow = flussRowToFlinkRowConverter.toFlinkRowData(scanRecord);
57+
ScanRecord<InternalRow> scanRecord =
58+
new ScanRecord<>(0, 1L, ChangeType.UPDATE_BEFORE, row);
59+
RowData flinkRow =
60+
flussRowToFlinkRowConverter.toFlinkRowData(new ScanRecordLogRecord(scanRecord));
5861
assertThat(flinkRow.getArity()).isEqualTo(rowType.getFieldCount());
5962
assertThat(flinkRow.getRowKind()).isEqualTo(RowKind.UPDATE_BEFORE);
6063

@@ -93,4 +96,33 @@ void testConverter() throws Exception {
9396
assertThat(flinkRow.isNullAt(18)).isTrue();
9497
}
9598
}
99+
100+
/** Lightweight adapter to view a {@code ScanRecord<InternalRow>} as a {@link LogRecord}. */
101+
private static final class ScanRecordLogRecord implements LogRecord {
102+
private final ScanRecord<InternalRow> delegate;
103+
104+
private ScanRecordLogRecord(ScanRecord<InternalRow> delegate) {
105+
this.delegate = delegate;
106+
}
107+
108+
@Override
109+
public long logOffset() {
110+
return delegate.logOffset();
111+
}
112+
113+
@Override
114+
public long timestamp() {
115+
return delegate.timestamp();
116+
}
117+
118+
@Override
119+
public ChangeType getChangeType() {
120+
return delegate.getChangeType();
121+
}
122+
123+
@Override
124+
public InternalRow getRow() {
125+
return delegate.getRow();
126+
}
127+
}
96128
}

0 commit comments

Comments
 (0)