Skip to content

Commit 8b8a33d

Browse files
committed
add TypedScanRecords
1 parent 7dc4879 commit 8b8a33d

25 files changed

+371
-208
lines changed

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/ScanRecord.java

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,51 +19,52 @@
1919

2020
import org.apache.fluss.annotation.Internal;
2121
import org.apache.fluss.record.ChangeType;
22+
import org.apache.fluss.record.LogRecord;
23+
import org.apache.fluss.row.InternalRow;
2224

2325
import java.util.Objects;
2426

2527
/** one scan record. */
28+
// TODO: replace this with GenericRecord in the future
2629
@Internal
27-
public class ScanRecord<T> {
30+
public class ScanRecord implements LogRecord {
2831
private static final long INVALID = -1L;
2932

3033
private final long offset;
3134
private final long timestamp;
3235
private final ChangeType changeType;
33-
private final T value;
36+
private final InternalRow row;
3437

35-
public ScanRecord(T value) {
36-
this(INVALID, INVALID, ChangeType.INSERT, value);
38+
public ScanRecord(InternalRow row) {
39+
this(INVALID, INVALID, ChangeType.INSERT, row);
3740
}
3841

39-
public ScanRecord(long offset, long timestamp, ChangeType changeType, T value) {
42+
public ScanRecord(long offset, long timestamp, ChangeType changeType, InternalRow row) {
4043
this.offset = offset;
4144
this.timestamp = timestamp;
4245
this.changeType = changeType;
43-
this.value = value;
46+
this.row = row;
4447
}
4548

4649
/** The position of this record in the corresponding fluss table bucket. */
50+
@Override
4751
public long logOffset() {
4852
return offset;
4953
}
5054

55+
@Override
5156
public long timestamp() {
5257
return timestamp;
5358
}
5459

60+
@Override
5561
public ChangeType getChangeType() {
5662
return changeType;
5763
}
5864

59-
/** Returns the carried record as InternalRow for backward compatibility. */
60-
public org.apache.fluss.row.InternalRow getRow() {
61-
return (org.apache.fluss.row.InternalRow) value;
62-
}
63-
64-
/** Returns the carried record value. */
65-
public T getValue() {
66-
return value;
65+
@Override
66+
public InternalRow getRow() {
67+
return row;
6768
}
6869

6970
@Override
@@ -74,19 +75,19 @@ public boolean equals(Object o) {
7475
if (o == null || getClass() != o.getClass()) {
7576
return false;
7677
}
77-
ScanRecord<?> that = (ScanRecord<?>) o;
78+
ScanRecord that = (ScanRecord) o;
7879
return offset == that.offset
7980
&& changeType == that.changeType
80-
&& Objects.equals(value, that.value);
81+
&& Objects.equals(row, that.row);
8182
}
8283

8384
@Override
8485
public int hashCode() {
85-
return Objects.hash(offset, changeType, value);
86+
return Objects.hash(offset, changeType, row);
8687
}
8788

8889
@Override
8990
public String toString() {
90-
return changeType.shortString() + value + "@" + offset;
91+
return changeType.shortString() + row.toString() + "@" + offset;
9192
}
9293
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.table.scanner;
19+
20+
import org.apache.fluss.annotation.PublicEvolving;
21+
import org.apache.fluss.record.ChangeType;
22+
import org.apache.fluss.row.InternalRow;
23+
24+
import java.util.Objects;
25+
26+
/**
27+
* A record produced by a table scanner which contains a typed value.
28+
*
29+
* @param <T> The type of the value.
30+
*/
31+
@PublicEvolving
32+
public class TypedScanRecord<T> {
33+
34+
private final ScanRecord scanRecord;
35+
private final T value;
36+
37+
public TypedScanRecord(ScanRecord scanRecord, T value) {
38+
this.scanRecord = scanRecord;
39+
this.value = value;
40+
}
41+
42+
/** The position of this record in the corresponding fluss table bucket. */
43+
public long logOffset() {
44+
return scanRecord.logOffset();
45+
}
46+
47+
/** The timestamp of this record. */
48+
public long timestamp() {
49+
return scanRecord.timestamp();
50+
}
51+
52+
/** The change type of this record. */
53+
public ChangeType getChangeType() {
54+
return scanRecord.getChangeType();
55+
}
56+
57+
/** Returns the record value. */
58+
public T getValue() {
59+
return value;
60+
}
61+
62+
/** Returns the internal row of this record. */
63+
public InternalRow getRow() {
64+
return scanRecord.getRow();
65+
}
66+
67+
@Override
68+
public boolean equals(Object o) {
69+
if (this == o) {
70+
return true;
71+
}
72+
if (o == null || getClass() != o.getClass()) {
73+
return false;
74+
}
75+
TypedScanRecord<?> that = (TypedScanRecord<?>) o;
76+
return Objects.equals(scanRecord, that.scanRecord) && Objects.equals(value, that.value);
77+
}
78+
79+
@Override
80+
public int hashCode() {
81+
return Objects.hash(scanRecord, value);
82+
}
83+
84+
@Override
85+
public String toString() {
86+
return scanRecord.getChangeType().shortString() + value + "@" + scanRecord.logOffset();
87+
}
88+
}

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/CompletedFetch.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public CompletedFetch(
9595
// TODO: optimize this to avoid deep copying the record.
9696
// refactor #fetchRecords to return an iterator which lazily deserialize
9797
// from underlying record stream and arrow buffer.
98-
ScanRecord<InternalRow> toScanRecord(LogRecord record) {
98+
ScanRecord toScanRecord(LogRecord record) {
9999
GenericRow newRow = new GenericRow(selectedFieldGetters.length);
100100
InternalRow internalRow = record.getRow();
101101
for (int i = 0; i < selectedFieldGetters.length; i++) {
@@ -148,7 +148,7 @@ void drain() {
148148
* maxRecords}
149149
* @return {@link ScanRecord scan records}
150150
*/
151-
public List<ScanRecord<InternalRow>> fetchRecords(int maxRecords) {
151+
public List<ScanRecord> fetchRecords(int maxRecords) {
152152
if (corruptLastRecord) {
153153
throw new FetchException(
154154
"Received exception when fetching the next record from "
@@ -161,7 +161,7 @@ public List<ScanRecord<InternalRow>> fetchRecords(int maxRecords) {
161161
return Collections.emptyList();
162162
}
163163

164-
List<ScanRecord<InternalRow>> scanRecords = new ArrayList<>();
164+
List<ScanRecord> scanRecords = new ArrayList<>();
165165
try {
166166
for (int i = 0; i < maxRecords; i++) {
167167
// Only move to next record if there was no exception in the last fetch.

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import org.apache.fluss.metadata.TablePath;
3030
import org.apache.fluss.record.LogRecord;
3131
import org.apache.fluss.record.LogRecordBatch;
32-
import org.apache.fluss.row.InternalRow;
3332
import org.apache.fluss.rpc.protocol.ApiError;
3433
import org.apache.fluss.rpc.protocol.Errors;
3534

@@ -84,9 +83,8 @@ public LogFetchCollector(
8483
* @throws LogOffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse and
8584
* the defaultResetPolicy is NONE
8685
*/
87-
public Map<TableBucket, List<ScanRecord<InternalRow>>> collectFetch(
88-
final LogFetchBuffer logFetchBuffer) {
89-
Map<TableBucket, List<ScanRecord<InternalRow>>> fetched = new HashMap<>();
86+
public Map<TableBucket, List<ScanRecord>> collectFetch(final LogFetchBuffer logFetchBuffer) {
87+
Map<TableBucket, List<ScanRecord>> fetched = new HashMap<>();
9088
int recordsRemaining = maxPollRecords;
9189

9290
try {
@@ -117,19 +115,18 @@ public Map<TableBucket, List<ScanRecord<InternalRow>>> collectFetch(
117115

118116
logFetchBuffer.poll();
119117
} else {
120-
List<ScanRecord<InternalRow>> records =
121-
fetchRecords(nextInLineFetch, recordsRemaining);
118+
List<ScanRecord> records = fetchRecords(nextInLineFetch, recordsRemaining);
122119
if (!records.isEmpty()) {
123120
TableBucket tableBucket = nextInLineFetch.tableBucket;
124-
List<ScanRecord<InternalRow>> currentRecords = fetched.get(tableBucket);
121+
List<ScanRecord> currentRecords = fetched.get(tableBucket);
125122
if (currentRecords == null) {
126123
fetched.put(tableBucket, records);
127124
} else {
128125
// this case shouldn't usually happen because we only send one fetch at
129126
// a time per bucket, but it might conceivably happen in some rare
130127
// cases (such as bucket leader changes). we have to copy to a new list
131128
// because the old one may be immutable
132-
List<ScanRecord<InternalRow>> newScanRecords =
129+
List<ScanRecord> newScanRecords =
133130
new ArrayList<>(records.size() + currentRecords.size());
134131
newScanRecords.addAll(currentRecords);
135132
newScanRecords.addAll(records);
@@ -149,8 +146,7 @@ public Map<TableBucket, List<ScanRecord<InternalRow>>> collectFetch(
149146
return fetched;
150147
}
151148

152-
private List<ScanRecord<InternalRow>> fetchRecords(
153-
CompletedFetch nextInLineFetch, int maxRecords) {
149+
private List<ScanRecord> fetchRecords(CompletedFetch nextInLineFetch, int maxRecords) {
154150
TableBucket tb = nextInLineFetch.tableBucket;
155151
Long offset = logScannerStatus.getBucketOffset(tb);
156152
if (offset == null) {
@@ -161,7 +157,7 @@ private List<ScanRecord<InternalRow>> fetchRecords(
161157
nextInLineFetch.nextFetchOffset());
162158
} else {
163159
if (nextInLineFetch.nextFetchOffset() == offset) {
164-
List<ScanRecord<InternalRow>> records = nextInLineFetch.fetchRecords(maxRecords);
160+
List<ScanRecord> records = nextInLineFetch.fetchRecords(maxRecords);
165161
LOG.trace(
166162
"Returning {} fetched records at offset {} for assigned bucket {}.",
167163
records.size(),

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import org.apache.fluss.record.MemoryLogRecords;
4343
import org.apache.fluss.remote.RemoteLogFetchInfo;
4444
import org.apache.fluss.remote.RemoteLogSegment;
45-
import org.apache.fluss.row.InternalRow;
4645
import org.apache.fluss.rpc.entity.FetchLogResultForBucket;
4746
import org.apache.fluss.rpc.gateway.TabletServerGateway;
4847
import org.apache.fluss.rpc.messages.FetchLogRequest;
@@ -162,7 +161,7 @@ public boolean hasAvailableFetches() {
162161
return !logFetchBuffer.isEmpty();
163162
}
164163

165-
public Map<TableBucket, List<ScanRecord<InternalRow>>> collectFetch() {
164+
public Map<TableBucket, List<ScanRecord>> collectFetch() {
166165
return logFetchCollector.collectFetch(logFetchBuffer);
167166
}
168167

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScanner.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.fluss.client.table.scanner.log;
1919

2020
import org.apache.fluss.annotation.PublicEvolving;
21-
import org.apache.fluss.row.InternalRow;
2221

2322
import java.time.Duration;
2423

@@ -49,7 +48,7 @@ public interface LogScanner extends AutoCloseable {
4948
* @throws java.lang.IllegalStateException if the scanner is not subscribed to any buckets to
5049
* read from.
5150
*/
52-
ScanRecords<InternalRow> poll(Duration timeout);
51+
ScanRecords poll(Duration timeout);
5352

5453
/**
5554
* Subscribe to the given table bucket in given offset dynamically. If the table bucket is

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.apache.fluss.metadata.TableBucket;
2929
import org.apache.fluss.metadata.TableInfo;
3030
import org.apache.fluss.metadata.TablePath;
31-
import org.apache.fluss.row.InternalRow;
3231
import org.apache.fluss.rpc.metrics.ClientMetricGroup;
3332
import org.apache.fluss.types.RowType;
3433
import org.apache.fluss.utils.Projection;
@@ -131,7 +130,7 @@ private Projection sanityProjection(@Nullable int[] projectedFields, TableInfo t
131130
}
132131

133132
@Override
134-
public ScanRecords<InternalRow> poll(Duration timeout) {
133+
public ScanRecords poll(Duration timeout) {
135134
acquireAndEnsureOpen();
136135
try {
137136
if (!logScannerStatus.prepareToPoll()) {
@@ -142,25 +141,25 @@ public ScanRecords<InternalRow> poll(Duration timeout) {
142141
long timeoutNanos = timeout.toNanos();
143142
long startNanos = System.nanoTime();
144143
do {
145-
Map<TableBucket, List<ScanRecord<InternalRow>>> fetchRecords = pollForFetches();
144+
Map<TableBucket, List<ScanRecord>> fetchRecords = pollForFetches();
146145
if (fetchRecords.isEmpty()) {
147146
try {
148147
if (!logFetcher.awaitNotEmpty(startNanos + timeoutNanos)) {
149148
// logFetcher waits for the timeout and no data in buffer,
150149
// so we return empty
151-
return new ScanRecords<>(fetchRecords);
150+
return new ScanRecords(fetchRecords);
152151
}
153152
} catch (WakeupException e) {
154153
// wakeup() is called, we need to return empty
155-
return new ScanRecords<>(fetchRecords);
154+
return new ScanRecords(fetchRecords);
156155
}
157156
} else {
158157
// before returning the fetched records, we can send off the next round of
159158
// fetches and avoid block waiting for their responses to enable pipelining
160159
// while the user is handling the fetched records.
161160
logFetcher.sendFetches();
162161

163-
return new ScanRecords<>(fetchRecords);
162+
return new ScanRecords(fetchRecords);
164163
}
165164
} while (System.nanoTime() - startNanos < timeoutNanos);
166165

@@ -231,8 +230,8 @@ public void wakeup() {
231230
logFetcher.wakeup();
232231
}
233232

234-
private Map<TableBucket, List<ScanRecord<InternalRow>>> pollForFetches() {
235-
Map<TableBucket, List<ScanRecord<InternalRow>>> fetchedRecords = logFetcher.collectFetch();
233+
private Map<TableBucket, List<ScanRecord>> pollForFetches() {
234+
Map<TableBucket, List<ScanRecord>> fetchedRecords = logFetcher.collectFetch();
236235
if (!fetchedRecords.isEmpty()) {
237236
return fetchedRecords;
238237
}

0 commit comments

Comments
 (0)