Skip to content

Commit 7dc4879

Browse files
committed
refactor to typed apis
1 parent 3ea717c commit 7dc4879

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+723
-428
lines changed

fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookup.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
* column [k STRING]:
3737
*
3838
* <pre>{@code
39-
* Lookuper<InternalRow> lookuper = table.newLookup().createLookuper();
39+
* Lookuper lookuper = table.newLookup().createLookuper();
4040
* CompletableFuture<LookupResult> resultFuture = lookuper.lookup(GenericRow.of("key1"));
4141
* resultFuture.get().getRowList().forEach(System.out::println);
4242
* }</pre>
@@ -45,15 +45,15 @@
4545
* columns [a INT, b STRING, c BIGINT] and bucket key [a, b]:
4646
*
4747
* <pre>{@code
48-
* Lookuper<InternalRow> lookuper = table.newLookup().lookupBy("a", "b").createLookuper();
48+
* Lookuper lookuper = table.newLookup().lookupBy("a", "b").createLookuper();
4949
* CompletableFuture<LookupResult> resultFuture = lookuper.lookup(GenericRow.of(1, "b1"));
5050
* resultFuture.get().getRowList().forEach(System.out::println);
5151
* }</pre>
5252
*
5353
* <p>Example 3: Using a POJO key (conversion handled internally):
5454
*
5555
* <pre>{@code
56-
* Lookuper<MyKeyPojo> lookuper = table.newLookup().createLookuper();
56+
* TypedLookuper<MyKeyPojo> lookuper = table.newLookup().createTypedLookuper(MyKeyPojo.class);
5757
* LookupResult result = lookuper.lookup(new MyKeyPojo(...)).get();
5858
* }</pre>
5959
*
@@ -99,11 +99,16 @@ default Lookup lookupBy(String... lookupColumnNames) {
9999
* lookup columns. By default, the lookup columns are the primary key columns, but can be
100100
* changed with ({@link #lookupBy(List)}) method.
101101
*
102-
* <p>The returned lookuper accepts generic keys of type {@code K}. If a key is a POJO, the
103-
* client implementation will convert it to an {@code InternalRow} based on the table schema and
104-
* the active lookup columns.
105-
*
106102
* @return the lookuper
107103
*/
108-
<K> Lookuper<K> createLookuper();
104+
Lookuper createLookuper();
105+
106+
/**
107+
* Creates a {@link TypedLookuper} instance to lookup rows of a primary key table using POJOs.
108+
*
109+
* @param pojoClass the class of the POJO
110+
* @param <T> the type of the POJO
111+
* @return the typed lookuper
112+
*/
113+
<T> TypedLookuper<T> createTypedLookuper(Class<T> pojoClass);
109114
}

fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookuper.java

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.fluss.client.lookup;
1919

2020
import org.apache.fluss.annotation.PublicEvolving;
21+
import org.apache.fluss.row.InternalRow;
2122

2223
import javax.annotation.concurrent.NotThreadSafe;
2324

@@ -27,30 +28,19 @@
2728
* A lookuper performs key-based lookups against a primary key table, using either the full primary
2829
* key or a prefix of the primary key (when configured via {@code Lookup#lookupBy}).
2930
*
30-
* <p>This interface is generic on the key type {@code K}: - When used in row mode, implementations
31-
* are typically declared as {@code Lookuper<InternalRow>} and accept an {@link
32-
* org.apache.fluss.row.InternalRow} containing the key fields in the configured order. - When used
33-
* with POJO keys, client-provided lookuper instances can also accept a POJO key type (for example
34-
* {@code Lookuper<MyKeyPojo>}) and will transparently convert the POJO to an {@code InternalRow}
35-
* using the table schema and active lookup columns.
36-
*
3731
* <p>Usage examples:
3832
*
3933
* <pre>{@code
4034
* // Row-based key (InternalRow)
41-
* Lookuper<InternalRow> lookuper = table.newLookup().createLookuper();
35+
* Lookuper lookuper = table.newLookup().createLookuper();
4236
* LookupResult res = lookuper.lookup(keyRow).get();
43-
*
44-
* // POJO key (converted internally)
45-
* Lookuper<MyKeyPojo> lookuperPojo = table.newLookup().createLookuper();
46-
* LookupResult res2 = lookuperPojo.lookup(new MyKeyPojo(...)).get();
4737
* }</pre>
4838
*
4939
* @since 0.6
5040
*/
5141
@PublicEvolving
5242
@NotThreadSafe
53-
public interface Lookuper<K> {
43+
public interface Lookuper {
5444

5545
/**
5646
* Lookups certain row from the given lookup key.
@@ -59,12 +49,8 @@ public interface Lookuper<K> {
5949
* {@code table.newLookup().createLookuper()}), or be the prefix key if the lookuper is a Prefix
6050
* Key Lookuper (created by {@code table.newLookup().lookupBy(prefixKeys).createLookuper()}).
6151
*
62-
* <p>The key can be either an {@link org.apache.fluss.row.InternalRow} or a POJO representing
63-
* the lookup key. Client-provided implementations returned by the Fluss client handle POJO-to-
64-
* row conversion internally when necessary.
65-
*
6652
* @param lookupKey the lookup key
6753
* @return the result of lookup.
6854
*/
69-
CompletableFuture<LookupResult> lookup(K lookupKey);
55+
CompletableFuture<LookupResult> lookup(InternalRow lookupKey);
7056
}

fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,7 @@
4646
* of the primary key.
4747
*/
4848
@NotThreadSafe
49-
class PrefixKeyLookuper implements Lookuper<InternalRow> {
50-
51-
private final TableInfo tableInfo;
52-
53-
private final MetadataUpdater metadataUpdater;
54-
55-
private final LookupClient lookupClient;
49+
class PrefixKeyLookuper extends AbstractLookuper implements Lookuper {
5650

5751
/** Extract bucket key from prefix lookup key row. */
5852
private final KeyEncoder bucketKeyEncoder;

fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,7 @@
4040

4141
/** An implementation of {@link Lookuper} that lookups by primary key. */
4242
@NotThreadSafe
43-
class PrimaryKeyLookuper implements Lookuper<InternalRow> {
44-
45-
private final TableInfo tableInfo;
46-
47-
private final MetadataUpdater metadataUpdater;
48-
49-
private final LookupClient lookupClient;
43+
class PrimaryKeyLookuper extends AbstractLookuper implements Lookuper {
5044

5145
private final KeyEncoder primaryKeyEncoder;
5246

fluss-client/src/main/java/org/apache/fluss/client/lookup/TableLookup.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.apache.fluss.client.metadata.MetadataUpdater;
2121
import org.apache.fluss.metadata.SchemaGetter;
2222
import org.apache.fluss.metadata.TableInfo;
23-
import org.apache.fluss.row.InternalRow;
2423

2524
import javax.annotation.Nullable;
2625

@@ -64,14 +63,17 @@ public Lookup lookupBy(List<String> lookupColumnNames) {
6463
}
6564

6665
@Override
67-
public <K> Lookuper<K> createLookuper() {
68-
Lookuper<InternalRow> lookuper;
66+
public Lookuper createLookuper() {
6967
if (lookupColumnNames == null) {
7068
return new PrimaryKeyLookuper(tableInfo, schemaGetter, metadataUpdater, lookupClient);
7169
} else {
7270
return new PrefixKeyLookuper(
7371
tableInfo, schemaGetter, metadataUpdater, lookupClient, lookupColumnNames);
7472
}
75-
return new TypedLookuper<K>(lookuper, tableInfo, lookupColumnNames);
7673
}
77-
}
74+
75+
@Override
76+
public <T> TypedLookuper<T> createTypedLookuper(Class<T> pojoClass) {
77+
return new TypedLookuperImpl<>(createLookuper(), tableInfo, lookupColumnNames);
78+
}
79+
}
Lines changed: 16 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
package org.apache.fluss.client.lookup;
2-
31
/*
42
* Licensed to the Apache Software Foundation (ASF) under one or more
53
* contributor license agreements. See the NOTICE file distributed with
@@ -17,57 +15,26 @@
1715
* limitations under the License.
1816
*/
1917

20-
import org.apache.fluss.client.converter.PojoToRowConverter;
21-
import org.apache.fluss.metadata.TableInfo;
22-
import org.apache.fluss.row.InternalRow;
23-
import org.apache.fluss.types.RowType;
18+
package org.apache.fluss.client.lookup;
2419

25-
import javax.annotation.Nullable;
20+
import org.apache.fluss.annotation.PublicEvolving;
2621

27-
import java.util.List;
2822
import java.util.concurrent.CompletableFuture;
2923

3024
/**
31-
* Decorator for {@link Lookuper} that enables generic key lookup via {@link
32-
* Lookuper#lookup(Object)}. Converts POJO keys to {@link InternalRow} using existing converters
33-
* based on table schema and active lookup columns, and directly delegates when the key is already
34-
* an {@link InternalRow}.
25+
* A typed lookuper performs key-based lookups against a primary key table using POJOs.
26+
*
27+
* @param <T> the type of the lookup key
28+
* @since 0.6
3529
*/
36-
final class TypedLookuper<K> implements Lookuper<K> {
37-
38-
private final Lookuper<InternalRow> delegate;
39-
private final TableInfo tableInfo;
40-
@Nullable private final List<String> lookupColumnNames;
41-
42-
TypedLookuper(
43-
Lookuper<InternalRow> delegate,
44-
TableInfo tableInfo,
45-
@Nullable List<String> lookupColumnNames) {
46-
this.delegate = delegate;
47-
this.tableInfo = tableInfo;
48-
this.lookupColumnNames = lookupColumnNames;
49-
}
50-
51-
@Override
52-
public CompletableFuture<LookupResult> lookup(K key) {
53-
if (key == null) {
54-
throw new IllegalArgumentException("key must not be null");
55-
}
56-
// Fast-path: already an InternalRow
57-
if (key instanceof InternalRow) {
58-
return delegate.lookup((InternalRow) key);
59-
}
60-
RowType tableSchema = tableInfo.getRowType();
61-
RowType keyProjection;
62-
if (lookupColumnNames == null) {
63-
keyProjection = tableSchema.project(tableInfo.getPrimaryKeys());
64-
} else {
65-
keyProjection = tableSchema.project(lookupColumnNames);
66-
}
67-
@SuppressWarnings("unchecked")
68-
Class<K> keyClass = (Class<K>) key.getClass();
69-
PojoToRowConverter<K> keyConv = PojoToRowConverter.of(keyClass, tableSchema, keyProjection);
70-
InternalRow keyRow = keyConv.toRow(key);
71-
return delegate.lookup(keyRow);
72-
}
30+
@PublicEvolving
31+
public interface TypedLookuper<T> {
32+
33+
/**
34+
* Lookups certain row from the given lookup key.
35+
*
36+
* @param lookupKey the lookup key
37+
* @return the result of lookup.
38+
*/
39+
CompletableFuture<LookupResult> lookup(T lookupKey);
7340
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package org.apache.fluss.client.lookup;
2+
3+
/*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
import org.apache.fluss.client.converter.PojoToRowConverter;
21+
import org.apache.fluss.metadata.TableInfo;
22+
import org.apache.fluss.row.InternalRow;
23+
import org.apache.fluss.types.RowType;
24+
25+
import javax.annotation.Nullable;
26+
27+
import java.util.List;
28+
import java.util.concurrent.CompletableFuture;
29+
30+
/**
31+
* Decorator for {@link Lookuper} that enables generic key lookup via {@link
32+
* TypedLookuper#lookup(Object)}. Converts POJO keys to {@link InternalRow} using existing
33+
* converters based on table schema and active lookup columns, and directly delegates when the key
34+
* is already an {@link InternalRow}.
35+
*/
36+
final class TypedLookuperImpl<K> implements TypedLookuper<K> {
37+
38+
private final Lookuper delegate;
39+
private final TableInfo tableInfo;
40+
@Nullable private final List<String> lookupColumnNames;
41+
42+
TypedLookuperImpl(
43+
Lookuper delegate, TableInfo tableInfo, @Nullable List<String> lookupColumnNames) {
44+
this.delegate = delegate;
45+
this.tableInfo = tableInfo;
46+
this.lookupColumnNames = lookupColumnNames;
47+
}
48+
49+
@Override
50+
public CompletableFuture<LookupResult> lookup(K key) {
51+
if (key == null) {
52+
throw new IllegalArgumentException("key must not be null");
53+
}
54+
// Fast-path: already an InternalRow
55+
if (key instanceof InternalRow) {
56+
return delegate.lookup((InternalRow) key);
57+
}
58+
RowType tableSchema = tableInfo.getRowType();
59+
RowType keyProjection;
60+
if (lookupColumnNames == null) {
61+
keyProjection = tableSchema.project(tableInfo.getPrimaryKeys());
62+
} else {
63+
keyProjection = tableSchema.project(lookupColumnNames);
64+
}
65+
@SuppressWarnings("unchecked")
66+
Class<K> keyClass = (Class<K>) key.getClass();
67+
PojoToRowConverter<K> keyConv = PojoToRowConverter.of(keyClass, tableSchema, keyProjection);
68+
InternalRow keyRow = keyConv.toRow(key);
69+
return delegate.lookup(keyRow);
70+
}
71+
}

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
import org.apache.fluss.annotation.PublicEvolving;
2121
import org.apache.fluss.client.table.scanner.batch.BatchScanner;
2222
import org.apache.fluss.client.table.scanner.log.LogScanner;
23+
import org.apache.fluss.client.table.scanner.log.TypedLogScanner;
2324
import org.apache.fluss.metadata.TableBucket;
24-
import org.apache.fluss.row.InternalRow;
2525

2626
import javax.annotation.Nullable;
2727

@@ -64,14 +64,14 @@ public interface Scan {
6464
*
6565
* <p>Note: this API doesn't support pre-configured with {@link #limit(int)}.
6666
*/
67-
LogScanner<InternalRow> createLogScanner();
67+
LogScanner createLogScanner();
6868

6969
/**
70-
* Creates a typed LogScanner to continuously read log data as POJOs of the given class.
70+
* Creates a {@link TypedLogScanner} to continuously read log data as POJOs of the given class.
7171
*
7272
* <p>Note: this API doesn't support pre-configured with {@link #limit(int)}.
7373
*/
74-
<T> LogScanner<T> createLogScanner(Class<T> pojoClass);
74+
<T> TypedLogScanner<T> createTypedLogScanner(Class<T> pojoClass);
7575

7676
/**
7777
* Creates a {@link BatchScanner} to read current data in the given table bucket for this scan.

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,12 @@
2626
import org.apache.fluss.client.table.scanner.log.LogScanner;
2727
import org.apache.fluss.client.table.scanner.log.LogScannerImpl;
2828
import org.apache.fluss.client.table.scanner.log.TypedLogScanner;
29+
import org.apache.fluss.client.table.scanner.log.TypedLogScannerImpl;
2930
import org.apache.fluss.config.ConfigOptions;
3031
import org.apache.fluss.exception.FlussRuntimeException;
3132
import org.apache.fluss.metadata.SchemaGetter;
3233
import org.apache.fluss.metadata.TableBucket;
3334
import org.apache.fluss.metadata.TableInfo;
34-
import org.apache.fluss.row.InternalRow;
3535
import org.apache.fluss.types.RowType;
3636

3737
import javax.annotation.Nullable;
@@ -116,9 +116,9 @@ public LogScanner createLogScanner() {
116116
}
117117

118118
@Override
119-
public <T> LogScanner<T> createLogScanner(Class<T> pojoClass) {
120-
LogScanner<InternalRow> base = createLogScanner();
121-
return new TypedLogScanner<>(base, pojoClass, tableInfo, projectedColumns);
119+
public <T> TypedLogScanner<T> createTypedLogScanner(Class<T> pojoClass) {
120+
LogScanner base = createLogScanner();
121+
return new TypedLogScannerImpl<>(base, pojoClass, tableInfo, projectedColumns);
122122
}
123123

124124
@Override

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScanner.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.fluss.client.table.scanner.log;
1919

2020
import org.apache.fluss.annotation.PublicEvolving;
21+
import org.apache.fluss.row.InternalRow;
2122

2223
import java.time.Duration;
2324

@@ -27,7 +28,7 @@
2728
* @since 0.1
2829
*/
2930
@PublicEvolving
30-
public interface LogScanner<T> extends AutoCloseable {
31+
public interface LogScanner extends AutoCloseable {
3132

3233
/**
3334
* The earliest offset to fetch from. Fluss uses "-2" to indicate fetching from log start
@@ -48,7 +49,7 @@ public interface LogScanner<T> extends AutoCloseable {
4849
* @throws java.lang.IllegalStateException if the scanner is not subscribed to any buckets to
4950
* read from.
5051
*/
51-
ScanRecords<T> poll(Duration timeout);
52+
ScanRecords<InternalRow> poll(Duration timeout);
5253

5354
/**
5455
* Subscribe to the given table bucket in given offset dynamically. If the table bucket is

0 commit comments

Comments
 (0)