Skip to content

Commit 8e01bc3

Browse files
committed
make lookuper generic
1 parent b954cc2 commit 8e01bc3

File tree

9 files changed

+298
-28
lines changed

9 files changed

+298
-28
lines changed

fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookup.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -93,12 +93,12 @@ default Lookup lookupBy(String... lookupColumnNames) {
9393
* lookup columns. By default, the lookup columns are the primary key columns, but can be
9494
* changed with ({@link #lookupBy(List)}) method.
9595
*
96+
* <p>The returned lookuper accepts generic keys of type {@code K}. If a key is a POJO,
97+
* the client implementation will convert it to an {@code InternalRow} based on the table
98+
* schema and the active lookup columns.
99+
*
96100
* @return the lookuper
97101
*/
98-
Lookuper createLookuper();
102+
<K> Lookuper<K> createLookuper();
99103

100-
/**
101-
* Creates a typed lookuper that accepts a POJO key and returns POJO result(s).
102-
*/
103-
<K, R> TypedLookuper<K, R> createLookuper(Class<K> keyClass, Class<R> resultClass);
104104
}

fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookuper.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
* @since 0.6
2929
*/
3030
@PublicEvolving
31-
public interface Lookuper {
31+
public interface Lookuper<T> {
3232

3333
/**
3434
* Lookups certain row from the given lookup key.
@@ -37,8 +37,12 @@ public interface Lookuper {
3737
* {@code table.newLookup().createLookuper()}), or be the prefix key if the lookuper is a Prefix
3838
* Key Lookuper (created by {@code table.newLookup().lookupBy(prefixKeys).createLookuper()}).
3939
*
40-
* @param lookupKey the lookup key.
40+
* <p>The key can be either an {@link org.apache.fluss.row.InternalRow} or a POJO representing
41+
* the lookup key. Client-provided implementations returned by the Fluss client will handle POJO
42+
* to row conversion internally when necessary.
43+
*
44+
* @param key the lookup key (InternalRow or POJO)
4145
* @return the result of lookup.
4246
*/
43-
CompletableFuture<LookupResult> lookup(InternalRow lookupKey);
47+
CompletableFuture<LookupResult> lookup(T key);
4448
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.lookup;
19+
20+
import org.apache.fluss.client.converter.PojoToRowConverter;
21+
import org.apache.fluss.metadata.TableInfo;
22+
import org.apache.fluss.row.InternalRow;
23+
import org.apache.fluss.types.RowType;
24+
25+
import javax.annotation.Nullable;
26+
import java.util.List;
27+
import java.util.concurrent.CompletableFuture;
28+
29+
/**
30+
* Decorator for {@link Lookuper} that enables generic key lookup via {@link Lookuper#lookup(Object)}.
31+
* Converts POJO keys to {@link InternalRow} using existing converters based on table schema and
32+
* active lookup columns, and directly delegates when the key is already an {@link InternalRow}.
33+
*/
34+
final class PojoConvertingLookuper<K> implements Lookuper<K> {
35+
36+
private final Lookuper<InternalRow> delegate;
37+
private final TableInfo tableInfo;
38+
@Nullable private final List<String> lookupColumnNames;
39+
40+
PojoConvertingLookuper(Lookuper<InternalRow> delegate,
41+
TableInfo tableInfo,
42+
@Nullable List<String> lookupColumnNames) {
43+
this.delegate = delegate;
44+
this.tableInfo = tableInfo;
45+
this.lookupColumnNames = lookupColumnNames;
46+
}
47+
48+
@Override
49+
public CompletableFuture<LookupResult> lookup(K key) {
50+
if (key == null) {
51+
throw new IllegalArgumentException("key must not be null");
52+
}
53+
// Fast-path: already an InternalRow
54+
if (key instanceof InternalRow) {
55+
return delegate.lookup((InternalRow) key);
56+
}
57+
RowType tableSchema = tableInfo.getRowType();
58+
RowType keyProjection;
59+
if (lookupColumnNames == null) {
60+
keyProjection = tableSchema.project(tableInfo.getPrimaryKeys());
61+
} else {
62+
keyProjection = tableSchema.project(lookupColumnNames);
63+
}
64+
@SuppressWarnings("unchecked")
65+
Class<K> keyClass = (Class<K>) key.getClass();
66+
PojoToRowConverter<K> keyConv = PojoToRowConverter.of(keyClass, tableSchema, keyProjection);
67+
InternalRow keyRow = keyConv.toRow(key);
68+
return delegate.lookup(keyRow);
69+
}
70+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.lookup;
19+
20+
import org.apache.fluss.annotation.PublicEvolving;
21+
22+
import javax.annotation.Nullable;
23+
import java.util.Collections;
24+
import java.util.List;
25+
import java.util.Objects;
26+
27+
/** POJO-typed version of {@link LookupResult} carrying POJO values. */
28+
@PublicEvolving
29+
public final class PojoLookupResult<R> {
30+
private final List<R> values;
31+
32+
public PojoLookupResult(@Nullable R value) {
33+
this(value == null ? Collections.emptyList() : Collections.singletonList(value));
34+
}
35+
36+
public PojoLookupResult(List<R> values) {
37+
this.values = values;
38+
}
39+
40+
public List<R> getValues() {
41+
return values;
42+
}
43+
44+
public @Nullable R getSingletonValue() {
45+
if (values.isEmpty()) {
46+
return null;
47+
} else if (values.size() == 1) {
48+
return values.get(0);
49+
} else {
50+
throw new IllegalStateException("Expecting exactly one value, but got: " + values.size());
51+
}
52+
}
53+
54+
@Override
55+
public boolean equals(Object o) {
56+
if (this == o) {
57+
return true;
58+
}
59+
if (o == null || getClass() != o.getClass()) {
60+
return false;
61+
}
62+
PojoLookupResult<?> that = (PojoLookupResult<?>) o;
63+
return Objects.equals(values, that.values);
64+
}
65+
66+
@Override
67+
public int hashCode() {
68+
return Objects.hash(values);
69+
}
70+
71+
@Override
72+
public String toString() {
73+
return values.toString();
74+
}
75+
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.lookup;
19+
20+
import org.apache.fluss.annotation.PublicEvolving;
21+
import org.apache.fluss.client.converter.PojoToRowConverter;
22+
import org.apache.fluss.client.converter.RowToPojoConverter;
23+
import org.apache.fluss.client.table.Table;
24+
import org.apache.fluss.metadata.TableInfo;
25+
import org.apache.fluss.row.InternalRow;
26+
import org.apache.fluss.types.RowType;
27+
28+
import java.util.ArrayList;
29+
import java.util.List;
30+
import java.util.Objects;
31+
import java.util.concurrent.CompletableFuture;
32+
33+
/**
34+
* Utility that keeps the original Lookuper API intact while offering POJO-based lookup ergonomics.
35+
*
36+
* <p>This class wraps a {@link Lookuper} and performs POJO-to-row conversion for keys and
37+
* row-to-POJO conversion for results using the existing converters. It does not alter the public
38+
* {@link Lookup} API, preserving backward compatibility.
39+
*/
40+
@PublicEvolving
41+
public final class PojoLookuper<K, R> {
42+
43+
private final Lookuper delegate;
44+
private final PojoToRowConverter<K> keyConverter;
45+
private final RowToPojoConverter<R> resultConverter;
46+
47+
private PojoLookuper(Lookuper delegate,
48+
PojoToRowConverter<K> keyConverter,
49+
RowToPojoConverter<R> resultConverter) {
50+
this.delegate = delegate;
51+
this.keyConverter = keyConverter;
52+
this.resultConverter = resultConverter;
53+
}
54+
55+
/**
56+
* Creates a POJO lookuper for primary-key lookup using the table's full schema as result type
57+
* and the primary key projection for the key.
58+
*/
59+
public static <K, R> PojoLookuper<K, R> forPrimaryKey(Table table,
60+
Class<K> keyClass,
61+
Class<R> resultClass) {
62+
Objects.requireNonNull(table, "table");
63+
TableInfo tableInfo = table.getTableInfo();
64+
RowType tableSchema = tableInfo.getRowType();
65+
RowType keyProjection = tableSchema.project(tableInfo.getPrimaryKeys());
66+
PojoToRowConverter<K> keyConv = PojoToRowConverter.of(keyClass, tableSchema, keyProjection);
67+
RowToPojoConverter<R> resConv = RowToPojoConverter.of(resultClass, tableSchema, tableSchema);
68+
Lookuper base = table.newLookup().createLookuper();
69+
return new PojoLookuper<>(base, keyConv, resConv);
70+
}
71+
72+
/**
73+
* Creates a POJO lookuper for prefix-key lookup using the provided lookup columns.
74+
*/
75+
public static <K, R> PojoLookuper<K, R> forPrefixKey(Table table,
76+
List<String> lookupColumns,
77+
Class<K> keyClass,
78+
Class<R> resultClass) {
79+
Objects.requireNonNull(table, "table");
80+
Objects.requireNonNull(lookupColumns, "lookupColumns");
81+
TableInfo tableInfo = table.getTableInfo();
82+
RowType tableSchema = tableInfo.getRowType();
83+
RowType keyProjection = tableSchema.project(lookupColumns);
84+
PojoToRowConverter<K> keyConv = PojoToRowConverter.of(keyClass, tableSchema, keyProjection);
85+
RowToPojoConverter<R> resConv = RowToPojoConverter.of(resultClass, tableSchema, tableSchema);
86+
Lookuper base = table.newLookup().lookupBy(lookupColumns).createLookuper();
87+
return new PojoLookuper<>(base, keyConv, resConv);
88+
}
89+
90+
/**
91+
* Performs a lookup using a POJO key and returns POJO results.
92+
*/
93+
public CompletableFuture<PojoLookupResult<R>> lookup(K lookupKey) {
94+
InternalRow keyRow = keyConverter.toRow(lookupKey);
95+
return delegate
96+
.lookup(keyRow)
97+
.thenApply(result -> {
98+
List<InternalRow> rows = result.getRowList();
99+
if (rows == null || rows.isEmpty()) {
100+
return new PojoLookupResult<R>((R) null);
101+
}
102+
List<R> out = new ArrayList<>(rows.size());
103+
for (InternalRow row : rows) {
104+
out.add(resultConverter.fromRow(row));
105+
}
106+
return new PojoLookupResult<>(out);
107+
});
108+
}
109+
}

fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
* An implementation of {@link Lookuper} that lookups by prefix key. A prefix key is a prefix subset
4747
* of the primary key.
4848
*/
49-
class PrefixKeyLookuper implements Lookuper {
49+
class PrefixKeyLookuper implements Lookuper<org.apache.fluss.row.InternalRow> {
5050

5151
private final TableInfo tableInfo;
5252

fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
import static org.apache.fluss.utils.Preconditions.checkArgument;
4141

4242
/** An implementation of {@link Lookuper} that lookups by primary key. */
43-
class PrimaryKeyLookuper implements Lookuper {
43+
class PrimaryKeyLookuper implements Lookuper<org.apache.fluss.row.InternalRow> {
4444

4545
private final TableInfo tableInfo;
4646

fluss-client/src/main/java/org/apache/fluss/client/lookup/TableLookup.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.fluss.client.metadata.MetadataUpdater;
2121
import org.apache.fluss.metadata.TableInfo;
22+
import org.apache.fluss.row.InternalRow;
2223

2324
import javax.annotation.Nullable;
2425

@@ -55,18 +56,17 @@ public Lookup lookupBy(List<String> lookupColumnNames) {
5556
}
5657

5758
@Override
58-
public Lookuper createLookuper() {
59+
public <K> Lookuper<K> createLookuper() {
60+
Lookuper<InternalRow> base;
5961
if (lookupColumnNames == null) {
60-
return new PrimaryKeyLookuper(tableInfo, metadataUpdater, lookupClient);
62+
base = new PrimaryKeyLookuper(tableInfo, metadataUpdater, lookupClient);
6163
} else {
62-
return new PrefixKeyLookuper(
64+
base = new PrefixKeyLookuper(
6365
tableInfo, metadataUpdater, lookupClient, lookupColumnNames);
6466
}
65-
}
66-
67-
@Override
68-
public <K, R> TypedLookuper<K, R> createLookuper(Class<K> keyClass, Class<R> resultClass) {
69-
Lookuper base = createLookuper();
70-
return new ConvertingLookuper<>(base, keyClass, resultClass, tableInfo, lookupColumnNames);
67+
// Wrap with a POJO-converting decorator so callers can use lookup(K) with POJOs as well.
68+
@SuppressWarnings("unchecked")
69+
Lookuper<K> decorated = (Lookuper<K>) new PojoConvertingLookuper<>(base, tableInfo, lookupColumnNames);
70+
return decorated;
7171
}
7272
}

0 commit comments

Comments
 (0)