Skip to content

Commit a71ba7c

Browse files
committed
add missing types to flink module
1 parent cf17b1e commit a71ba7c

File tree

3 files changed

+46
-11
lines changed

3 files changed

+46
-11
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/LakeSnapshotAndLogSplitScanner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public class LakeSnapshotAndLogSplitScanner implements BatchScanner {
6767
// the sorted logs in memory, mapping from key -> value
6868
private Map<InternalRow, KeyValueRow> logRows;
6969

70-
private final LogScanner logScanner;
70+
private final LogScanner<InternalRow> logScanner;
7171
private final long stoppingOffset;
7272
private boolean logScanFinished;
7373

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.fluss.lake.source.LakeSplit;
3939
import org.apache.fluss.metadata.TableBucket;
4040
import org.apache.fluss.metadata.TablePath;
41+
import org.apache.fluss.row.InternalRow;
4142
import org.apache.fluss.types.RowType;
4243
import org.apache.fluss.utils.CloseableIterator;
4344
import org.apache.fluss.utils.ExceptionUtils;
@@ -407,7 +408,7 @@ private void checkSnapshotSplitOrStartNext() {
407408
}
408409
}
409410

410-
private FlinkRecordsWithSplitIds forLogRecords(ScanRecords scanRecords) {
411+
private FlinkRecordsWithSplitIds forLogRecords(ScanRecords<InternalRow> scanRecords) {
411412
// For calculating the currentFetchEventTimeLag
412413
long fetchTimestamp = System.currentTimeMillis();
413414
long maxConsumerRecordTimestampInFetch = -1;
@@ -426,7 +427,7 @@ private FlinkRecordsWithSplitIds forLogRecords(ScanRecords scanRecords) {
426427
}
427428
splitIdByTableBucket.put(scanBucket, splitId);
428429
tableScanBuckets.add(scanBucket);
429-
List<ScanRecord> bucketScanRecords = scanRecords.records(scanBucket);
430+
List<ScanRecord<InternalRow>> bucketScanRecords = scanRecords.records(scanBucket);
430431
if (!bucketScanRecords.isEmpty()) {
431432
final ScanRecord lastRecord = bucketScanRecords.get(bucketScanRecords.size() - 1);
432433
// We keep the maximum message timestamp in the fetch for calculating lags
@@ -478,7 +479,7 @@ public String next() {
478479
}
479480

480481
private CloseableIterator<RecordAndPos> toRecordAndPos(
481-
Iterator<ScanRecord> recordAndPosIterator) {
482+
Iterator<ScanRecord<InternalRow>> recordAndPosIterator) {
482483
return new CloseableIterator<RecordAndPos>() {
483484

484485
@Override

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@
3232
import org.apache.fluss.metadata.TableBucket;
3333
import org.apache.fluss.metadata.TableInfo;
3434
import org.apache.fluss.metadata.TablePath;
35+
import org.apache.fluss.record.ChangeType;
36+
import org.apache.fluss.record.LogRecord;
37+
import org.apache.fluss.row.InternalRow;
3538
import org.apache.fluss.utils.CloseableIterator;
3639

3740
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
@@ -249,11 +252,12 @@ private void mayCreateLogScanner() {
249252
}
250253

251254
private RecordsWithSplitIds<TableBucketWriteResult<WriteResult>> forLogRecords(
252-
ScanRecords scanRecords) throws IOException {
255+
ScanRecords<org.apache.fluss.row.InternalRow> scanRecords) throws IOException {
253256
Map<TableBucket, TableBucketWriteResult<WriteResult>> writeResults = new HashMap<>();
254257
Map<TableBucket, String> finishedSplitIds = new HashMap<>();
255258
for (TableBucket bucket : scanRecords.buckets()) {
256-
List<ScanRecord> bucketScanRecords = scanRecords.records(bucket);
259+
List<ScanRecord<org.apache.fluss.row.InternalRow>> bucketScanRecords =
260+
scanRecords.records(bucket);
257261
if (bucketScanRecords.isEmpty()) {
258262
continue;
259263
}
@@ -265,13 +269,14 @@ private RecordsWithSplitIds<TableBucketWriteResult<WriteResult>> forLogRecords(
265269
LakeWriter<WriteResult> lakeWriter =
266270
getOrCreateLakeWriter(
267271
bucket, currentTableSplitsByBucket.get(bucket).getPartitionName());
268-
for (ScanRecord record : bucketScanRecords) {
272+
for (ScanRecord<InternalRow> record : bucketScanRecords) {
269273
// if record is less than stopping offset
270274
if (record.logOffset() < stoppingOffset) {
271-
lakeWriter.write(record);
275+
lakeWriter.write(new ScanRecordLogRecord(record));
272276
}
273277
}
274-
ScanRecord lastRecord = bucketScanRecords.get(bucketScanRecords.size() - 1);
278+
ScanRecord<InternalRow> lastRecord =
279+
bucketScanRecords.get(bucketScanRecords.size() - 1);
275280
// has arrived into the end of the split,
276281
if (lastRecord.logOffset() >= stoppingOffset - 1) {
277282
currentTableStoppingOffsets.remove(bucket);
@@ -390,8 +395,8 @@ private TableBucketWriteResultWithSplitIds forSnapshotSplitRecords(
390395
getOrCreateLakeWriter(
391396
bucket, checkNotNull(currentSnapshotSplit).getPartitionName());
392397
while (recordIterator.hasNext()) {
393-
ScanRecord scanRecord = recordIterator.next().record();
394-
lakeWriter.write(scanRecord);
398+
ScanRecord<InternalRow> scanRecord = recordIterator.next().record();
399+
lakeWriter.write(new ScanRecordLogRecord(scanRecord));
395400
}
396401
recordIterator.close();
397402
return emptyTableBucketWriteResultWithSplitIds();
@@ -460,6 +465,35 @@ public void close() throws Exception {
460465
// don't need to close connection, will be closed by TieringSourceReader
461466
}
462467

468+
/** Lightweight adapter to view a {@code ScanRecord<InternalRow>} as a {@link LogRecord}. */
469+
private static final class ScanRecordLogRecord implements LogRecord {
470+
private final ScanRecord<InternalRow> delegate;
471+
472+
private ScanRecordLogRecord(ScanRecord<InternalRow> delegate) {
473+
this.delegate = delegate;
474+
}
475+
476+
@Override
477+
public long logOffset() {
478+
return delegate.logOffset();
479+
}
480+
481+
@Override
482+
public long timestamp() {
483+
return delegate.timestamp();
484+
}
485+
486+
@Override
487+
public ChangeType getChangeType() {
488+
return delegate.getChangeType();
489+
}
490+
491+
@Override
492+
public InternalRow getRow() {
493+
return delegate.getRow();
494+
}
495+
}
496+
463497
private void subscribeLog(TieringLogSplit logSplit) {
464498
// assign bucket offset dynamically
465499
TableBucket tableBucket = logSplit.getTableBucket();

0 commit comments

Comments
 (0)