Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
<msgpack.version>0.6.12</msgpack.version>
<protostuff.version>1.5.9</protostuff.version>
<fury.version>0.4.1</fury.version>
<fory.version>0.16.0</fory.version>
<grpc.version>1.53.0</grpc.version>

<!--common-->
Expand Down Expand Up @@ -519,6 +520,13 @@



<!-- Apache Fory -->
<dependency>
<groupId>org.apache.fory</groupId>
<artifactId>fory-core</artifactId>
<version>${fory.version}</version>
</dependency>

<!-- Test libs -->
<dependency>
<groupId>org.apache.curator</groupId>
Expand Down
46 changes: 46 additions & 0 deletions codec/codec-sofa-fory/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>com.alipay.sofa</groupId>
<artifactId>sofa-rpc-codec</artifactId>
<version>${revision}</version>
</parent>

<artifactId>sofa-rpc-codec-sofa-fory</artifactId>

<dependencies>
<dependency>
<groupId>com.alipay.sofa</groupId>
<artifactId>sofa-rpc-codec-api</artifactId>
</dependency>
<dependency>
<groupId>com.alipay.sofa</groupId>
<artifactId>sofa-rpc-api</artifactId>
</dependency>
<dependency>
<groupId>com.alipay.sofa</groupId>
<artifactId>sofa-rpc-log</artifactId>
</dependency>

<!-- Apache Fory dependency (replaces legacy org.furyio:fury-core) -->
<dependency>
<groupId>org.apache.fory</groupId>
<artifactId>fory-core</artifactId>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.rpc.codec.fory;

import com.alipay.sofa.common.config.SofaConfigs;
import com.alipay.sofa.rpc.codec.AbstractSerializer;
import com.alipay.sofa.rpc.codec.CustomSerializer;
import com.alipay.sofa.rpc.codec.common.BlackAndWhiteListFileLoader;
import com.alipay.sofa.rpc.codec.common.SerializeCheckStatus;
import com.alipay.sofa.rpc.codec.fory.serialize.SofaRequestForySerializer;
import com.alipay.sofa.rpc.codec.fory.serialize.SofaResponseForySerializer;
import com.alipay.sofa.rpc.common.config.RpcConfigKeys;
import com.alipay.sofa.rpc.core.exception.SofaRpcException;
import com.alipay.sofa.rpc.core.request.SofaRequest;
import com.alipay.sofa.rpc.core.response.SofaResponse;
import com.alipay.sofa.rpc.ext.Extension;
import com.alipay.sofa.rpc.transport.AbstractByteBuf;
import com.alipay.sofa.rpc.transport.ByteArrayWrapperByteBuf;
import org.apache.fory.Fory;
import org.apache.fory.ThreadLocalFory;
import org.apache.fory.ThreadSafeFory;
import org.apache.fory.config.Language;
import org.apache.fory.memory.MemoryBuffer;
import org.apache.fory.resolver.AllowListChecker;
import org.apache.fory.resolver.ClassResolver;

import java.util.List;
import java.util.Map;

import static org.apache.fory.config.CompatibleMode.COMPATIBLE;

/**
* Apache Fory serializer for SOFARPC.
* Uses the new Apache Fory dependency (org.apache.fory:fory-core)
* instead of the legacy org.furyio:fury-core used by FurySerializer.
*
* @author <a href="mailto:sunhailin.shl@antgroup.com">sunhailin-Leo</a>
* @see com.alipay.sofa.rpc.codec.fury.FurySerializer
*/
@Extension(value = "fory", code = 23)
public class ForySerializer extends AbstractSerializer {

protected final ThreadSafeFory fory;

private final String checkerMode = SofaConfigs.getOrDefault(RpcConfigKeys.SERIALIZE_CHECKER_MODE);

public ForySerializer() {
fory = new ThreadLocalFory(classLoader -> {
Fory foryInstance = Fory.builder()
.withLanguage(Language.JAVA)
.withRefTracking(true)
.withCodegen(true)
.withNumberCompressed(true)
.withCompatibleMode(COMPATIBLE)
.requireClassRegistration(false)
.withClassLoader(classLoader)
.withAsyncCompilation(true)
.build();

// In Apache Fory, the security checker is set via
// classResolver.setClassChecker(ClassChecker).
ClassResolver classResolver = (ClassResolver) foryInstance.getTypeResolver();

if (checkerMode.equalsIgnoreCase(SerializeCheckStatus.DISABLE.name())) {
AllowListChecker noChecker = new AllowListChecker(AllowListChecker.CheckLevel.DISABLE);
classResolver.setTypeChecker(noChecker);
} else if (checkerMode.equalsIgnoreCase(SerializeCheckStatus.WARN.name())) {
AllowListChecker blackListChecker = new AllowListChecker(AllowListChecker.CheckLevel.WARN);
classResolver.setTypeChecker(blackListChecker);
blackListChecker.addListener(classResolver);
List<String> blackList = BlackAndWhiteListFileLoader.SOFA_SERIALIZE_BLACK_LIST;
for (String key : blackList) {
blackListChecker.disallowClass(key + "*");
}
} else if (checkerMode.equalsIgnoreCase(SerializeCheckStatus.STRICT.name())) {
// STRICT mode: apply both whitelist and blacklist
AllowListChecker blackAndWhiteListChecker = new AllowListChecker(AllowListChecker.CheckLevel.STRICT);
classResolver.setTypeChecker(blackAndWhiteListChecker);
blackAndWhiteListChecker.addListener(classResolver);
List<String> whiteList = BlackAndWhiteListFileLoader.SOFA_SERIALIZER_WHITE_LIST;
for (String key : whiteList) {
blackAndWhiteListChecker.allowClass(key + "*");
}
List<String> blackList = BlackAndWhiteListFileLoader.SOFA_SERIALIZE_BLACK_LIST;
for (String key : blackList) {
blackAndWhiteListChecker.disallowClass(key + "*");
}
}

foryInstance.register(SofaRequest.class);
foryInstance.register(SofaResponse.class);
foryInstance.register(SofaRpcException.class);
return foryInstance;
});
addCustomSerializer(SofaRequest.class, new SofaRequestForySerializer(fory));
addCustomSerializer(SofaResponse.class, new SofaResponseForySerializer(fory));
}

@Override
public AbstractByteBuf encode(final Object object, final Map<String, String> context) throws SofaRpcException {
if (object == null) {
throw buildSerializeError("Unsupported null message!");
}
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
fory.setClassLoader(contextClassLoader);
CustomSerializer customSerializer = getObjCustomSerializer(object);
if (customSerializer != null) {
return customSerializer.encodeObject(object, context);
} else {
MemoryBuffer writeBuffer = MemoryBuffer.newHeapBuffer(32);
writeBuffer.writerIndex(0);
fory.serialize(writeBuffer, object);
return new ByteArrayWrapperByteBuf(writeBuffer.getBytes(0, writeBuffer.writerIndex()));
}
} catch (SofaRpcException e) {
throw e;
} catch (Exception e) {
throw buildSerializeError(e.getMessage(), e);
} finally {
fory.clearClassLoader(contextClassLoader);
}
}

@Override
public Object decode(final AbstractByteBuf data, final Class clazz, final Map<String, String> context)
throws SofaRpcException {
if (clazz == null) {
throw buildDeserializeError("Target class is null!");
}
if (data.readableBytes() <= 0) {
throw buildDeserializeError("Deserialized array is empty.");
}
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
fory.setClassLoader(contextClassLoader);
CustomSerializer customSerializer = getCustomSerializer(clazz);
if (customSerializer != null) {
return customSerializer.decodeObject(data, context);
} else {
MemoryBuffer readBuffer = MemoryBuffer.fromByteArray(data.array());
return fory.deserialize(readBuffer);
}
} catch (SofaRpcException e) {
throw e;
} catch (Exception e) {
throw buildDeserializeError(e.getMessage(), e);
} finally {
fory.clearClassLoader(contextClassLoader);
}
}

@Override
public void decode(final AbstractByteBuf data, final Object template, final Map<String, String> context)
throws SofaRpcException {
if (template == null) {
throw buildDeserializeError("template is null!");
}
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
fory.setClassLoader(contextClassLoader);
CustomSerializer customSerializer = getObjCustomSerializer(template);
if (customSerializer != null) {
customSerializer.decodeObjectByTemplate(data, context, template);
} else {
throw buildDeserializeError("Only support decode from SofaRequest and SofaResponse template");
}
} catch (SofaRpcException e) {
throw e;
} catch (Exception e) {
throw buildDeserializeError(e.getMessage(), e);
} finally {
fory.clearClassLoader(contextClassLoader);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.rpc.codec.fory.serialize;

import com.alipay.sofa.rpc.codec.CustomSerializer;
import com.alipay.sofa.rpc.common.RemotingConstants;
import com.alipay.sofa.rpc.config.ConfigUniqueNameGenerator;
import com.alipay.sofa.rpc.core.exception.RpcErrorType;
import com.alipay.sofa.rpc.core.exception.SofaRpcException;
import com.alipay.sofa.rpc.core.request.SofaRequest;
import com.alipay.sofa.rpc.transport.AbstractByteBuf;
import com.alipay.sofa.rpc.transport.ByteArrayWrapperByteBuf;
import org.apache.fory.ThreadSafeFory;
import org.apache.fory.memory.MemoryBuffer;

import java.util.Map;

/**
* Custom serializer for {@link SofaRequest} using Apache Fory.
*
* @author <a href="mailto:sunhailin.shl@antgroup.com">sunhailin-Leo</a>
*/
public class SofaRequestForySerializer implements CustomSerializer<SofaRequest> {

private final ThreadSafeFory fory;

public SofaRequestForySerializer(ThreadSafeFory fory) {
this.fory = fory;
}

@Override
public AbstractByteBuf encodeObject(SofaRequest object, Map<String, String> context) throws SofaRpcException {
try {
MemoryBuffer writeBuffer = MemoryBuffer.newHeapBuffer(32);
writeBuffer.writerIndex(0);

boolean genericSerialize = context != null &&
isGenericRequest(context.get(RemotingConstants.HEAD_GENERIC_TYPE));
if (genericSerialize) {
// TODO support generic call
throw new SofaRpcException(RpcErrorType.CLIENT_SERIALIZE, "Generic call is not supported for now.");
}
fory.serialize(writeBuffer, object);
final Object[] args = object.getMethodArgs();
fory.serialize(writeBuffer, args);

return new ByteArrayWrapperByteBuf(writeBuffer.getBytes(0, writeBuffer.writerIndex()));
} catch (SofaRpcException e) {
throw e;
} catch (Exception e) {
throw new SofaRpcException(RpcErrorType.CLIENT_SERIALIZE, e.getMessage(), e);
}
}

@Override
public SofaRequest decodeObject(AbstractByteBuf data, Map<String, String> context) throws SofaRpcException {
MemoryBuffer readBuffer = MemoryBuffer.fromByteArray(data.array());
try {
SofaRequest sofaRequest = (SofaRequest) fory.deserialize(readBuffer);
String targetServiceName = sofaRequest.getTargetServiceUniqueName();
if (targetServiceName == null) {
throw new SofaRpcException(RpcErrorType.SERVER_DESERIALIZE, "Target service name of request is null!");
}
String interfaceName = ConfigUniqueNameGenerator.getInterfaceName(targetServiceName);
sofaRequest.setInterfaceName(interfaceName);
final Object[] args = (Object[]) fory.deserialize(readBuffer);
sofaRequest.setMethodArgs(args);
return sofaRequest;
} catch (SofaRpcException e) {
throw e;
} catch (Exception e) {
throw new SofaRpcException(RpcErrorType.SERVER_DESERIALIZE, e.getMessage(), e);
}
}

@Override
public void decodeObjectByTemplate(AbstractByteBuf data, Map<String, String> context, SofaRequest template)
throws SofaRpcException {
if (data.readableBytes() <= 0) {
throw new SofaRpcException(RpcErrorType.SERVER_DESERIALIZE, "Deserialized array is empty.");
}
try {
MemoryBuffer readBuffer = MemoryBuffer.fromByteArray(data.array());
SofaRequest tmp = (SofaRequest) fory.deserialize(readBuffer);
String targetServiceName = tmp.getTargetServiceUniqueName();
if (targetServiceName == null) {
throw new SofaRpcException(RpcErrorType.SERVER_DESERIALIZE, "Target service name of request is null!");
}
template.setMethodName(tmp.getMethodName());
template.setMethodArgSigs(tmp.getMethodArgSigs());
template.setTargetServiceUniqueName(tmp.getTargetServiceUniqueName());
template.setTargetAppName(tmp.getTargetAppName());
template.addRequestProps(tmp.getRequestProps());
String interfaceName = ConfigUniqueNameGenerator.getInterfaceName(targetServiceName);
template.setInterfaceName(interfaceName);
final Object[] args = (Object[]) fory.deserialize(readBuffer);
template.setMethodArgs(args);
} catch (SofaRpcException e) {
throw e;
} catch (Exception e) {
throw new SofaRpcException(RpcErrorType.SERVER_DESERIALIZE, e.getMessage(), e);
}
}

protected boolean isGenericRequest(String serializeType) {
return serializeType != null && !serializeType.equals(RemotingConstants.SERIALIZE_FACTORY_NORMAL);
}
}
Loading
Loading