Skip to content

Commit c1975b3

Browse files
committed
address comment for the lookuper
1 parent 1f2e546 commit c1975b3

File tree

3 files changed

+22
-7
lines changed

3 files changed

+22
-7
lines changed

fluss-client/src/main/java/org/apache/fluss/client/lookup/TableLookup.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,6 @@ public Lookuper createLookuper() {
7474

7575
@Override
7676
public <T> TypedLookuper<T> createTypedLookuper(Class<T> pojoClass) {
77-
return new TypedLookuperImpl<>(createLookuper(), tableInfo, lookupColumnNames);
77+
return new TypedLookuperImpl<>(createLookuper(), tableInfo, lookupColumnNames, pojoClass);
7878
}
7979
}

fluss-client/src/main/java/org/apache/fluss/client/lookup/TypedLookuperImpl.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,17 @@ final class TypedLookuperImpl<K> implements TypedLookuper<K> {
3838
private final Lookuper delegate;
3939
private final TableInfo tableInfo;
4040
@Nullable private final List<String> lookupColumnNames;
41+
private final PojoToRowConverter<K> keyConv;
4142

4243
TypedLookuperImpl(
43-
Lookuper delegate, TableInfo tableInfo, @Nullable List<String> lookupColumnNames) {
44+
Lookuper delegate,
45+
TableInfo tableInfo,
46+
@Nullable List<String> lookupColumnNames,
47+
Class<K> keyClass) {
4448
this.delegate = delegate;
4549
this.tableInfo = tableInfo;
4650
this.lookupColumnNames = lookupColumnNames;
51+
this.keyConv = createPojoToRowConverter(keyClass);
4752
}
4853

4954
@Override
@@ -55,17 +60,19 @@ public CompletableFuture<LookupResult> lookup(K key) {
5560
if (key instanceof InternalRow) {
5661
return delegate.lookup((InternalRow) key);
5762
}
63+
64+
InternalRow keyRow = keyConv.toRow(key);
65+
return delegate.lookup(keyRow);
66+
}
67+
68+
private PojoToRowConverter<K> createPojoToRowConverter(Class<K> keyClass) {
5869
RowType tableSchema = tableInfo.getRowType();
5970
RowType keyProjection;
6071
if (lookupColumnNames == null) {
6172
keyProjection = tableSchema.project(tableInfo.getPrimaryKeys());
6273
} else {
6374
keyProjection = tableSchema.project(lookupColumnNames);
6475
}
65-
@SuppressWarnings("unchecked")
66-
Class<K> keyClass = (Class<K>) key.getClass();
67-
PojoToRowConverter<K> keyConv = PojoToRowConverter.of(keyClass, tableSchema, keyProjection);
68-
InternalRow keyRow = keyConv.toRow(key);
69-
return delegate.lookup(keyRow);
76+
return PojoToRowConverter.of(keyClass, tableSchema, keyProjection);
7077
}
7178
}

fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.fluss.client.table.scanner.ScanRecord;
2727
import org.apache.fluss.client.table.scanner.log.LogScanner;
2828
import org.apache.fluss.client.table.scanner.log.ScanRecords;
29+
import org.apache.fluss.client.table.scanner.log.TypedLogScanner;
2930
import org.apache.fluss.client.table.writer.UpsertWriter;
3031
import org.apache.fluss.config.ConfigOptions;
3132
import org.apache.fluss.config.Configuration;
@@ -144,6 +145,13 @@ protected static void subscribeFromBeginning(LogScanner logScanner, Table table)
144145
}
145146
}
146147

148+
protected static void subscribeFromBeginning(TypedLogScanner<?> logScanner, Table table) {
149+
int bucketCount = table.getTableInfo().getNumBuckets();
150+
for (int i = 0; i < bucketCount; i++) {
151+
logScanner.subscribeFromBeginning(i);
152+
}
153+
}
154+
147155
protected static void subscribeFromTimestamp(
148156
TablePath tablePath,
149157
@Nullable String partitionName,

0 commit comments

Comments
 (0)