Skip to content

Commit b954cc2

Browse files
committed
poc
1 parent fb49c4a commit b954cc2

33 files changed

+1258
-102
lines changed

fluss-client/src/main/java/org/apache/fluss/client/converter/ConverterCommons.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,23 @@ static void validatePojoMatchesTable(PojoType<?> pojoType, RowType tableSchema)
8383
}
8484
}
8585

86+
static void validatePojoMatchesProjection(PojoType<?> pojoType, RowType projection) {
87+
Set<String> pojoNames = pojoType.getProperties().keySet();
88+
List<String> fieldNames = projection.getFieldNames();
89+
if (!pojoNames.containsAll(fieldNames)) {
90+
throw new IllegalArgumentException(
91+
String.format(
92+
"POJO fields %s must contain all projection fields %s.",
93+
pojoNames, fieldNames));
94+
}
95+
for (int i = 0; i < projection.getFieldCount(); i++) {
96+
String name = fieldNames.get(i);
97+
DataType dt = projection.getTypeAt(i);
98+
PojoType.Property prop = pojoType.getProperty(name);
99+
validateCompatibility(dt, prop);
100+
}
101+
}
102+
86103
static void validateProjectionSubset(RowType projection, RowType tableSchema) {
87104
Set<String> tableNames = new HashSet<>(tableSchema.getFieldNames());
88105
for (String n : projection.getFieldNames()) {

fluss-client/src/main/java/org/apache/fluss/client/converter/PojoToRowConverter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ private PojoToRowConverter(PojoType<T> pojoType, RowType tableSchema, RowType pr
5454
this.tableSchema = tableSchema;
5555
this.projection = projection;
5656
this.projectionFieldNames = projection.getFieldNames();
57-
ConverterCommons.validatePojoMatchesTable(pojoType, tableSchema);
57+
// For writer path, allow POJO to be a superset of the projection. It must contain all projected fields.
58+
ConverterCommons.validatePojoMatchesProjection(pojoType, projection);
5859
ConverterCommons.validateProjectionSubset(projection, tableSchema);
5960
this.fieldConverters = createFieldConverters();
6061
}

fluss-client/src/main/java/org/apache/fluss/client/converter/PojoType.java

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,8 @@ static <T> PojoType<T> of(Class<T> pojoClass) {
7373
for (Map.Entry<String, Field> e : allFields.entrySet()) {
7474
String name = e.getKey();
7575
Field field = e.getValue();
76-
if (field.getType().isPrimitive()) {
77-
throw new IllegalArgumentException(
78-
String.format(
79-
"POJO class %s has primitive field '%s' of type %s. Primitive types are not allowed; all fields must be nullable (use wrapper types).",
80-
pojoClass.getName(), name, field.getType().getName()));
81-
}
76+
// Allow primitive types by treating them as their boxed counterparts for compatibility
77+
Class<?> effectiveType = boxIfPrimitive(field.getType());
8278
boolean publicField = Modifier.isPublic(field.getModifiers());
8379
Method getter = getters.get(name);
8480
Method setter = setters.get(name);
@@ -95,7 +91,7 @@ static <T> PojoType<T> of(Class<T> pojoClass) {
9591
props.put(
9692
name,
9793
new Property(
98-
name, field.getType(), publicField ? field : null, getter, setter));
94+
name, effectiveType, publicField ? field : null, getter, setter));
9995
}
10096

10197
return new PojoType<>(pojoClass, ctor, props);
@@ -235,6 +231,41 @@ private static String decapitalize(String s) {
235231
return s.substring(0, 1).toLowerCase(Locale.ROOT) + s.substring(1);
236232
}
237233

234+
private static Class<?> boxIfPrimitive(Class<?> type) {
235+
if (!type.isPrimitive()) {
236+
return type;
237+
}
238+
if (type == boolean.class) {
239+
return Boolean.class;
240+
}
241+
if (type == byte.class) {
242+
return Byte.class;
243+
}
244+
if (type == short.class) {
245+
return Short.class;
246+
}
247+
if (type == int.class) {
248+
return Integer.class;
249+
}
250+
if (type == long.class) {
251+
return Long.class;
252+
}
253+
if (type == float.class) {
254+
return Float.class;
255+
}
256+
if (type == double.class) {
257+
return Double.class;
258+
}
259+
if (type == char.class) {
260+
return Character.class;
261+
}
262+
// void shouldn't appear as a field type, but handle defensively
263+
if (type == void.class) {
264+
return Void.class;
265+
}
266+
return type;
267+
}
268+
238269
static final class Property {
239270
final String name;
240271
final Class<?> type;
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.lookup;
19+
20+
import org.apache.fluss.client.converter.PojoToRowConverter;
21+
import org.apache.fluss.client.converter.RowToPojoConverter;
22+
import org.apache.fluss.metadata.TableInfo;
23+
import org.apache.fluss.row.InternalRow;
24+
import org.apache.fluss.types.RowType;
25+
26+
import javax.annotation.Nullable;
27+
import java.util.ArrayList;
28+
import java.util.List;
29+
import java.util.concurrent.CompletableFuture;
30+
31+
/** Adapter that allows using POJOs for key and result in lookups. */
32+
class ConvertingLookuper<K, R> implements TypedLookuper<K, R> {
33+
34+
private final Lookuper delegate;
35+
private final PojoToRowConverter<K> keyConverter;
36+
private final RowToPojoConverter<R> resultConverter;
37+
38+
ConvertingLookuper(Lookuper delegate,
39+
Class<K> keyClass,
40+
Class<R> resultClass,
41+
TableInfo tableInfo,
42+
@Nullable List<String> lookupColumnNames) {
43+
this.delegate = delegate;
44+
RowType tableSchema = tableInfo.getRowType();
45+
RowType keyProjection;
46+
if (lookupColumnNames == null) {
47+
keyProjection = tableSchema.project(tableInfo.getPrimaryKeys());
48+
} else {
49+
keyProjection = tableSchema.project(lookupColumnNames);
50+
}
51+
this.keyConverter = PojoToRowConverter.of(keyClass, tableSchema, keyProjection);
52+
this.resultConverter = RowToPojoConverter.of(resultClass, tableSchema, tableSchema);
53+
}
54+
55+
@Override
56+
public CompletableFuture<TypedLookupResult<R>> lookup(K lookupKey) {
57+
InternalRow keyRow = keyConverter.toRow(lookupKey);
58+
return delegate
59+
.lookup(keyRow)
60+
.thenApply(result -> {
61+
List<InternalRow> rows = result.getRowList();
62+
if (rows == null || rows.isEmpty()) {
63+
return new TypedLookupResult<R>((R) null);
64+
}
65+
List<R> out = new ArrayList<>(rows.size());
66+
for (InternalRow row : rows) {
67+
out.add(resultConverter.fromRow(row));
68+
}
69+
return new TypedLookupResult<>(out);
70+
});
71+
}
72+
}

fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookup.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,4 +96,9 @@ default Lookup lookupBy(String... lookupColumnNames) {
9696
* @return the lookuper
9797
*/
9898
Lookuper createLookuper();
99+
100+
/**
101+
* Creates a typed lookuper that accepts a POJO key and returns POJO result(s).
102+
*/
103+
<K, R> TypedLookuper<K, R> createLookuper(Class<K> keyClass, Class<R> resultClass);
99104
}

fluss-client/src/main/java/org/apache/fluss/client/lookup/TableLookup.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,4 +63,10 @@ public Lookuper createLookuper() {
6363
tableInfo, metadataUpdater, lookupClient, lookupColumnNames);
6464
}
6565
}
66+
67+
@Override
68+
public <K, R> TypedLookuper<K, R> createLookuper(Class<K> keyClass, Class<R> resultClass) {
69+
Lookuper base = createLookuper();
70+
return new ConvertingLookuper<>(base, keyClass, resultClass, tableInfo, lookupColumnNames);
71+
}
6672
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.lookup;
19+
20+
import org.apache.fluss.annotation.PublicEvolving;
21+
22+
import javax.annotation.Nullable;
23+
import java.util.Collections;
24+
import java.util.List;
25+
import java.util.Objects;
26+
27+
/** Typed version of {@link LookupResult} carrying POJO values. */
28+
@PublicEvolving
29+
public final class TypedLookupResult<R> {
30+
private final List<R> values;
31+
32+
public TypedLookupResult(@Nullable R value) {
33+
this(value == null ? Collections.emptyList() : Collections.singletonList(value));
34+
}
35+
36+
public TypedLookupResult(List<R> values) {
37+
this.values = values;
38+
}
39+
40+
public List<R> getValues() {
41+
return values;
42+
}
43+
44+
public @Nullable R getSingletonValue() {
45+
if (values.isEmpty()) {
46+
return null;
47+
} else if (values.size() == 1) {
48+
return values.get(0);
49+
} else {
50+
throw new IllegalStateException("Expecting exactly one value, but got: " + values.size());
51+
}
52+
}
53+
54+
@Override
55+
public boolean equals(Object o) {
56+
if (this == o) {
57+
return true;
58+
}
59+
if (o == null || getClass() != o.getClass()) {
60+
return false;
61+
}
62+
TypedLookupResult<?> that = (TypedLookupResult<?>) o;
63+
return Objects.equals(values, that.values);
64+
}
65+
66+
@Override
67+
public int hashCode() {
68+
return Objects.hash(values);
69+
}
70+
71+
@Override
72+
public String toString() {
73+
return values.toString();
74+
}
75+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.lookup;
19+
20+
import org.apache.fluss.annotation.PublicEvolving;
21+
22+
import java.util.concurrent.CompletableFuture;
23+
24+
/** Typed lookuper that accepts a POJO key and returns POJO result(s). */
25+
@PublicEvolving
26+
public interface TypedLookuper<K, R> {
27+
28+
CompletableFuture<TypedLookupResult<R>> lookup(K lookupKey);
29+
}

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,14 @@ public interface Scan {
6363
*
6464
* <p>Note: this API doesn't support pre-configured with {@link #limit(int)}.
6565
*/
66-
LogScanner createLogScanner();
66+
org.apache.fluss.client.table.scanner.log.LogScanner<org.apache.fluss.row.InternalRow> createLogScanner();
67+
68+
/**
69+
* Creates a typed LogScanner to continuously read log data as POJOs of the given class.
70+
*
71+
* <p>Note: this API doesn't support pre-configured with {@link #limit(int)}.
72+
*/
73+
<T> org.apache.fluss.client.table.scanner.log.LogScanner<T> createLogScanner(Class<T> pojoClass);
6774

6875
/**
6976
* Creates a {@link BatchScanner} to read current data in the given table bucket for this scan.

0 commit comments

Comments
 (0)