Skip to content

Commit e602fd8

Browse files
feat(java): row encoder supports custom types and collections
1 parent 6613de0 commit e602fd8

17 files changed

+1172
-79
lines changed
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.fury.type;
21+
22+
import org.apache.fury.annotation.Internal;
23+
24+
@Internal
25+
public interface CustomTypeRegistry {
26+
CustomTypeRegistry EMPTY =
27+
new CustomTypeRegistry() {
28+
@Override
29+
public boolean hasCodec(final Class<?> beanType, final Class<?> fieldType) {
30+
return false;
31+
}
32+
33+
@Override
34+
public boolean canConstructCollection(
35+
final Class<?> collectionType, final Class<?> elementType) {
36+
return false;
37+
}
38+
};
39+
40+
boolean hasCodec(Class<?> beanType, Class<?> fieldType);
41+
42+
boolean canConstructCollection(Class<?> collectionType, Class<?> elementType);
43+
}

java/fury-core/src/main/java/org/apache/fury/type/TypeUtils.java

Lines changed: 59 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -568,6 +568,10 @@ public static boolean isBean(Class<?> clz) {
568568
return isBean(TypeRef.of(clz));
569569
}
570570

571+
public static boolean isBean(Type type, CustomTypeRegistry customTypes) {
572+
return isBean(TypeRef.of(type), new LinkedHashSet<>(), customTypes);
573+
}
574+
571575
/**
572576
* Returns true if class is not array/iterable/map, and all fields is {@link
573577
* TypeUtils#isSupported(TypeRef)}. Bean class can't be a non-static inner class. Public static
@@ -577,7 +581,14 @@ public static boolean isBean(TypeRef<?> typeRef) {
577581
return isBean(typeRef, new LinkedHashSet<>());
578582
}
579583

580-
private static boolean isBean(TypeRef<?> typeRef, LinkedHashSet<TypeRef> walkedTypePath) {
584+
private static boolean isBean(TypeRef<?> typeRef, LinkedHashSet<TypeRef<?>> walkedTypePath) {
585+
return isBean(typeRef, walkedTypePath, CustomTypeRegistry.EMPTY);
586+
}
587+
588+
private static boolean isBean(
589+
TypeRef<?> typeRef,
590+
LinkedHashSet<TypeRef<?>> walkedTypePath,
591+
CustomTypeRegistry customTypes) {
581592
Class<?> cls = getRawType(typeRef);
582593
if (Modifier.isAbstract(cls.getModifiers()) || Modifier.isInterface(cls.getModifiers())) {
583594
return false;
@@ -589,7 +600,7 @@ private static boolean isBean(TypeRef<?> typeRef, LinkedHashSet<TypeRef> walkedT
589600
if (cls.getEnclosingClass() != null && !Modifier.isStatic(cls.getModifiers())) {
590601
return false;
591602
}
592-
LinkedHashSet<TypeRef> newTypePath = new LinkedHashSet<>(walkedTypePath);
603+
LinkedHashSet<TypeRef<?>> newTypePath = new LinkedHashSet<>(walkedTypePath);
593604
newTypePath.add(typeRef);
594605
if (cls == Object.class) {
595606
// return false for typeToken that point to un-specialized generic type.
@@ -602,14 +613,17 @@ private static boolean isBean(TypeRef<?> typeRef, LinkedHashSet<TypeRef> walkedT
602613
&& !ITERABLE_TYPE.isSupertypeOf(typeRef)
603614
&& !MAP_TYPE.isSupertypeOf(typeRef);
604615
if (maybe) {
616+
Class<?> enclosingType = enclosingType(newTypePath);
605617
return Descriptor.getDescriptors(cls).stream()
606618
.allMatch(
607619
d -> {
608620
TypeRef<?> t = d.getTypeRef();
609621
// do field modifiers and getter/setter validation here, not in getDescriptors.
610622
// If Modifier.isFinal(d.getModifiers()), use reflection
611623
// private field that doesn't have getter/setter will be handled by reflection.
612-
return isSupported(t, newTypePath) || isBean(t, newTypePath);
624+
return customTypes.hasCodec(enclosingType, t.getRawType())
625+
|| isSupported(t, newTypePath, customTypes)
626+
|| isBean(t, newTypePath, customTypes);
613627
});
614628
} else {
615629
return false;
@@ -621,10 +635,13 @@ private static boolean isBean(TypeRef<?> typeRef, LinkedHashSet<TypeRef> walkedT
621635

622636
/** Check if <code>typeToken</code> is supported by row-format. */
623637
public static boolean isSupported(TypeRef<?> typeRef) {
624-
return isSupported(typeRef, new LinkedHashSet<>());
638+
return isSupported(typeRef, new LinkedHashSet<>(), CustomTypeRegistry.EMPTY);
625639
}
626640

627-
private static boolean isSupported(TypeRef<?> typeRef, LinkedHashSet<TypeRef> walkedTypePath) {
641+
private static boolean isSupported(
642+
TypeRef<?> typeRef,
643+
LinkedHashSet<TypeRef<?>> walkedTypePath,
644+
CustomTypeRegistry customTypes) {
628645
Class<?> cls = getRawType(typeRef);
629646
if (!Modifier.isPublic(cls.getModifiers())) {
630647
return false;
@@ -639,6 +656,10 @@ private static boolean isSupported(TypeRef<?> typeRef, LinkedHashSet<TypeRef> wa
639656
} else if (typeRef.isArray()) {
640657
return isSupported(Objects.requireNonNull(typeRef.getComponentType()));
641658
} else if (ITERABLE_TYPE.isSupertypeOf(typeRef)) {
659+
TypeRef<?> elementType = getElementType(typeRef);
660+
if (customTypes.canConstructCollection(typeRef.getRawType(), elementType.getRawType())) {
661+
return true;
662+
}
642663
boolean isSuperOfArrayList = cls.isAssignableFrom(ArrayList.class);
643664
boolean isSuperOfHashSet = cls.isAssignableFrom(HashSet.class);
644665
if ((!isSuperOfArrayList && !isSuperOfHashSet)
@@ -658,7 +679,7 @@ private static boolean isSupported(TypeRef<?> typeRef, LinkedHashSet<TypeRef> wa
658679
throw new UnsupportedOperationException(
659680
"cyclic type is not supported. walkedTypePath: " + walkedTypePath);
660681
} else {
661-
LinkedHashSet<TypeRef> newTypePath = new LinkedHashSet<>(walkedTypePath);
682+
LinkedHashSet<TypeRef<?>> newTypePath = new LinkedHashSet<>(walkedTypePath);
662683
newTypePath.add(typeRef);
663684
return isBean(typeRef, newTypePath);
664685
}
@@ -673,13 +694,24 @@ private static boolean isSupported(TypeRef<?> typeRef, LinkedHashSet<TypeRef> wa
673694
* parameters recursively
674695
*/
675696
public static LinkedHashSet<Class<?>> listBeansRecursiveInclusive(Class<?> beanClass) {
676-
return listBeansRecursiveInclusive(beanClass, new LinkedHashSet<>());
697+
return listBeansRecursiveInclusive(beanClass, CustomTypeRegistry.EMPTY);
698+
}
699+
700+
public static LinkedHashSet<Class<?>> listBeansRecursiveInclusive(
701+
Class<?> beanClass, CustomTypeRegistry customTypes) {
702+
return listBeansRecursiveInclusive(beanClass, new LinkedHashSet<>(), customTypes);
677703
}
678704

679705
private static LinkedHashSet<Class<?>> listBeansRecursiveInclusive(
680-
Class<?> beanClass, LinkedHashSet<TypeRef<?>> walkedTypePath) {
706+
Class<?> beanClass,
707+
LinkedHashSet<TypeRef<?>> walkedTypePath,
708+
CustomTypeRegistry customTypes) {
681709
LinkedHashSet<Class<?>> beans = new LinkedHashSet<>();
682-
if (isBean(beanClass)) {
710+
Class<?> enclosingType = enclosingType(walkedTypePath);
711+
if (customTypes.hasCodec(enclosingType, beanClass)) {
712+
return beans;
713+
}
714+
if (isBean(beanClass, customTypes)) {
683715
beans.add(beanClass);
684716
}
685717
LinkedHashSet<TypeRef<?>> typeRefs = new LinkedHashSet<>();
@@ -692,33 +724,34 @@ private static LinkedHashSet<Class<?>> listBeansRecursiveInclusive(
692724

693725
for (TypeRef<?> typeToken : typeRefs) {
694726
Class<?> type = getRawType(typeToken);
695-
if (isBean(type)) {
696-
beans.add(type);
727+
if (isBean(type, customTypes)) {
697728
if (walkedTypePath.contains(typeToken)) {
698729
throw new UnsupportedOperationException(
699730
"cyclic type is not supported. walkedTypePath: " + walkedTypePath);
700731
} else {
701732
LinkedHashSet<TypeRef<?>> newPath = new LinkedHashSet<>(walkedTypePath);
702733
newPath.add(typeToken);
703-
beans.addAll(listBeansRecursiveInclusive(type, newPath));
734+
beans.addAll(listBeansRecursiveInclusive(type, newPath, customTypes));
704735
}
705736
} else if (isCollection(type)) {
706737
TypeRef<?> elementType = getElementType(typeToken);
707738
LinkedHashSet<TypeRef<?>> newPath = new LinkedHashSet<>(walkedTypePath);
708739
newPath.add(elementType);
709-
beans.addAll(listBeansRecursiveInclusive(elementType.getClass(), newPath));
740+
beans.addAll(listBeansRecursiveInclusive(elementType.getClass(), newPath, customTypes));
710741
} else if (isMap(type)) {
711742
Tuple2<TypeRef<?>, TypeRef<?>> mapKeyValueType = getMapKeyValueType(typeToken);
712743
LinkedHashSet<TypeRef<?>> newPath = new LinkedHashSet<>(walkedTypePath);
713744
newPath.add(mapKeyValueType.f0);
714745
newPath.add(mapKeyValueType.f1);
715-
beans.addAll(listBeansRecursiveInclusive(mapKeyValueType.f0.getRawType(), newPath));
716-
beans.addAll(listBeansRecursiveInclusive(mapKeyValueType.f1.getRawType(), newPath));
746+
beans.addAll(
747+
listBeansRecursiveInclusive(mapKeyValueType.f0.getRawType(), newPath, customTypes));
748+
beans.addAll(
749+
listBeansRecursiveInclusive(mapKeyValueType.f1.getRawType(), newPath, customTypes));
717750
} else if (type.isArray()) {
718751
Class<?> arrayComponent = getArrayComponent(type);
719752
LinkedHashSet<TypeRef<?>> newPath = new LinkedHashSet<>(walkedTypePath);
720753
newPath.add(TypeRef.of(arrayComponent));
721-
beans.addAll(listBeansRecursiveInclusive(arrayComponent, newPath));
754+
beans.addAll(listBeansRecursiveInclusive(arrayComponent, newPath, customTypes));
722755
}
723756
}
724757
return beans;
@@ -737,7 +770,7 @@ public static int computeStringHash(String str) {
737770
}
738771

739772
/** Returns generic type arguments of <code>typeToken</code>. */
740-
public static List<TypeRef<?>> getTypeArguments(TypeRef typeRef) {
773+
public static List<TypeRef<?>> getTypeArguments(TypeRef<?> typeRef) {
741774
if (typeRef.getType() instanceof ParameterizedType) {
742775
ParameterizedType parameterizedType = (ParameterizedType) typeRef.getType();
743776
return Arrays.stream(parameterizedType.getActualTypeArguments())
@@ -752,7 +785,7 @@ public static List<TypeRef<?>> getTypeArguments(TypeRef typeRef) {
752785
* Returns generic type arguments of <code>typeToken</code>, includes generic type arguments of
753786
* generic type arguments recursively.
754787
*/
755-
public static List<TypeRef<?>> getAllTypeArguments(TypeRef typeRef) {
788+
public static List<TypeRef<?>> getAllTypeArguments(TypeRef<?> typeRef) {
756789
List<TypeRef<?>> types = getTypeArguments(typeRef);
757790
LinkedHashSet<TypeRef<?>> allTypeArguments = new LinkedHashSet<>(types);
758791
for (TypeRef<?> type : types) {
@@ -776,4 +809,12 @@ public static String qualifiedName(String pkg, String className) {
776809
return pkg + "." + className;
777810
}
778811
}
812+
813+
private static Class<?> enclosingType(LinkedHashSet<TypeRef<?>> newTypePath) {
814+
Class<?> result = Object.class;
815+
for (TypeRef<?> type : newTypePath) {
816+
result = type.getRawType();
817+
}
818+
return result;
819+
}
779820
}

java/fury-format/README.md

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,45 @@ Fury row format is heavily inspired by spark tungsten row format, but with chang
1111
- Support adding fields without breaking compatibility.
1212

1313
The initial fury java row data structure implementation is modified from spark unsafe row/writer.
14+
15+
It is possible to register custom type handling and collection factories for the row format -
16+
see Encoders.registerCustomCodec and Encoders.registerCustomCollectionFactory.
17+
18+
A short example:
19+
20+
```
21+
@Data
22+
public static class UuidType {
23+
public UUID f1;
24+
public UUID[] f2;
25+
public SortedSet<UUID> f3;
26+
}
27+
28+
static class UuidEncoder implements CustomCodec.MemoryBufferCodec<UUID> {
29+
@Override
30+
public MemoryBuffer encode(final UUID value) {
31+
final MemoryBuffer result = MemoryBuffer.newHeapBuffer(16);
32+
result.putInt64(0, value.getMostSignificantBits());
33+
result.putInt64(8, value.getLeastSignificantBits());
34+
return result;
35+
}
36+
37+
@Override
38+
public UUID decode(final MemoryBuffer value) {
39+
return new UUID(value.readInt64(), value.readInt64());
40+
}
41+
}
42+
43+
static class SortedSetOfUuidDecoder implements CustomCollectionFactory<UUID, SortedSet<UUID>> {
44+
@Override
45+
public SortedSet<UUID> newCollection(final int size) {
46+
return new TreeSet<>(UnsignedUuidComparator.INSTANCE);
47+
}
48+
}
49+
50+
Encoders.registerCustomCodec(UUID.class, new UuidEncoder());
51+
Encoders.registerCustomCollectionFactory(
52+
SortedSet.class, UUID.class, new SortedSetOfUuidDecoder());
53+
54+
RowEncoder<UuidType> encoder = Encoders.bean(UuidType.class);
55+
```

java/fury-format/src/main/java/org/apache/fury/format/encoder/ArrayDataForEach.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.fury.codegen.Expression.AbstractExpression;
3131
import org.apache.fury.format.row.binary.BinaryArray;
3232
import org.apache.fury.format.row.binary.BinaryUtils;
33+
import org.apache.fury.format.type.CustomTypeEncoderRegistry;
3334
import org.apache.fury.reflect.TypeRef;
3435
import org.apache.fury.type.TypeUtils;
3536
import org.apache.fury.util.Preconditions;
@@ -75,8 +76,17 @@ public ArrayDataForEach(
7576
super(inputArrayData);
7677
Preconditions.checkArgument(getRawType(inputArrayData.type()) == BinaryArray.class);
7778
this.inputArrayData = inputArrayData;
78-
this.accessMethod = BinaryUtils.getElemAccessMethodName(elemType);
79-
this.elemType = BinaryUtils.getElemReturnType(elemType);
79+
TypeRef<?> accessType;
80+
CustomCodec<?, ?> customEncoder =
81+
CustomTypeEncoderRegistry.customTypeHandler()
82+
.findCodec(BinaryArray.class, elemType.getRawType());
83+
if (customEncoder == null) {
84+
accessType = elemType;
85+
} else {
86+
accessType = TypeRef.of(customEncoder.encodedType());
87+
}
88+
this.accessMethod = BinaryUtils.getElemAccessMethodName(accessType);
89+
this.elemType = BinaryUtils.getElemReturnType(accessType);
8090
this.notNullAction = notNullAction;
8191
this.nullAction = nullAction;
8292
}

0 commit comments

Comments
 (0)